やりたいこと
AndroidでKinesisに直接ログデータを上げます。
とくに大量の時系列データをアップロードする必要がある場合、REST APIなどを経由するとその部分のコストや保守が無視できなくなることがあるかもしれません。
今回、AndroidやiOS側の処理リソースを使って、Cognitoから払い出されたクレデンシャルをもとに、直接Kinesisを呼び出し、データをアップロードすることを試します。
注意点としては、REST API経由と比較すると、デバイス側にKinesisの実装が入るため、クラウドとの依存関係は増します。デバイス数が増えてきた場合の最適化法を検討するうえでの一つの選択肢としてお考えください。
なお、今回は、us-east-1リージョンを使ってます。他のリージョンの場合は一部変更が必要です。
Amplify開発環境
npm install -g @aws-amplify/cli
Android Studio
空のプロジェクトをベースに、機能を実装してみます。
Android Studioのターミナルで以下を叩きます。
$ amplify init
? Enter a name for the project kinesisTest
? Enter a name for the environment dev
? Choose your default editor: IntelliJ IDEA
? Choose the type of app that you're building android
? Where is your Res directory: app/src/main/res
Using default provider awscloudformation
? Do you want to use an AWS profile? Yes
? Please choose the profile you want to use default
続いて認証を追加。
$ amplify add auth
Do you want to use the default authentication and security configuration? Default configuration
Warning: you will not be able to edit these selections.
How do you want users to be able to sign in? Email
Do you want to configure advanced settings? No, I am done.
$ amplify push
これであとは自動作成されます。
今回使いませんがログイン機能に必要な設定も行われます。
Cognito
Cognitoサービスを選択後、 federated identities -> identity pool を確認すると、新たにプールが追加されているのが確認できます。
ここでUnauthenticated roleを確認して、IAMでそれを編集します。
IAMサービスへ移って、Permissionを編集します。
簡単のために、AmazonKinesisFullAccessをアタッチしました。これで匿名ユーザーでもKinesisにデータが上げられます。(あくまでテストなのでこの設定です。実際はAuthRoleに対してアタッチします)
これでCognitoの設定は終わったので、実装に入ります。
実装
Amplifyのドキュメントを見ながら必要なモジュールを入れます。 追加で、Kinesisと、非同期処理のためのコルーチンを追加しました。
//Base SDK
implementation 'com.amazonaws:aws-android-sdk-core:2.15.+'
//AppSync SDK
implementation 'com.amazonaws:aws-android-sdk-appsync:2.8.+'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'com.amazonaws:aws-android-sdk-kinesis:2.15.+'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
MainActivityを書き換えます。追加したのは数行です。
雑で乱暴ですが、これで大量のストリームをKinesisに送ります。
package com.example.kinesistest
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import com.amazonaws.auth.CognitoCachingCredentialsProvider
import com.amazonaws.mobile.config.AWSConfiguration
import com.amazonaws.mobileconnectors.kinesis.kinesisrecorder.KinesisRecorder
import com.amazonaws.regions.Regions
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.json.JSONObject
class MainActivity : AppCompatActivity() {
private lateinit var kinesisRecorder: KinesisRecorder
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val credentialsProvider = CognitoCachingCredentialsProvider(
applicationContext,
AWSConfiguration(applicationContext)
)
kinesisRecorder =
KinesisRecorder(applicationContext.cacheDir, Regions.US_EAST_1, credentialsProvider)
GlobalScope.launch {
(0..1000).map { a ->
(0..1000).map { b ->
val json = JSONObject()
json.accumulate("time", System.currentTimeMillis())
json.accumulate("count", a)
json.accumulate("index", b)
kinesisRecorder.saveRecord(json.toString(), "mystream")
}
kinesisRecorder.submitAllRecords()
Log.d("kinesis", "count:$a")
}
}
}
}
kinesis Stream
Androidから送られたデータレコードを受けるために設定を行います。
mystreamという名前で、シャード数は1としています。
実際にはスループットを見ながらシャードの調整が必要です。
実行
Android Studioに戻って、実行します。
KinesisのMonitoringの画面で、レコードが入ってきたことを確認できます。
ここまでで、AndroidアプリからKinesisにデータをあげることができました。
クライアント側の実装もそこまで多くは無いので、ログだけを上げ続けたいようなユースケースでは検討の余地があるかもしれません。
Kinesisで受けたデータを処理するところは、また今度。。
kinesis Firehose
Kinesis Stream の代わりに、Firehoseを使う方法も書いておきます。
Android側の変更点は三点のみです。
//private lateinit var kinesisRecorder: KinesisRecorder
private lateinit var kinesisFirehoseRecorder: KinesisFirehoseRecorder
(中略)
//kinesisRecorder =
// KinesisRecorder(applicationContext.cacheDir, Regions.US_EAST_1, credentialsProvider)
kinesisFirehoseRecorder =
KinesisFirehoseRecorder(cacheDir, Regions.US_EAST_1, credentialsProvider)
(中略)
//kinesisRecorder.saveRecord(json.toString(), "mystream")
kinesisFirehoseRecorder.saveRecord(json.toString(), "mystream")
FirehoseのDelivery Streamも、 mystream
という名前で作成し、実行するとFirehoseから先のS3等にファイルを保存可能です。(詳細は割愛)