Kinesisストリームを使ったアプリケーションをPythonで作成するのは Amazon Kinesis Client Library for Python を使うのが楽です。そして、KCLワーカーはECSのコンテナとして起動すると運用がめちゃ楽です。
ECSのコンテナは、その出力をCloudWatchLogsにロギングしてくれるので、KCLワーカーのログもそうしたいですが、KCL for Pythonで使う MultiLangDaemon のログはデフォルトログレベルがINFOで、非常に冗長なので、このログレベルをWARNINGに変更したいと思います。
今回は、KCL for Pythonのサンプルアプリケーションを使ってやってみたいと思います。
まず KCL for Python を git clone します。
git clone https://github.com/awslabs/amazon-kinesis-client-python.git
README.md
の手順に従って、セットアップ、起動してみます。
python setup.py download_jars
python setup.py install
python samples/amazon_kclpy_helper.py --print_command --java `which java` --sample | /bin/bash
デフォルトのログはこんな感じになると思います。
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 408939a3-1479-477d-99f6-d010aae841b3
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running PythonKCLSample to process stream words with executable sample_kclpy_app.py
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: 408939a3-1479-477d-99f6-d010aae841b3
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: XXXXXXXXXXXXXXXXXXXX
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/2.7 sample_kclpy_app.py
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Jul 20, 2018 2:23:37 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
^CJul 20, 2018 2:23:39 AM com.amazonaws.services.kinesis.multilang.MultiLangDaemon run
INFO: Process terminanted, will initiate shutdown.
それでは、ログレベルを変えてみましょう。
MultiLangDaemon は apache commons logging
を使っているので、ロギングシステムを簡単に変更できます。デフォルトのロガーはJdk14Loggerですが、今回はlog4jに変更して、log4jの設定でログレベルを変えていきます。
setup.py を編集して、ダウンロードする jar ファイル群に log4j
を追加します。ポイントはlog4j 2ではなく1.2であることです。
diff --git a/setup.py b/setup.py
index 4d21a36..c9c34fb 100644
--- a/setup.py
+++ b/setup.py
@@ -80,7 +80,8 @@ REMOTE_MAVEN_PACKAGES = [
('com.google.guava', 'guava', '18.0'),
('com.google.protobuf', 'protobuf-java', '2.6.1'),
('commons-lang', 'commons-lang', '2.6'),
- ('commons-logging', 'commons-logging', '1.1.3')
+ ('commons-logging', 'commons-logging', '1.1.3'),
+ ('log4j', 'log4j', '1.2.17')
]
ログシステムをlog4jに変更するよう commons-logging.properties
ファイルを作成します。
org.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger
log4jの設定ファイルも作成します。ここでログレベルをWARNにしておきます。
log4j.rootLogger=WARN, kcl
log4j.appender.kcl=org.apache.log4j.ConsoleAppender
log4j.appender.kcl.Target = System.out
log4j.appender.kcl.layout=org.apache.log4j.PatternLayout
log4j.appender.kcl.layout.ConversionPattern=%d [%p] %C : %m %n
作った設定ファイルは、実行パスに移動します。例えば今回はvirtualenvを使っているので、${VIRTUAL_ENV}/lib/python3.5/site-packages/amazon_kclpy/samples
です。実行パスに移動する代わりに、javaのシステムプロパティにそれぞれ -Djava.util.logging.config.file
、 -Dlog4j.configuration
で渡しても良いみたいです。
それでは、もう一度セットアップをやり直して起動します。
python setup.py download_jars
python setup.py install
python samples/amazon_kclpy_helper.py --print_command --java `which java` --sample | /bin/bash
結果は、全然ログが出なくなりましたが、試しに存在しないストリームを見に行くとエラーログが出ました。
これでログレベルの変更は完了です。
$ python samples/amazon_kclpy_helper.py -j `which java` --print_command --sample | /bin/bash
2018-07-20 05:01:41,478 [ERROR] com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask : Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream words under account XXXXXXXXXXXX not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 42315af3-e4a2-4a2a-8c28-019086731416)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)