Posted at

Python3でRICOH THETA API v2.1を叩く

2018.8.1時点において、RICOH THETA API v2.1経由でTheta Sの撮影・保存をする方法です。

開発環境


  • macOS Sierra (ver 10.12.3)

  • Python3.4

  • デバイス: Ricoh Theta S


RICOH THETAへの接続とインターネット接続を両立する

THETAのAPIなどを叩くためには、PC(ここではMac)とTHETAをWi-Fで接続する必要があります。

しかし、MacをTHETAに接続すると、内蔵のWi-FiがTHETAへの接続に専有されてしまい、インターネットに接続できなくなるので非常に不便です。

解決策として、外付けのwifiドングルを使うことでWi-Fの接続口を2つにすることです。

① macの内蔵Wi-Fi → インターネットに接続

② 外付けのWi-Fiドングル → THETAと接続

という感じで、これでTHETAとインターネットに同時に接続できるようになります。

外付けのWi-Fiドングルはなんでも良いわけではなく、特にmac対応しているものではないといけません。

もし手元に持っていなくて新規に買う場合はこちらがおすすめです(Macと接続できることを確認済みで、値段も安く安定利用できることも確認済み)

以下、上記のwifiドングルを使った場合での接続手順です。

ドライバのダウンロードページ(Mac,Win,Linux)はこちら

Macの場合はドライバーをインストールしてMacを再起動すれば特に問題なく動いてくれます。

しかしLinux(ここでは特にRaspberryPiでやりたかったのでRaspbianOS)でこのwifiドングルを動かす場合は少し面倒です。しかし、ありがたいことに対処法をまとめてくれている方がいました。

自分の場合、「WiFiドライバのインストール」の項では、自動インストーラがうまく動きませんでしたが、手動でドライバをインストールする方の方法でうまく動かすことができました。


THETA APIを叩く

実際の手順は以下のページが参考になります。

こちらで紹介されているShell ScriptでAPIを叩く方法を一度試してみるとTHETA APIの動きがパッと理解できるので一度試してみることをおすすめします。(非常に簡単です)


Python3でTHETA APIを叩く

上記のページで紹介されているPythonでのAPI操作方法を参考にしましたが、コードがPython2だったためここではPython3で書き直しました。

また、上記ページで紹介されているPython2系コードでは、最新の撮影画像の取得部分にバグがあり、実際に動かしてみたところ、n回目の撮影時に保存する画像ファイルは(n-1)回目に撮影された画像になります。(一つ前に撮影された画像が保存されてしまう)。

以下のコードではTheta APIで取得できる状態ID(fingerprint)と最新ファイルURL(state._latestFileUrl)情報を比較し、正しく”最新の画像”を取得できるようにバグを修正しています。


コードの簡単な解説

※コード全体はGitHubに公開しています。

大きな流れとしては、


  1. セッションの作成

  2. fingerprintの記録

  3. 撮影

  4. 撮影した写真のURL取得

  5. 写真の保存

  6. セッションのクローズ

になります。


1. セッションの作成

Theta(192.168.1.1)にアクセスし、sessionIdを取得します。

urllib.request.urlopen("http://192.168.1.1/osc/info").read()

# create session
print('Create Session')
data = json.dumps({"name":"camera.startSession"}).encode('ascii')
res = urllib.request.urlopen('http://192.168.1.1/osc/commands/execute', data)
sessionId = json.loads(res.read().decode('utf-8'))["results"]["sessionId"]


2. fingerprintの記録

後ほど、最新の画像かどうか確認するためにfingerpirntをとっておく

# record fingerprint before taking photo

res = urllib.request.urlopen('http://192.168.1.1/osc/state', urllib.parse.urlencode({}).encode('ascii'))
prev_fingerprint = fingerprint = json.loads(res.read().decode('utf-8'))["fingerprint"]


3. 撮影

# take a picture

data = json.dumps({"name":"camera.takePicture", "parameters": {"sessionId": sessionId}}).encode('ascii')
urllib.request.urlopen('http://192.168.1.1/osc/commands/execute', data)
print('Took a photo')


4. 撮影した写真のURL取得

最新のfingerprintが古いfingerprintと同じだった場合や、_latestFileUriが取得できなかった場合は処理をスキップする。この処理を入れておかないと一つ前の画像のURLが取得されてしまう。

# get photo url

fileUri = ""
while True:
res = urllib.request.urlopen('http://192.168.1.1/osc/state', urllib.parse.urlencode({}).encode('ascii'))
j = json.loads(res.read().decode('utf-8'))
fingerprint = j["fingerprint"]
fileUri = j["state"]["_latestFileUri"]
file_name = os.path.basename(fileUri)
if not fileUri or fingerprint == prev_fingerprint:
time.sleep(0.2) # avoid a load by frequent requesting on theta.
else:
print('new_photo_file:', file_name)
break


5. 写真の保存

# save photo

print('Saving a photo')
content = None
while content is None:
try:
data = json.dumps({"name":"camera.getImage", "parameters": {"fileUri": fileUri}}).encode('ascii')
res = urllib.request.urlopen('http://192.168.1.1/osc/commands/execute', data)
content = res.read()
with open(os.path.join(save_dir, file_name), "wb") as file:
file.write(content)
except urllib.error.HTTPError as err:
if err.code != 400:
print("taken photo may not be saved to the theta storage yet.")
raise err
else:
time.sleep(0.2)

print('Saved a photo')


6.セッションのクローズ

# close session

data = json.dumps({"name":"camera.closeSession", "parameters": {"sessionId": sessionId}}).encode('ascii')
urllib.request.urlopen('http://192.168.1.1/osc/commands/execute', data)
print('Closed session')


参考