Dart で Torrentクライアントを作ろう(10+?) Dart で Mainline DHT を Codingしてみよう (4) FindNodesを実装してみよう

  • 0
    いいね
  • 0
    コメント
    この記事は最終更新日から1年以上が経過しています。

    FindNodesを実装してみよう


    通信ライブラリとして、hetimanetを使用しています。APIのインターフェイスは他の通信ライブラリと大きな違いはありません。なので、お使いのものと読み替えて読み進めてください。

    RootingTableに、FindeNodeの機能を追加する

    所持しているPeerInfoを指定されたKIDでソートして探す。指定されたKIDと距離が近いNodeの一覧を返します。

    class KRootingTable {
      ...
      ...
      ...
      List<KPeerInfo> findNode(KId id) {
        List<KPeerInfo> ids = [];
        for (KBucket b in _kBuckets) {
          for (KPeerInfo i in b.iterable) {
            ids.add(i);
          }
        }
        ids.sort((KPeerInfo a, KPeerInfo b) {
          return a.id.xor(id).compareTo(b.id.xor(id));
        });
        List<KPeerInfo> ret = [];
         for (KPeerInfo p in ids) {
          ret.add(p);
           if (ret.length >= _kBucketSize) {
             return ret;
          }
        }
        return ret;
      }
      ...
      ...
    }
    

    (1) KNodeはUDPサーバー機能を持つ。

    まずは、UDPを用いて、通信部分を書いてみましょう。Torrentの仕様では、DHTとして動作するPeerをNodeと読んでいます。
    DHTの通信を行う主体として、KNode class を定義することにします。

    UDP serverは、メッセージを受け取るメッセージはbencodeなのでした。パースしてそのMessageを使うclassに渡します。

    class KNode {
      bool _isStart = false;
      bool get isStart => _isStart;
      HetiSocketBuilder _socketBuilder = null;
      HetiUdpSocket _udpSocket = null;
    
      KNode(HetiSocketBuilder socketBuilder) {
        this._socketBuilder = socketBuilder;
      }
    
      Future start({String ip: "0.0.0.0", int port: 28080}) async {
        (_isStart != false ? throw "already started" : 0);
        _udpSocket = this._socketBuilder.createUdpClient();
        return _udpSocket.bind(ip, port, multicast: true).then((int v) {
          _udpSocket.onReceive().listen((HetiReceiveUdpInfo info) {
            KrpcMessage.decode(info.data, this).then((KrpcMessage message) {
              onReceiveMessage(info, message);
            });
          });
          _isStart = true;
        });
      }
    
      Future stop() async {
        if (_isStart == false || _udpSocket == null) {
          return null;
        }
        return _udpSocket.close().whenComplete(() {
          _isStart = false;
          _ai.stop(this);
        });
      }
    }
    

    (2) Krpc Messageをパースする機能を持つ

    MainLine DHT では、PeerとPeerの通信には、Bencodeが利用されます。
    すでにBencodeのパーサーは作成ずみなので、難しいことはないはずです。

    class KrpcMessage {
      KrpcMessage.fromMap(Map map) {
        _messageAsMap = map;
      }
    
      static Future<KrpcMessage> decode(List<int> data) async {
        Map<String, Object> messageAsMap = null;
        try {
          Object v = Bencode.decode(data);
          messageAsMap = v;
        } catch (e) {
          throw {};
        }
        return  new KrpcMessage.fromMap(messageAsMap);
      }
    }
    

    こんな感じです。あとは、必要に応じて、パースした結果を読み取るだけです。

    class KrpcMessage {
    ...
    ...
      List<int> get transactionId => _messageAsMap["t"]);
      String get transactionIdAsString => UTF8.decode(transactionId);
    
      //
      List<int> get messageType => _messageAsMap["y"];
      String get messageTypeAsString => UTF8.decode(messageType);
    
      //
      List<int> get query => _messageAsMap["q"];
    ...
    ...
    }
    

    こんな感じです。BEP5のスペックをみると結構ありますがも気長にコーディングしていけば、半日くらいで終わると思います。

    (3) メッセージを送信する機能を持つ

    class KNode {
     ..
     ..
      sendMessage(KrpcMessage message, String ip, int port) {
          return _udpSocket.send(message.messageAsBencode, ip, port);
      }
     ..
    }
    
    class FindNode {
    
      static int queryID = 0;
    
      static KrpcMessage createQuery(List<int> queryingNodesId, List<int> targetNodeId) {
        List<int> transactionId = UTF8.encode("fi${queryID++}");
        return new KrpcMessage.fromMap({"a": {"id": queryingNodesId, "target": targetNodeId}, "q": "find_node", "t": transactionId, "y": "q"});
      }
    
      static KrpcMessage createResponse(List<int> compactNodeInfo, List<int> queryingNodesId, List<int> transactionId) {
        return new KrpcMessage.fromMap({"r": {"id": queryingNodesId, "nodes": compactNodeInfo}, "t": transactionId, "y": "r"});
      }
    }
    

    メッセージの送信は、受信用に作成したUDPSocketを利用します。こうすることで、受信相手に、ポート番号を伝えることができます。

    (4) ネットワークへの参加用のコードを書く

    RootingTableに所持しているデータの中から、自分自信ともっとも近いKIDをもつPeerへFindNodeクエリを送信する

    class KNodeWorkFindNode {
      ...
      ...
      updateP2PNetworkWithoutClear(KNode node) {
        node.rootingtable.findNode(node.nodeId).then((List<KPeerInfo> infos) {
          for (KPeerInfo info in infos) {
            if (!_findNodesInfo.rawsequential.contains(info)) {
              _findNodesInfo.addLast(info);
              node.sendFindNodeQuery(info.ipAsString, info.port, node.nodeId.value).catchError((_) {});
            }
          }
        });
      }
      ...
      ...
    }
    

    レスポンスを受けとったら、もう一度繰り返す

    class KNodeWorkFindNode {
      ...
      ...
      onReceiveQuery(KNode node, HetiReceiveUdpInfo info, KrpcMessage query) {
        if (query.queryAsString == KrpcMessage.QUERY_FIND_NODE) {
          KrpcFindNode findNode = query.toFindNode();
          return node.rootingtable.findNode(findNode.targetAsKId).then((List<KPeerInfo> infos) {
            return node.sendFindNodeResponse(info.remoteAddress, info.remotePort, query.transactionId, KPeerInfo.toCompactNodeInfos(infos)).catchError((_) {});
          });
        }
        node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, query.nodeIdAsKId));
        updateP2PNetworkWithoutClear(node);
      }
      ...
      ..
    }
    

    一定時間だったら、もう一度アクセスする

    class KNodeWorkFindNode {
      ...
      ...
    
      onTicket(KNode node) {
        _findNodesInfo.clear();
        updateP2PNetworkWithoutClear(node);
      }
    

    (5) FindeNodeクエリ受け取ったから、レスポンスを返せるようにしよう

    class KNodeWorkFindNode {
      ...
      ...
      onReceiveResponse(KNode node, HetiReceiveUdpInfo info, KrpcMessage response) {
        if (response.queryFromTransactionId == KrpcMessage.QUERY_FIND_NODE) {
          KrpcFindNode findNode = response.toFindNode();
          for (KPeerInfo info in findNode.compactNodeInfoAsKPeerInfo) {
            node.rootingtable.update(info);
          }
        }
        node.rootingtable.update(new KPeerInfo(info.remoteAddress, info.remotePort, response.nodeIdAsKId));
        updateP2PNetworkWithoutClear(node);
      }
      ..
    }
    

    次回、次次回について

    次回、実際にGetPeersクエリを解説します。DHT上でファイルをシェアすることができるようになります。

    Ref

    PS

    Qiitaに投稿した、Torrentのチュートリアルと、この文章は、gitbookの方でメンテナンスしていきます。もう少し詳しく知りたい方はこちらを参照してください。

    Dart用の作成したTorrent Libraryは以下で公開しています。
    - https://github.com/kyorohiro/dart_hetimatorrent
    - https://github.com/kyorohiro/dart_hetimatorrent/tree/master/example/TorrentDHT


    Kyorohiro work

    http://kyorohiro.strikingly.com