2
1

More than 3 years have passed since last update.

[Android] Jetpack DataStoreのRead/Writeについて読んだので覚え書き

Posted at

はじめに

Jetpack DataStoreがどうやってデータの管理をしているのかを知っておこうと思って、ちょっとソースを読んでみたのでその覚え書きです。
自身のメモ的な要素が多分に含まれております。予めご了承いただければ幸いです。

ソースについて

今回読んだソースコードは2020/10/30時点のものとなります。
DataStoreは現時点ではAlphaなのですぐに実装が様変わりすることも十分あるかと思います。

また、DataStoreにはPreferences DataStoreProto DataStoreがありますが、どちらもCoreの部分は同じコードで動いていて、データの扱い方がちょっと違う(Preferences DataStoreでは内部で汎用的なProtoファイルを定義して処理をしている)だけっぽかったので、今回は共通部分に関してのものになります。
なのでPreferencesの方はあまり考慮せずに書いてます

Utils

今回特に詳細述べませんが出現するクラスは以下となります。

  • DataAndHash: データ自体と、そのデータのHash値をセットにしたdata class意図しない改ざんなどが起こっていないことを判断するためのクラス
  • UncloseableOutputStream: 書き込み時に、DataStore利用者がstreamをCloseできないようにするためのWrapperクラス
  • ConflatedBroadcastChannel: 読み込まれたり更新されたデータが流れてくるChannel

Read/Write実行の起点

そもそものDataStoreの初期化はコレ

datastore/datastore/src/main/java/androidx/datastore/DataStoreFactory.kt
public fun <T> Context.createDataStore(...): DataStore<T> = DataStoreFactory.create(...)

そしてDataStoreFactory.create(...)の定義はココ

datastore/datastore-core/src/main/java/androidx/datastore/core/DataStoreFactory.kt
public fun <T> create(...): DataStore<T> = SingleProcessDataStore(...)

このSingleProcessDataStoreが今回の目的物です。
すべての変更(read/write)はクラス内で生成するactor: SendChannel<Message<T>>内で処理されます。

  private val actor: SendChannel<Message<T>> = scope.actor(
    capacity = UNLIMITED
  ) {
    try {
      messageConsumer@ for (msg in channel) {
      ...

      try {
        readAndInitOnce(msg.dataChannel) // Read実行
      } catch (ex: Throwable) {
        resetDataChannel(ex)
        continue@messageConsumer
      }

      // We have successfully read data and sent it to downstreamChannel.

      if (msg is Message.Update) {
        msg.ack.completeWith(
          runCatching {
            transformAndWrite(msg.transform, downstreamChannel()) // Write実行
          }
        )
      }
    ...
  }

Read

開始位置の定義はココ

  private suspend fun readAndInitOnce(dataChannel: ConflatedBroadcastChannel<DataAndHash<T>>) {
    if (dataChannel.valueOrNull != null) {
      // If we already have cached data, we don't try to read it again.
      return
    }

    val updateLock = Mutex()
    var initData = readDataOrHandleCorruption()

    ...

    dataChannel.offer(DataAndHash(initData, initData.hashCode()))
  }

読み込み実行は1度だけ行われ、すでに読み込みが行われている場合は関数最初のベースケースでEarly Returnして処理をスキップします。
実際に読み込むのはreadDataOrHandleCorruption()関数内のreadData()呼び出しです。

  private suspend fun readDataOrHandleCorruption(): T {
    try {
      return readData()
    } catch (ex: CorruptionException) {

      val newData: T = corruptionHandler.handleCorruption(ex)

      try {
        writeData(newData)
      } catch (writeEx: IOException) {
        // If we fail to write the handled data, add the new exception as a suppressed
        // exception.
        ex.addSuppressed(writeEx)
        throw ex
      }

      // If we reach this point, we've successfully replaced the data on disk with newData.
      return newData
    }
  }

データの読み込みに失敗した場合、Catchブロックが実行されることになり、defaultではcorruptionHandlerはExceptionをそのまま返しますが、SingleProcessDataStoreクラスのコンストラクタで、カスタムハンドラを渡すこともできます。
その場合はハンドラで取得できるデータを利用してデータ一旦の書き込みを行います。

  private suspend fun readData(): T {
    try {
      FileInputStream(file).use { stream ->
        return serializer.readFrom(stream)
      }
    } catch (ex: FileNotFoundException) {
      if (file.exists()) {
        throw ex
      }
      return serializer.defaultValue
    }
  }

ここでわかる通り、データファイルはserializerでシリアライズしたものをファイルに読み込み/書き出していて、後ろにRoomやSharedPreference等があるとかそういうものではないようですね。
またファイルがないとき、基本的にはserializer.defaultValueが返却されますが、異常ケースの場合はExceptionがreadAndInitOnceの呼び出し元まで飛び、処理自体が無効化されます。

Write

書き込みに関しても、まずは読み込みの処理が走ります。
万が一読み込み前に書き込みが走ってもいいように配慮しているんしょうか。
開始位置の定義はココ

  private suspend fun transformAndWrite(
    transform: suspend (t: T) -> T,
    /**
     * This is the channel that contains the data that will be used for the transformation.
     * It *must* already have a value -- otherwise this will throw IllegalStateException.
     * Once the transformation is completed and data is durably persisted to disk, and the new
     * value will be offered to this channel.
     */
    updateDataChannel: ConflatedBroadcastChannel<DataAndHash<T>>
  ): T {
    val curDataAndHash = updateDataChannel.value
    curDataAndHash.checkHashCode()
    val curData = curDataAndHash.value
    val newData = transform(curData)

    // Check that curData has not changed...
    curDataAndHash.checkHashCode()

    return if (curData == newData) {
      curData
    } else {
      writeData(newData)
      updateDataChannel.offer(DataAndHash(newData, newData.hashCode()))
      newData
    }
  }

まずはデータのtransform。
現在のDataのHashをチェックして、保持しているデータ自体が変更されていないことを確認します(データはImmutableであることが要求されている)
その後transform関数でデータを変換し、再度現在のデータのHashを確認します。これはtransform関数が副作用を持っていないことを保証するためのようです。
変換前と後でデータが変わっていなかったら何もせず、更新後のデータ書き込みはwriteData(...)関数で行います。

  internal fun writeData(newData: T) {
    file.createParentDirectories()

    val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
    try {
      FileOutputStream(scratchFile).use { stream ->
        serializer.writeTo(newData, UncloseableOutputStream(stream))
        stream.fd.sync()
        // TODO(b/151635324): fsync the directory, otherwise a badly timed crash could
        //  result in reverting to a previous state.
      }

      if (!scratchFile.renameTo(file)) {
        throw IOException("$scratchFile could not be renamed to $file")
      }
    } catch (ex: IOException) {
      if (scratchFile.exists()) {
        scratchFile.delete()
      }
      throw ex
    }
  }

書き込みは直接データファイルに行うのではなく、SCRATCH_SUFFIX(".tmp")に対して実行し、書き込みが完了してからrenameすることで行っているようです。
Linuxファイルシステムのやってるようなこと(データ破損対策)を表現してるんですかね。

Read/Write実行の呼び出し

Read

DataStoreのデータはFlow型として扱うことができ、

  override val data: Flow<T> = flow {
    val curChannel = downstreamChannel()
    actor.offer(Message.Read(curChannel)) // Read実行呼び出し
    emitAll(curChannel.asFlow().map { it.value })
  }

と定義されています。
このFlowを作るときにactorにイベントを渡すことでReadの実行が呼び出されています。
そして、curChannel(ConflatedBroadcastChannel)から流れてきたデータを渡しています。
なので、データを取得したり変更したときにはこのChannelに対してOfferして検知できるようにしています。

Write

データ更新は以下の関数から行われます。

  override suspend fun updateData(transform: suspend (t: T) -> T): T {
    val ack = CompletableDeferred<T>()
    val dataChannel = downstreamChannel()
    val updateMsg = Message.Update<T>(transform, ack, dataChannel)

    actor.send(updateMsg)

    // If no read has succeeded yet, we need to wait on the result of the next read so we can
    // bubble exceptions up to the caller. Read exceptions are not bubbled up through ack.
    if (dataChannel.valueOrNull == null) {
      dataChannel.asFlow().first()
    }

    // Wait with same scope as the actor, so we're not waiting on a cancelled actor.
    return withContext(scope.coroutineContext) { ack.await() }
  }

書き込みに関しても、読み込みとを同じくactorにイベントを渡して、ackがcompleteされるまで待っています。

おわりに

ざっくりとではありますが、Read/Writeに関して書きました。
scope.actor(...)はobsoleteであったりしていますが、この辺もVersionが進めば変わってきそうですね。

また読んでいて思った点としては、
例えばActivity AでもActivity Bでも同じDataを見ていて、 A->B->A(page back)と遷移したとき、Bでの変更はAでは検知できず、データがズレてしまうのでは?
そもそも使い方として間違った使い方なのかもしれませんが、やむにやまれずそうせざるを得なくなったときどうするべきなんでしょ。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1