概要?
爆速とか言ってますが、個人の観測範囲でfastapiを使ってlinebot作る方法があまり紹介されていないような感じがしたので書いてみました。
ちなみに、私が開発した某夢の国のアトラクションの待ち時間やスタンバイパスの発行状況、ランダムにアトラクションを紹介してくれるLINEbotもfastapiで動いています。
あ、登録も忘れずにお願いします。
登録→https://lin.ee/SLBk5e1
アトラクションをランダムに選ぶだけでなく、待ち時間そのものも見られるような機能を追加しました!
— いんちょ〰️©️‼️ (@inchoXD) April 5, 2022
↓友達追加はこちら↓https://t.co/hFWXCOjxMe#TDR_now #TDR_info#TDR_today https://t.co/5cm6DuetN0 pic.twitter.com/8puahVBqPZ
fastapiがなんぞやって方は公式ドキュメントを読んでみると良いでしょう。Flaskっぽい書き方でサクッとAPI開発できますよ!ドキュメントも一部日本語で書いてあります。
準備
公式アカウントの準備
公式サイトを参考に行う。
サーバーの準備
LINEbotの動作を確認するにはサーバーを公開しないといけないのですが、このためにherokuを準備するのは少々手間だったりするので今回はngrokというものを使用しようと思います。
各種設定は非常に簡単なので公式の通りに設定を進めてください。
必要になるライブラリのインストール
fastapi系
fastapiのインストール
pip install fastapi
uvicornのインストール
pip install uvicorn
MessagingAPI系
genjwtkeyのインストール
アサーション用のキーペア作成時に使用します。
pip install genjwtkey
dotenvのインストール
private_keyやkidを環境変数として保存しておくのに使用します。
pip install python-dotenv
実装
クイックスタートを一部参考に実装を進めていきます。
オウム返しまでの手順
- サーバーを起動し、アクセストークンなどを発行します。
- LINEからのメッセージを待ちます。
- メッセージを受信します。
- 受信したメッセージがLINEプラットフォームから送られたことを確認するために、リクエストヘッダーの
x-line-signature
に含まれる署名を検証します。 - 正しいものであれば送られてきたテキストメッセージの内容を受信相手に向かって返信します。
- LINE上でオウム返しされる。
ディレクトリ構成
.
├── .env
├── keys
│ ├── private_key.json
│ └── public_key.json
└── main.py
ソースコード全体
GitHubに上げてあります。
main.pyの作成
チャネルアクセストークンの発行とかでしばらく出番はあまり多くはありませんが、一旦以下の内容でmain.py
を作成します。
# fastapi関係
from fastapi import FastAPI, HTTPException, Request, Header
# fastapiの実行
app = FastAPI()
チャネルアクセストークンの種類
チャネルアクセストークンには3つ種類があります。
- 任意の有効期間を指定できるチャネルアクセストークン(チャネルアクセストークンv2.1) → 使用が推奨されている。有効期限も自由に設定できて便利だが、発行数に制限があるので注意すること。
- 短期のチャネルアクセストークン → 一度発行すれば30日間は使える。発行数に制限はないし、古いものから使えなくなる。
- 長期のチャネルアクセストークン → その名の通り。無効化するまで使える。コンソールからの発行が可能。
チャネルアクセストークンv2.1の発行
publick_keyとprivate_keyの生成
プロジェクトのディレクトリにkeys
とかわかりやすい名前のディレクトリを作成して、そこに移動します。
$ python -m genjwtkey
します。するとpublic_key.json
とprivate_key.json
の二つが生成されます。
public_key.json
の中身をコピーし、コンソールにアクセスします。
アサーション署名キーの欄にある公開鍵を登録するにコピーした内容を登録し、生成されたkidを控えておきます。
.envの作成
生成したprivate_keyの内容と先ほど控えたkidを.env
に書き込みましょう。
後々、使用することになるのでコンソールからチャンネルIDとチャネルシークレットをコピーしてきてついでに書き込んでおきましょう。
.envやkeysディレクトリはgithubみたいなところに公開してはいけないし、それらの内容をコード直書きしない。
(cat keys/private_key.json > .env
みたいな感じにしてvscodeなりvimなりで一括で{ }
とか"
を削除して、:
を=
に一斉置換した後に、kid={kid}
みたいな感じで書き足すと爆速。)
.envの書き方はこんな感じ。
alg={algのvalue}
d={dのvalue}
dp={dpのvalue}
dq={dqのvalue}
e={eのvalue}
kty={ktyのvalue}
n={nのvalue}
p={pのvalue}
q={qのvalue}
qi={qiのvalue}
use={useのvalue}
kid={控えたkid}
channel_id={コンソールから拾ってきたchannel_id}
client_secret={コンソールから拾ってきたclient_secret}
main.pyで.envを読み込んでjwtを生成する関数の作成
main.pyに以下を追記します。
# jwt生成関係
import os
import jwt
from jwt.algorithms import RSAAlgorithm
import time
from dotenv import load_dotenv
# アクセストークンに関するclass
# 今後、アクセストークンの破棄方法や、別のアクセストークン発行方法について紹介する場合に機能を書き足していく。
class AccessToken:
def __init__(self):
# .envをロード
load_dotenv(override=True)
# アクセストークンの破棄などで使い回す可能性があるのでこの時点でchannel_idを.envから読み込んでおく。
self.channel_id = os.environ['channel_id']
# 短期のアクセストークンの発行などで使い回す可能性があるのでこの時点でclient_secretを.envから読み込んでおく。
self.client_secret = os.environ['client_secret']
# よく使い回すのでインスタンス変数としておくとなんとなく便利
self.base_domain = 'https://api.line.me'
# 認証用uri
self.auth_uri = f'{self.base_domain}/oauth2/v2.1'
# jwt生成関数
def encode_jwt(self, token_exp=60*60*24*30):
# token_expをデフォルトで30日に指定
# .envの内容を取得。
private_key = {
'alg':os.environ['alg'],
'd':os.environ['d'],
'dp':os.environ['dp'],
'dq':os.environ['dq'],
'e':os.environ['e'],
'kty':os.environ['kty'],
'n':os.environ['n'],
'p':os.environ['p'],
'q':os.environ['q'],
'qi':os.environ['qi'],
'use':os.environ['use']
}
# headerを以下の内容で指定
headers = {
'alg':'RS256',
'typ':'JWT',
'kid':os.environ['kid']
}
# payloadを以下の内容で指定
payload = {
'iss':self.channel_id,
'sub':self.channel_id,
'aud':f'{self.base_domain}/',
'exp':int(time.time())+(60*30),
'token_exp':token_exp
}
key = RSAAlgorithm.from_jwk(private_key)
JWT = jwt.encode(payload, key, algorithm='RSA256', headers=headers, json_encoder=None)
return JWT
# fastapiの実行
app = FastAPI()
# サーバー起動時にアクセストークンの発行を行う。
at = AccessToken()
# 今回は1時間の有効期限のアクセストークン
at.encode_jwt(token_exp=3600)
encode_jwt()
でこのあと生成させるアクセストークンの有効期限を指定できます。戻り値がJWT
になります。何も引数を指定しなければ30日の期限で生成されるようにできます。token_exp
に秒数で有効期限を指定すればその分だけ使用可能なチャネルアクセストークンv2.1を発行させることが可能になります。
発行できる数には上限があるのでちょっとお試しとか動作確認するくらいなら60分とか短めに指定しておくとチャネルアクセストークンv2.1が発行できなくなることを防ぐことができます。
もちろん、jwtにも有効期限を設けることができます。
payload
の中のexp
にてそれを行っています。jwtのアサーションの最大有効期間が30分であるため、UNIX時間に1800(s)を足しています。
チャネルアクセストークンv2.1の発行を行う関数の作成
main.py
に以下を追記します。
# アクセストークンの生成やメッセージの送信関係
import json
import requests
class AccessToken:
def __init__(self):
# 記載済みのため省略
def gen_jwt(self, token_exp=60*60*24*30):
# 記載済みのため省略
# チャネルアクセストークンv2.1の発行を行う関数
def issue_access_token(self, JWT):
# トークン発行のエンドポイントに各種パラメータをセットしてpostリクエストを行う。
# ヘッダとボディは以下のように指定する。
uri = f'{self.auth_uri}/token'
headers = {
'Content-Type':'application/x-www-form-urlencoded'
}
data = {
'grant_type':'client_credentials',
'client_assertion_type':'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion':JWT
}
r = requests.post(url=uri, headers=headers, data=data)
r_json = r.json()
# 生成に失敗したときのハンドリング。status_codeでバリデーションした方が良いかもしれない。
if not 'error' in r_json:
return r_json
else:
status_code = r.status_code
r_json['status_code'] = r.status_code
return r_json
# fastapiの実行
app = FastAPI()
# サーバー起動時にアクセストークンの発行を行う。
at = AccessToken()
# 今回は1時間の有効期限のアクセストークン
JWT = at.encode_jwt(token_exp=3600)
# ↓↓↓以下も追記する↓↓↓
# アクセストークンの発行処理。
issued_at = at.issue_access_token(JWT)
access_token = issued_at['access_token']
issue_access_token()
でチャネルアクセストークンv2.1を発行できます。引数には先ほど作成した関数の戻り値であるJWT
を入れます。作成が成功すると以下の辞書型の戻り値を得ることができます。
{
"access_token": "{アクセストークン}",
"token_type": "Bearer",
"expires_in": {アクセストークンが発酵されてから期限切れになるまでの秒数(int)},
"key_id": "{アクセストークンを識別するID}"
}
key_id
はアクセストークンを破棄したくなった場合などに使用するのでaccess_token
と一緒に破棄するまで安全な場所で保管するのが望ましい。
今回、アクセストークンの破棄や他のアクセストークンの発行方法などは紹介しない。
これで完成度が大体50%くらいです。完成まであと少し!!!
LINEからのメッセージを受け取る
やっとそれらしいところまで来ました。
これからメッセージを受信する部分に関して記述していきます。
LINEからのPOST
リクエストを受け取る
main.py
の最終行に以下を追記します。
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
# このあと色々処理を書きます。一旦割愛。
pass
上記は/line-messaging-api/handle_request
というエンドポイントでPOST
リクエストをうけつけますよ!という意味になります。正直、エンドポイントはなんでも良いのですが、Messaging API
を受け付けるエンドポイントという意味でこんな感じにしてみました。
@app.post('/line-messaging-api/handle_request')
の部分は/line-messaging-api/handle_request
でPOST
を受け付けますよという意味になります。
async def line(data:Request, x_line_signature:str=Header(None)):
の部分はRequest
をdata
に、header
をx_line_signature
に格納しますよという意味になります。
一応、これでLINEからのリクエストを受け取れるようになりました。
受信したメッセージがLINEから送られたものなのかを検証
LINEからのリクエストかどうかを識別するため、main.py
に以下を追記します。
########################################
# 以下をimport文が書いてある下に列挙する
########################################
# 署名の検証関係
import base64
import hashlib
import hmac
########################################
# 以下をAccessTokenクラスの2行したくらい下に書く
########################################
# 署名の検証に関するclass
class VerifySignature(AccessToken):
def __init__(self):
# AccessToken()でprivate_keyを読み込んでいるので継承
super().__init__()
def verify_signature(self, l_signature, data):
client_secret = self.client_secret
# ボディのダイジェスト値を求めます。
hash_ = hmac.new(client_secret.encode('utf-8'), data, hashlib.sha256).digest()
# ダイジェスト値をBase64エンコードします。
b_signature = base64.b64encode(hash_)
# 署名をutf-8にデコード
signature = b_signature.decode('utf-8')
# リクエストボディとリクエストヘッダの署名が一致するかを確かめます。
if l_signature == signature:
return True
else:
return False
# fastapiの実行
app = FastAPI()
# サーバー起動時にアクセストークンの発行を行う。
at = AccessToken()
# 今回は1時間の有効期限のアクセストークン
JWT = at.encode_jwt(token_exp=3600)
# アクセストークンの発行処理。
issued_at = at.issue_access_token(JWT)
access_token = issued_at['access_token']
# ↓↓↓以下を追記↓↓↓
# 署名の検証
vs = VerifySignature()
新たに署名を検証するため、VerifySignature
というclassを作成し、その中のverify_signature()
関数で署名の検証をします。
この関数の引数には次に記述する内容で出てくる、リクエストヘッダとリクエストボディを与えます。
データを抽出して署名を検証する
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
# ↓↓↓以下から記載↓↓↓
# Requestからbodyのデータを抽出
b_data = await data.body()
# 署名の検証
l_signature = vs.verify_signature(x_line_signature, b_data)
# 署名を検証し、一致しなければ`invalid_requests`を返す。
if not l_signature:
raise HTTPException(
status_code=400,
detail='invalid_requests',
headers={'Content-type':'application/json'}
)
# Requestからjsonを抽出
d_json = await data.json()
先ほど作成した署名を検証する関数を組み込んで検証しただけの処理を組んでいます。
fastapiでは例外としてレスポンスをカスタマイズすることができます。それがraise HTTPExceprion
の部分です。
メッセージの内容を受け取る。
ここまで、メッセージを受け取るまでの準備を行なってきました。ついにここではユーザから送られてきたメッセージを抽出します。
main.py
に以下を追記します。
#↓↓↓importが列挙されている部分の一番下に以下を記述
########################################
# 以下をVerifySignatureクラスの2行したくらい下に書く
########################################
class Message(AccessToken):
def __init__(self):
super().__init__()
# メッセージ送信用ドメイン
self.msg_uri = f'{self.base_domain}/v2/bot/message'
def send_message(self, access_token, message, send_type, reply_token):
# 後ほどこの関数の紹介&説明を行う。
pass
def recieve(self, d_json):
# d_jsonに含まれるeventsの内容を抽出する。
events = d_json['events']
# eventsの中には返信を行うのに重要なreply_tokenやメッセージの内容が含まれている場合がある。
# 順番に返事が行えるようにreply_tokenを格納するlistを先に準備しておく。
reply_tokens = list()
# 複数の返信先にメッセージを返せるように返信用メッセージを格納するlistを先に準備しておく。
msgs = list()
# eventsの中身を一つずつ取り出す。
for event in events:
# reply_tokenを抽出してlistに格納
reply_token = event['replyToken']
reply_tokens.append(reply_token)
# 受信したメッセージを抽出
message_data = event['message']
# 写真や動画の場合もあるので受信したメッセージの種類を抽出する。
messege_type = message_data['type']
# 受信内容がテキストメッセージならmessageに内容を格納
if messege_type == 'text':
message = message_data['text']
msgs.append(message)
else:
# 今は一旦pass
pass
########################################
# AccessToken関係の処理は省略
# vs = VerifySignatureの下に以下を記述
########################################
# メッセージ関係
msg = Message()
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
# Requestからbodyのデータを抽出
b_data = await data.body()
# 署名の検証
l_signature = vs.verify_signature(x_line_signature, b_data)
# 署名を検証し、一致しなければ`invalid_requests`を返す。
if not l_signature:
raise HTTPException(
status_code=400,
detail='invalid_requests',
headers={'Content-type':'application/json'}
)
# Requestからjsonを抽出
d_json = await data.json()
# ↓↓↓以下を記述↓↓↓
# 受信したメッセージの解析と返信処理を行う関数
msg.Message(d_json)
return {}
まず、メッセージ関連のMessage
クラスを作成しました。recieve()
は受け取ったメッセージの内容を解析して送信する処理を行う関数です。
送信処理に関しては次の項目で記述します。
引数に指定されたd_jsonは以下のような辞書型変数です。
{
"destination": "xxxxxxxxxx",
"events": [
{
"type": "message",
"message": {
"type": "text",
"id": "14353798921116",
"text": "Hello, world"
},
"timestamp": 1625665242211,
"source": {
"type": "user",
"userId": "U80696558e1aa831..."
},
"replyToken": "757913772c4646b784d4b7ce46d12671",
"mode": "active"
},...
]
}
event
には以上のような情報が含まれているのでreply_token
とmessage
の内容を抽出してあげればオウム返し機能を実現させることができます。
返信の処理をする
返信を行うにはmain.py
に以下の内容を書き加えます。
# メッセージ送信関数
def send_message(self, access_token, message, send_type, reply_token):
# ↓↓↓以下の内容を記述↓↓↓
# 返信用メッセージの生成。
# 同一reply_tokenに5個までメッセージを送信できる。
# 今回はreply_token1つにつき1つのメッセージを返信する。
content = [
{
'type':'text',
'text':message
}
]
# メッセージ送信用エンドポイントの指定
uri = f'{self.msg_uri}/{send_type}'
# アクセストークンをAutorizationのなかにBearer形式で指定
headers = {
'Content-Type':'application/json',
'Authorization':f'Bearer {access_token}'
}
# 返信用のメッセージデータをreply_tokenと一緒に指定
data = {
'messages':content,
'replyToken':reply_token
}
# メッセージの返信
r = requests.post(url=uri, headers=headers, data=json.dumps(data))
r_json = r.json()
# エラーハンドリング
if not 'error' in r_json:
return r_json
else:
status_code = r.status_code
r_json['status_code'] = r.status_code
return r_json
# 受信内容解析&返信実行関数
def recieve(self, d_json):
# d_jsonに含まれるeventsの内容を抽出する。
events = d_json['events']
# eventsの中には返信を行うのに重要なreply_tokenやメッセージの内容が含まれている場合がある。
# 順番に返事が行えるようにreply_tokenを格納するlistを先に準備しておく。
reply_tokens = list()
# 複数の返信先にメッセージを返せるように返信用メッセージを格納するlistを先に準備しておく。
msgs = list()
# eventsの中身を一つずつ取り出す。
for event in events:
# reply_tokenを抽出してlistに格納
reply_token = event['replyToken']
reply_tokens.append(reply_token)
# 受信したメッセージを抽出
message_data = event['message']
# 写真や動画の場合もあるので受信したメッセージの種類を抽出する。
messege_type = message_data['type']
# 受信内容がテキストメッセージならmessageに内容を格納
if messege_type == 'text':
message = message_data['text']
msgs.append(message)
else:
# 今は一旦pass
pass
# ↓↓↓以下を記述↓↓↓
# 返信処理の実行
for reply_token in reply_tokens:
# index番号の取得
index = reply_tokens.index(reply_token)
# reply_tokenのindex番号に対応するmessageを取得
msg = msgs[index]
# access_tokenなどを引数にしてメッセージの返信処理関数を実行
self.send_message(access_token, msg, 'reply', reply_token)
send_message()
で返信の処理を行います。この関数の引数にreceive()
で取得したreply_token
や遥かなたで取得したaccess_token
を引数として指定することで返信を行うことができるようになります。あとは、receive()
を然るべき場所で呼び出してあげればオウム返しbotの完成です。
msg.receive()
を呼び出す。
main.py
に以下の内容を追記します。
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
# Requestからbodyのデータを抽出
b_data = await data.body()
# 署名の検証
l_signature = vs.verify_signature(x_line_signature, b_data)
# 署名を検証し、一致しなければ`invalid_requests`を返す。
if not l_signature:
raise HTTPException(
status_code=400,
detail='invalid_requests',
headers={'Content-type':'application/json'}
)
# Requestからjsonを抽出
d_json = await data.json()
# ↓↓↓以下を記載↓↓↓
# 受け取ったメッセージを返信処理する関数を実行
msg.recieve(d_json)
# 空のレスポンスを返す
return {}
最後に、recieve()
関数を呼び出してオウム返しbotの完成です!!!
サーバーの起動
ターミナルを開き、
$ uvicorn main:app --reload
$ ngrok http 8000
以上二つを実行します。
http://127.0.0.1:4040/inspect/http にアクセスし、https://
から始まるURLをコピーし、LINE DevelopersコンソールのMessaging API設定内のWebhook設定に{先ほどコピーしたurl}/line-messaging-api/handle_request
(/line-messaging-api/handle_request
は作成したエンドポイント)を貼り付け保存すればLINEでデバッグができるようになります。
完成
こんな感じで動けば成功です。
眠れないのでオウム返しbotをどれくらいの時間で作れるかRTAしてみた。
— いんちょ〰️©️‼️ (@inchoXD) April 6, 2022
(ちゃんと測ってないけど1時間かからないくらいでサクッと終わった) pic.twitter.com/mACpiuPdhc