この記事について
最近、SQLBriteというライブラリに触れ、データベースの変更がObservableとして購読できるということに感動しました。Githubのソースコードを見てみると、SQLBriteはたったの6ファイルで構成されていることがわかります。これなら読めるだろうと思ったので、ざっと目を通してみました。この記事では、SQLBriteのソースコードを読んでわかったSQLBriteがテーブル変更を通知する仕組みを説明します。
SQLBriteとは?
参考:
- Githubリポジトリ: https://github.com/square/sqlbrite
- 自分が書いた紹介記事: http://tech.furyu.jp/blog/?p=5233
SQLBriteとは、SQLiteOpenHelperとContentResolverをラップするSquare社製のライブラリです。SQLiteOpenHelperをラップしたBriteDatabase、ContentResolverをラップしたBriteContentResolverを提供してくれます。
BriteDatabaseは、BriteDatabase#createQuery(String table, String query)
によって、QueryObservable extends Observable<Query>
という購読できるクエリを返します。
Observable<Query> users = db.createQuery("users", "SELECT * FROM users");
users.subscribe(new Action1<Query>() {
@Override public void call(Query query) {
Cursor cursor = query.run();
// TODO parse data...
}
});
(https://github.com/square/sqlbrite#usage より引用)
INSERT, UPDATE, DELETEなどで、usersテーブルに変更があるとAction1#call(Query query)
が呼ばれるので、Viewに反映することができます。
また、QueryObservable#mapToList(Func1<Cursor, T> mapper)
メソッドでCursor
からエンティティのリストに変換でき、
db.createQuery("users", "SELECT * FROM users")
.mapToList(new Func1<Cursor, User>() {
@Override
public User call(Cursor cursor) {
long id = cursor.getLong(cursor.getColumnIndexOrThrow("_id"));
String name = cursor.getString(cursor.getColumnIndexOrThrow("name"));
return new User(id, name);
}
}).subscribe(new Action1<List<User>>() {
@Override
public void call(List<User> users) {
// ユーザーリストの表示など
}
});
QueryObservable#mapToOne(Func1<Cursor, T> mapper)
メソッドでCursor
から単一のエンティティに変換できます。
db.createQuery("users", "SELECT count(*) user_count FROM users")
.mapToOne(new Func1<Cursor, Long>() {
@Override
public Long call(Cursor cursor) {
long count = cursor.getLong(cursor.getColumnIndexOrThrow("user_count"));
return count;
}
}).subscribe(new Action1<Long>() {
@Override
public void call(Long count) {
// ユーザー数の表示など
}
});
クラスの構成
SQLBriteを構成する主なクラスを列挙します。ファイル数は6つですが、内部クラスもあるので6以上あります。
クラス名 | 説明 |
---|---|
SQLBrite | BriteDatabaseの生成。 |
SQLBrite.Query | 実行可能なクエリの抽象クラス |
BriteDatabase | SQLiteOpenHelperをラップしたクラス。createQueryメソッドでObservableを生成する。insert、delete、updateなどの処理や、トランザクションの管理も行う。 |
BriteDatabase.DatabaseQuery | Queryを継承した具象クラス |
BriteContentResolver | ContentResolver(アプリケーション間のデータ共有に使われる)をラップしたクラス。購読できるQuery を生成する。 |
QueryObservable | Observableを継承したクラス。mapToListメソッドやmapToOneメソッドを提供する。 |
QueryToListOperator | Queryをエンティティのリストに変換するOperator |
QueryToOneOperator | Queryを単一のエンティティに変換するOperator |
テーブル変更を通知する仕組み
テーブル変更を通知する仕組みは、BriteDatabaseクラスに書いてあります。private QueryObservable createQuery(Func1<Set<String>, Boolean>, String, String...)
メソッドを見てみましょう。
@CheckResult @NonNull
private QueryObservable createQuery(Func1<Set<String>, Boolean> tableFilter, String sql,
String... args) {
if (transactions.get() != null) {
throw new IllegalStateException("Cannot create observable query in transaction. "
+ "Use query() for a query inside a transaction.");
}
DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
final Observable<Query> queryObservable = triggers //
.filter(tableFilter) // Only trigger on tables we care about.
.map(query) // DatabaseQuery maps to itself to save an allocation.
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
.startWith(query) //
.observeOn(scheduler) //
.onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
.doOnSubscribe(ensureNotInTransaction);
// TODO switch to .extend when non-@Experimental
return new QueryObservable(new Observable.OnSubscribe<Query>() {
@Override public void call(Subscriber<? super Query> subscriber) {
queryObservable.unsafeSubscribe(subscriber);
}
});
}
通知の開始
購読されるObservableは以下の箇所で定義されています。
DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
final Observable<Query> queryObservable = triggers //
.filter(tableFilter) // Only trigger on tables we care about.
.map(query) // DatabaseQuery maps to itself to save an allocation.
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
.startWith(query) //
.observeOn(scheduler) //
.onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
.doOnSubscribe(ensureNotInTransaction);
どうやら、triggers
というものがObservableの源流となっているようです。では、このtriggers
は何かと言うと、BriteDatabaseのメンバ変数として定義されています。
private final PublishSubject<Set<String>> triggers = PublishSubject.create();
ただの、PublishSubjectですね。ということは、triggers.onNext
が呼ばれることでテーブル変更の通知が開始すると考えられます。triggers.onNext
が呼ばれているのは以下の1箇所です。
void sendTableTrigger(Set<String> tables) {
SqliteTransaction transaction = transactions.get();
if (transaction != null) {
transaction.addAll(tables);
} else {
if (logging) log("TRIGGER %s", tables);
triggers.onNext(tables);
}
}
sendTableTriggerメソッドは、BriteDatabaseのinsert, update, deleteメソッドなどで利用されています。たとえば、以下はinsertメソッドの中身です。
public long insert(@NonNull String table, @NonNull ContentValues values,
@ConflictAlgorithm int conflictAlgorithm) {
SQLiteDatabase db = getWriteableDatabase();
if (logging) {
log("INSERT\n table: %s\n values: %s\n conflictAlgorithm: %s", table, values,
conflictString(conflictAlgorithm));
}
long rowId = db.insertWithOnConflict(table, null, values, conflictAlgorithm);
if (logging) log("INSERT id: %s", rowId);
if (rowId != -1) {
// Only send a table trigger if the insert was successful.
sendTableTrigger(Collections.singleton(table));
}
return rowId;
}
INSERTに成功した場合に、sendTableTriggerメソッドにINSERTしたテーブル名を渡していますね。同様にUPDATEやDELETEの成功時にもsendTableTriggerが呼ばれます。これによって、テーブル変更があった時の通知が開始できるわけですね。
通知開始後の処理
購読されるObservableの定義箇所に戻りましょう。
DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
final Observable<Query> queryObservable = triggers //
.filter(tableFilter) // Only trigger on tables we care about.
.map(query) // DatabaseQuery maps to itself to save an allocation.
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
.startWith(query) //
.observeOn(scheduler) //
.onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
.doOnSubscribe(ensureNotInTransaction);
テーブルのフィルタリング
triggers
からは変更されたテーブル名がSet<String>
として流れてきます。これを次に処理するのは.filter(tableFilter)
です。tableFilter
は、publicなcreateQueryメソッドで生成されています。
@CheckResult @NonNull
public QueryObservable createQuery(@NonNull final String table, @NonNull String sql,
@NonNull String... args) {
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
@Override public Boolean call(Set<String> triggers) {
return triggers.contains(table);
}
@Override public String toString() {
return table;
}
};
return createQuery(tableFilter, sql, args);
}
createQueryの第1引数には、自分が通知を受けたいテーブル名を渡します。変更したテーブルの一覧に通知を受けたいテーブルが含まれていれば、次の処理ができるということです。
クエリの再実行
つづいて、.map(query)
の説明です。query
は、
DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
と定義されています。DatabaseQueryはQueryを継承したクラスです。Query#run
によって、SQLクエリを実行してCursorを返します。
@Override public Cursor run() {
if (transactions.get() != null) {
throw new IllegalStateException("Cannot execute observable query in a transaction.");
}
long startNanos = nanoTime();
Cursor cursor = getReadableDatabase().rawQuery(sql, args);
if (logging) {
long tookMillis = NANOSECONDS.toMillis(nanoTime() - startNanos);
log("QUERY (%sms)\n tables: %s\n sql: %s\n args: %s", tookMillis, tableFilter,
indentSql(sql), Arrays.toString(args));
}
return cursor;
}
また、mapの中に入れられているのでわかると思いますが、DatabaseQueryはFunc1<Set<String>, Query>
を実装しています。Func1#call
で単純に値が流れてきたら自分自身を返すという実装をしています。
@Override public Query call(Set<String> ignored) {
return this;
}
ここまでで、テーブル変更のトリガー発火→通知を受けたいテーブルだけ次の処理へ→Query(実体はDatabaseQuery) が流れる、という変更通知の仕組みが説明できました。
初回購読時の処理
テーブル変更があるとQueryが流れてくるという説明をずっとしていましたが、Queryが流れてきて欲しいタイミングとして忘れてはいけないのが、最初にsubscribeした時です。これについても、購読されるObservableの定義箇所で書かれています。
DatabaseQuery query = new DatabaseQuery(tableFilter, sql, args);
final Observable<Query> queryObservable = triggers
.filter(tableFilter)
.map(query)
.onBackpressureLatest()
.startWith(query) // ←ここです!
.observeOn(scheduler)
.onBackpressureLatest()
.doOnSubscribe(ensureNotInTransaction);
.startWith(query)
で、初回購読時にはqueryをそのまま流すようにしています。
まとめ
この記事では、SQLBriteがテーブル変更を通知する仕組みを説明しました。ざっくり言うと、
- INSERT, UPDATE, DELETE時などに、
PublishSubject#onNext(Set<String>)
で変更したテーブルを流す -
tableFilter
で今見ているテーブルだけにフィルタリングする -
.map(query)
でQueryを流す -
.startWith(query)
で初回購読時もQueryを流す
という仕組みでした。
感想
何かの仕組みの説明を言葉だけで書くのはかなり難しいと感じました。とくに、RxJavaは画像などを駆使して説明しないとイメージがつかみにくいですね。本当は、mapToList, mapToOneメソッドの仕組みやトランザクションの扱い方なども説明したかったのですが、力尽きてしまいました。
しかし、他の人の(しかもSquare社の人の)コードをじっくり読んだことで、かなり勉強になりました。自分の知らなかったこと(startWithメソッドなど)を知ることができて良かったです。皆さんも気になるライブラリがあれば、ソースコードを読んでみてはいかがでしょうか。
以上です。