#イントロ
s_rae です。
頑張ってプログラミングの勉強をしています。
アプリ開発の練習としてそのロボットを操作できるインタフェースを作ってみました。
https://github.com/samanthanium/dullahan_local
でソースコードを見れます。
前回の投稿の続きとして今回はソースの説明を少し行います。
前回: https://qiita.com/s_rae/items/d808c3eccd8e173dc55b
ホビー目的で作られたものなのでエラーハンドリングは抜いて説明いたします。
#使ったフレームワークやライブラリー
このプロジェクトにはこちらのテクノロジーを使ってます。
- Djangoフレームワーク
- Django Channels
- Pybluez
###Djangoフレームワーク
Djangoでは主にHTTPでウェブページをサーブし、セッションでユーザーを管理しsqliteデータベースの作業を行いました。
ちなみにDjangoプロジェクトの中は複数の’app’(他のプロジェクトにも使えるように再利用出来るコード)で構成されてます。
このプロジェクトは
- dullahan(プロジェクトメインのapp)
- pages(Home, About, Loginウェブページ用)
- control_system(コマンドを送ったりデバイスを登録するためのページ)
- bluetooth_interface(Bluetooth機器のConsumer, functionを含めてます)
の’app’で構成されています。
Djangoを使った理由は機能が多く含まれて素早く作成が出来からです。あとPythonの練習もやりたいと思いました。
###Django Channels
DjangoChannelsはDjangoフレームワークのために作られたライブラリーです。Websocketなどの機能をDjangoフレームワークに追加する事が可能です。
Channelsは'Consumer'クラスを使ってWebSocketのメッセージを処理する仕組みです。Djangoフレームワークのviewと似てる機能を行えます。
###Pybluez
PybluezはシステムのBluetooth機能をPythonで使えるために作られたライブラリーです。
こちらではPybluezのコードは説明しませんが、socketモジュールと似てる感じでコードを書くことが可能です。
#コードの説明
ユーザーがログインする --> デバイスを追加する --> デバイスにコマンドを送る
の感じの流れを説明いたします。
#...
def log_in(req):
form = AuthenticationForm()
if req.method == 'POST':
form = AuthenticationForm(data=req.POST)
if form.is_valid():
login(req, form.get_user())
return redirect(reverse('get_home'))
else:
return render(req, 'login.html', {'form': form})
return render(req, 'login.html', {'form': form})
#...
def sign_up(req):
form = UserCreationForm()
if req.method == 'POST':
form = UserCreationForm(data=req.POST)
if form.is_valid():
form.save()
return redirect(reverse('log_in'))
else:
print(form.errors)
return render(req, 'signup.html', {'form': form})
ここにユーザのログインのためのfunctionがあります。DjangoのFormsに含まれているmethod”form.get_user”を使えば簡単にログインさせる事が出来ます。
その後からは
@login_required
のdecoratorをviewの中に使えば現在のユーザーの確認が出来ます。
次では周りのBluetoothデバイスを検索し同録する画面です。
from bluetooth_interface import bt_functions as bt
#...
@login_required
def device_search_ajax(req):
try:
if req.method == "GET":
nearby_devices = json.dumps(bt.search())
return JsonResponse({'nearby_devices':nearby_devices}, status=200)
else:
#...
except:
#エラーハンドル
#...
周りのBluetoothデバイスを検索するときはAjaxリクエストで行ってます。
bt.searchはPybluezを使ってBluetoothデバイスを検索するfunctionです。
こちらかわはHTTPではなくWebsocketを使いロボットとのメッセージのやり取りします。
#...
class CommandConsumer(WebsocketConsumer):
def connect(self):
# ユーザーが選択したデバイスのモデルを探す
self.device_id = self.scope['url_route']['kwargs']['device_id']
self.device_group_name = 'command_%s' % self.device_id
device = Device.objects.get(id=self.device_id)
# Bluetooth用のconsumerにデバイスとシリアル通信が出来るようにする
async_to_sync(self.channel_layer.send)('bt-process', {
'type': 'bt_connect',
'group_name': self.device_group_name,
'uuid': device.uuid,
'device' : device.id,
}
)
# ユーザーをgroupに追加する
async_to_sync(self.channel_layer.group_add)(
self.device_group_name,
self.channel_name
)
self.accept()
#...
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
device_id = text_data_json['device']
# コマンドをデバイスに送信する
async_to_sync(self.channel_layer.send)('bt-process', {
'type': 'bt_send_serial',
'device': device_id,
'message': message,
}
)
#...
# Bluetoothデバイスからもらった返信のメッセージを保存、フォワード
def command_message(self, event):
message = event['message']
device_id = event['device']
# timestamp付きでデバイスのモデルに保存する
device = Device.objects.get(id=device_id)
new_history = CommandHistory(device=device, command=message )
new_history.save()
command_history = {}
all_history = device.history.all()
for command in all_history:
command_history[command.timestamp.strftime('%H:%M:%S')] = command.command + '\n'
# コマンド履歴をユーザーに送信する
self.send(text_data=json.dumps({
'message': command_history,
'device': device_id
}))
こちらが少し複雑ですが簡単に説明すると
-
connectでユーザーとのWebsocket通信が開設されBluetoothデバイスに通信が出来るようにします。’bt-process’とは現在バックグラウンドで行われてるBluetooth機器用のconsumerを示してます。’bt-process’の’bt_connect’を使おうとする感じです。
-
receiveではユーザーがウェブページを使ってメッセージを送った時そのメッセージに含まれてるデバイスへのコマンドをロボットに送信します。こちらも上と同じように’bt-process’にメッセージをおくります。
-
command_messageではロボットから受信した返信をロボットのデータベースモデルに保存します。そのメッセージはロボットのモデルとMany-to-one関係になってます。ロボットのidを使って繋がってる形です。
次はロボットと通信するためのconsumerクラスです。
#...
from bluetooth_interface import bt_functions
#...
class BTConsumer(SyncConsumer):
def bt_connect(self, message):
# ユーザーにメッセージを送れるために覚えておく
self.device_group_name = message['group_name']
self.sock = {}
# Bluetooth通信port番号
self.port_num = 1
# 通信用ソケットを作る
sock, msg = bt_functions.connect(message['uuid'], self.port_num)
# ソケットをデバイスのidと一緒に記録
if sock != 1:
self.sock = {message['device']:[sock, self.port_num]}
# 通信状態をユーザーに送る
async_to_sync(self.channel_layer.group_send)(
self.device_group_name,
{
'type': 'command_message',
'device': message['device'],
'message': msg,
}
)
#...
def bt_send_serial(self, event):
# ユーザーが要請してるデバイスのidとメッセージを取る
device_id = int(event['device'])
message = event['message']
# そのデバイスとの通信ようソケットを選んでメッセージを送信する
if device_id in self.sock.keys():
result = bt_functions.send_serial(self.sock[device_id][0], message)
# ユーザーに返信のメッセージを送る
async_to_sync(self.channel_layer.group_send)(
self.device_group_name,
{
'type': 'command_message',
'device': device_id,
'message': result,
}
)
#...
ここでどうやって行うかかなり迷いました。
結果としては
- ロボットがいつでもメッセージ(センサーのデーター、コマンドへの返信など)を送信できるようなconsumerを作成
- ユーザーがいつでもメッセージをロボットに送信できるような機能を追加
が出来るようにしました。
Django ChannelsではChannels Layerて言うのを使いConsumerクラスやプロセスの間の通信が出来ます。
なので、ユーザーがログアウトしたり他の作業を行っても別のプロセスでロボットと常に繋がってます。
まだ作成してないですが、こう言う形でロボットやIoTデバイスから常にセンサーのデーターがとれます。
詳しいことはDjango Channelsのドキュメントを参考して下さい。
https://channels.readthedocs.io/en/latest/topics/channel_layers.html
https://channels.readthedocs.io/en/latest/topics/worker.html
#最後に
Asyncが使いやすいNode.jsなどでプロジェクトを作ったら楽だったと思います。
ですがDjangoの多く含まれてる機能(セキュリティー、ユーザー管理、Formsなど)を使いたかったのでDjangoとDjangoChannelsを使いました。
ここまで作るのにかかった時間は2週間~くらいでした。新しい事が多かったので大体はドキュメントをよんだり勉強する時間でした。
今回もここまで読んで下さってありがとうございます。