文章目录
- 0 前言
- 1 区块链基础
- 1.1 比特币内部结构
- 1.2 实现的区块链数据结构
- 1.3 注意点
- 1.4 区块链的核心-工作量证明算法
- 1.4.1 拜占庭将军问题
- 1.4.2 解决办法
- 1.4.3 代码实现
- 2 快速实现一个区块链
- 2.1 什么是区块链
- 2.2 一个完整的快包含什么
- 2.3 什么是挖矿
- 2.4 工作量证明算法:
- 2.5 实现代码
- 3 最后
0 前言
🔥 优质竞赛项目系列,今天要分享的是
python区块链实现 - proof of work工作量证明共识算法
该项目较为新颖,适合作为竞赛课题方向,学长非常推荐!
🧿 更多资料, 项目分享:
https://gitee.com/dancheng-senior/postgraduate
1 区块链基础
学长以比特币的结构向大家详解区块链的组成部分
1.1 比特币内部结构
- previous hash(前一个区块的hash)
- merkle root(默克尔树根节点,内部存储交易数据)
- timestamp(当前区块生成的时间)
- nonce(旷工计算hash值次数)
1.2 实现的区块链数据结构
- index 当前第几个区块
- timestamp 该区块创建时的时间戳
- data 交易信息
- previousHash 前一个区块的hash
- hash 当前区块的hash
1.3 注意点
第一个区块叫做创世区块(genesis block),区块链创建的时候默认生产的这里用的是单纯的链表,不是用默克尔树存储
示例代码
from hashlib import sha256//区块schemaclass Block:def __init__(self,index,timestamp,data,previousHash=""):self.index = indexself.timestamp = timestampself.data = dataself.previousHash = previousHashself.hash = self.calculateHash()//计算当前区块的hash值def calculateHash(self):plainData = str(self.index)+str(self.timestamp)+str(self.data)return sha256(plainData.encode('utf-8')).hexdigest()def __str__(self):return str(self.__dict__)//区块链schemaclass BlockChain://初始化的时候 创建 创世区块def __init__(self):self.chain = [self.createGenesisBlock()]//构建创世区块def createGenesisBlock(self):return Block(0,"01/01/2018","genesis block","0")//获取最后一个区块def getLatestBlock(self):return self.chain[len(self.chain)-1]//往区块链里面添加区块def addBlock(self,newBlock):newBlock.previousHash = self.getLatestBlock().hashnewBlock.hash = newBlock.calculateHash()self.chain.append(newBlock)def __str__(self):return str(self.__dict__) //校验区块链是不是有效的 有没有人被篡改def chainIsValid(self):for index in range(1,len(self.chain)):currentBlock = self.chain[index]previousBlock = self.chain[index-1]if (currentBlock.hash != currentBlock.calculateHash()):return Falseif previousBlock.hash != currentBlock.previousHash:return Falsereturn TruemyCoin = BlockChain()
myCoin.addBlock(Block(1,"02/01/2018","{amount:4}"))
myCoin.addBlock(Block(2,"03/01/2018","{amount:5}"))#print block info 打印区块链信息
print("print block info ####:")
for block in myCoin.chain:print(block)
#check blockchain is valid 检查区块链是不是有效的
print("before tamper block,blockchain is valid ###")
print(myCoin.chainIsValid())
#tamper the blockinfo 篡改区块2的数据
myCoin.chain[1].data = "{amount:1002}"
print("after tamper block,blockchain is valid ###")
print(myCoin.chainIsValid())
输出结果
print block info ####:
{'index': 0, 'timestamp': '01/01/2018', 'data': 'genesis block', 'previousHash': '0', 'hash': 'd8d21e5ba33780d5eb77d09d3b407ceb8ade4e5545ef951de1997b209d91e264'}
{'index': 1, 'timestamp': '02/01/2018', 'data': '{amount:4}', 'previousHash': 'd8d21e5ba33780d5eb77d09d3b407ceb8ade4e5545ef951de1997b209d91e264', 'hash': '15426e32db30f4b26aa719ba5e573f372f41e27e4728eb9e9ab0bea8eae63a9d'}
{'index': 2, 'timestamp': '03/01/2018', 'data': '{amount:5}', 'previousHash': '15426e32db30f4b26aa719ba5e573f372f41e27e4728eb9e9ab0bea8eae63a9d', 'hash': '75119e897f21c769acee6e32abcefc5e88e250a1f35cc95946379436050ac2f0'}
before tamper block,blockchain is valid ###
True
after tamper block,blockchain is valid ###
False
1.4 区块链的核心-工作量证明算法
上面学长介绍了区块链的基本结构,我在之前的基础上来简单实现一下工作量证明算法(proof of
work),在介绍pow之前先思考一下为什么要工作量证明算法,或者再往前想一步为什么比特币如何解决信任的问题?
1.4.1 拜占庭将军问题
比特币出现之前就有了拜占庭将军问题,主要思想是,如何在分布式系统环境里去相信其他人发给你的信息?
一组拜占庭将军分别各率领一支军队共同围困一座城市。为了简化问题,将各支军队的行动策略限定为进攻或撤离两种。因为部分军队进攻部分军队撤离可能会造成灾难性后果,因此各位将军必须通过投票来达成一致策略,即所有军队一起进攻或所有军队一起撤离。因为各位将军分处城市不同方向,他
系统的问题在于,将军中可能出现叛徒,他们不仅可能向较为糟糕的策略投票,还可能选择性地发送投票信息。假设有9位将军投票,其中1名叛徒。8名忠诚的将军中出现了4人投进攻,4人投撤离的情况。这时候叛徒可能故意给4名投进攻的将领送信表示投票进攻,而给4名投撤离的将领送信表示投撤离。这样一来在4名投进攻的将领看来,投票结果是5人投进攻,从而发起进攻;而在4名投撤离的将军看来则是5人投撤离。这样各支军队的一致协同就遭到了破坏。
1.4.2 解决办法
拜占庭将军问题主要问题是,中间人可以拦截消息,进行修改;上述的那些士兵可以理解成比特币中的一些节点,不是所有节点拿到消息后都是可以直接处理的,先去解决一个数学问题,就是工作量证明,只有拥有特定的计算能力解决了问题之后才能去修改或者校验(验证,打包,上链)。
上图就是简单的工作量证明算法流程,一串数字后面有个x,x之前的数可以理解成交易数据,然后需要找到一个x,让整个数的hash值的开头有n个0,如果hash是很均匀的话,那么生成的hash值每一位为0或者1都是等可能的,所以前n个都为0的概率就是2的n次方/2的hash值位数,上图给出了如果hash值是5个bit的情况下的所有可能
1.4.3 代码实现
from hashlib import sha256import timeclass Block:def __init__(self,index,timestamp,data,previousHash=""):self.index = indexself.timestamp = timestampself.data = dataself.previousHash = previousHashself.nonce = 0 //代表当前计算了多少次hash计算self.hash = self.calculateHash()def calculateHash(self):plainData = str(self.index)+str(self.timestamp)+str(self.data)+str(self.nonce)return sha256(plainData.encode('utf-8')).hexdigest()#挖矿 difficulty代表复杂度 表示前difficulty位都为0才算成功def minerBlock(self,difficulty):while(self.hash[0:difficulty]!=str(0).zfill(difficulty)):self.nonce+=1self.hash = self.calculateHash()def __str__(self):return str(self.__dict__)class BlockChain:def __init__(self):self.chain = [self.createGenesisBlock()]self.difficulty = 5def createGenesisBlock(self):return Block(0,"01/01/2018","genesis block")def getLatestBlock(self):return self.chain[len(self.chain)-1]#添加区块前需要 做一道计算题😶,坐完后才能把区块加入到链上def addBlock(self,newBlock):newBlock.previousHash = self.getLatestBlock().hashnewBlock.minerBlock(self.difficulty)self.chain.append(newBlock)def __str__(self):return str(self.__dict__) def chainIsValid(self):for index in range(1,len(self.chain)):currentBlock = self.chain[index]previousBlock = self.chain[index-1]if (currentBlock.hash != currentBlock.calculateHash()):return Falseif previousBlock.hash != currentBlock.previousHash:return Falsereturn TruemyCoin = BlockChain()# 下面打印了每个区块挖掘需要的时间 比特币通过一定的机制控制在10分钟出一个块 # 其实就是根据当前网络算力 调整我们上面difficulty值的大小,如果你在# 本地把上面代码difficulty的值调很大你可以看到很久都不会出计算结果startMinerFirstBlockTime = time.time()print("start to miner first block time :"+str(startMinerFirstBlockTime))myCoin.addBlock(Block(1,"02/01/2018","{amount:4}"))print("miner first block time completed" + ",used " +str(time.time()-startMinerFirstBlockTime) +"s")startMinerSecondBlockTime = time.time()print("start to miner first block time :"+str(startMinerSecondBlockTime))myCoin.addBlock(Block(2,"03/01/2018","{amount:5}"))print("miner second block time completed" + ",used " +str(time.time()-startMinerSecondBlockTime) +"s\n")#print block infoprint("print block info ####:\n")for block in myCoin.chain:print("\n")print(block)#check blockchain is validprint("before tamper block,blockchain is valid ###")print(myCoin.chainIsValid())#tamper the blockinfomyCoin.chain[1].data = "{amount:1002}"print("after tamper block,blockchain is valid ###")print(myCoin.chainIsValid())
输出
2 快速实现一个区块链
2.1 什么是区块链
区块链是一个不可变得,有序的被称之为块的记录链,它们可以包含交易、文件或者任何你喜欢的数据,但最重要的是,它们用hash连接在一起。
2.2 一个完整的快包含什么
一个索引,一个时间戳,一个事物列表,一个校验, 一个前快的散链表
2.3 什么是挖矿
挖矿其实非常简单就做了以下三件事:
1、计算工作量证明poW
2、通过新增一个交易赋予矿工(自已)一个币
3、构造新区块并将其添加到链中
2.4 工作量证明算法:
使用该算法来证明是如何在区块上创建和挖掘新的区块,pow的目标是计算出一个符合特定条件的数字,这个数字对于所有人而言必须在计算上非常困难,但易于验证,这就是工作证明背后的核心思想计算难度与目标字符串需要满足的特定字符串成正比。
2.5 实现代码
import hashlibimport jsonimport requestsfrom textwrap import dedentfrom time import timefrom uuid import uuid4from urllib.parse import urlparsefrom flask import Flask, jsonify, requestclass Blockchain(object):def __init__(self):...self.nodes = set()# 用 set 来储存节点,避免重复添加节点....self.chain = []self.current_transactions = []#创建创世区块self.new_block(previous_hash=1,proof=100)def reister_node(self,address):"""在节点列表中添加一个新节点:param address::return:"""prsed_url = urlparse(address)self.nodes.add(prsed_url.netloc)def valid_chain(self,chain):"""确定一个给定的区块链是否有效:param chain::return:"""last_block = chain[0]current_index = 1while current_index<len(chain):block = chain[current_index]print(f'{last_block}')print(f'{block}')print("\n______\n")# 检查block的散列是否正确if block['previous_hash'] != self.hash(last_block):return False# 检查工作证明是否正确if not self.valid_proof(last_block['proof'], block['proof']):return Falselast_block = blockcurrent_index += 1return Truedef ressolve_conflicts(self):"""共识算法:return:"""neighbours = self.nodesnew_chain = None# 寻找最长链条max_length = len(self.chain)# 获取并验证网络中的所有节点的链for node in neighbours:response = requests.get(f'http://{node}/chain')if response.status_code == 200:length = response.json()['length']chain = response.json()['chain']# 检查长度是否长,链是否有效if length > max_length and self.valid_chain(chain):max_length = lengthnew_chain = chain# 如果发现一个新的有效链比当前的长,就替换当前的链if new_chain:self.chain = new_chainreturn Truereturn Falsedef new_block(self,proof,previous_hash=None):"""创建一个新的块并将其添加到链中:param proof: 由工作证明算法生成证明:param previous_hash: 前一个区块的hash值:return: 新区块"""block = {'index':len(self.chain)+1,'timestamp':time(),'transactions':self.current_transactions,'proof':proof,'previous_hash':previous_hash or self.hash(self.chain[-1]),}# 重置当前交易记录self.current_transactions = []self.chain.append(block)return blockdef new_transaction(self,sender,recipient,amount):# 将新事务添加到事务列表中"""Creates a new transaction to go into the next mined Block:param sender:发送方的地址:param recipient:收信人地址:param amount:数量:return:保存该事务的块的索引"""self.current_transactions.append({'sender':sender,'recipient':recipient,'amount':amount,})return self.last_block['index'] + 1@staticmethoddef hash(block):"""给一个区块生成 SHA-256 值:param block::return:"""# 必须确保这个字典(区块)是经过排序的,否则将会得到不一致的散列block_string = json.dumps(block,sort_keys=True).encode()return hashlib.sha256(block_string).hexdigest()@propertydef last_block(self):# 返回链中的最后一个块return self.chain[-1]def proof_of_work(self,last_proof):# 工作算法的简单证明proof = 0while self.valid_proof(last_proof,proof)is False:proof +=1return proof@staticmethoddef valid_proof(last_proof,proof):# 验证证明guess = f'{last_proof}{proof}'.encode()guess_hash = hashlib.sha256(guess).hexdigest()return guess_hash[:4] =="0000"# 实例化节点app = Flask(__name__)# 为该节点生成一个全局惟一的地址node_identifier = str(uuid4()).replace('-','')# 实例化Blockchain类blockchain = Blockchain()# 进行挖矿请求@app.route('/mine',methods=['GET'])def mine():# 运行工作算法的证明来获得下一个证明。last_block = blockchain.last_blocklast_proof = last_block['proof']proof = blockchain.proof_of_work(last_proof)# 必须得到一份寻找证据的奖赏。blockchain.new_transaction(sender="0",recipient=node_identifier,amount=1,)# 通过将其添加到链中来构建新的块previous_hash = blockchain.hash(last_block)block = blockchain.new_block(proof,previous_hash)response = {'message': "New Block Forged",'index': block['index'],'transactions': block['transactions'],'proof': block['proof'],'previous_hash': block['previous_hash'],}return jsonify(response), 200# 创建交易请求@app.route('/transactions/new',methods=['POST'])def new_transactions():values = request.get_json()# 检查所需要的字段是否位于POST的data中required = ['seder','recipient','amount']if not all(k in values for k in request):return 'Missing values',400#创建一个新的事物index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount'])response = {'message': f'Transaction will be added to Block {index}'}return jsonify(response), 201# 获取所有快信息@app.route('/chain',methods=['GET'])def full_chain():response = {'chain':blockchain.chain,'length':len(blockchain.chain),}return jsonify(response),200# 添加节点@app.route('/nodes/register',methods=['POST'])def register_nodes():values = request.get_json()nodes = values.get('nodes')if nodes is None:return "Error: Please supply a valid list of nodes", 400for node in nodes:blockchain.register_node(node)response = {'message': 'New nodes have been added','total_nodes': list(blockchain.nodes),}return jsonify(response), 201# 解决冲突@app.route('/nodes/resolve', methods=['GET'])def consensus():replaced = blockchain.resolve_conflicts()if replaced:response = {'message': 'Our chain was replaced','new_chain': blockchain.chain}else:response = {'message': 'Our chain is authoritative','chain': blockchain.chain}return jsonify(response), 200if __name__ == '__main__':app.run(host='0.0.0.0',port=5000)
代码弄好启动你的项目以后打开Postman 完成以下操作
学长通过请求 http://localhost:5000/mine进行采矿
3 最后
🧿 更多资料, 项目分享:
https://gitee.com/dancheng-senior/postgraduate