Laravel Queue, Database拡張とドライバ追加のヒント

  • 3
    いいね
  • 0
    コメント

LaravelはFacadeやContainer以外にも、
Webアプリケーションで利用する多くの機能が用意されています。

非同期処理などに利用するQueueや、PDOを拡張したDatabaseコンポーネント(Eloquent, QueryBuilder)、
CacheやSession、AuthManagerなど、
多くのコンポーネントがデフォルトで用意されているドライバ以外の独自のドライバを簡単に追加して利用ができる様に設計されています。

今回はこれらを使ってQueueドライバを追加する方法を紹介します。1
追加するドライバはドキュメント指向型データベースのcouchbaseを利用します。2
Queueドライバを追加する前に、couchbaseの機能をフル活用するためにデータベースドライバを追加します。3
(Laravelに様々なドライバを追加するには、
それぞれのコンポーネントで用意されたManagerクラスを必ず利用しなければなりません。)

Extend Connection

データベースドライバを追加する場合に、Illuminate\Database\ConnectionInterfaceを実装する必要があります。
Illuminate\Database\Connection を継承するだけでも良いでしょう。
このConnectionクラスは接続と、Query発行時の動作をドライバにあわせて実装することができます。
QuryBuilderやSchemaBuilderの拡張、追加はここでは行わずに、
それぞれのBuilderクラスを継承する必要があります。
まずはConnectionに必要なものを用意していきます。

    /**
     * @return \CouchbaseCluster
     */
    protected function createConnection()
    {
        $this->setReconnector(function () {
            $this->connection = (new CouchbaseConnector)->connect($this->config);

            return $this;
        });

        return (new CouchbaseConnector)->connect($this->config);
    }

    /**
     * {@inheritdoc}
     */
    public function getDriverName()
    {
        return 'couchbase';
    }

    /**
     * @return \CouchbaseCluster
     */
    public function getCouchbase()
    {
        if (is_null($this->connection)) {
            $this->connection = $this->createConnection();
        }

        return $this->connection;
    }

ドライバを追加した場合は、ドライバ名を必ず返却する様にし、
切断時にリコネクトさせるためのリコネクタを用意する必要があります。
CouchbaseConnectorクラスは、
couchbaseのエクステンションのクラスである CouchbaseCluster クラスをラップしたものです。

couchbaseではtableではなく、bucketを利用するため、
用意されているtableメソッド(インターフェースに含まれています)を下記の様に実装し、
couchbaseでは利用できないTransactionなども実装します。

    /**
     * @param string $table
     *
     * @return QueryBuilder
     */
    public function table($table)
    {
        return $this->bucket($table)->query()->from($table);
    }

    /**
     * {@inheritdoc}
     */
    public function transaction(Closure $callback, $attempts = 1)
    {
        throw new NotSupportedException(__METHOD__);
    }

tableメソッドはクエリビルダを利用するために必ず Illuminate\Database\Query\Builder をreturnする必要があります。
他のメソッドなども同様にデータベースの機能に合わせて実装を行います。

Extend QueryBuilder

couchbaseではN1QLを利用するため、LaravelのQueryBuilderそのままでは動作させることが不可能なため、
Illuminate\Database\Query\Builder クラスを継承して拡張します。


    /**
     * @param array $column
     *
     * @return $this
     */
    public function returning(array $column = ['*'])
    {
        $this->returning = $column;

        return $this;
    }

    /**
     * Insert a new record into the database.
     *
     * @param array $values
     *
     * @return bool
     */
    public function insert(array $values)
    {
        if (empty($values)) {
            return true;
        }
        $values = $this->detectValues($values);
        $bindings = [];
        foreach ($values as $record) {
            foreach ($record as $key => $value) {
                $bindings[$key] = $value;
            }
        }

        $sql = $this->grammar->compileInsert($this, $values);

        return $this->connection->insert($sql, $bindings);
    }

    /**
     * supported N1QL upsert query.
     *
     * @param array $values
     *
     * @return bool|mixed
     */
    public function upsert(array $values)
    {
        if (empty($values)) {
            return true;
        }
        $values = $this->detectValues($values);
        $bindings = [];
        foreach ($values as $record) {
            foreach ($record as $key => $value) {
                $bindings[$key] = $value;
            }
        }

        $sql = $this->grammar->compileUpsert($this, $values);

        return $this->connection->upsert($sql, $bindings);
    }

couchbaseではinsert時にレコードの取得をreturning句を利用して指定することができます。
またMongoDBなどでもおなじみのupsertもサポートされているため、
拡張してこれらを追加します。
これでQueryBuilderからN1QLをコールできる様になりましたが、
Illuminate\Database\Query\Builder クラスはあくまで、Queryをオブジェクト指向ライクに記述するためのインターフェースであり、
実際のQueryを文字列にコンパイルするクラスは、 Illuminate\Database\Query\Grammer クラスが行います。

returningやupsertが利用できる様にこのクラスも拡張します。

Extend QueryGrammer

Databaseコンポーネントは、コンパイル時にプレフィックスにcompileがついたメソッドを実行します。
このルールに従ってN1QLに合わせてメソッドを実装します。

    /**
     * {@inheritdoc}
     *
     * notice: supported set query only
     */
    public function compileUpdate(Builder $query, $values)
    {
        // keyspace-ref:
        $table = $this->wrapTable($query->from);
        // use-keys-clause:
        $keyClause = $this->wrapKey($query->key);
        // returning-clause
        $returning = implode(', ', $query->returning);

        $columns = [];

        foreach ($values as $key => $value) {
            $columns[] = $this->wrap($key) . ' = ' . $this->parameter($value);
        }

        $columns = implode(', ', $columns);

        $joins = '';
        if (isset($query->joins)) {
            $joins = ' ' . $this->compileJoins($query, $query->joins);
        }
        $where = $this->compileWheres($query);

        return trim("update {$table} USE KEYS {$keyClause} {$joins} set $columns $where RETURNING {$returning}");
    }

    /**
     * {@inheritdoc}
     */
    public function compileInsert(Builder $query, array $values)
    {
        // keyspace-ref:
        $table = $this->wrapTable($query->from);
        // use-keys-clause:
        $keyClause = $this->wrapKey($query->key);
        // returning-clause
        $returning = implode(', ', $query->returning);

        if (!is_array(reset($values))) {
            $values = [$values];
        }
        $parameters = [];

        foreach ($values as $record) {
            $parameters[] = '(' . $this->parameterize($record) . ')';
        }
        $parameters = (!$keyClause) ? implode(', ', $parameters) : "({$keyClause}, \$parameters)";
        $keyValue = (!$keyClause) ? null : '(KEY, VALUE)';

        return "insert into {$table} {$keyValue} values $parameters RETURNING {$returning}";
    }

    /**
     * {@inheritdoc}
     *
     * @see http://developer.couchbase.com/documentation/server/4.1/n1ql/n1ql-language-reference/delete.html
     */
    public function compileDelete(Builder $query)
    {
        // keyspace-ref:
        $table = $this->wrapTable($query->from);
        // use-keys-clause:
        $keyClause = null;
        if ($query->key) {
            $key = $this->wrapKey($query->key);
            $keyClause = "USE KEYS {$key}";
        }
        // returning-clause
        $returning = implode(', ', $query->returning);
        $where = is_array($query->wheres) ? $this->compileWheres($query) : '';

        return trim("delete from {$table} {$keyClause} {$where} RETURNING {$returning}");
    }

    /**
     * @param QueryBuilder $query
     * @param array        $values
     *
     * @return string
     */
    public function compileUpsert(QueryBuilder $query, array $values)
    {
        // keyspace-ref:
        $table = $this->wrapTable($query->from);
        // use-keys-clause:
        $keyClause = $this->wrapKey($query->key);
        // returning-clause
        $returning = implode(', ', $query->returning);

        if (!is_array(reset($values))) {
            $values = [$values];
        }
        $parameters = [];

        foreach ($values as $record) {
            $parameters[] = '(' . $this->parameterize($record) . ')';
        }
        $parameters = (!$keyClause) ? implode(', ', $parameters) : "({$keyClause}, \$parameters)";
        $keyValue = (!$keyClause) ? null : '(KEY, VALUE)';

        return "UPSERT INTO {$table} {$keyValue} VALUES $parameters RETURNING {$returning}";
    }

この様にすることで、QueryBuilderのオブジェクトを利用し、GrammerクラスがN1QLに合わせてqueryを発行できる様になります。

前述したConnectionクラスは、ここでコンパイルされた文字列を利用して指定したデータベースへクエリを発行する仕組みとなっています。

Connectionクラスに戻り、実際にcouchbaseにクエリを投げる様にメソッドをオーバーライドします。

    /**
     * {@inheritdoc}
     */
    public function affectingStatement($query, $bindings = [])
    {
        return $this->run($query, $bindings, function ($me, $query, $bindings) {
            if ($me->pretending()) {
                return 0;
            }
            $query = \CouchbaseN1qlQuery::fromString($query);
            $query->consistency($this->consistency);
            $query->namedParams(['parameters' => $bindings]);
            $result = $this->executeQuery($query);
            $this->metrics = (isset($result->metrics)) ? $result->metrics : [];

            return (isset($result->rows[0])) ? $result->rows[0] : false;
        });
    }

実際にクエリを発行するメソッドはいくつかありますが、ここでは例としてaffectingStatementメソッドを紹介します。
このメソッドでcouchbaseにクエリを発行しますが、
consistencyの指定や、変数のバインドの機能はPDOと異なるため、中身は当然couchbaseに合わせて実装しなければなりません。
例えばcouchbaseはqueryの結果にドキュメントのmeta情報などが返却されます。
これら全てを返却してしまうと、他のdatabaseドライバと大きく挙動を変えてしまうため、
Eventを使ってリッスンできる様にするなどを行っています。

最後にServiceProviderを利用して、DatabaseManagerにcouchbaseドライバを追加します。

ServiceProvider実装例
 // add couchbase extension
 $this->app['db']->extend('couchbase', function ($config, $name) {
     return new CouchbaseConnection($config, $name);
 });

Managerクラスのextendメソッドでドライバを追加します。
これでdatabaseコンポーネントを通じて次の様にコールすることができる様になります。

N1QLquery
$value = [
    'click' => 'to edit',
    'content' => 'testing'
];
$key = 'insert:testing';
$result = \DB::connection('couchbase')
    ->table('testing')->key($key)->insert($value);

これでQueueドライバ追加の準備が整いました。
それではQueueドライバを追加していきましょう

Extend Queue

LaravelのQueueコンポーネントは、
Illuminate\Contracts\Queue\Factory,
Illuminate\Contracts\Queue\Monitor
の二つのインターフェースを実装したQueueManagerクラスを利用します。
Databaseコンポーネントのドライバ追加と大きくは変わりません。

このQueueManagerを使って追加するドライバは、
Connectorクラスは \Illuminate\Queue\Connectors\ConnectorInterface
Connectorのconnectメソッドの戻り値は CouchbaseQueue|\Illuminate\Contracts\Queue\Queue を実装したクラスでなければなりません。

Databaseドライバを組み合わせて実装する場合は次の様になります。

use Illuminate\Support\Arr;
use Illuminate\Queue\Connectors\ConnectorInterface;
use Illuminate\Database\ConnectionResolverInterface;

class CouchbaseConnector implements ConnectorInterface
{
    /** @var ConnectionResolverInterface */
    protected $connectionResolver;

    /**
     * CouchbaseConnector constructor.
     *
     * @param ConnectionResolverInterface $connectionResolver
     */
    public function __construct(ConnectionResolverInterface $connectionResolver)
    {
        $this->connectionResolver = $connectionResolver;
    }

    /**
     * @param array $config
     *
     * @return CouchbaseQueue|\Illuminate\Contracts\Queue\Queue
     */
    public function connect(array $config)
    {
        $connection = $this->connectionResolver->connection($config['driver']);

        return new CouchbaseQueue(
            $connection,
            $config['bucket'],
            $config['queue'],
            Arr::get($config, 'retry_after', 60)
        );
    }
}

指定したDatabaseドライバをQueueのコネクションに利用することができる様になります。
次に実際にQueueで利用されるメソッドを実装します。
ここではすでに前述したインターフェースを実装した Illuminate\Queue\DatabaseQueue クラスを継承し、
N1QLに合わせた実装に変更します。
(RDBMSのクエリを再利用できる、かつこれまでに簡単に紹介したQueryBuilder拡張を行っているため、
ほとんどがそのままで利用できる状態になっています)

use Illuminate\Queue\DatabaseQueue;
use Illuminate\Database\Query\Builder;
use Illuminate\Queue\Jobs\DatabaseJob;

class CouchbaseQueue extends DatabaseQueue
{
    /**  @var string */
    protected $table;

    /** @var CouchbaseConnection */
    protected $database;

    /**
     * {@inheritdoc}
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);
        if ($job = $this->getNextAvailableJob($queue)) {
            $job = $this->markJobAsReserved($job);

            return new DatabaseJob(
                $this->container, $this, $job, $queue
            );
        }

        return null;
    }

        // 省略
    /**
     * generate increment key
     *
     * @param int $initial
     *
     * @return int
     */
    protected function incrementKey($initial = 1)
    {
        $result = $this->database->openBucket($this->table)
            ->counter($this->identifier(), $initial, ['initial' => abs($initial)]);

        return $result->value;
    }

    /**
     * @param array $attributes
     *
     * @return string
     */
    protected function uniqueKey(array $attributes)
    {
        $array = array_only($attributes, ['queue', 'attempts', 'id']);

        return implode(':', $array);
    }

    /**
     * @return string
     */
    protected function identifier()
    {
        return __CLASS__ . ':sequence';
    }
}

couchbaseはRDBMSで利用可能なauto incrementは無いため、
ドキュメントの値を単純にincrementするcounterメソッドとkeyを組み合わせてほぼ同じ動作を行う様に実装します。4
このクラスでqueueで主に利用されるメソッドはpopメソッドになります。
引数に与えられる文字列が、Queue名になっており、

$ php artisan queue:work redis --queue=emails

などで指定されるものになります。
defaultで実行されない様にするために、検索されない様になっているのが理解できると思います。
他のメソッドは他のデータベースドライバとほぼ同等の動作をさせるために、
ドキュメントのロックやcasを利用する様になっています。

最後にDatabaseドライバ追加と同じ様に、ServiceProviderでQueueドライバを追加します。

protected function registerCouchbaseQueueDriver()
{
    /** @var QueueManager $queueManager */
    $queueManager = $this->app['queue'];
    $queueManager->addConnector('couchbase', function () {
        return new \Ytake\LaravelCouchbase\Queue\CouchbaseConnector($this->app['db']);
    });
}

追加後は、config/queue.phpに追加することで他のドライバと同様に動作させることができる様になります。

config/queue.php
    'connections' => [
        'couchbase' => [
            'driver' => 'couchbase',
            'bucket' => 'jobs',
            'queue' => 'default',
            'retry_after' => 90,
        ],
    ],
$ php artisan queue:work couchbase --queue=send_email

これで追加したqueueドライバが利用されるようになります。

最後に

フレームワークの機能拡張はこの様にして簡単に行うことができます。
他にもcontainerを通じてbindし直すなど多様な方法が利用できますが、
パッケージで公開、開発を行う場合は、
そのパッケージ内ではFacade, ヘルパーメソッドは使うべきでは無いことに注意しておきましょう。
Facadeの名前はconfig/app.phpで任意の名前に変更することができ、
ヘルパーメソッド(configやappなど)は、Containerを通じてデフォルトの挙動から大きく変更させることができるため、5
必ずフレームワークが利用しているContractインターフェースや、各コンポーネントのManagerクラスなどを
タイプヒントして実装することが望ましいです。
パッケージ公開時は必ずフレームワーク本体には依存しないコードを心掛けるとともに、
ユニットテストなど忘れずに必ず含めましょう。


  1. Laravel5.0~5.3までほぼ同様です。 

  2. couchbaseは一般的なKVSとしての機能と、SQLライクなN1QLとMapReduceを利用してドキュメントを検索することができます。 

  3. 尚、ここで紹介しているもののパッケージはLaravel-Couchbaseで利用できます 

  4. incrementKey メソッドがそれに当たります。 

  5. Eventはpeclに同名のエクステンションがあるなどしますので、Laravel4ではEventから名前を変更することが必須でした