LoginSignup
36
34

More than 5 years have passed since last update.

SQLBriteがテーブル変更を通知する仕組み

Posted at

この記事について

最近、SQLBriteというライブラリに触れ、データベースの変更がObservableとして購読できるということに感動しました。Githubのソースコードを見てみると、SQLBriteはたったの6ファイルで構成されていることがわかります。これなら読めるだろうと思ったので、ざっと目を通してみました。この記事では、SQLBriteのソースコードを読んでわかったSQLBriteがテーブル変更を通知する仕組みを説明します。

SQLBriteとは?

参考:
- Githubリポジトリ: https://github.com/square/sqlbrite
- 自分が書いた紹介記事: http://tech.furyu.jp/blog/?p=5233

SQLBriteとは、SQLiteOpenHelperとContentResolverをラップするSquare社製のライブラリです。SQLiteOpenHelperをラップしたBriteDatabase、ContentResolverをラップしたBriteContentResolverを提供してくれます。
BriteDatabaseは、BriteDatabase#createQuery(String table, String query)によって、QueryObservable extends Observable<Query>という購読できるクエリを返します。

createQuery
Observable<Query> users = db.createQuery("users", "SELECT * FROM users");
users.subscribe(new Action1<Query>() {
  @Override public void call(Query query) {
    Cursor cursor = query.run();
    // TODO parse data...
  }
});

https://github.com/square/sqlbrite#usage より引用)

INSERT, UPDATE, DELETEなどで、usersテーブルに変更があるとAction1#call(Query query)が呼ばれるので、Viewに反映することができます。

また、QueryObservable#mapToList(Func1<Cursor, T> mapper)メソッドでCursorからエンティティのリストに変換でき、

mapToList
db.createQuery("users", "SELECT * FROM users")
    .mapToList(new Func1<Cursor, User>() {
        @Override
        public User call(Cursor cursor) {
            long id = cursor.getLong(cursor.getColumnIndexOrThrow("_id"));
            String name = cursor.getString(cursor.getColumnIndexOrThrow("name"));
            return new User(id, name);
        }
    }).subscribe(new Action1<List<User>>() {
        @Override
        public void call(List<User> users) {
            // ユーザーリストの表示など
        }
    });

QueryObservable#mapToOne(Func1<Cursor, T> mapper)メソッドでCursorから単一のエンティティに変換できます。

mapToOne
db.createQuery("users", "SELECT count(*) user_count FROM users")
    .mapToOne(new Func1<Cursor, Long>() {
        @Override
        public Long call(Cursor cursor) {
            long count = cursor.getLong(cursor.getColumnIndexOrThrow("user_count"));
            return count;
        }
    }).subscribe(new Action1<Long>() {
        @Override
        public void call(Long count) {
            // ユーザー数の表示など
        }
    });

クラスの構成

SQLBriteを構成する主なクラスを列挙します。ファイル数は6つですが、内部クラスもあるので6以上あります。

クラス名 説明
SQLBrite BriteDatabaseの生成。
SQLBrite.Query 実行可能なクエリの抽象クラス
BriteDatabase SQLiteOpenHelperをラップしたクラス。createQueryメソッドでObservableを生成する。insert、delete、updateなどの処理や、トランザクションの管理も行う。
BriteDatabase.DatabaseQuery Queryを継承した具象クラス
BriteContentResolver ContentResolver(アプリケーション間のデータ共有に使われる)をラップしたクラス。購読できるQueryを生成する。
QueryObservable Observableを継承したクラス。mapToListメソッドやmapToOneメソッドを提供する。
QueryToListOperator Queryをエンティティのリストに変換するOperator
QueryToOneOperator Queryを単一のエンティティに変換するOperator

テーブル変更を通知する仕組み

テーブル変更を通知する仕組みは、BriteDatabaseクラスに書いてあります。private QueryObservable createQuery(Func1<Set<String>, Boolean>, String,
String...)
メソッドを見てみましょう。

@CheckResult @NonNull
private QueryObservable createQuery(Func1<Set<String>, Boolean> tableFilter, String sql,
    String... args) {
  if (transactions.get() != null) {
    throw new IllegalStateException("Cannot create observable query in transaction. "
        + "Use query() for a query inside a transaction.");
  }

  DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
  final Observable<Query> queryObservable = triggers //
      .filter(tableFilter) // Only trigger on tables we care about.
      .map(query) // DatabaseQuery maps to itself to save an allocation.
      .onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
      .startWith(query) //
      .observeOn(scheduler) //
      .onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
      .doOnSubscribe(ensureNotInTransaction);
  // TODO switch to .extend when non-@Experimental
  return new QueryObservable(new Observable.OnSubscribe<Query>() {
    @Override public void call(Subscriber<? super Query> subscriber) {
      queryObservable.unsafeSubscribe(subscriber);
    }
  });
}

通知の開始

購読されるObservableは以下の箇所で定義されています。

  DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
  final Observable<Query> queryObservable = triggers //
      .filter(tableFilter) // Only trigger on tables we care about.
      .map(query) // DatabaseQuery maps to itself to save an allocation.
      .onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
      .startWith(query) //
      .observeOn(scheduler) //
      .onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
      .doOnSubscribe(ensureNotInTransaction);

どうやら、triggersというものがObservableの源流となっているようです。では、このtriggersは何かと言うと、BriteDatabaseのメンバ変数として定義されています。

private final PublishSubject<Set<String>> triggers = PublishSubject.create();

ただの、PublishSubjectですね。ということは、triggers.onNextが呼ばれることでテーブル変更の通知が開始すると考えられます。triggers.onNextが呼ばれているのは以下の1箇所です。

void sendTableTrigger(Set<String> tables) {
  SqliteTransaction transaction = transactions.get();
  if (transaction != null) {
    transaction.addAll(tables);
  } else {
    if (logging) log("TRIGGER %s", tables);
    triggers.onNext(tables);
  }
}

sendTableTriggerメソッドは、BriteDatabaseのinsert, update, deleteメソッドなどで利用されています。たとえば、以下はinsertメソッドの中身です。

public long insert(@NonNull String table, @NonNull ContentValues values,
    @ConflictAlgorithm int conflictAlgorithm) {
  SQLiteDatabase db = getWriteableDatabase();

  if (logging) {
    log("INSERT\n  table: %s\n  values: %s\n  conflictAlgorithm: %s", table, values,
        conflictString(conflictAlgorithm));
  }
  long rowId = db.insertWithOnConflict(table, null, values, conflictAlgorithm);

  if (logging) log("INSERT id: %s", rowId);

  if (rowId != -1) {
    // Only send a table trigger if the insert was successful.
    sendTableTrigger(Collections.singleton(table));
  }
  return rowId;
}

INSERTに成功した場合に、sendTableTriggerメソッドにINSERTしたテーブル名を渡していますね。同様にUPDATEやDELETEの成功時にもsendTableTriggerが呼ばれます。これによって、テーブル変更があった時の通知が開始できるわけですね。

通知開始後の処理

購読されるObservableの定義箇所に戻りましょう。

  DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
  final Observable<Query> queryObservable = triggers //
      .filter(tableFilter) // Only trigger on tables we care about.
      .map(query) // DatabaseQuery maps to itself to save an allocation.
      .onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
      .startWith(query) //
      .observeOn(scheduler) //
      .onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
      .doOnSubscribe(ensureNotInTransaction);

テーブルのフィルタリング

triggersからは変更されたテーブル名がSet<String>として流れてきます。これを次に処理するのは.filter(tableFilter)です。tableFilterは、publicなcreateQueryメソッドで生成されています。

@CheckResult @NonNull
public QueryObservable createQuery(@NonNull final String table, @NonNull String sql,
    @NonNull String... args) {
  Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
    @Override public Boolean call(Set<String> triggers) {
      return triggers.contains(table);
    }

    @Override public String toString() {
      return table;
    }
  };
  return createQuery(tableFilter, sql, args);
}

createQueryの第1引数には、自分が通知を受けたいテーブル名を渡します。変更したテーブルの一覧に通知を受けたいテーブルが含まれていれば、次の処理ができるということです。

クエリの再実行

つづいて、.map(query)の説明です。queryは、

DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);

と定義されています。DatabaseQueryはQueryを継承したクラスです。Query#runによって、SQLクエリを実行してCursorを返します。

@Override public Cursor run() {
  if (transactions.get() != null) {
    throw new IllegalStateException("Cannot execute observable query in a transaction.");
  }

  long startNanos = nanoTime();
  Cursor cursor = getReadableDatabase().rawQuery(sql, args);

  if (logging) {
    long tookMillis = NANOSECONDS.toMillis(nanoTime() - startNanos);
    log("QUERY (%sms)\n  tables: %s\n  sql: %s\n  args: %s", tookMillis, tableFilter,
        indentSql(sql), Arrays.toString(args));
  }

  return cursor;
}

また、mapの中に入れられているのでわかると思いますが、DatabaseQueryはFunc1<Set<String>, Query>を実装しています。Func1#callで単純に値が流れてきたら自分自身を返すという実装をしています。

@Override public Query call(Set<String> ignored) {
  return this;
}

ここまでで、テーブル変更のトリガー発火→通知を受けたいテーブルだけ次の処理へ→Query(実体はDatabaseQuery) が流れる、という変更通知の仕組みが説明できました。

初回購読時の処理

テーブル変更があるとQueryが流れてくるという説明をずっとしていましたが、Queryが流れてきて欲しいタイミングとして忘れてはいけないのが、最初にsubscribeした時です。これについても、購読されるObservableの定義箇所で書かれています。

  DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
  final Observable<Query> queryObservable = triggers
      .filter(tableFilter)
      .map(query)
      .onBackpressureLatest()
      .startWith(query) // ←ここです!
      .observeOn(scheduler)
      .onBackpressureLatest()
      .doOnSubscribe(ensureNotInTransaction);

.startWith(query)で、初回購読時にはqueryをそのまま流すようにしています。

まとめ

この記事では、SQLBriteがテーブル変更を通知する仕組みを説明しました。ざっくり言うと、

  • INSERT, UPDATE, DELETE時などに、PublishSubject#onNext(Set<String>)で変更したテーブルを流す
  • tableFilterで今見ているテーブルだけにフィルタリングする
  • .map(query)でQueryを流す
  • .startWith(query)で初回購読時もQueryを流す

という仕組みでした。

感想

何かの仕組みの説明を言葉だけで書くのはかなり難しいと感じました。とくに、RxJavaは画像などを駆使して説明しないとイメージがつかみにくいですね。本当は、mapToList, mapToOneメソッドの仕組みやトランザクションの扱い方なども説明したかったのですが、力尽きてしまいました。

しかし、他の人の(しかもSquare社の人の)コードをじっくり読んだことで、かなり勉強になりました。自分の知らなかったこと(startWithメソッドなど)を知ることができて良かったです。皆さんも気になるライブラリがあれば、ソースコードを読んでみてはいかがでしょうか。

以上です。

36
34
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
34