46
50

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Python で Blockchain の実装をする

Last updated at Posted at 2019-01-09

概要

  • Bitcoin 論文を参考に Blockchain の実装をする

    • 天下り的にならないよう,徐々にシステムをパワーアップさせていく方針で作る
  • 目標は,参加者同士が競ってマイニングをし,コンセンサスを取りながらblockchainを作る様子を眺めること

    • 完成後のデモはこんな感じ
    • demo.gif
  • 本記事の実装はこちら (Jupyter Notebook): https://github.com/tamuhey/python_blockchain

※ Jupyterでの実装推奨

スタート:信用のある第三者を介する取引

  • まずは第三者を介した取引をシミュレーションしてみる
  • 取引の公正さは,全てこの第三者が保証する

取引はどのように行うか? - Transaction の実装

  • 取引記録を発行することで,取引を行う
    • Python3.7 から実装されたdataclassesを使ってみる
from __future__ import annotations
from dataclasses import dataclass
  • 個人の識別をaddressで行う
  • 取引額をvalueとする
@dataclass
class Transaction:
    sender_address: int
    receiver_address: int
    value: float
  • sender は Transaction を発行して取引を行う

Wallet の実装

  • とりあえず,Wallet は適当な 1 つのintを address として持つことにする
  • Transaction を発行する機能をもたせる
    • send method で取引記録を作成できる
@dataclass
class Wallet:
    address: int
    def send(self, receiver_address, value):
        return Transaction(self.address, receiver_address, value)

Ledger の実装

  • transaction は信用のある第三者(以降Bank と呼ぶことにする)に提出されることで成立する
    • Bank は Transaction を元帳(Ledger)で管理する
    • Bank が全ての Transaction を管理し,精査することで不正を防止する
  • Ledger はとりあえず単なるリストにする
Ledger=list

デモ: シンプルな取引

  • Alice と Bob が取引する様子をデモする
alice=Wallet(1) # create Alice's wallet
bob=Wallet(2)   # create Bob's wallet
ledger=Ledger() # create Bank's ledger
  • Alice が transaction を発行して Bank に登録することで,取引が成立する
transaction=alice.send(bob.address, 5) # Alice send `5` to Bob
ledger.append(transaction)
  • ここで例えば,第三者の信頼度がちょっと落ちたとする
    • 取引内容の改ざんはどのように防げばよいだろうか?

電子署名を用いた transaction の発行

  • sender は transaction に電子署名を行う
    • 今回は RSA を用いることにする
      • BitCoin はECDSAを用いている
    • これによって transaction の改ざんを検知することができる
  • 電子署名の公開鍵はどのように扱う?
    • 公開鍵を Wallet の address とする!

Transaction の改良 - 電子署名フィールドの追加

  • 電子署名できるようにする
    • signを追加
    • 署名の際,transaction を str で表すためのstr_data method も追加
  • ついでにデータを json string に変換する method を追加する
    • これは後々データのやり取りを簡単にするため
import json
import dataclasses as dc
@dataclass
class Transaction:
    sender_address: str
    receiver_address: str
    value: float
    sign: str = None

    def str_data(self) -> str:
        # return the str of data without `sign`
        d=dc.asdict(self)
        del d["sign"]
        return json.dumps(d)

    def json_dumps(self) -> str:
        return json.dumps(dc.asdict(self))
    @classmethod
    def json_loads(cls, string) -> Transaction:
        return cls(**json.loads(string))

Wallet の改良 - 電子署名機能の追加

from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA
from Crypto.PublicKey import RSA
import binascii
  • pycrypto の key instance を str に変換する helper function を実装する
    • str にしておく理由は後ほどわかる
def decode_key(key):
    if isinstance(key, str):
        return key
    return binascii.hexlify(key.exportKey(format='DER')).decode('ascii')
def encode_key(key):
    if isinstance(key, RSA._RSAobj):
        return key
    return RSA.importKey(binascii.unhexlify(key))

Wallet の改良

class Wallet:
    def __init__(self):
        key=RSA.generate(1024)
        self.private_key = decode_key(key)
        self.address = decode_key(key.publickey())

    def sign_transaction(self, transaction) -> Transaction:
        # generate signer from self private key
        signer=PKCS1_v1_5.new(encode_key(self.private_key))
        # hash the transaction
        h=SHA.new(transaction.str_data().encode())
        # add signature to the transaction, and return it
        return dc.replace(transaction, sign=signer.sign(h).hex())

    def send(self, receiver_address, value) -> Transaction:
        """Generate transaction, and add signature"""
        transaction=Transaction(self.address, receiver_address, value)
        return self.sign_transaction(transaction)
  • 秘密鍵private_keyの追加
  • address は対応する公開鍵とする
  • sign_transactionの追加
    • transaction を受け取り,電子署名付き transaction を返す
  • sendを変更
    • 電子署名付き transaction を生成できるようになった!

ついでに,transaction の電子署名をチェックする helper function を作る

  • signature の public key は transaction の address なので,transaction を見れば電子署名の正当性を判別できる
def verify_transaction(transaction) -> bool:
    if transaction.sign is None:
        return False
    # hash the transaction
    h=SHA.new(transaction.str_data().encode())
    # generate verifier with public key
    verifier=PKCS1_v1_5.new(encode_key(transaction.sender_address))
    # is the signature correct?
    return verifier.verify(h, binascii.unhexlify(transaction.sign))

デモ: RSA を用いた transaction の発行とチェック

先ほどと同じようにデモをしてみる

alice=Wallet() # now the address is automatically generated
bob=Wallet()
ledger=Ledger()
  • transaction を発行し,電子署名を行い,bank に提出する
transaction=alice.send(bob.address, 5)
# Bank appends the transaction to its ledger
ledger.append(transaction)
  • transaction の電子署名を確認してみる
verify_transaction(transaction)
True
  • データをちょと変えてみると,不正な transaction になるのがわかる
transaction.value=7
verify_transaction(transaction)
False
  • transaction の改竄リスクを低減することができた
    • しかし依然として,double-spending problem を防ぐには信頼のある第三者が必要となる
    • これはデータで通貨を作る際の最も大きな障害(データは簡単にコピーできるから)
  • すべての transaction が公開されていれば,double-spending は防げるのでは?
    • 公開されていれば receiver も記録をチェックできる

Timestamp Server による Transaction の公開

  • いくつかの transaction をまとめて,timestamp をつけ,blockで公開することを考える
    • 信頼のあるtimestamp serverが transaction をまとめて timestamp をつけ,電子署名をして公開する
    • ある特定の時刻にその transaction が存在したことが保証される
  • さらに,block には直前の timestamp の hash を含め,blockchainを作る
    • block を改ざんすると,以降の block は全て不正なものとなる
  • これで transaction をチェックして double-spending を防ぐ第三者は必要なくなった
    • transaction は公開されているから,誰でも検証することができる

Block の実装

  • str で表現できるよう,json method を追加する
  • ついでに hash method も実装する
from typing import Tuple, Sequence
import hashlib
@dataclass
class Block:
    time: float
    transactions: Tuple[Transaction]
    previous_hash: str
    sign: str = None

    def json_dumps(self) -> str:
        dct=dc.asdict(self)
        dct["transactions"]=[t.json_dumps() for t in self.transactions]
        return json.dumps(dct)
    @classmethod
    def json_loads(cls, string) -> Block:
        dct=json.loads(string)
        dct["transactions"]=tuple([Transaction.json_loads(t) for t in dct["transactions"]])
        return cls(**dct)

    def hash(self) -> str:
        block_bytes=self.json_dumps().encode()
        return hashlib.sha256(block_bytes).hexdigest()

Timestamp Server の実装

  • timestamp server は以下の流れでblockchainを作る

    1. transaction のリストを受け取る
    2. 一つ前の block の hash と timestamp をつけて Block にまとめる
    3. 電子署名をする
    4. 公開する
  • block の正当性の保証は timestamp server の電子署名で行う

    • server に RSA キーペアをもたせる
  • 最初の block は previous block がないため,特別なブロック(genesis)を用意する (参考)

  • blockchain は今回はただのリストにする

from time import time
class TimestampServer:
    def __init__(self):
        key=RSA.generate(1024)
        self.public_key=key.publickey()
        self.signer=PKCS1_v1_5.new(key)

        genesis=Block(time(), (), "0")
        self.block_chain=[genesis]

    def generate_block(self, transactions: Sequence[Transaction]):
        # generate block
        block=Block(time(), tuple(transactions), self.block_chain[-1].hash())

        # sign the block
        dct=dc.asdict(block)
        del dct["sign"]
        block.sign=self.signer.sign(SHA.new(json.dumps(dct).encode())).hex()

        # publish the block
        self.block_chain.append(block)

blockchain の正当性確認

  • blockchain の正当性を確認する helper function を用意する

    1. block の持つprevious_hashは正しいか?
    2. block に入っている transaction は正当か?
    3. timestamp server がつけた電子署名は正当か?
  • block の正当性確認

def verify_block(previous_block, block, timestamp_server_publickey) -> bool:
    is_correct_hash = previous_block.hash() ==  block.previous_hash
    is_correct_transactions = all(map(verify_transaction, block.transactions))

    dct=dc.asdict(block)
    del dct["sign"]
    h=SHA.new(json.dumps(dct).encode())
    verifier=PKCS1_v1_5.new(timestamp_server_publickey)
    is_correct_sign=verifier.verify(h, binascii.unhexlify(block.sign))
    return is_correct_hash and is_correct_transactions and is_correct_sign
  • Blockchain の正当性確認
def verify_blockchain(chain, timestamp_server_publickey):
    for i in range(len(chain)-1):
        if not verify_block(chain[-i-2],chain[-i-1],timestamp_server_publickey):
            return False
    return True

デモ: Timestamp server による Block の公開と BlockChain への登録

  • timestamp server をつくる
timestamp_server=TimestampServer()
  • 取引を複数回行う
transactions=[]
transactions.append(alice.send(bob.address, 5))
transactions.append(bob.send(alice.address, 7))
  • Timestamp server による Block の生成と公開
    • timestamp_server の blockchain は誰でも見ることができる
timestamp_server.generate_block(transactions)
  • 正当性確認
verify_blockchain(timestamp_server.block_chain, timestamp_server.public_key)
True
  • これで transaction のチェックをすべての人が行えるようになった
  • 確かにtransaction を検証する第三者は必要なくなったが,今度は信頼のある timestamp server が必要になってしまった
  • timestamp server を第三者に任せない方法がほしい!

Proof-of-Work の実装

  • これが blockchain の革新的要素
  • 先程は"信頼の置ける" timestamp server の電子署名が,Block の正当性を保証していた
  • では誰でも同じ署名ができれば,誰でも timestamp server の仕事ができるのでは?
    • 電子署名でやると,当然ブロックの正当性の証明にはならない..
  • HashCashを用いる

Block に nonce を加える

  • これが timestamp server の電子署名の代わりとなる
  • block のハッシュ値が適当な条件を満たすようにnonceを追加する
    • 例えば上 4 桁が 0
    • この制約をdifficultyと呼ぶ
    • difficulty は適当に調整される. 例えば bitcoin の場合は,block の追加の interval が 10 分程度になるように調整されるようだ
    • 今回は difficulty は固定にする
  • nonce を見つける作業がmine
    • nonce を見つけるのは難しい(たくさん試して,あたりを引くしかない)
      • これによって block の生成を非常に高コストにすることができる
    • nonce が正しいことを確認するのは簡単
      • 電子署名と同様, 正当性の確認が簡単に行える
  • mineは誰でも行える

Block に nonce をつける

@dataclass
class Block:
    time: float
    transactions: Tuple[Transaction]
    previous_hash: str
    nonce: int = None

    def json_dumps(self) -> str:
        dct=dc.asdict(self)
        dct["transactions"]=[t.json_dumps() for t in self.transactions]
        return json.dumps(dct)

    @classmethod
    def json_loads(cls, string) -> Block:
        dct=json.loads(string)
        dct["transactions"]=tuple([Transaction.json_loads(t) for t in dct["transactions"]])
        return cls(**dct)

    def hash(self):
        block_bytes=self.json_dumps().encode()
        return hashlib.sha256(block_bytes).hexdigest()

nonceの検証コード

  • blockのnonceは正しいか?
difficulty=3
def valid_proof(block):
    return block.hash()[:difficulty] == "0" * difficulty

mine の実装

  • あたりを引くまで手当たり次第試す
def mine(block):
    nonce=0
    block.nonce=nonce
    while not valid_proof(block):
        nonce += 1
        block.nonce=nonce
    return block
  • ついでに verify_block も改良
    1. previous_hashは正しいか?
    2. transactionは正当か?
    3. nonceは正しいか?
def verify_block(previous_block, block) -> bool:
    is_correct_hash = previous_block.hash() ==  block.previous_hash
    is_correct_transactions = all(map(verify_transaction, block.transactions))
    is_correct_proof = valid_proof(block)
    return is_correct_hash and is_correct_transactions and is_correct_proof
  • BlockChain もちょっと改良
    • 自身を str に変換する method をつける
class BlockChain(list):
    def json_dumps(self) -> str:
        return json.dumps([block.json_dumps() for block in self])
    @classmethod
    def json_loads(cls, string) -> BlockChain:
        return cls([Block.json_loads(s) for s in json.loads(string)])

デモ: tranasction の発行から mining まで

genesis=Block(time(), (), "0")
block_chain=BlockChain([genesis])

# 取引
transactions=[]
transactions.append(alice.send(bob.address, 5))
transactions.append(bob.send(alice.address, 5))
  • mine
previous_hash=block_chain[-1].hash()
block=mine(Block(time(), tuple(transactions), previous_hash))
block.nonce
2523
  • かなり遅い実装をしているので,そこそこ時間がかかるかもしれない
  • block を検証してみる
verify_block(block_chain[-1], block)
True

Node と Consensus Algorithm

  • 前回までで分散管理の準備は整った
  • 具体的に blockchain をつくるnodeを実装する
  • node や sender, receiver が集まってnetworkを形成する
  • network の動きは以下のようになる
  1. sender が transaction 生成.なるべく多くの node に broadcast する
  2. node は transaction を集めて block にまとめる
  3. mine
  4. 他の node にマイニングした block を broadcast する
  5. block を受け取った node は,block の正当性を確認し,double-spending のチェックをする
  6. チェックが通れば,次の block の作成を始める
  • Node が複数いて,異なる BlockChain を持つときは,どのように正しさを判断するか?
    • Consensus Algorithm
    • 最も長い blockchain を,最も正しい blockchain とみなす
      • なぜなら,chain が長ければ長いほど,多くの計算資源が注ぎ込まれたことになるから
      • 多数決ではなく,one-CPU-one-vote
from uuid import uuid4
from time import sleep
from copy import deepcopy
import multiprocessing as mp
import random
from itertools import count
  • node を実装する
    • uuidでnodeの識別をする
  • network まわりの細かい作業は,network interface に任せる
    • あとで実装する
difficulty=3
class Node:
    def __init__(self, network, genesis, uuid=None):
        self.chain=BlockChain([genesis])
        self.uuid=uuid or str(uuid4())
        self.network=network

    def work(self, verbose=False):
        while True:
            if not self.mine():
                sleep(0.1) # wait due to no transactions
            else:
                if verbose: print(self.uuid, "Mined a block")
            if self.add_block():
                if verbose: print(self.uuid, "Added one block")
            self.network.post_chain(self.chain, self.uuid) # publish my blockchain
            if self.resolve_conflicts():
                if verbose: print(self.uuid, "Change chain")

    def mine(self):
        transactions=self.network.get_transactions(self.uuid)
        if len(transactions)==0:
            return False # cannot mine due to no transactions
        previous_hash=self.chain[-1].hash()
        block=Block(time(), tuple(transactions), previous_hash)

        for i, n in enumerate(self.random_generator()):
            block.nonce=n
            if self.verify_proof(block):
                break

            if i % 100 == 0 and self.add_block(): # someone mined a block
                return True
            if i % 100 == 0 and self.resolve_conflicts(): # someone mined a block
                return True

        self.network.broadcast_block(block, self.uuid)
        return True # successfully mined

    @staticmethod
    def random_generator(step=None):
        step = step or random.randint(1e4, 1e5)
        for c in count():
            for i in random.sample(range(c*step, (c+1)*step), step):
                yield i

    def add_block(self):
        for block in self.network.get_blocks(self.uuid):
            if self.verify_block(block):
                self.chain.append(block)
                if self.verify_chain(self.chain):
                    return True
                else:
                    self.chain.pop(-1)
        return False # no block is added

    def resolve_conflicts(self):
        """Longest valid chain is authoritative"""
        authoritative_chain=self.chain
        for chain in self.network.get_neighbour_chains(self.uuid):
            if not self.verify_chain(chain):
                # node is incorrect
                continue
            if len(chain)>len(authoritative_chain):
                # Longest valid chain is authoritative
                authoritative_chain=deepcopy(chain)
        self.chain=authoritative_chain
        return self.chain is not authoritative_chain

    @staticmethod
    def verify_transaction(transaction):
        if transaction.sign is None:
            return False
        h=SHA.new(transaction.str_data().encode())
        verifier=PKCS1_v1_5.new(encode_key(transaction.sender_address))
        return verifier.verify(h, binascii.unhexlify(transaction.sign))

    @staticmethod
    def verify_proof(block):
        return block.hash()[:difficulty] == "0" * difficulty

    def verify_block(self, block) -> bool:
        is_correct_transactions = all(map(self.verify_transaction, block.transactions))
        is_correct_proof = self.verify_proof(block)
        return is_correct_transactions and is_correct_proof

    def verify_chain(self, chain):
        for i in range(len(chain)-1, 0, -1):
            if not self.verify_block(chain[i]):
                return False
            if chain[i-1].hash() != chain[i].previous_hash:
                return False
        return True
  • 全体的に,network の作業が追加されている

  • mine を少し変更している

    • 複数の node が同じ方法でマイニングをするのは無駄が生じる可能性がある
      • 例えば全く同じ Block のマイニングを複数のノードが開始した場合,確実に最も計算力の高い node がマイニングを成功させることになる
  • Wallet もネットワーク対応に

    • transaction を発行したら,知っている node に transaction を送る
class Wallet:
    def __init__(self, network, nodes=None):
        key=RSA.generate(1024)
        self.private_key = decode_key(key)
        self.address = decode_key(key.publickey())
        self.network=network
        self.nodes=nodes or []

    def sign_transaction(self, transaction):
        signer=PKCS1_v1_5.new(encode_key(self.private_key))
        h=SHA.new(transaction.str_data().encode())
        return dc.replace(transaction, sign=signer.sign(h).hex())

    def send(self, receiver_address, value):
        transaction=Transaction(self.address, receiver_address, value)
        self.broadcast(self.sign_transaction(transaction))

    def broadcast(self, transaction):
        for uuid in self.nodes:
            self.network.post_transaction(transaction, uuid)
  • network っぽいものを実装する
    • Network class を中継点としてデータの遣り取りをする
  • あとで multiprocess でノードを動かすために,データの管理は multiprocess.manager で行う
  • flask とかで作っても面白いかもしれない
  • 本質的でないので読み飛ばして良い
from typing import List
class Network:
    def __init__(self, neighbours=()):
        self.manager=mp.Manager()
        self.chains=self.manager.dict()
        self.blocks=self.manager.dict()
        self.transactions=self.manager.dict()
        self.neighbours=dict(neighbours)

    def post_chain(self, chain, uuid):
        self.chains[uuid]=chain.json_dumps()

    def get_chain(self, uuid) -> BlockChain:
        if uuid not in self.chains:
            return []
        return BlockChain.json_loads(self.chains[uuid])

    def get_neighbour_chains(self, uuid) -> List[BlockChain]:
        return [self.get_chain(neigh) for neigh in self.neighbours[uuid] if neigh != uuid]

    def post_block(self, block, uuid):
        if uuid not in self.blocks:
            self.blocks[uuid]=self.manager.list([block.json_dumps()])
        else:
            self.blocks[uuid].append(block.json_dumps())

    def broadcast_block(self, block, uuid):
        for receiver in self.neighbours[uuid]:
            self.post_block(block, receiver)

    def get_blocks(self, uuid) -> List[Block]:
        if uuid not in self.blocks:
            return []
        res=[Block.json_loads(s) for s in self.blocks[uuid]]
        self.blocks[uuid][:]=[]
        return res

    def post_transaction(self, transaction, uuid):
        if uuid not in self.transactions:
            self.transactions[uuid]=self.manager.list([transaction.json_dumps()])
        else:
            self.transactions[uuid].append(transaction.json_dumps())

    def get_transactions(self, uuid) -> Tuple[Transaction]:
        if uuid not in self.transactions:
            return []
        res=[Transaction.json_loads(s) for s in self.transactions[uuid]]
        self.transactions[uuid][:]=[]
        return res

    def shutdown(self):
        self.manager.shutdown()

ノード 1 つの場合のテスト

genesis=Block(time(), (), "0")
uuid="test"
network=Network({uuid:[uuid]})
node=Node(network, genesis, uuid)
alice=Wallet(network, [uuid])
bob=Wallet(network, [uuid])
alice.send(bob.address, 5)
  • これを実行してしばらく問題が起きなければ OK
    • 適当に interrupt する
node.work(verbose=True)
  • 後片付け
network.shutdown()

デモ: 分散 blockchain のデモ

  • multiprocessing で network のデモを行う
genesis=Block(time(), (), "0")
network=Network()
  • node 生成
  • 適当に num_nodes を変更して遊ぶと良い
num_nodes=7
nodes=[str(uuid4()) for _ in range(num_nodes)]
  • node の network の状態を定義する
  • とりあえずは全結合にする
    • つまり全ての Node 同士,直接やりとりできる
for uuid in nodes:
    network.neighbours[uuid]=nodes
  • multiprocess worker を作って起動する
def node_work(uuid):
    node=Node(network, genesis, uuid)
    node.work()
workers=[]
for uuid in nodes:
    worker=mp.Process(target=node_work, args=(uuid,))
    workers.append(worker)
    worker.start()
  • people を生成する
  • これも適当に人数を変えて遊ぶと良い
num_people=5
people=[Wallet(network, random.sample(nodes, random.randint(1, len(nodes)))) for _ in range(num_people)]

blockchain の表示

  • 各ノードの持つ blockchain がどの様になっているのか表示する
  • 別 Process で行う
    • Jupyter は賢いので output にきちんと表示してくれる
from IPython.display import clear_output
def draw_chains(network):
    while True:
        line=""
        for k, v in network.chains.items():
            line += f"(Node: {k[:4]}) "
            chain=BlockChain.json_loads(v)
            h=list(map(lambda x: x.hash()[:5], chain))
            line += "--".join(map(lambda x: f"[{x}]", h))
            line += "\n"
        print(line.rstrip())
        clear_output(True)
        sleep(0.1)

worker=mp.Process(target=draw_chains, args=(network,))
worker.start()
(Node: 7301) [9e462]--[00001]
(Node: 0aab) [9e462]--[00001]
(Node: 76a6) [9e462]--[00001]
(Node: 381e) [9e462]--[00001]
(Node: 0887) [9e462]
(Node: 6207) [9e462]--[00001]
(Node: 0443) [9e462]
  • 取引をして遊ぶ
    • ランダムに取引が行われるコードを書いてみる
    • 上の表示が変化する!
def generate_transactions():
    for _ in range(random.randint(1,10)):
        sender, receiver= random.sample(people, 2)
        sender.send(receiver.address, random.randint(1, 100))
generate_transactions()
  • network をちょと変えて遊んで見る
    • ちょっと断線させる
for uuid in nodes:
    network.neighbours[uuid]=random.sample(nodes, 3)
generate_transactions()

片付け

worker.terminate()
worker.kill()
for worker in workers:
    worker.terminate()
    worker.kill()
network.shutdown()

おわりに

参考文献

46
50
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
46
50

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?