3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Redisにコマンドを増やしてみた

Last updated at Posted at 2020-11-19

Redis

はじめに:@tmgauss

この記事は、東京大学工学部電子情報工学科及び電気電子工学科の実習「大規模ソフトウェアを手探る」のレポートとして書かれています。「大規模ソフトウェアを手探る」とは、世の中で実際に使われている大規模なオープンソースソフトウェアを改良し、機能拡張しようという実習・実験です。
我々のチームは数多のOSSの中から対象としてRedisを選択し、実際にプルリクエストを送るところまで完了しました。

何を達成したか:@tmgauss

実際に取り組んだことは以下の3つです。

  1. helpコマンドの修正
  2. lallコマンドの追加
  3. copyコマンドの追加
    • プルリクエスト申請中

11/17更新
copyコマンドのプルリクが通ってマージされました。

Redisとは:@tmgauss

Redisとは端的に言うと、Key-Value型のNoSQLデータベース(Pythonの辞書のようにkeyから連想されるvalueをとってくるイメージ)で、「REmote DIctionary Server」の略です。Redisには以下に示すような特徴があります。

  • Client-Serverモデルを採用
  • インメモリデータベース
    • 保存しなければ揮発してしまう
    • 高速たる所以のひとつ
  • C言語で書かれたOSS
    • 弊学科の公用語
  • 様々なデータ構造をサポート
    • 文字列型だけでなく、リストやハッシュなども
  • 様々な言語をサポート
    • 有名言語ならだいたいサポート
      ActionScript, C, C++, C#, Chicken Scheme, Clojure, Common Lisp, D, Dart, Erlang, Go, Haskell, Haxe, Io, Java, JavaScript (Node.js), Julia, Lua, Nim, Objective-C, OCaml, Perl, PHP, Pure Data, Python, R, Racket, Ruby, Rust, Scala, Smalltalk, Tcl
      

環境・ビルド:小林

開発環境

チームの各メンバーが改変したコードはチームのGitLabリポジトリを用いて共有していました。

GitLabを用いるにあたり、はじめにGitHubにあるRedisのリポジトリをローカルにcloneしてきました。

$ cd ~/redis
$ git clone https://github.com/redis/redis.git

次に、cloneしてきた内容をそのままチームのGitLabリポジトリにpushしようとしたのですが、zeroPaddedFilemode: contains zero-padded file modesなどのエラーが出てpushできませんでした。

この問題はgit fast-exportgit fast-importを用いて新しいローカルリポジトリを作成し、その新しいリポジトリからpushすることで解決しました。(stackoverflowの投稿を参考にしました。)

$ mkdir ~/new-redis
$ cd ~/new-redis
$ git init
$ cd ~/redis
$ git fast-export --all | (cd ~/new-redis && git fast-import)

ビルド

makeでビルドすることができ、make testで正常にビルドされたか確認した後、make installでインストールできます。

$ make
$ make test
$ make PREFIX=~/redis-install install

デバッガを使用するためには、makeの際にコンパイラオプションを指定するためmake CFLAGS="-g -O0"とする必要があります。

変更その1:helpコマンド:小林

概要

Redisのhelpコマンドは指定されたコマンドの内容が存在しないときに空行を返します。これを空行ではなくCommand not found!と表示するようにしました。

コードを読む

デバッガを使いながらhelpに関係のありそうな箇所を探すと、redis-cli.cの620行目付近にcliOutputHelp()という関数が見つかりました。

static void cliOutputHelp(int argc, char **argv) {
    int i, j, len;
    int group = -1;
    helpEntry *entry;
    struct commandHelp *help;

    if (argc == 0) {
        cliOutputGenericHelp();
        return;
    } else if (argc > 0 && argv[0][0] == '@') {
        len = sizeof(commandGroups)/sizeof(char*);
        for (i = 0; i < len; i++) {
            if (strcasecmp(argv[0]+1,commandGroups[i]) == 0) {
                group = i;
                break;
            }
        }
    }

    assert(argc > 0);
    for (i = 0; i < helpEntriesLen; i++) {
        entry = &helpEntries[i];
        if (entry->type != CLI_HELP_COMMAND) continue;

        help = entry->org;
        if (group == -1) {
            /* Compare all arguments */
            if (argc == entry->argc) {
                for (j = 0; j < argc; j++) {
                    if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
                }
                if (j == argc) {
                    cliOutputCommandHelp(help,1);
                }
            }
        } else {
            if (group == help->group) {
                cliOutputCommandHelp(help,0);
            }
        }
    }
    printf("\r\n");
}

この関数の動作を見ていきます。

helpの後にコマンドを指定しなかった場合はcliOutputGenericHelp()を呼び出してメッセージを表示させています。また、helpにはhelp <cmd>としてコマンドの詳細を確認する他にhelp @<group>として特定のデータ構造等に関連するコマンドの一覧を得る使い方もあります。後者の場合に指定したグループのインデックスを取得する操作を以下の部分で行っています。

if (argc == 0) {
    cliOutputGenericHelp();
    return;
} else if (argc > 0 && argv[0][0] == '@') {
    len = sizeof(commandGroups)/sizeof(char*);
    for (i = 0; i < len; i++) {
        if (strcasecmp(argv[0]+1,commandGroups[i]) == 0) {
            group = i;
            break;
        }
    }
}

その後、for (i = 0; i < helpEntriesLen; i++)以降の部分では用意されているエントリそれぞれについて、コマンドで指定されたものであればcliOutputCommandHelp()を呼び出して出力しています。group変数はhelp <cmd>とした場合は-1になっています。

if (group == -1) {
    /* Compare all arguments */
    if (argc == entry->argc) {
        for (j = 0; j < argc; j++) {
            if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
        }
        if (j == argc) {
            cliOutputCommandHelp(help,1);
        }
    }
} else {
    if (group == help->group) {
        cliOutputCommandHelp(help,0);
    }
}

コードを変更する

ここでの目的を振り返っておくと、helpコマンドで出力されるエントリが存在しない場合に代わりにメッセージを出力するというものでした。

コードを読んだ結果、cliOutputCommandHelp()が呼び出される回数と出力されるエントリの数が等しいことがわかりました。そこで、cliOutputCommandHelpCountという変数を用意しcliOutputCommandHelp()の呼び出し回数を数えておき、全てのエントリに対する出力処理が終わった時点でcliOutputCommandHelpCount0であればメッセージを出力するようにコードを追加しました。

static void cliOutputHelp(int argc, char **argv) {
    int i, j, len;
    int group = -1;
    int cliOutputCommandHelpCount = 0;
    helpEntry *entry;
    struct commandHelp *help;

    if (argc == 0) {
        cliOutputGenericHelp();
        return;
    } else if (argc > 0 && argv[0][0] == '@') {
        len = sizeof(commandGroups)/sizeof(char*);
        for (i = 0; i < len; i++) {
            if (strcasecmp(argv[0]+1,commandGroups[i]) == 0) {
                group = i;
                break;
            }
        }
    }

    assert(argc > 0);
    for (i = 0; i < helpEntriesLen; i++) {
        entry = &helpEntries[i];
        if (entry->type != CLI_HELP_COMMAND) continue;

        help = entry->org;
        if (group == -1) {
            /* Compare all arguments */
            if (argc == entry->argc) {
                for (j = 0; j < argc; j++) {
                    if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
                }
                if (j == argc) {
                    cliOutputCommandHelp(help,1);
                    cliOutputCommandHelpCount++;
                }
            }
        } else {
            if (group == help->group) {
                cliOutputCommandHelp(help,0);
                cliOutputCommandHelpCount++;
            }
        }
    }
    if(cliOutputCommandHelpCount == 0){
        printf("Command not found!");
    }
    printf("\r\n");
}

ビルドして確認してみるとうまく動きました。

> help notCommand
Command not found!

変更その2:lallコマンド:@tmgauss

現状の問題点

現在のRedisではリストの一覧表示としてlrangeコマンドが使えます。lrangeコマンドは本来

> lrange <key> <start> <end>

の形で <key> で参照されるリストの <start> 番目から <end> 番目までの要素を表示するものです。例えば、lrange mykey 0 5とすると、mykeyで参照されるリストの0番目から5番目の要素が表示されます。

ここでlrange mykey 0 -1とすると、リストの最初から最後までの要素を出力します。しかし、これだともしリストの要素数が100個だとすると、出力が荒れてしまうことになります。一方、例えばlrange mykey 0 9を入力すると表示は10個に制限されますが、全体要素数を取得するには別途llen mykeyを実行しなければなりません。つまり、

  • 要素の中身をちょっとみたい
  • 全体の要素数を確認したい

この2つの要求をたった1つのコマンドで実現できない、これが我々が着目した問題点です。

解決策の概要

 lall コマンドの仕様(最初の想定)は以下の通りです。

> lall <key>
1) 1
2) 2
...
10) 10
the list has more 12 element(s)

もし、リストの要素数(長さ)が10以下であれば、lrange <key> 0 -1と変わりません。

解決策の実装

1. サーバーの中身を探る

コマンドをredis-cli から redis-serverに送る流れについて

(gdb) bt
#0  redisNetWrite (c=0x1004052b0) at net.c:83
#1  0x000000010002a745 in redisBufferWrite (c=0x1004052b0, done=0x7ffeefbff544) at hiredis.c:969
#2  0x000000010002a964 in redisGetReply (c=0x1004052b0, reply=0x7ffeefbff5b0) at hiredis.c:1025
#3  0x000000010002005a in cliReadReply (output_raw_strings=0) at redis-cli.c:1275
#4  0x000000010002232d in cliSendCommand (argc=3, argv=0x100204f00, repeat=0) at redis-cli.c:1436
#5  0x0000000100021c5d in issueCommandRepeat (argc=3, argv=0x100204f00, repeat=1)
    at redis-cli.c:1933
#6  0x0000000100013cd9 in repl () at redis-cli.c:2129
#7  0x000000010000ee64 in main (argc=0, argv=0x7ffeefbff728) at redis-cli.c:8319

net.cの83行目において、send関数を用いて、serverのほうにコマンドを送っていることを発見しました。

deps/hiredis/net.c
ssize_t nwritten = send(c->fd, c->obuf, hi_sdslen(c->obuf), 0);

また、結果の受け取りのほうは、net.cの60行目付近の

deps/hiredis/net.c
ssize_t redisNetRead(redisContext *c, char *buf, size_t bufcap) {
...
ssize_t nread = recv(c->fd, buf, bufcap, 0);
...
}

において行っていると予想されます。

今までの処理をまとめるとhiredis.cの1013行目の以下の関数において

deps/hiredis/hiredis.c
int redisGetReply(redisContext *c, void **reply) {
    int wdone = 0;
    void *aux = NULL;

    /* Try to read pending replies */
    if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
        return REDIS_ERR;

    /* For the blocking context, flush output buffer and read reply */
    if (aux == NULL && c->flags & REDIS_BLOCK) {
        /* Write until done */
        /* ここでコマンドを送る */
        do {
            if (redisBufferWrite(c,&wdone) == REDIS_ERR)
                return REDIS_ERR;
        } while (!wdone);
        /* Read until there is a reply */
        /* ここで結果を受け取る*/
        do {
            if (redisBufferRead(c) == REDIS_ERR)
                return REDIS_ERR;

            /* We loop here in case the user has specified a RESP3
             * PUSH handler (e.g. for client tracking). */
            /* ここのredisGetReplyFromReader関数で受け取った結果の処理を行い結果をreplyに代入している。 */
            do {
                if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                    return REDIS_ERR;
            } while (redisHandledPushReply(c, aux));
        } while (aux == NULL);
    }

    /* Set reply or free it if we were passed NULL */
    if (reply != NULL) {
        *reply = aux;
    } else {
        freeReplyObject(aux);
    }

    return REDIS_OK;
}

(日本語のコメントアウト参照)

補足: デバッガについて

  • クライアントからコマンドを受け取った時のサーバーの処理について流れを追いました。macOSだとgdbのattachを使ってアプリを実行途中から追跡することができなかったので、学科pcのwindows wsl環境を使いました。最初makeがあまり上手くいきませんでしたが、適当にsudo apt install とかmake MALLOC=libcとかしていればうまくいきました。
  • 参考にしたのは以下のリンクです。

2. クライアントの中身を探る

ae.c のaeMainという関数のなかのwhileループでずっと待っているんだろうなということが予想されました。

src/ae.c
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

あとserverという変数にサーバー関連の情報がたくさんあり、cという変数が恐らくクライアント関連の情報を持っているとも思われます。redis-cliを起動しながらredis-serverをgdbで追って、適当にコマンドを打ちながら動作を追いました。

168         int ret = write(conn->fd, data, data_len);
(gdb) bt
#0  connSocketWrite (conn=0x5600ba1a4f20, data=0x5600ba1a51c8, data_len=5)
    at connection.c:168
#1  0x00005600b80a191c in connWrite (conn=0x5600ba1a4f20, 
    data=0x5600ba1a51c8, data_len=5) at connection.h:140
#2  0x00005600b80a53d6 in writeToClient (c=0x5600ba1a4f70, 
    handler_installed=0) at networking.c:1363
#3  0x00005600b80a5834 in handleClientsWithPendingWrites ()
    at networking.c:1481
#4  0x00005600b80aa819 in handleClientsWithPendingWritesUsingThreads ()
    at networking.c:3158
#5  0x00005600b808f4de in beforeSleep (eventLoop=0x5600ba123fa0)
    at server.c:2270
#6  0x00005600b808875a in aeProcessEvents (eventLoop=0x5600ba123fa0, 
    flags=27) at ae.c:391
#7  0x00005600b8088a8c in aeMain (eventLoop=0x5600ba123fa0) at ae.c:487
#8  0x00005600b8097e1f in main (argc=1, argv=0x7ffdf3e710f8)
    at server.c:5442

少なくともreplyに代入された結果はredis-cli.cのcliReadReply関数内の1328行目

src/redis-cli.c
out = cliFormatReply(reply, config.output, output_raw_strings);
fwrite(out,sdslen(out),1,stdout);
sdsfree(out);

において出力されていることが分かりました。

3. コマンドを登録する

  • server.cの中にあるstruct redisCommand redisCommandTable[]にコマンドの情報を足しました。lrangeは
src/server.c
    {"lrange",lrangeCommand,4,
     "read-only @list",
     0,NULL,1,1,1,0,0,0},

こんな風になっていたので、

src/server.c
    {"lall",lallCommand,2,
     "read-only @list",
     0,NULL,1,1,1,0,0,0},

としました。それぞれの項の説明はserver.cの先頭にあります。(4→2としたのは引数の個数。)

  • server.hに宣言を足す。
src/server.h
void lrangeCommand(client *c);

lrangeはこんなふうになっていたので、下のように足しました。

src/server.h
void lallCommand(client *c);

4. lallCommand関数の実装

lrangeCommand関数はt_list.cに書いてあったので、その直下にlallCommand を足すことにしました。

lrangeCommand

src/t_list.c
void lrangeCommand(client *c) {
    robj *o;
    long start, end, llen, rangelen;

    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
         || checkType(c,o,OBJ_LIST)) return;
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
    if (start > end || start >= llen) {
        addReply(c,shared.emptyarray);
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    /* Return the result in form of a multi-bulk reply */
    addReplyArrayLen(c,rangelen);
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
        listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);

        while(rangelen--) {
            listTypeEntry entry;
            listTypeNext(iter, &entry);
            quicklistEntry *qe = &entry.entry;
            if (qe->value) {
                addReplyBulkCBuffer(c,qe->value,qe->sz);
            } else {
                addReplyBulkLongLong(c,qe->longval);
            }
        }
        listTypeReleaseIterator(iter);
    } else {
        serverPanic("List encoding is not QUICKLIST!");
    }
}

だったので、lallCommandでは

src/t_list.c
void lallCommand(client *c) {
    robj *o;
    const int maxlen = 10;
    long llen, rangelen, leftlen;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
         || checkType(c,o,OBJ_LIST)) return;
    llen = listTypeLength(o);

    /* judge the last displayed index */
    if (llen <= maxlen) {
        rangelen = llen;
        leftlen = 0;
        addReplyArrayLen(c,rangelen);
    } else {
        rangelen = maxlen;
        leftlen = llen - maxlen;
        addReplyArrayLen(c,rangelen+1);
    }

    /* Return the result in form of a multi-bulk reply */
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
        listTypeIterator *iter = listTypeInitIterator(o, 0, LIST_TAIL);

        for (int i = 0; i < rangelen; i++) {
            listTypeEntry entry;
            listTypeNext(iter, &entry);
            quicklistEntry *qe = &entry.entry;
            if (qe->value) {
                addReplyBulkCBuffer(c,qe->value,qe->sz);
            } else {
                addReplyBulkLongLong(c,qe->longval);
            }
        }

        if (leftlen) {
            const int msg_body_len = 64;
            char msg_body[msg_body_len];
            snprintf(msg_body, msg_body_len, "[NOTE] The list has more %ld element(s)", leftlen);
            sds sdstext = sdsnew(msg_body);
            addReplyBulkSds(c, sdstext);
            sds_free(sdstext);
        }

        listTypeReleaseIterator(iter);
    } else {
        serverPanic("List encoding is not QUICKLIST!");
    }
}

とすることで、すべてのリストが返ってくるようにしました。

完成

> lall <key>
1) 1
2) 2
...
10) 10
11) the list has more 12 element(s)

結局、redisの保守性から、リスト形式のReplyは以上のようなx)が最初につく形式しか認められず、11) the list has more 12 element(s)で妥協することにしました。

変更その3:copyコマンド:@swamp0407

達成したこと

copyコマンドを実装しました。現在PRを出しているところです。
プルリクは全部で500行以上になりました。

やろうと思ったきっかけ

lallコマンドの実装が終わって、次に何をやろうか考えていたときにcopyコマンドのissueを見つけ、moveコマンドをちょっと変えればすぐにできるんじゃないかという話になったからです。
(issueにはThis is not trivialってかかれてたけど、やろうと思ったときはその理由がよくわかってませんでした。)

実装の流れ

実装の流れにおいては、失敗も含めて書いておきます。

1. 手探る

詳細

lallコマンドを追加したことで、コマンドの追加方法はわかってきていたので、とりあえず、copyコマンドと似ていることをしているmoveコマンドの実装を見てどのようなことをすればいいのか探りました。
また、issueを読み、どのような仕様のコマンドにすればいいかを理解しました。
 

仕様

COPY <key> <new-key> [DB <dest-db>] [REPLACE]
  • <key>の値を<new-key>にコピーする。
  • 返り値はInteger。成功したら1、失敗したら0
  • [DB <dest-db>]でコピー先のDB指定。デフォルトは<key>が存在するDB(つまり現在地)
  • [REPLACE]ですでに<new-key>が存在していた場合、置き換えるか指定。デフォルトは置き換えない。
    (COPY key new key DB 3 REPLACEみたいな感じにするのが正しいですが、PR出すまで、COPY key new key 3 REPLACEというふうに実装していました。)

2. 実装する

詳細

以上の仕様を踏まえた上で、下のような箇所を変更すればよいと分かりました。db.cはもともとmoveコマンドがあった箇所、server.cはコマンドの一覧があるところ、server.hはコマンドの宣言があるところであり、keyspace.tclはmoveコマンドのテストコードがあったところです。

おそらく変えないといけないところ
  • db.ccopyCommand関数を追加
  • server.cserver.hCOPYコマンドとcopyCommand関数を追加
  • tests/unit/keyspace.tclにテストコードを追加

moveコマンドはあるデータベースから、違うデータベースにキーを移動させるというキーです。これはredisのサーバの内部的には、keyに対応するオブジェクトを見つけて、そのデータベースから削除して、指定されたデータベースに加えるという作業をしています。そのため、データベースから削除するという作業を無くせば、コピーコマンドが実装できるのではないかと考えました。(実際には違います。)

実際にコードを書いてみた(1回目)

下のようなコードを書いた。

copyコマンドのソース
src/db.c
void copyCommand(client *c) {
    robj *o;
    redisDb *src, *dst;
    int srcid;
    long long dbid, expire;
    int replace = 0;
    if (server.cluster_enabled) {
        addReplyError(c,"COPY is not allowed in cluster mode");
        return;
    }
    
    if (c->argc >= 5){
        addReply(c,shared.syntaxerr);
        return;
    }

    if (c->argc == 4){
        if (!strcasecmp(c->argv[4]->ptr,"replace")) {
            replace = 1;
        }
    }

    /* Obtain source and target DB pointers 
     * Default target DB is the same as the source DB */
    src = c->db;
    srcid = c->db->id;
    dst = c->db;
    dbid = c->db->id;

    if (c->argc >= 3){
        if (getLongLongFromObject(c->argv[3],&dbid) == C_ERR ||
            dbid < INT_MIN || dbid > INT_MAX ||
            selectDb(c,dbid) == C_ERR)
        {
            addReply(c,shared.outofrangeerr);
            return;
        }
        dst = c->db;
        selectDb(c,srcid); /* Back to the source DB */
    }

    /* Check if the element exists and get a reference */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);
        return;
    }
    expire = getExpire(c->db,c->argv[1]);

    /* Return zero if the key already exists in the target DB */
    if (!replace && lookupKeyWrite(dst,c->argv[2]) != NULL) {
        addReply(c,shared.czero);
        return;
    }
    dbAdd(dst,c->argv[1],o);
    if (expire != -1) setExpire(c,dst,c->argv[1],expire);
    incrRefCount(o);

    /* OK! key copied */
    signalModifiedKey(c,dst,c->argv[2]);
    notifyKeyspaceEvent(NOTIFY_GENERIC,
                "copy_to",c->argv[2],dst->id);

    server.dirty++;
    addReply(c,shared.cone);
    }
コードを読んでも、よくわからないと思うので、大まかに説明します。
  • コピー先のDBを取得。
    getLongLongFromObject(c->argv[3],&dbid)

  • replaceオプションが指定されているか確認
    strcasecmp(c->argv[4]->ptr,"replace")

  • keyに対応するオブジェクトが存在するか確認し、存在したら取得
    o = lookupKeyWrite(c->db,c->argv[1]);

  • オブジェクトに期限が設定されているか確認し、存在したら取得。(オブジェクトに期限を設定でき、期限が来たら削除されるようになっている。)
    expire = getExpire(c->db,c->argv[1]);

  • オブジェクトを新しいDBにコピーし、もとのオブジェクトに期限があったら期限も設定する。
    dbAdd(dst,c->argv[1],o); 
    setExpire(c,dst,c->argv[1],expire);

  • ここらへんはあんまりCOPYコマンドに関係ないので説明しないが、PUB・SUBやクライエントキャッシュというものに関係するところ。
    signalModifiedKey(c,dst,c->argv[2]);
    notifyKeyspaceEvent(NOTIFY_GENERIC, "copy_to",c->argv[2],dst->id);

copyコマンドを実際に試してみる

makeしてcopyコマンドを試してみた。上のコードを少し変えた状態でmakeしてみたところ、コピーすることは可能だった。しかし、コピー元を書き換えると、コピー先も変わってしまうことが分かった。
つまり、参照しているオブジェクトが同じで、正しい意味でコピーできていないことが分かった。shallow copy的な感じ?

3. 手探る(2回目)

詳細

3.でテストをしたことで、コピーを実装するためには、新しいオブジェクトを作る必要があることが分かりました。
しかし、redisには様々なデータタイプがあり、それに合わせてオブジェクトの作り方が違うため、単純に一つの関数でコピーをすることはできません。これが、copyコマンドというありふれてそうなコマンドが実装されていなかった理由かもと思います。オブジェクトのコピーの仕方の手探りを進めていくと、object.cにdupStringObjectという関数を見つけました。これはStringタイプのオブジェクトをコピーするコマンドだったため、あとはList型、Set型、Sorted Set型、Stream型、Module型に対して似たような関数を作ればいいということが分かりました。

4. 実装する(2回目)

詳細

ここでは、実装したそれぞれの関数について見ていきます。
それぞれの関数のコードはコメント等含めて全部で、300行以上あるので、実装がとても大変でした。それぞれの関数の実装については、長くなるので、下に折りたたみで表示します。

それぞれの関数の実装について
dupListObject関数について

List型のオブジェクトをコピーする関数。

まずは、List型のオブジェクトを新しく作り要素を追加するコマンドについて調べました。
要素を追加するコマンドはlpushコマンドだったので、lpushコマンドの流れについて追いました。

次に、List型のオブジェクトの中身を見るコマンドについて調べました。これはlallコマンドを実装するときに参考にしたlrangeコマンドだったので理解するのは比較的流れを追うのは簡単でした。

以上を踏まえた上で、dupListObject関数の実装を開始しました。まずはlpushコマンドでやっているように新しくList型のオブジェクトを作り、lrangeでやっているように元のオブジェクトから要素を取り出し、lpushコマンドでやっているように値を新しいオブジェクトに追加しました。
完成形は下のよう。

dupListObject関数
src/object.c
robj *dupListObject(robj *o) {
    robj *lobj;
    long llen, start = 0;
    char buf[64];

    serverAssert(o->type == OBJ_LIST);

    switch (o->encoding) {
        case OBJ_ENCODING_QUICKLIST:
            lobj = createQuicklistObject();
            break;
        default:
            serverPanic("Wrong encoding.");
            break;
    }
    quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, server.list_compress_depth);
    llen = listTypeLength(o);
    listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
    while (llen--) {
        listTypeEntry entry;
        listTypeNext(iter, &entry);
        quicklistEntry *qe = &entry.entry;
        if (qe->value) {
            robj *obj = createObject(OBJ_STRING, sdsnewlen((const char *)qe->value, qe->sz));
            listTypePush(lobj, obj, LIST_TAIL);
        } else {
            ll2string(buf, 64, qe->longval);
            robj *obj = createObject(OBJ_STRING, sdsnewlen((const char *)buf, strlen(buf)));
            listTypePush(lobj, obj, LIST_TAIL);
        }
    }
    listTypeReleaseIterator(iter);
    return lobj;
}
dupSetObject関数について

Set型のオブジェクトをコピーする関数。
List型と同様に新しくオブジェクトを作り、要素を追加するコマンドを調べ、次に要素を取得するコマンドを調べました。(それぞれ、sadd関数と、smembers関数)
完成形は下のよう。

dupSetObject関数
src/object.c
robj *dupSetObject(robj *o) {
    robj *set;
    setTypeIterator *si;
    sds elesds;
    int64_t intobj;
    int encoding;

    serverAssert(o->type == OBJ_SET);

    /* Create a new set object that have the same encoding as the original object's encoding */
    switch (o->encoding) {
        case OBJ_ENCODING_INTSET:
            set = createIntsetObject();
            break;
        case OBJ_ENCODING_HT:
            set = createSetObject();
            break;
        default:
            serverPanic("Wrong encoding.");
            break;
    }
    si = setTypeInitIterator(o);
    while ((encoding = setTypeNext(si, &elesds, &intobj)) != -1) {
        if (encoding == OBJ_ENCODING_INTSET) {
            elesds = sdsfromlonglong(intobj);
            setTypeAdd(set, elesds);
            sdsfree(elesds);
        } else {
            setTypeAdd(set, elesds);
        }
    }
    setTypeReleaseIterator(si);
    return set;
}
dupZsetObject関数について

Zset型のオブジェクトをコピーする関数。
List型と同様に新しくオブジェクトを作り、要素を追加するコマンドを調べ、次に要素を取得するコマンドを調べました。(それぞれ、zadd関数と、zrange関数)
完成形は下のよう。

dupZsetObject関数
src/object.c
robj *dupZsetObject(robj *o) {
    robj *zobj;
    char buf[64];
    long llen, start = 0;
    int retflags = ZADD_NONE;

    serverAssert(o->type == OBJ_ZSET);
    
    /* Create a new zset object that have the same encoding as the original object's encoding */
    switch (o->encoding) {
        case OBJ_ENCODING_ZIPLIST:
            zobj = createZsetZiplistObject();
            break;
        case OBJ_ENCODING_SKIPLIST:
            zobj = createZsetObject();
            break;
        default:
            serverPanic("Wrong encoding.");
            break;
    }
    llen = zsetLength(o);
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = o->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        double score;

        eptr = ziplistIndex(zl, 2 * start);
        sptr = ziplistNext(zl, eptr);


        /* Extract score-element pair from an original zset object. 
         * add a score-element pair to a new zset object which encoding is ZIPLIST.*/
        while (llen--) {
            ziplistGet(eptr, &vstr, &vlen, &vlong);
            score = zzlGetScore(sptr);
            if (vstr == NULL) {
                ll2string(buf, 64, vlong);
                sds ele = sdsnewlen((const char *)buf, strlen(buf));
                zsetAdd(zobj, score, ele, &retflags, NULL);
                sdsfree(ele);
            } else {
                sds ele = sdsnewlen((const char *)vstr, vlen);
                zsetAdd(zobj, score, ele, &retflags, NULL);
                sdsfree(ele);
            }
            zzlNext(zl, &eptr, &sptr);
        }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds ele;

        /* Add a score-element pair to a new zset object which encoding is SKIPLIST. */
        ln = zsl->header->level[0].forward;
        while (llen--) {
            ele = ln->ele;
            zsetAdd(zobj, ln->score, ele, &retflags, NULL);
            ln = ln->level[0].forward;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return zobj;
}
dupHashObject関数について

Hash型のオブジェクトをコピーする関数。
List型と同様に新しくオブジェクトを作り、要素を追加するコマンドを調べ、次に要素を取得するコマンドを調べました。(それぞれ、hadd関数と、hget関数)
完成形はしたのよう。

dupHashObject関数
src/object.c
robj *dupHashObject(robj *o) {
    robj *hobj;
    char buf[64];
    hashTypeIterator *hi;

    serverAssert(o->type == OBJ_HASH);

    switch (o->encoding) {
        case OBJ_ENCODING_ZIPLIST:
            hobj = createHashObject();
            break;
        case OBJ_ENCODING_HT:
            hobj = createHashObject();
            hashTypeConvert(hobj, OBJ_ENCODING_HT);
            break;
        default:
            serverPanic("Wrong encoding.");
            break;
    }

    hi = hashTypeInitIterator(o);

    while (hashTypeNext(hi) != C_ERR) {
        if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
            unsigned char *vstr = NULL;
            unsigned int vlen = UINT_MAX;
            long long vll = LLONG_MAX;
            unsigned char *kstr = NULL;
            unsigned int klen = UINT_MAX;
            long long kll = LLONG_MAX;
            sds hkey, hvalue;

            /* Extract a key-value pair from an original hash object.*/
            hashTypeCurrentFromZiplist(hi, OBJ_HASH_KEY, &kstr, &klen, &kll);
            if (kstr) {
                hkey = sdsnewlen((const char *)kstr, klen);
            } else {
                ll2string(buf, 64, kll);
                hkey = sdsnewlen((const char *)buf, strlen(buf));
            }
            hashTypeCurrentFromZiplist(hi, OBJ_HASH_VALUE, &vstr, &vlen, &vll);
            if (vstr) {
                hvalue = sdsnewlen((const char *)vstr, vlen);
            } else {
                ll2string(buf, 64, vll);
                hvalue = sdsnewlen((const char *)buf, strlen(buf));
            }

            /* Add a key-value pair to a new hash object. */
            hashTypeSet(hobj, hkey, hvalue, HASH_SET_COPY);
            sdsfree(hkey);
            sdsfree(hvalue);
        } else if (hi->encoding == OBJ_ENCODING_HT) {
            sds hkey, hvalue;
            /* Extract a key-value pair from an original hash object.*/
            hkey = hashTypeCurrentFromHashTable(hi, OBJ_HASH_KEY);
            hvalue = hashTypeCurrentFromHashTable(hi, OBJ_HASH_VALUE);

            /* Add a key-value pair to a new hash object. */
            hashTypeSet(hobj, hkey, hvalue, HASH_SET_COPY);
            sdsfree(hkey);
            sdsfree(hvalue);
        } else {
            serverPanic("Unknown hash encoding");
        }
    }
    hashTypeReleaseIterator(hi);
    return hobj;
}
dupStreamObject関数について

Stream型のオブジェクトをコピーする関数。
List型と同様に新しくオブジェクトを作り、要素を追加するコマンドを調べ、次に要素を取得するコマンドを調べた。それに加えて、Stream型のデータはConsumer Groupという情報も持っているので、特に実装が大変でした。
完成形は下のよう。

dupStream関数
src/object.c
robj *dupStreamObject(robj *o) {
    robj *sobj;
    
    serverAssert(o->type == OBJ_STREAM);

    switch (o->encoding) {
        case OBJ_ENCODING_STREAM:
            sobj = createStreamObject();
            break;
        default:
            serverPanic("Wrong encoding.");
            break;
    }

    streamID id;
    stream *s;
    stream *new_s;
    streamID startid;
    streamID endid;
    int64_t numfields;
    int rev = 0;

    startid.ms = startid.seq = 0;
    endid.ms = endid.seq = UINT64_MAX;
    s = o->ptr;
    new_s = sobj->ptr;

    streamIterator si;
    streamIteratorStart(&si, s, &startid, &endid, rev);
    while (streamIteratorGetID(&si, &id, &numfields)) {
        /* Extract field-value pairs from an original stream object
         * and, add these to a new stream object. */
        robj **argv;
        argv = zmalloc(sizeof(robj *) * numfields * 2);
        for (int j = 0; j < numfields; j++) {
            unsigned char *key, *value;
            int64_t key_len, value_len;

            streamIteratorGetField(&si, &key, &value, &key_len, &value_len);
            argv[j * 2] = createObject(OBJ_STRING, sdsnewlen((const char *)key, key_len));
            argv[j * 2 + 1] = createObject(OBJ_STRING, sdsnewlen((const char *)value, value_len));
        }
        streamAppendItem(new_s, argv, numfields, &id, &id);
        zfree(argv);
    }
    streamIteratorStop(&si);

    if (s->cgroups == NULL) {
        /*Nothing to do*/
    } else {
        /* Consumer Groups */
        raxIterator ri_cgroups;
        raxStart(&ri_cgroups, s->cgroups);
        raxSeek(&ri_cgroups, "^", NULL, 0);
        while (raxNext(&ri_cgroups)) {
            streamCG *cg = ri_cgroups.data;
            streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
                                              ri_cgroups.key_len, &cg->last_id);
            /* If already exists */
            if (new_cg == NULL) {
                new_cg = raxFind(s->cgroups, ri_cgroups.key, ri_cgroups.key_len);
            }

            /* Consumers */
            raxIterator ri_consumers;
            raxStart(&ri_consumers, cg->consumers);
            raxSeek(&ri_consumers, "^", NULL, 0);
            while (raxNext(&ri_consumers)) {
                streamConsumer *consumer = ri_consumers.data;
                streamConsumer *new_consumer = raxFind(new_cg->consumers, 
                                (unsigned char *)consumer->name, sdslen(consumer->name));
                if (new_consumer == raxNotFound) {
                    new_consumer = zmalloc(sizeof(*new_consumer));
                    new_consumer->name = sdsdup(consumer->name);
                    new_consumer->pel = raxNew();
                    raxInsert(new_cg->consumers, (unsigned char *)new_consumer->name, 
                                    sdslen(new_consumer->name), new_consumer, NULL);
                    new_consumer->seen_time = consumer->seen_time;
                }

                /* Consumer PEL */
                raxIterator ri_cpel;
                raxStart(&ri_cpel, consumer->pel);
                raxSeek(&ri_cpel, "^", NULL, 0);
                while (raxNext(&ri_cpel)) {
                    unsigned char buf[sizeof(streamID)];
                    streamNACK *nack = ri_cpel.data;
                    streamDecodeID(ri_cpel.key,&id);
                    streamEncodeID(buf, &id);
                    /* Insert NACK. */
                    streamNACK *new_nack = zmalloc(sizeof(*new_nack));
                    new_nack->delivery_time = nack->delivery_time;
                    new_nack->delivery_count = nack->delivery_count;
                    new_nack->consumer = new_consumer;
                    int group_inserted = raxTryInsert(new_cg->pel, buf, sizeof(buf), new_nack, NULL);
                    int consumer_inserted = raxTryInsert(new_consumer->pel, buf, sizeof(buf), new_nack, NULL);

                    if (group_inserted == 0) {
                        streamFreeNACK(new_nack);
                        new_nack = raxFind(new_cg->pel, buf, sizeof(buf));
                        serverAssert(new_nack != raxNotFound);
                        raxRemove(new_nack->consumer->pel, buf, sizeof(buf), NULL);
                        /* Update the consumer and NACK metadata. */
                        new_nack->delivery_time = nack->delivery_time;
                        new_nack->delivery_count = nack->delivery_count;
                        new_nack->consumer = new_consumer;
                        /* Add the entry in the new consumer local PEL. */
                        raxInsert(new_consumer->pel, buf, sizeof(buf), new_nack, NULL);
                    } else if (group_inserted == 1 && consumer_inserted == 0) {
                        serverPanic(
                            "NACK half-created. Should not be possible.");
                    }
                }
                raxStop(&ri_cpel);
            }
            raxStop(&ri_consumers);
        }
        raxStop(&ri_cgroups);
    }
    return sobj;
}

上記のこと以外にもテストコードを実装したり、コードにコメントを書いたりしました。

おまけ

PRを出す

以上が実装できたので、PRを出しました。現在コメントもらった箇所を修正したりしているところです。
仕様の誤りや、テストコードをもっと追加したほうがいいこと、memcpyなどを使ったもっと効率的な実装の仕方があることなどを指摘され、修正しました。
マージしてもらえると嬉しいです。

11/17更新
PRが通ってマージされました!すごい達成感でした。結局、PRを出したときのコードから7割くらい変更することになりました。
PRを出した段階ではコピー元のオブジェクトから一つづつ要素を取り出して新しいオブジェクトに一つづつ加えるという実装でした。(それ以外のやり方わかりませんでした。)しかし、redisの内部ではただのbinaryとして保存しているオブジェクトもあることをreviewerの方に教えてもらい単純にmemcpyでまとめてコピーするというより効率的な実装に変更しました。

最終的に変更したコードはここから見れます。上のコードと比べて大分変更した形跡が見て取れると思います。

おまけ
PRの修正が終わりかけてきた段階でredisの内部実装についてかなり詳しくまとめてくれているDive Deep Redis ~ 入門から実装の確認まで ~というサイトを見つけました。実験中に手探ったような内容や知らなかった内容が沢山載っておりもっと早く知れていたらという気持ちになりました。もし来年度以降、実験でredisを手探ってみようという人がいたら参考にしてみたらいいと思います。

最後に:みんな

感想1

本当にチームのメンバーに助けられたので、この場を借りてまずメンバーに感謝したいと思います。実際、実習前までは本実習が「学生のお遊戯」にすぎず、実際にマージされることなどほとんどないのだろうと思っていました。しかし、実際に自分で手を動かしたり、他チームの話を聞くうちに、だんだんとOSSは意外と近い存在であることが分かりました。また、データベース自体もこれまで割とブラックボックスとして使っていましたが、中身はこれまで学習してきたことの実装にすぎないのだとも実感しました。これからもそのような「とりあえず触ってみる」精神を大切にしたいと思います。貴重な機会をありがとうございました!

感想2

最初は班員誰もredisについて詳しくなくて、僕自身は聞いたこともないレベルでしたが、COPYコマンドのPRを出せるところまで実装できてよかったです。サーバ・クライエント間のやり取りを追うことで、redisのサーバー側とクライエント側の実装について、どっちも結構詳しく慣れました。また、コピーコマンドの実装を通じて、redisのデータ型についてもかなり理解が深まりました。githubにもそんなに触れたことがなかったので、実験を始める前まではOSSにPRを出すことはまったく考えてませんでしたが、他の班の人がPRを出してるのを見て、やる気が出てきて頑張れました。楽しかったです。

感想3

今まではRedisのように大きなソフトウェアを触ったことはなかったため、取り掛かるまでは私に扱えるのかどうかとても不安でした。しかし、優秀なチームメンバーと優しいTAの方々のおかげで最後までやり切ることができ、非常に感謝しています。個人的には、初めて触るソフトウェアに対してデバッガを用いて構造を調べる過程を経験できたことが一番ためになりました。大規模なソフトウェアでも自分の理解できる範囲のコードを読めば機能追加はできるということがわかったので、今後他のOSSに興味を持った時には今回の経験を活かして積極的に開発に関わっていきたいと思います。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?