0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

DatabricksにおけるAmazon Kinesisの活用

Last updated at Posted at 2021-11-18

Amazon Kinesis | Databricks on AWS [2021/4/29時点]の翻訳です。

構造化ストリーミングにおけるKinesisコネクターがDatabricksランタイムに含まれています。

Amazon Kinesisの認証

Kinesisの認証を行うためには、デフォルトではAmazonのデフォルトクレデンシャルプロバイダーチェーンを使用します。Kinesisにアクセスできるインスタンスプロファイルが設定されたDatabricksクラスターを起動することをお勧めします。アクセスのためにキーを使いたいのであれば、オプションawsAccessKeyawsSecretKeyを指定することができます。

あるいは、roleArnオプションを用いてIAMロールを想定することができます。オプションとして、roleExternalIdに外部ID、roleSessionNameにセッション名を指定することができます。ロールを仮定するためには、ロールを想定する権限を持つクラスターを起動するか、awsAccessKeyawsSecretKeyを通じてアクセスキーを指定することができます。クロスアカウント認証の際には、想定されたロールを保持するためにDatabricks AWSアカウントを通じて想定されるroleArnの使用を推奨します。クロスアカウント認証に関する詳細については、Delegate Access Across AWS Accounts Using IAM Rolesをご覧ください。

注意
Kinesisのソースには、ListShardsGetRecordsGetShardIterator権限が必要となります。例外Amazon: Access Deniedに遭遇した際には、お使いのユーザー、プロファイルにこれらの権限があることを確認してください。詳細はControlling Access to Amazon Kinesis Data Streams Resources Using IAMをご覧下さい。

注意
Databricksランタイム6.x以下では、お使いのKinesisストリームのシャードのリストを取得するために、デフォルトではListShardsではなくDescribeStreamを使用します。これらのDatabricksランタイムバージョンでは、DescribeStream権限が必要となります。

スキーマ

レコードのスキーマは以下の通りとなります。

カラム
partitionKey string
data binary
stream string
shardId string
sequenceNumber string
approximateArrivalTimestamp timestamp

dataカラムを明示的にデシアライズするためには、DataFrameのオペレーション(cast("string")やudf(ユーザー定義関数)など)を使用してください。

クイックスタート

クイックスタート:ワードカウントからスタートしましょう。以下のノートブックでは、Kinesisと構造化ストリーミングを用いて、どのようにワードカウントを実行するのかを説明しています。

構造化ストリーミングによるKinesisワードカウントのノートブック

設定

警告!
Kinesisによるレート制限と、Kinesisにおける制限から、Kinesisを使用する際には一度のみのトリガー実行(Trigger.Once())はサポートされていません。

どのデータを読み取るのかを指定するための最も重要な設定を以下に示します。

オプション デフォルト 説明
streamName ストリーム名のカンマ区切りのリスト なし(必須パラメーター) サブスクライブするストリーム名
region 指定するストリームのリージョン ローカルで解決できるリージョン ストリームが定義されているリージョン
initialPosition latest、trim_horizon、earliest (trim_horizonのエイリアス)、at_timestamp latest ストリームから読み込む際の開始位置

時間指定での読み取り開始

注意
本機能はDatabricksランタイム 7.3 LTS以降で利用できます。

時間指定で読み取りを開始するには、initialPositionオプションにat_timestampの値を使用することができます。{"at_timestamp": "06/25/2020 10:23:45 PDT"}のように、JSON文字列として値を指定することができます。ストリーミングクエリーは、指定されたタイムスタンプ以降(タイムスタンプを含みます)の全ての変更を読み取ります。これにより、Javaのタイムスタンプをパースするためのデフォルトフォーマットを使用することができます。{"at_timestamp": "06/25/2020 10:23:45 PDT", format: "MM/dd/yyyy HH:mm:ss ZZZ"}のように、JSON文字列の追加フィールドに明示的にフォーマットを含めることができます。

さらに、Kinesisから読み込みを行う際のスループットとレーテンシーを制御する設定が存在します。KinesisソースはバックグラウンドスレッドでSparkジョブを実行し、定期的にKinesisデータをプリフェッチし、Sparkエグゼキューターのメモリーにキャッシュします。ストリーミングクエリーは、それぞれのプリフェッチステップが完了し、データが処理できるようになった後にのみ、キャッシュデータを処理します。このため、このプリフェッチステップが、計測されるエンドツーエンドのレーテンシーとスループットの多くを決定します。以下のオプションを用いることでパフォーマンスを制御することができます。

オプション デフォルト 説明
maxRecordsPerFetch 正の整数 10,000 Kinesisに対するAPIリクエストにおいて読み取るレコード数。返却されるレコード数は、Kinesis Producer Libraryを用いてサブレコードがシングルレコードに集約されるかどうかに基づいて、多くなる場合があります。
maxFetchRate MB/sで表現されるデータレートを表現する正の数値 1.0 (max = 2.0) シャードごとのデータプレフェッチのスピード。これはフェッチのレートに制限をかけるものであり、Kinesisのスロットリングを回避するためのものです。2.0 MB/sがKinesisで許可される最大のレートです。
minFetchPeriod 1秒が1sのように期間を示す文字列 400ms (min = 200ms) 連続したプリフェッチのトライの間でどのくらい待つのか。これはフェッチの頻度を制限し、Kinesisのスロットリングを回避するためのものです。200msがKinesisが許可する最大値となり、秒あたり5回のフェッチとなります。
maxFetchDuration 1分が1mのように期間を示す文字列 10s 処理に利用できるようにする前に、プリフェッチした新規データをどのくらいの期間バフッファするのか。
fetchBufferSize 2gb10mbのようなバイト文字列 20gb 次のトリガーまでにどれだけのデータをバッファするのか。これは、停止条件として用いられ、上限を指定するものであり、この値で指定された以上のデータがバッファされる場合があります。
shardsPerTask 正の整数 5 Sparkタスク毎にどれだけのKinesisシャードが並列でプリフェッチを行うのか。最小のクエリーレーテンシーと最大のリソース使用量のために理想的には、クラスターのコア数 >= Kinesisのシャード数 / shardsPerTaskとなります。
shardFetchInterval 2分が2mのように期間を示す文字列 1s 再シャードのために、どのくらいの頻度でKinesisにポーリングするのか。
awsAccessKey 文字列 なし AWSのアクセスキー
awsSecretKey 文字列 なし アクセスキーに対応するAWSのシークレットアクセスキー
roleArn 文字列 なし Kinesisにアクセスする際に想定するロールのAmazonリソース名(ARN)
roleExternalId 文字列 なし AWSアカウントにアクセスを移譲する際に用いるオプション値。詳細はHow to Use an External IDをご覧ください。
roleSessionName 文字列 なし 異なるプリンシパルや別の理由で、同じロールが想定された際にセッションを一意に特定する想定ロールの識別子

注意
オプションのデフォルト値は、2つのリーダーがKinesisのレートリミットに達することなしに、同時にKinesisストリームを消費できるように選択されています。より多くのコンシューマーが存在する場合には、それに応じてオプションを調整する必要があります。例えば、maxFetchRateを少なくして、minFetchPeriodを多くするなどです。

特定のユースケースにおいて推奨するいくつかの設定を説明します。

KinesisからS3へのETL

長期ストレージに対するETLを行う際、少数の大規模ファイルの方が望ましいケースがあります。この場合、例えば、5-10分のように長いストリームトリガー周期に設定したいと考えるかもしれません。さらに、処理中に大きなブロックをバッファするようにmaxFetchDurationを大きくし、トリガーの合間にあまりに早くフェッチを停止して、ストrームに遅れ始めないようにfetchBufferSizeを大きくしたいと考えるかもしれません。

低レーテンシーでのモニタリング、アラート

アラートを行うユースケースがある場合、より低いレーテンシーが必要になるでしょう。これを達成するには以下のことを行います。

  • Kinesisのレートリミットに達することなしに、可能な限り高速にストリーミングクエリーがフェッチするように最適化するために、Kinesisストリームには単一のコンシューマーのみがあること(すなわち、ストリーミングクエリーは一つのみで他には存在しない)を確認します。
  • 可能な限り高速にフェッチしたデータに処理を開始できるようにmaxFetchDurationを小さな値(例えば、200ms)に設定します。
  • オプションminFetchPeriod210msに設定し、可能な限り頻繁にフェッチするようにします。
  • オプションshardsPerTaskやクラスターを# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTaskとなるように設定します。これによって、バックグラウンドのプリフェッチのタスクとストリーミングクエリータスクは同時に実行されます。

クエリーが5秒毎にデータを受け取っていることを観測した場合、Kinesisのレートリミットに達している可能性が高いです。設定を確認してください。

警告!
Kinesisストリームを削除して再作成した場合、ストリーミングクエリーを再開するために、あらゆる既存のチェックポイントのディレクトリを再利用することはできません。チェックポイントのディレクトリを削除して、クエリーを最初からスタートする必要があります。

メトリクス

注意
Databricksランタイム8.1以上で利用できます。

Kinesisは、それぞれのワークスペースでストリーミングの開始からコンシューマー遅れている時間をミリ秒でレポートします。avgMsBehindLatestmaxMsBehindLatestminMsBehindLatestメトリクスとして、ストリーミングクエリープロセス( https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively )における全ワークスペースにおける平均値、最小値、最大値をミリ秒で取得することができます。ストリーミングをノートブックで実行している場合には、これらのメトリクスをストリーミングクエリー進捗ダッシュボードRaw Dataデータタブで確認することができます。

JSON
{
  "sources" : [ {
    "description" : "KinesisV2[stream]",
    "metrics" : {
      "avgMsBehindLatest" : "32000.0",
      "maxMsBehindLatest" : "32000",
      "minMsBehindLatest" : "32000"
    },
  } ]
}

Kinesisへの書き込み

以下のコードスニペットは、Kinesisにデータを書き込むForeachSinkとして使用することができます。Dataset[(String, Array[Byte])]が必要となります。

注意
以下のコードスニペットはexactly onceではなく、at least onceセマンティクスを提供しています。

Kinesis Foreach Sinkノートブック

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?