はじめに
AWS IoTにはJobと呼ばれるデバイスで実行するタスク管理して実行する仕組みがあります。AWS上のマニュアルを読んでもJobを使うと何が嬉しいのかよくわからなかったので実際に試してみました。
#AWS IoT Jobについて
以下の本家のサイトにAWS IoT Jobについての說明があります。Jobとは1つ以上のデバイスに送信され実行されるリモート操作です、とあります。イメージ的にはデバイスのFirmwareのアップデートやなんらかのまとまった処理をデバイス上で実行したいというような用途で使用します。
#何が嬉しいのか?
Job自体はJob単位にそのJobの状態を管理することができるため、例えばMQTTのConnectionが通信品質の問題で一時的に断絶してしまったような状況でも、定義したJobの状態を管理して、必要に応じて再実行ということができます。JOBの状態は、IN_PROGRESS、QUEUEDもしくはFAILED、SUCCEEDED、CANCELED、TIMED_OUT、REJECTED、または REMOVEDのように管理され、AWSマネジメントコンソール上から以下のように確認することができます。(以下の場合は1つのJobが成功した)
Job用に以下のような特別なTopicが用意されていて、デバイス側はこれらのTopicをSubscribeし、必要に応じてPublishすることで現在の状態をAWS側に伝えます
$aws/things/MyThing/jobs/notify
$aws/things/MyThing/jobs/notify-next
$aws/things/MyThing/jobs/get/accepted
$aws/things/MyThing/jobs/get/rejected
$aws/things/MyThing/jobs/jobId/get/accepted
$aws/things/MyThing/jobs/jobId/get/rejected
また、Job DocumentとよばれるJSONのファイルをユーザで定義することができて、そのファイルをデバイス側に渡すことができます。例えば、Firmwareのバージョン情報を渡すようにして、デバイス側のバージョンよりも新しければアップデートするというような制御が可能になります。
やってみた
では実際にやってみたいと思います。まずはS3にバケットを1つ作り、Job Documentを定義します。今回は適当に以下のようなJSONファイルを定義しました。JobTypeとかCommandとかを定義して、Commandによっては対応していないのでJobをデバイス側でRejectする、とかできるようにできればよいかと。
{
"jobType": "Test",
"Command": "Update",
"Timeout": "30",
"Status": "normal"
}
これをtestjob1.jsonという名前でS3 Bucket上に保存します。
次に、AWS IoT Coreのマネジメントコンソール上からジョブを選択し、カスタムジョブの作成をクリックします。
ジョブIDを設定し、デバイスを選択します。デバイスは事前に作成しておいたThingを選択しました。(Thingを作成し、証明書等をデバイスに入れておく必要があります。)ジョブファイルの追加のところで、先程S3上にアップロードしたJSONファイルを選択し、次へ。
高度な設定のページでは特になにも設定せず、作成を行います。Jobが作成されるので状態を確認すると、1つのジョブがキューに入っている状態となっています。
次にこのJOBを実行するデバイス側を用意します。デバイスにはPCを使用し、Python用のDevice SDKを使って通信をすることとします。このリンクのGithubからソースコードをCloneすると、samplesフォルダの下に「jobs.py」というテストプログラムがあります。こちらはJobを使用するデバイスで実行するプログラムのサンプルです。内容をみるとわかりますが、AWS SDKで用意されているiobjobsの関数を使って、上記に紹介したJob用のTopicをSubscribeし、また状態を変化させるためにTopicにPublishしています。
IoT Core上でThingを作成し、証明書をPC上に保存した状態で、実行してみます。証明書情報は~/cert以下においています。endpointは適宜自分のIoT Coreのend pointに置き換えてください。
$ python3 jobs.py --endpoint xxxxxx-ats.iot.ap-northeast-1.amazonaws.com --cert ~/cert/device.pem.crt --key ~/cert/private.pem.key --root-ca ~/cert/Amazon-root-CA-1.pem --thing-name MyIoTThing --job-time 5 --signing-region ap-northeast-1
そうすると、
Connected!
Subscribing to Next Changed events...
Subscribing to Start responses...
Subscribing to Update responses...
Trying to start the next job...
Publishing request to start next job...
Published request to start the next job.
Request to start next job was accepted. job_id:Job001 job_document:{'jobType': 'Test', 'Command': 'Update', 'Timeout': '30', 'Status': 'normal'}
Starting local work on job...
{'jobType': 'Test', 'Command': 'Update', 'Timeout': '30', 'Status': 'normal'}
Done working on job.
Publishing request to update job status to SUCCEEDED...
Published request to update job.
Request to update job was accepted.
Received Next Job Execution Changed event: None. Waiting for further jobs...
のようにJobが実行されます。Jobを実行する際に定義したJob Documentの内容が確認できるので、その内容によってデバイス側の処理を自由に変更することができます。どのようなときにJobが成功して、どのようなときにJobが失敗する、というようなことはデバイス側で自由に設定することができます。
結果、先程作成したJobを再度マネジメントコンソールから見ると、状態が成功に変化していることがわかります。
試しにソースコードを以下のように変更して、CommandがDeleteのときはFailedになるようにします。
def job_thread_fn(job_id, job_document):
try:
print("Starting local work on job...")
time.sleep(args.job_time)
print(job_document)
print(job_document["jobType"])
print(job_document["Command"])
print("Done working on job.")
if(job_document["Command"]=="Delete"):
#not supported command
print(" Command not supported: Publishing request to update job status to rejected...")
request = iotjobs.UpdateJobExecutionRequest(
thing_name=args.thing_name,
job_id=job_id,
status=iotjobs.JobStatus.FAILED)
else:
print("Publishing request to update job status to SUCCEEDED...")
request = iotjobs.UpdateJobExecutionRequest(
thing_name=args.thing_name,
job_id=job_id,
status=iotjobs.JobStatus.SUCCEEDED)
publish_future = jobs_client.publish_update_job_execution(request, mqtt.QoS.AT_LEAST_ONCE)
publish_future.add_done_callback(on_publish_update_job_execution)
except Exception as e:
exit(e)
その上で、以下のようなJobDocumentを作成し、JSONファイルをS3上に置きます。
{
"jobType": "Test2",
"Command": "Delete",
"Timeout": "60",
"Status": "normal"
}
Jobを作成し、このJobファイルを選んで実行した場合、結果が以下のように失敗となります。デバイス側から成功したか失敗したかをPublishすることでAWS側に状態を知らせています。
まとめ
AWS IoTのJobの仕組み、おおよその使い方の概要がご理解いただけたかと思います。デバイス側とハンドシェイクすることによって状態を管理する仕組みであると言えると思います。ジョブの状態変化はライフサイクルイベントとして取得することもできるのでデバイスの状態を管理して処理を行うというような場合に便利な機能と言えるかなと思います。