Amazon Kinesis | Databricks on AWS [2021/4/29時点]の翻訳です。
構造化ストリーミングにおけるKinesisコネクターがDatabricksランタイムに含まれています。
Amazon Kinesisの認証
Kinesisの認証を行うためには、デフォルトではAmazonのデフォルトクレデンシャルプロバイダーチェーンを使用します。Kinesisにアクセスできるインスタンスプロファイルが設定されたDatabricksクラスターを起動することをお勧めします。アクセスのためにキーを使いたいのであれば、オプションawsAccessKey
とawsSecretKey
を指定することができます。
あるいは、roleArn
オプションを用いてIAMロールを想定することができます。オプションとして、roleExternalId
に外部ID、roleSessionName
にセッション名を指定することができます。ロールを仮定するためには、ロールを想定する権限を持つクラスターを起動するか、awsAccessKey
とawsSecretKey
を通じてアクセスキーを指定することができます。クロスアカウント認証の際には、想定されたロールを保持するためにDatabricks AWSアカウントを通じて想定されるroleArn
の使用を推奨します。クロスアカウント認証に関する詳細については、Delegate Access Across AWS Accounts Using IAM Rolesをご覧ください。
注意
Kinesisのソースには、ListShards
、GetRecords
、GetShardIterator
権限が必要となります。例外Amazon: Access Denied
に遭遇した際には、お使いのユーザー、プロファイルにこれらの権限があることを確認してください。詳細はControlling Access to Amazon Kinesis Data Streams Resources Using IAMをご覧下さい。
注意
Databricksランタイム6.x以下では、お使いのKinesisストリームのシャードのリストを取得するために、デフォルトではListShards
ではなくDescribeStream
を使用します。これらのDatabricksランタイムバージョンでは、DescribeStream
権限が必要となります。
スキーマ
レコードのスキーマは以下の通りとなります。
カラム | 型 |
---|---|
partitionKey | string |
data | binary |
stream | string |
shardId | string |
sequenceNumber | string |
approximateArrivalTimestamp | timestamp |
data
カラムを明示的にデシアライズするためには、DataFrameのオペレーション(cast("string")
やudf(ユーザー定義関数)など)を使用してください。
クイックスタート
クイックスタート:ワードカウントからスタートしましょう。以下のノートブックでは、Kinesisと構造化ストリーミングを用いて、どのようにワードカウントを実行するのかを説明しています。
構造化ストリーミングによるKinesisワードカウントのノートブック
設定
警告!
Kinesisによるレート制限と、Kinesisにおける制限から、Kinesisを使用する際には一度のみのトリガー実行(Trigger.Once()
)はサポートされていません。
どのデータを読み取るのかを指定するための最も重要な設定を以下に示します。
オプション | 値 | デフォルト | 説明 |
---|---|---|---|
streamName | ストリーム名のカンマ区切りのリスト | なし(必須パラメーター) | サブスクライブするストリーム名 |
region | 指定するストリームのリージョン | ローカルで解決できるリージョン | ストリームが定義されているリージョン |
initialPosition | latest、trim_horizon、earliest (trim_horizonのエイリアス)、at_timestamp
|
latest | ストリームから読み込む際の開始位置 |
時間指定での読み取り開始
注意
本機能はDatabricksランタイム 7.3 LTS以降で利用できます。
時間指定で読み取りを開始するには、initialPosition
オプションにat_timestamp
の値を使用することができます。{"at_timestamp": "06/25/2020 10:23:45 PDT"}
のように、JSON文字列として値を指定することができます。ストリーミングクエリーは、指定されたタイムスタンプ以降(タイムスタンプを含みます)の全ての変更を読み取ります。これにより、Javaのタイムスタンプをパースするためのデフォルトフォーマットを使用することができます。{"at_timestamp": "06/25/2020 10:23:45 PDT", format: "MM/dd/yyyy HH:mm:ss ZZZ"}
のように、JSON文字列の追加フィールドに明示的にフォーマットを含めることができます。
さらに、Kinesisから読み込みを行う際のスループットとレーテンシーを制御する設定が存在します。KinesisソースはバックグラウンドスレッドでSparkジョブを実行し、定期的にKinesisデータをプリフェッチし、Sparkエグゼキューターのメモリーにキャッシュします。ストリーミングクエリーは、それぞれのプリフェッチステップが完了し、データが処理できるようになった後にのみ、キャッシュデータを処理します。このため、このプリフェッチステップが、計測されるエンドツーエンドのレーテンシーとスループットの多くを決定します。以下のオプションを用いることでパフォーマンスを制御することができます。
オプション | 値 | デフォルト | 説明 |
---|---|---|---|
maxRecordsPerFetch | 正の整数 | 10,000 | Kinesisに対するAPIリクエストにおいて読み取るレコード数。返却されるレコード数は、Kinesis Producer Libraryを用いてサブレコードがシングルレコードに集約されるかどうかに基づいて、多くなる場合があります。 |
maxFetchRate | MB/sで表現されるデータレートを表現する正の数値 | 1.0 (max = 2.0) | シャードごとのデータプレフェッチのスピード。これはフェッチのレートに制限をかけるものであり、Kinesisのスロットリングを回避するためのものです。2.0 MB/sがKinesisで許可される最大のレートです。 |
minFetchPeriod | 1秒が1s のように期間を示す文字列 |
400ms (min = 200ms) | 連続したプリフェッチのトライの間でどのくらい待つのか。これはフェッチの頻度を制限し、Kinesisのスロットリングを回避するためのものです。200msがKinesisが許可する最大値となり、秒あたり5回のフェッチとなります。 |
maxFetchDuration | 1分が1m のように期間を示す文字列 |
10s | 処理に利用できるようにする前に、プリフェッチした新規データをどのくらいの期間バフッファするのか。 |
fetchBufferSize |
2gb 、10mb のようなバイト文字列 |
20gb | 次のトリガーまでにどれだけのデータをバッファするのか。これは、停止条件として用いられ、上限を指定するものであり、この値で指定された以上のデータがバッファされる場合があります。 |
shardsPerTask | 正の整数 | 5 | Sparkタスク毎にどれだけのKinesisシャードが並列でプリフェッチを行うのか。最小のクエリーレーテンシーと最大のリソース使用量のために理想的には、クラスターのコア数 >= Kinesisのシャード数 / shardsPerTask となります。 |
shardFetchInterval | 2分が2m のように期間を示す文字列 |
1s | 再シャードのために、どのくらいの頻度でKinesisにポーリングするのか。 |
awsAccessKey | 文字列 | なし | AWSのアクセスキー |
awsSecretKey | 文字列 | なし | アクセスキーに対応するAWSのシークレットアクセスキー |
roleArn | 文字列 | なし | Kinesisにアクセスする際に想定するロールのAmazonリソース名(ARN) |
roleExternalId | 文字列 | なし | AWSアカウントにアクセスを移譲する際に用いるオプション値。詳細はHow to Use an External IDをご覧ください。 |
roleSessionName | 文字列 | なし | 異なるプリンシパルや別の理由で、同じロールが想定された際にセッションを一意に特定する想定ロールの識別子 |
注意
オプションのデフォルト値は、2つのリーダーがKinesisのレートリミットに達することなしに、同時にKinesisストリームを消費できるように選択されています。より多くのコンシューマーが存在する場合には、それに応じてオプションを調整する必要があります。例えば、maxFetchRate
を少なくして、minFetchPeriod
を多くするなどです。
特定のユースケースにおいて推奨するいくつかの設定を説明します。
KinesisからS3へのETL
長期ストレージに対するETLを行う際、少数の大規模ファイルの方が望ましいケースがあります。この場合、例えば、5-10分のように長いストリームトリガー周期に設定したいと考えるかもしれません。さらに、処理中に大きなブロックをバッファするようにmaxFetchDuration
を大きくし、トリガーの合間にあまりに早くフェッチを停止して、ストrームに遅れ始めないようにfetchBufferSize
を大きくしたいと考えるかもしれません。
低レーテンシーでのモニタリング、アラート
アラートを行うユースケースがある場合、より低いレーテンシーが必要になるでしょう。これを達成するには以下のことを行います。
- Kinesisのレートリミットに達することなしに、可能な限り高速にストリーミングクエリーがフェッチするように最適化するために、Kinesisストリームには単一のコンシューマーのみがあること(すなわち、ストリーミングクエリーは一つのみで他には存在しない)を確認します。
- 可能な限り高速にフェッチしたデータに処理を開始できるように
maxFetchDuration
を小さな値(例えば、200ms
)に設定します。 - オプション
minFetchPeriod
を210ms
に設定し、可能な限り頻繁にフェッチするようにします。 - オプション
shardsPerTask
やクラスターを# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask
となるように設定します。これによって、バックグラウンドのプリフェッチのタスクとストリーミングクエリータスクは同時に実行されます。
クエリーが5秒毎にデータを受け取っていることを観測した場合、Kinesisのレートリミットに達している可能性が高いです。設定を確認してください。
警告!
Kinesisストリームを削除して再作成した場合、ストリーミングクエリーを再開するために、あらゆる既存のチェックポイントのディレクトリを再利用することはできません。チェックポイントのディレクトリを削除して、クエリーを最初からスタートする必要があります。
メトリクス
注意
Databricksランタイム8.1以上で利用できます。
Kinesisは、それぞれのワークスペースでストリーミングの開始からコンシューマー遅れている時間をミリ秒でレポートします。avgMsBehindLatest
、maxMsBehindLatest
、minMsBehindLatest
メトリクスとして、ストリーミングクエリープロセス( https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively )における全ワークスペースにおける平均値、最小値、最大値をミリ秒で取得することができます。ストリーミングをノートブックで実行している場合には、これらのメトリクスをストリーミングクエリー進捗ダッシュボードのRaw Dataデータタブで確認することができます。
{
"sources" : [ {
"description" : "KinesisV2[stream]",
"metrics" : {
"avgMsBehindLatest" : "32000.0",
"maxMsBehindLatest" : "32000",
"minMsBehindLatest" : "32000"
},
} ]
}
Kinesisへの書き込み
以下のコードスニペットは、Kinesisにデータを書き込むForeachSink
として使用することができます。Dataset[(String, Array[Byte])]
が必要となります。
注意
以下のコードスニペットはexactly onceではなく、at least once
セマンティクスを提供しています。
Kinesis Foreach Sinkノートブック