本記事では、ログサービスへのデータ送信を支援する、使いやすく設定性の高いJavaライブラリ「Alibaba CloudのLOG Java Producer」を紹介します。
本ブログは英語版からの翻訳です。オリジナルはこちらからご確認いただけます。一部機械翻訳を使用しております。翻訳の間違いがありましたら、ご指摘いただけると幸いです。
背景
ログは至る所にあります。世の中の変化を記録するキャリアとして、ログはマーケティング、研究開発、運用、セキュリティ、BI、監査など多くの分野で広く利用されています。
アリババログサービスは、ログデータのオールインワンサービスプラットフォームです。その中核コンポーネントであるLogHubは、高スループット、低遅延、自動スケーリングなどの優れた機能により、ビッグデータ処理、特にリアルタイムデータ処理のインフラとなっています。Flink、Spark、Stormなどのビッグデータコンピューティングエンジン上で動作するジョブは、データ処理結果や中間結果をリアルタイムでLogHubに書き込みます。LogHubからのデータを利用して、下流のシステムは、クエリ分析、アラームの監視、機械学習、反復計算など多くのサービスを提供することができます。LogHubのビッグデータ処理アーキテクチャは次の図のようになっています。
システムが正常に動作するようにするためには、便利で効率の良いデータ書き込み方法を利用する必要があります。APIやSDKを直接利用するだけでは、ビッグデータシナリオにおけるデータ書き込み能力の要件を満たすには不十分です。そこで開発されたのが「Alibaba Cloud LOG Java Producer」です。
特徴
Alibaba Cloud LOG Java Producerは、使いやすく、高度に設定可能なJavaクラスライブラリです。以下の機能を備えています。
1、スレッドセーフ:Alibaba Cloud LOG Java Producer (以下「Producer」)によって公開されるすべてのメソッドはスレッドセーフです。
2、非同期送信:Producer の SEND メソッドへの呼び出しは、通常、すぐに返され、データの送信やサーバからの応答の受信を待つことはありません。Producerには、送信するデータを一括でキャッシュするための内部キャッシュ機構(LogAcccumulator)があり、データを一括で送信することでスループットを向上させています。
3、自動リトライ: Producerは、再試行可能な例外に対して、自動で設定可能なリトライ機構(RetryQueue)を提供します。RetryQueue の最大リトライ時間とバックオフ期間を設定することができます。
4、トレーサビリティ:コールバックやフューチャーを使用して、対象のデータが正常に送信されたかどうかや、データを送信するために行われた試行を知ることができます。この機能を使用することで、問題をトレースし、問題解決のための判断を下すことができます。
5、コンテキストリストア: 同じProducerで生成されたログは同じコンテキストにあり、あるログの前後の関連ログをサーバー側で確認することができます。
6、シャットダウン:closeメソッドが結果を返すと、Producerがキャッシュしたデータが全て処理され、それに応じた通知を受け取ることができます。
メリット
Producerを使用してLogHubにデータを書き込むことは、APIやSDKを使用した場合と比較して以下のようなメリットがあります。
ハイパフォーマンス
大量のデータと限られたリソースでは、所望のスループットを実現するために、マルチスレッド、キャッシュポリシー、バッチ処理、障害発生時のリトライなどの複雑なロジックを実装する必要があります。Producerは、アプリケーションのパフォーマンスを向上させ、アプリケーション開発プロセスを簡素化するために、上記のロジックを実装しています。
非同期かつノンブロッキングのタスク実行
十分なキャッシュメモリがあれば、ProducerはLogHubに送信するデータをキャッシュします。sendメソッドを呼び出すと、指定されたデータは処理をブロックすることなく即座に送信されます。これにより、演算とI/Oロジックの分離を実現しています。後日、返された未来のオブジェクトや登録されたコールバックからデータ送信結果を取得することができます。
制御可能なリソースの利用
送信するデータをキャッシュするためにProducerが使用するメモリのサイズは、データ送信タスクに使用するスレッド数と同様にパラメータで制御することができます。これにより、Producerが無制限にリソースを消費することを回避することができます。また、実情に応じてリソース消費量と書き込みスループットのバランスをとることができます。
まとめ
要約すると、Producerは、複雑な基礎となる詳細を自動的に処理し、シンプルなインターフェースを公開することで、多くの利点を提供します。また、上位レイヤサービスの正常な動作に影響を与えず、データアクセスの敷居を大幅に下げることができます。
メカニズムの説明
Producer のパフォーマンスをよりよく理解するために、このセクションでは、データ書き込みロジック、コアコンポーネントの実装、グレースフルシャットダウンを含む Producer がどのように動作するかを説明します。Producer の全体的なアーキテクチャは以下の図に示されています。
データ書き込み
Producerのデータ書き込みロジック:
1、producer.send()
メソッドを呼び出して指定したログストアにデータを送信した後、LogAccumulator内のProducerバッチにデータがロードされます。通常、sendメソッドはすぐに結果を返します。しかし、Producer インスタンスに目的のデータを格納する十分なスペースがない場合、以下の条件のいずれかが満たされるまで send メソッドはブロックされます。
------1、以前にキャッシュされたデータがバッチハンドラで処理され、そのデータで占有されていたメモリが解放されます。その結果、Producerは対象のデータを格納するための十分なスペースを持つことになります。
------2、指定されたブロッキング時間を超えると例外が発生します。
2、Producer.send() を呼び出すと、対象バッチのログ数が maxBatchCount を超えてしまったり、対象バッチに対象データを格納するための十分なスペースがない場合があります。この場合、Producerは最初にターゲットバッチを IOThreadPool に送信してから、ターゲットデータを格納するための新しいバッチを作成します。スレッドをブロックしないようにするために、IOThreadPool は無制限のブロッキングキューを使用します。Producer インスタンスにキャッシュできるログの数は限られているので、キューの長さが無限に伸びることはありません。
3、Mover は LogAccumulator の各 Producer バッチをトラバースし、最大キャッシュ時間を超えたバッチを expiredBatches に送ります。また、期限切れになっていないバッチの最も早い期限切れ時間(t)も記録します。
4、その後、LogAccumulator から IOThreadPool に期限切れバッチを送信します。
5、その後、Mover は RetryQueue から送信条件に合致したProducerバッチを取得します。条件を満たすバッチがない場合は、t の期間待機します。
6、そして、期限切れのバッチを RetryQueue から IOThreadPool に送信する。ステップ 6 が終了すると、Mover はステップ 3 から 6 を繰り返します。
7、IOThreadPool のワーカースレッドは、ブロックされたキューからターゲットのログストアにバッチを送ります。
8、バッチがログストアに送られた後、それは成功キューに行きます。
9、送信に失敗し、以下の条件のいずれかを満たす場合は、失敗キューに行きます。
-------1、失敗したバッチは再試行できません。
-------2、RetryQueueが閉じられます。
-------3、指定されたリトライ回数に達し、失敗キューのバッチ数が送信するバッチの総数の1/2を超えません。
10、そうでなければ、ワーカースレッドは失敗したバッチの次回の送信時刻を計算してRetryQueueに送信します。
11、SuccessBatchHandler スレッドは、成功キューからバッチを取り出し、このバッチに登録されているすべてのコールバックを実行します。
12、FailureBatchHandler スレッドは、失敗キューからバッチを取り出し、このバッチに登録されているすべてのコールバックを実行します。
コアコンポーネント
Producer のコアコンポーネントには、LogAccumulator、RetryQueue、Mover、IOThreadPool、SendProducerBatchTask、BatchHandler があります。
LogAccumulator
スループットを向上させるために、より大きなバッチにデータを蓄積し、バッチでデータを送信するのが一般的です。ここで説明するLogAccumulatorの主な役割は、データをバッチにマージすることです。異なるデータを大きなバッチにマージするには、データが同じプロジェクト、ログストア、トピック、ソース、およびshardHashプロパティを持っている必要があります。LogAccumulator は、これらのプロパティに基づいて、これらのデータを内部マップの異なる位置にキャッシュします。マップのキーは、上記の5つのプロパティの5つの要素であり、値はProducerBatchです。スレッドの安全性と高い並行性を確保するために、ConcurrentMapが使用されます。
LogAccumulatorのもう一つの機能は、キャッシュされたデータの合計サイズを制御することです。この制御ロジックを実装するためにSemaphoreを使用しています。SemaphoreはAbstractQueuedSynchronizerベース(AQSベース)の高性能同期ツールです。Semaphoreは、まずスピニングによる共有リソースの取得を試み、コンテキストスイッチのオーバーヘッドを削減します。
RetryQueue
RetryQueue は、送信に失敗して再試行を待っているバッチを保存するために使用されます。これらの各バッチは、バッチを送信する時間を示すフィールドを持っています。期限切れのバッチを効率的に引き出すために、プロデューサーはこれらのバッチを保存するためのDelayQueueを持っています。DelayQueue は時間ベースの優先度の高いキューで、最も早い期限切れのバッチが最初に処理されます。このキューはスレッドセーフです。
Mover
Mover は独立したスレッドです。LogAccumulator と RetryQueue から期限切れのバッチを定期的に IOThreadPool に送信します。Mover はアイドル状態でも CPU リソースを占有します。CPU リソースの無駄遣いを避けるために、Mover は、LogAccumulator および RetryQueue から送信される適格なバッチが見つからない間、RetryQueue からの期限切れバッチを待ちます。この期間は、構成した最大キャッシュ時間 lingerMs です。
IOThreadPool
IOThreadPool内のワーカースレッドは、ログストアにデータを送信します。IOThreadPool のサイズは ioThreadCount パラメータで指定でき、デフォルト値はプロセッサ数の 2 倍です。
SendProducerBatchTask
SendProducerBatchTaskは、バッチ送信ロジックでカプセル化されています。I/O スレッドのブロックを避けるために、SendProducerBatchTask は、ターゲットのバッチが正常に送信されたかどうかにかかわらず、コールバック実行のために別のキューにターゲットのバッチを送信します。さらに、失敗したバッチがリトライ条件を満たした場合、現在のI/Oスレッドではすぐに再送されません。すぐに再送された場合、通常は再び失敗します。その代わりに、SendProducerBatchTask は、指数的なバックオフポリシーに従って RetryQueue に送ります。
BatchHandler
プロデューサーは、送信に成功したバッチと失敗したバッチを処理するために、SuccessBatchHandler と FailureBatchHandler を起動します。ハンドラがコールバックの実行やバッチの未来の設定を完了した後、新しいデータを使用するために、このバッチが占有しているメモリを解放します。別々の処理は、正常に送信されたバッチと失敗したバッチが分離されていることを確実にします。これは Producer のスムーズな操作を保証します。
GracefulShutdown
GracefulShutdownを実装するには、以下の要件を満たす必要があります。
1、close メソッドが結果をあなたに返すとき、プロデューサーのすべてのスレッドが終了している必要があります。また、キャッシュされたデータが適切に処理されていること、自分で登録したコールバックがすべて実行されていること、自分に返す先物がすべて設定されていることが必要になります。
2、また、closeメソッドの最大待ち時間を設定できるようにしておく必要があります。メソッドは、スレッドが終了したかどうか、キャッシュされたデータが処理されたかどうかに関わらず、この期間を超えた後、直ちに結果をあなたに返さなければなりません。
3、closeメソッドは、マルチスレッド環境でも複数回呼び出すことができ、正常に動作します。
4、コールバックでcloseメソッドを呼び出すことは安全であり、アプリケーションにデッドロックを起こすことはありません。
前述の要件を満たすために、プロデューサーのクローズロジックは以下のように設計されています。
1、LogAccumulatorを閉じる。LogAccumulator にデータを書き続けると例外が発生します。
2、RetryQueue を閉じます。RetryQueue にバッチを送り続けると、例外がスローされます。
3、Mover を閉じて、完全に終了するのを待ちます。クローズシグナルを検出した後、Mover は送信条件を満たしているかどうかに関わらず、LogAccumulator と RetryQueue から残っているすべてのバッチを IOThreadPool に送信します。データ損失を避けるために、Mover は、他のスレッドが書き込みをしなくなるまで、常に LogAccumulator と RetryQueue からバッチを引っ張ってきます。
4、IOThreadPool を閉じて、送信されたすべてのタスクが完了するのを待ちます。RetryQueue が既に閉じられている場合、失敗したバッチは直接失敗キューに送られます。
5、SuccessBatchHandlerを閉じて、完全に終了するのを待ちます。コールバックでcloseメソッドが呼び出された場合、待機処理はスキップされます。Closeシグナルを検出した後、SuccessBatchHandlerは成功キューからすべてのバッチを取り出し、1つずつ処理します。
6、FailureBatchHandlerを閉じて、完全に終了するのを待つ。コールバックでcloseメソッドが呼び出された場合、待機処理はスキップされます。Close シグナルを検出した後、FailureBatchHandler は失敗キューからすべてのバッチを取り出し、1 つずつ処理します。
このように、データの流れの方向を基準にしてキューやスレッドを1つずつ閉じていくことで、優雅なシャットダウンと安全な終了を実現しています。
まとめ
Alibaba Cloud LOG Java Producerは、以前のバージョンのProducerの包括的なアップグレードです。ネットワーク例外が発生した場合のCPU使用率の高さや、Producerを終了する際のデータ損失の少なさなど、以前のバージョンでの多くの問題点を解決しています。さらに、フォールトトレランス機構が強化されました。Producerは、操作ミスをした後でも、適切なリソース使用量、高いスループット、厳密な隔離を確保することができます。
アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。
アリババクラウドの詳細は、こちらからご覧ください。
アリババクラウドジャパン公式ページ