概要
-
Bitcoin 論文を参考に Blockchain の実装をする
- 天下り的にならないよう,徐々にシステムをパワーアップさせていく方針で作る
-
目標は,参加者同士が競ってマイニングをし,コンセンサスを取りながらblockchainを作る様子を眺めること
-
本記事の実装はこちら (Jupyter Notebook): https://github.com/tamuhey/python_blockchain
※ Jupyterでの実装推奨
スタート:信用のある第三者を介する取引
- まずは第三者を介した取引をシミュレーションしてみる
- 取引の公正さは,全てこの第三者が保証する
取引はどのように行うか? - Transaction の実装
- 取引記録を発行することで,取引を行う
- Python3.7 から実装されたdataclassesを使ってみる
from __future__ import annotations
from dataclasses import dataclass
- 個人の識別を
address
で行う - 取引額を
value
とする
@dataclass
class Transaction:
sender_address: int
receiver_address: int
value: float
- sender は Transaction を発行して取引を行う
Wallet の実装
- とりあえず,Wallet は適当な 1 つの
int
を address として持つことにする - Transaction を発行する機能をもたせる
-
send
method で取引記録を作成できる
-
@dataclass
class Wallet:
address: int
def send(self, receiver_address, value):
return Transaction(self.address, receiver_address, value)
Ledger の実装
- transaction は信用のある第三者(以降Bank と呼ぶことにする)に提出されることで成立する
- Bank は Transaction を元帳(Ledger)で管理する
- Bank が全ての Transaction を管理し,精査することで不正を防止する
- Ledger はとりあえず単なるリストにする
Ledger=list
デモ: シンプルな取引
- Alice と Bob が取引する様子をデモする
alice=Wallet(1) # create Alice's wallet
bob=Wallet(2) # create Bob's wallet
ledger=Ledger() # create Bank's ledger
- Alice が transaction を発行して Bank に登録することで,取引が成立する
transaction=alice.send(bob.address, 5) # Alice send `5` to Bob
ledger.append(transaction)
- ここで例えば,第三者の信頼度がちょっと落ちたとする
- 取引内容の改ざんはどのように防げばよいだろうか?
電子署名を用いた transaction の発行
- sender は transaction に電子署名を行う
- 今回は RSA を用いることにする
- BitCoin はECDSAを用いている
- これによって transaction の改ざんを検知することができる
- 今回は RSA を用いることにする
- 電子署名の公開鍵はどのように扱う?
- 公開鍵を Wallet の address とする!
Transaction の改良 - 電子署名フィールドの追加
- 電子署名できるようにする
-
sign
を追加 - 署名の際,transaction を str で表すための
str_data
method も追加
-
- ついでにデータを json string に変換する method を追加する
- これは後々データのやり取りを簡単にするため
import json
import dataclasses as dc
@dataclass
class Transaction:
sender_address: str
receiver_address: str
value: float
sign: str = None
def str_data(self) -> str:
# return the str of data without `sign`
d=dc.asdict(self)
del d["sign"]
return json.dumps(d)
def json_dumps(self) -> str:
return json.dumps(dc.asdict(self))
@classmethod
def json_loads(cls, string) -> Transaction:
return cls(**json.loads(string))
Wallet の改良 - 電子署名機能の追加
- RSA による電子署名機能を実装する
- pycrypto を用いる
- 引き続き Wallet は address を 1 つのみ持つことにする
- 例えば BitCoin では匿名性の向上のため,一つの Wallet は複数のキーペアを持つことができる (参考)
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA
from Crypto.PublicKey import RSA
import binascii
- pycrypto の key instance を str に変換する helper function を実装する
- str にしておく理由は後ほどわかる
def decode_key(key):
if isinstance(key, str):
return key
return binascii.hexlify(key.exportKey(format='DER')).decode('ascii')
def encode_key(key):
if isinstance(key, RSA._RSAobj):
return key
return RSA.importKey(binascii.unhexlify(key))
Wallet の改良
class Wallet:
def __init__(self):
key=RSA.generate(1024)
self.private_key = decode_key(key)
self.address = decode_key(key.publickey())
def sign_transaction(self, transaction) -> Transaction:
# generate signer from self private key
signer=PKCS1_v1_5.new(encode_key(self.private_key))
# hash the transaction
h=SHA.new(transaction.str_data().encode())
# add signature to the transaction, and return it
return dc.replace(transaction, sign=signer.sign(h).hex())
def send(self, receiver_address, value) -> Transaction:
"""Generate transaction, and add signature"""
transaction=Transaction(self.address, receiver_address, value)
return self.sign_transaction(transaction)
- 秘密鍵
private_key
の追加 - address は対応する公開鍵とする
-
sign_transaction
の追加- transaction を受け取り,電子署名付き transaction を返す
-
send
を変更- 電子署名付き transaction を生成できるようになった!
ついでに,transaction の電子署名をチェックする helper function を作る
- signature の public key は transaction の address なので,transaction を見れば電子署名の正当性を判別できる
def verify_transaction(transaction) -> bool:
if transaction.sign is None:
return False
# hash the transaction
h=SHA.new(transaction.str_data().encode())
# generate verifier with public key
verifier=PKCS1_v1_5.new(encode_key(transaction.sender_address))
# is the signature correct?
return verifier.verify(h, binascii.unhexlify(transaction.sign))
デモ: RSA を用いた transaction の発行とチェック
先ほどと同じようにデモをしてみる
alice=Wallet() # now the address is automatically generated
bob=Wallet()
ledger=Ledger()
- transaction を発行し,電子署名を行い,bank に提出する
transaction=alice.send(bob.address, 5)
# Bank appends the transaction to its ledger
ledger.append(transaction)
- transaction の電子署名を確認してみる
verify_transaction(transaction)
True
- データをちょと変えてみると,不正な transaction になるのがわかる
transaction.value=7
verify_transaction(transaction)
False
- transaction の改竄リスクを低減することができた
- しかし依然として,double-spending problem を防ぐには信頼のある第三者が必要となる
- これはデータで通貨を作る際の最も大きな障害(データは簡単にコピーできるから)
- すべての transaction が公開されていれば,double-spending は防げるのでは?
- 公開されていれば receiver も記録をチェックできる
Timestamp Server による Transaction の公開
- いくつかの transaction をまとめて,timestamp をつけ,blockで公開することを考える
- 信頼のあるtimestamp serverが transaction をまとめて timestamp をつけ,電子署名をして公開する
- ある特定の時刻にその transaction が存在したことが保証される
- さらに,block には直前の timestamp の hash を含め,blockchainを作る
- block を改ざんすると,以降の block は全て不正なものとなる
- これで transaction をチェックして double-spending を防ぐ第三者は必要なくなった
- transaction は公開されているから,誰でも検証することができる
Block の実装
- str で表現できるよう,json method を追加する
- ついでに hash method も実装する
from typing import Tuple, Sequence
import hashlib
@dataclass
class Block:
time: float
transactions: Tuple[Transaction]
previous_hash: str
sign: str = None
def json_dumps(self) -> str:
dct=dc.asdict(self)
dct["transactions"]=[t.json_dumps() for t in self.transactions]
return json.dumps(dct)
@classmethod
def json_loads(cls, string) -> Block:
dct=json.loads(string)
dct["transactions"]=tuple([Transaction.json_loads(t) for t in dct["transactions"]])
return cls(**dct)
def hash(self) -> str:
block_bytes=self.json_dumps().encode()
return hashlib.sha256(block_bytes).hexdigest()
Timestamp Server の実装
-
timestamp server は以下の流れでblockchainを作る
- transaction のリストを受け取る
- 一つ前の block の hash と timestamp をつけて Block にまとめる
- 電子署名をする
- 公開する
-
block の正当性の保証は timestamp server の電子署名で行う
- server に RSA キーペアをもたせる
-
最初の block は previous block がないため,特別なブロック(genesis)を用意する (参考)
-
blockchain は今回はただのリストにする
from time import time
class TimestampServer:
def __init__(self):
key=RSA.generate(1024)
self.public_key=key.publickey()
self.signer=PKCS1_v1_5.new(key)
genesis=Block(time(), (), "0")
self.block_chain=[genesis]
def generate_block(self, transactions: Sequence[Transaction]):
# generate block
block=Block(time(), tuple(transactions), self.block_chain[-1].hash())
# sign the block
dct=dc.asdict(block)
del dct["sign"]
block.sign=self.signer.sign(SHA.new(json.dumps(dct).encode())).hex()
# publish the block
self.block_chain.append(block)
blockchain の正当性確認
-
blockchain の正当性を確認する helper function を用意する
- block の持つprevious_hashは正しいか?
- block に入っている transaction は正当か?
- timestamp server がつけた電子署名は正当か?
-
block の正当性確認
def verify_block(previous_block, block, timestamp_server_publickey) -> bool:
is_correct_hash = previous_block.hash() == block.previous_hash
is_correct_transactions = all(map(verify_transaction, block.transactions))
dct=dc.asdict(block)
del dct["sign"]
h=SHA.new(json.dumps(dct).encode())
verifier=PKCS1_v1_5.new(timestamp_server_publickey)
is_correct_sign=verifier.verify(h, binascii.unhexlify(block.sign))
return is_correct_hash and is_correct_transactions and is_correct_sign
- Blockchain の正当性確認
def verify_blockchain(chain, timestamp_server_publickey):
for i in range(len(chain)-1):
if not verify_block(chain[-i-2],chain[-i-1],timestamp_server_publickey):
return False
return True
デモ: Timestamp server による Block の公開と BlockChain への登録
- timestamp server をつくる
timestamp_server=TimestampServer()
- 取引を複数回行う
transactions=[]
transactions.append(alice.send(bob.address, 5))
transactions.append(bob.send(alice.address, 7))
- Timestamp server による Block の生成と公開
- timestamp_server の blockchain は誰でも見ることができる
timestamp_server.generate_block(transactions)
- 正当性確認
verify_blockchain(timestamp_server.block_chain, timestamp_server.public_key)
True
- これで transaction のチェックをすべての人が行えるようになった
- 確かにtransaction を検証する第三者は必要なくなったが,今度は信頼のある timestamp server が必要になってしまった
- timestamp server を第三者に任せない方法がほしい!
Proof-of-Work の実装
- これが blockchain の革新的要素
- 先程は"信頼の置ける" timestamp server の電子署名が,Block の正当性を保証していた
- では誰でも同じ署名ができれば,誰でも timestamp server の仕事ができるのでは?
- 電子署名でやると,当然ブロックの正当性の証明にはならない..
- HashCashを用いる
Block に nonce を加える
- これが timestamp server の電子署名の代わりとなる
- block のハッシュ値が適当な条件を満たすようにnonceを追加する
- 例えば上 4 桁が 0
- この制約をdifficultyと呼ぶ
- difficulty は適当に調整される. 例えば bitcoin の場合は,block の追加の interval が 10 分程度になるように調整されるようだ
- 今回は difficulty は固定にする
- nonce を見つける作業がmine
- nonce を見つけるのは難しい(たくさん試して,あたりを引くしかない)
- これによって block の生成を非常に高コストにすることができる
- nonce が正しいことを確認するのは簡単
- 電子署名と同様, 正当性の確認が簡単に行える
- nonce を見つけるのは難しい(たくさん試して,あたりを引くしかない)
- mineは誰でも行える
Block に nonce をつける
@dataclass
class Block:
time: float
transactions: Tuple[Transaction]
previous_hash: str
nonce: int = None
def json_dumps(self) -> str:
dct=dc.asdict(self)
dct["transactions"]=[t.json_dumps() for t in self.transactions]
return json.dumps(dct)
@classmethod
def json_loads(cls, string) -> Block:
dct=json.loads(string)
dct["transactions"]=tuple([Transaction.json_loads(t) for t in dct["transactions"]])
return cls(**dct)
def hash(self):
block_bytes=self.json_dumps().encode()
return hashlib.sha256(block_bytes).hexdigest()
nonceの検証コード
- blockのnonceは正しいか?
difficulty=3
def valid_proof(block):
return block.hash()[:difficulty] == "0" * difficulty
mine の実装
- あたりを引くまで手当たり次第試す
def mine(block):
nonce=0
block.nonce=nonce
while not valid_proof(block):
nonce += 1
block.nonce=nonce
return block
- ついでに verify_block も改良
-
previous_hash
は正しいか? - transactionは正当か?
- nonceは正しいか?
-
def verify_block(previous_block, block) -> bool:
is_correct_hash = previous_block.hash() == block.previous_hash
is_correct_transactions = all(map(verify_transaction, block.transactions))
is_correct_proof = valid_proof(block)
return is_correct_hash and is_correct_transactions and is_correct_proof
- BlockChain もちょっと改良
- 自身を str に変換する method をつける
class BlockChain(list):
def json_dumps(self) -> str:
return json.dumps([block.json_dumps() for block in self])
@classmethod
def json_loads(cls, string) -> BlockChain:
return cls([Block.json_loads(s) for s in json.loads(string)])
デモ: tranasction の発行から mining まで
genesis=Block(time(), (), "0")
block_chain=BlockChain([genesis])
# 取引
transactions=[]
transactions.append(alice.send(bob.address, 5))
transactions.append(bob.send(alice.address, 5))
- mine
previous_hash=block_chain[-1].hash()
block=mine(Block(time(), tuple(transactions), previous_hash))
block.nonce
2523
- かなり遅い実装をしているので,そこそこ時間がかかるかもしれない
- block を検証してみる
verify_block(block_chain[-1], block)
True
Node と Consensus Algorithm
- 前回までで分散管理の準備は整った
- 具体的に blockchain をつくるnodeを実装する
- node や sender, receiver が集まってnetworkを形成する
- network の動きは以下のようになる
- sender が transaction 生成.なるべく多くの node に broadcast する
- node は transaction を集めて block にまとめる
- mine
- 他の node にマイニングした block を broadcast する
- block を受け取った node は,block の正当性を確認し,double-spending のチェックをする
- チェックが通れば,次の block の作成を始める
- Node が複数いて,異なる BlockChain を持つときは,どのように正しさを判断するか?
- Consensus Algorithm
- 最も長い blockchain を,最も正しい blockchain とみなす
- なぜなら,chain が長ければ長いほど,多くの計算資源が注ぎ込まれたことになるから
- 多数決ではなく,one-CPU-one-vote
from uuid import uuid4
from time import sleep
from copy import deepcopy
import multiprocessing as mp
import random
from itertools import count
- node を実装する
- uuidでnodeの識別をする
- network まわりの細かい作業は,network interface に任せる
- あとで実装する
difficulty=3
class Node:
def __init__(self, network, genesis, uuid=None):
self.chain=BlockChain([genesis])
self.uuid=uuid or str(uuid4())
self.network=network
def work(self, verbose=False):
while True:
if not self.mine():
sleep(0.1) # wait due to no transactions
else:
if verbose: print(self.uuid, "Mined a block")
if self.add_block():
if verbose: print(self.uuid, "Added one block")
self.network.post_chain(self.chain, self.uuid) # publish my blockchain
if self.resolve_conflicts():
if verbose: print(self.uuid, "Change chain")
def mine(self):
transactions=self.network.get_transactions(self.uuid)
if len(transactions)==0:
return False # cannot mine due to no transactions
previous_hash=self.chain[-1].hash()
block=Block(time(), tuple(transactions), previous_hash)
for i, n in enumerate(self.random_generator()):
block.nonce=n
if self.verify_proof(block):
break
if i % 100 == 0 and self.add_block(): # someone mined a block
return True
if i % 100 == 0 and self.resolve_conflicts(): # someone mined a block
return True
self.network.broadcast_block(block, self.uuid)
return True # successfully mined
@staticmethod
def random_generator(step=None):
step = step or random.randint(1e4, 1e5)
for c in count():
for i in random.sample(range(c*step, (c+1)*step), step):
yield i
def add_block(self):
for block in self.network.get_blocks(self.uuid):
if self.verify_block(block):
self.chain.append(block)
if self.verify_chain(self.chain):
return True
else:
self.chain.pop(-1)
return False # no block is added
def resolve_conflicts(self):
"""Longest valid chain is authoritative"""
authoritative_chain=self.chain
for chain in self.network.get_neighbour_chains(self.uuid):
if not self.verify_chain(chain):
# node is incorrect
continue
if len(chain)>len(authoritative_chain):
# Longest valid chain is authoritative
authoritative_chain=deepcopy(chain)
self.chain=authoritative_chain
return self.chain is not authoritative_chain
@staticmethod
def verify_transaction(transaction):
if transaction.sign is None:
return False
h=SHA.new(transaction.str_data().encode())
verifier=PKCS1_v1_5.new(encode_key(transaction.sender_address))
return verifier.verify(h, binascii.unhexlify(transaction.sign))
@staticmethod
def verify_proof(block):
return block.hash()[:difficulty] == "0" * difficulty
def verify_block(self, block) -> bool:
is_correct_transactions = all(map(self.verify_transaction, block.transactions))
is_correct_proof = self.verify_proof(block)
return is_correct_transactions and is_correct_proof
def verify_chain(self, chain):
for i in range(len(chain)-1, 0, -1):
if not self.verify_block(chain[i]):
return False
if chain[i-1].hash() != chain[i].previous_hash:
return False
return True
-
全体的に,network の作業が追加されている
-
mine を少し変更している
- 複数の node が同じ方法でマイニングをするのは無駄が生じる可能性がある
- 例えば全く同じ Block のマイニングを複数のノードが開始した場合,確実に最も計算力の高い node がマイニングを成功させることになる
- 複数の node が同じ方法でマイニングをするのは無駄が生じる可能性がある
-
Wallet もネットワーク対応に
- transaction を発行したら,知っている node に transaction を送る
class Wallet:
def __init__(self, network, nodes=None):
key=RSA.generate(1024)
self.private_key = decode_key(key)
self.address = decode_key(key.publickey())
self.network=network
self.nodes=nodes or []
def sign_transaction(self, transaction):
signer=PKCS1_v1_5.new(encode_key(self.private_key))
h=SHA.new(transaction.str_data().encode())
return dc.replace(transaction, sign=signer.sign(h).hex())
def send(self, receiver_address, value):
transaction=Transaction(self.address, receiver_address, value)
self.broadcast(self.sign_transaction(transaction))
def broadcast(self, transaction):
for uuid in self.nodes:
self.network.post_transaction(transaction, uuid)
- network っぽいものを実装する
- Network class を中継点としてデータの遣り取りをする
- あとで multiprocess でノードを動かすために,データの管理は multiprocess.manager で行う
- flask とかで作っても面白いかもしれない
- 本質的でないので読み飛ばして良い
from typing import List
class Network:
def __init__(self, neighbours=()):
self.manager=mp.Manager()
self.chains=self.manager.dict()
self.blocks=self.manager.dict()
self.transactions=self.manager.dict()
self.neighbours=dict(neighbours)
def post_chain(self, chain, uuid):
self.chains[uuid]=chain.json_dumps()
def get_chain(self, uuid) -> BlockChain:
if uuid not in self.chains:
return []
return BlockChain.json_loads(self.chains[uuid])
def get_neighbour_chains(self, uuid) -> List[BlockChain]:
return [self.get_chain(neigh) for neigh in self.neighbours[uuid] if neigh != uuid]
def post_block(self, block, uuid):
if uuid not in self.blocks:
self.blocks[uuid]=self.manager.list([block.json_dumps()])
else:
self.blocks[uuid].append(block.json_dumps())
def broadcast_block(self, block, uuid):
for receiver in self.neighbours[uuid]:
self.post_block(block, receiver)
def get_blocks(self, uuid) -> List[Block]:
if uuid not in self.blocks:
return []
res=[Block.json_loads(s) for s in self.blocks[uuid]]
self.blocks[uuid][:]=[]
return res
def post_transaction(self, transaction, uuid):
if uuid not in self.transactions:
self.transactions[uuid]=self.manager.list([transaction.json_dumps()])
else:
self.transactions[uuid].append(transaction.json_dumps())
def get_transactions(self, uuid) -> Tuple[Transaction]:
if uuid not in self.transactions:
return []
res=[Transaction.json_loads(s) for s in self.transactions[uuid]]
self.transactions[uuid][:]=[]
return res
def shutdown(self):
self.manager.shutdown()
ノード 1 つの場合のテスト
genesis=Block(time(), (), "0")
uuid="test"
network=Network({uuid:[uuid]})
node=Node(network, genesis, uuid)
alice=Wallet(network, [uuid])
bob=Wallet(network, [uuid])
alice.send(bob.address, 5)
- これを実行してしばらく問題が起きなければ OK
- 適当に interrupt する
node.work(verbose=True)
- 後片付け
network.shutdown()
デモ: 分散 blockchain のデモ
- multiprocessing で network のデモを行う
genesis=Block(time(), (), "0")
network=Network()
- node 生成
- 適当に num_nodes を変更して遊ぶと良い
num_nodes=7
nodes=[str(uuid4()) for _ in range(num_nodes)]
- node の network の状態を定義する
- とりあえずは全結合にする
- つまり全ての Node 同士,直接やりとりできる
for uuid in nodes:
network.neighbours[uuid]=nodes
- multiprocess worker を作って起動する
def node_work(uuid):
node=Node(network, genesis, uuid)
node.work()
workers=[]
for uuid in nodes:
worker=mp.Process(target=node_work, args=(uuid,))
workers.append(worker)
worker.start()
- people を生成する
- これも適当に人数を変えて遊ぶと良い
num_people=5
people=[Wallet(network, random.sample(nodes, random.randint(1, len(nodes)))) for _ in range(num_people)]
blockchain の表示
- 各ノードの持つ blockchain がどの様になっているのか表示する
- 別 Process で行う
- Jupyter は賢いので output にきちんと表示してくれる
from IPython.display import clear_output
def draw_chains(network):
while True:
line=""
for k, v in network.chains.items():
line += f"(Node: {k[:4]}) "
chain=BlockChain.json_loads(v)
h=list(map(lambda x: x.hash()[:5], chain))
line += "--".join(map(lambda x: f"[{x}]", h))
line += "\n"
print(line.rstrip())
clear_output(True)
sleep(0.1)
worker=mp.Process(target=draw_chains, args=(network,))
worker.start()
(Node: 7301) [9e462]--[00001]
(Node: 0aab) [9e462]--[00001]
(Node: 76a6) [9e462]--[00001]
(Node: 381e) [9e462]--[00001]
(Node: 0887) [9e462]
(Node: 6207) [9e462]--[00001]
(Node: 0443) [9e462]
- 取引をして遊ぶ
- ランダムに取引が行われるコードを書いてみる
- 上の表示が変化する!
def generate_transactions():
for _ in range(random.randint(1,10)):
sender, receiver= random.sample(people, 2)
sender.send(receiver.address, random.randint(1, 100))
generate_transactions()
- network をちょと変えて遊んで見る
- ちょっと断線させる
for uuid in nodes:
network.neighbours[uuid]=random.sample(nodes, 3)
generate_transactions()
片付け
worker.terminate()
worker.kill()
for worker in workers:
worker.terminate()
worker.kill()
network.shutdown()
おわりに
- 本物の BitCoin とは異なる部分がたくさんあります
- 本記事の実装はこちら (Jupyter Notebook): https://github.com/tamuhey/python_blockchain
参考文献
-
Bitcoin white paper
- 専門外でも読みやすいです
-
Bitcoin Wiki
- 非常に情報量が多いです