11
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

fastapiとngrokを使って爆速でオウム返しLINEbotを作る

Last updated at Posted at 2022-04-06

概要?

爆速とか言ってますが、個人の観測範囲でfastapiを使ってlinebot作る方法があまり紹介されていないような感じがしたので書いてみました。
ちなみに、私が開発した某夢の国のアトラクションの待ち時間やスタンバイパスの発行状況、ランダムにアトラクションを紹介してくれるLINEbotもfastapiで動いています。
あ、登録も忘れずにお願いします。
登録→https://lin.ee/SLBk5e1

fastapiがなんぞやって方は公式ドキュメントを読んでみると良いでしょう。Flaskっぽい書き方でサクッとAPI開発できますよ!ドキュメントも一部日本語で書いてあります。

準備

公式アカウントの準備

公式サイトを参考に行う。

サーバーの準備

LINEbotの動作を確認するにはサーバーを公開しないといけないのですが、このためにherokuを準備するのは少々手間だったりするので今回はngrokというものを使用しようと思います。
各種設定は非常に簡単なので公式の通りに設定を進めてください。

必要になるライブラリのインストール

fastapi系

fastapiのインストール

pip install fastapi

uvicornのインストール

pip install uvicorn

MessagingAPI系

genjwtkeyのインストール
アサーション用のキーペア作成時に使用します。

pip install genjwtkey

dotenvのインストール
private_keyやkidを環境変数として保存しておくのに使用します。

pip install python-dotenv

実装

クイックスタートを一部参考に実装を進めていきます。

オウム返しまでの手順

  1. サーバーを起動し、アクセストークンなどを発行します。
  2. LINEからのメッセージを待ちます。
  3. メッセージを受信します。
  4. 受信したメッセージがLINEプラットフォームから送られたことを確認するために、リクエストヘッダーのx-line-signatureに含まれる署名を検証します。
  5. 正しいものであれば送られてきたテキストメッセージの内容を受信相手に向かって返信します。
  6. LINE上でオウム返しされる。

ディレクトリ構成

.
├── .env
├── keys
│   ├── private_key.json
│   └── public_key.json
└── main.py

ソースコード全体

GitHubに上げてあります。

main.pyの作成

チャネルアクセストークンの発行とかでしばらく出番はあまり多くはありませんが、一旦以下の内容でmain.pyを作成します。

Python main.py
# fastapi関係
from fastapi import FastAPI, HTTPException, Request, Header


# fastapiの実行
app = FastAPI()

チャネルアクセストークンの種類

チャネルアクセストークンには3つ種類があります。

  1. 任意の有効期間を指定できるチャネルアクセストークン(チャネルアクセストークンv2.1) → 使用が推奨されている。有効期限も自由に設定できて便利だが、発行数に制限があるので注意すること。
  2. 短期のチャネルアクセストークン → 一度発行すれば30日間は使える。発行数に制限はないし、古いものから使えなくなる。
  3. 長期のチャネルアクセストークン → その名の通り。無効化するまで使える。コンソールからの発行が可能。

チャネルアクセストークンv2.1の発行

publick_keyとprivate_keyの生成

プロジェクトのディレクトリにkeysとかわかりやすい名前のディレクトリを作成して、そこに移動します。

$ python -m genjwtkey

します。するとpublic_key.jsonprivate_key.jsonの二つが生成されます。
public_key.jsonの中身をコピーし、コンソールにアクセスします。
アサーション署名キーの欄にある公開鍵を登録するにコピーした内容を登録し、生成されたkidを控えておきます。

.envの作成

生成したprivate_keyの内容と先ほど控えたkidを.envに書き込みましょう。
後々、使用することになるのでコンソールからチャンネルIDチャネルシークレットをコピーしてきてついでに書き込んでおきましょう。
.envやkeysディレクトリはgithubみたいなところに公開してはいけないし、それらの内容をコード直書きしない。
(cat keys/private_key.json > .env みたいな感じにしてvscodeなりvimなりで一括で{ }とか"を削除して、:=に一斉置換した後に、kid={kid}みたいな感じで書き足すと爆速。)
.envの書き方はこんな感じ。

alg={algのvalue}
d={dのvalue}
dp={dpのvalue}
dq={dqのvalue}
e={eのvalue}
kty={ktyのvalue}
n={nのvalue}
p={pのvalue}
q={qのvalue}
qi={qiのvalue}
use={useのvalue}
kid={控えたkid}
channel_id={コンソールから拾ってきたchannel_id}
client_secret={コンソールから拾ってきたclient_secret}

main.pyで.envを読み込んでjwtを生成する関数の作成

main.pyに以下を追記します。

Python main.py
# jwt生成関係
import os
import jwt
from jwt.algorithms import RSAAlgorithm
import time
from dotenv import load_dotenv

# アクセストークンに関するclass
# 今後、アクセストークンの破棄方法や、別のアクセストークン発行方法について紹介する場合に機能を書き足していく。
class AccessToken:
    def __init__(self):
        # .envをロード
        load_dotenv(override=True)
        # アクセストークンの破棄などで使い回す可能性があるのでこの時点でchannel_idを.envから読み込んでおく。
        self.channel_id = os.environ['channel_id']
        # 短期のアクセストークンの発行などで使い回す可能性があるのでこの時点でclient_secretを.envから読み込んでおく。
        self.client_secret = os.environ['client_secret']
        # よく使い回すのでインスタンス変数としておくとなんとなく便利
        self.base_domain = 'https://api.line.me'
        # 認証用uri
        self.auth_uri = f'{self.base_domain}/oauth2/v2.1'


    # jwt生成関数
    def encode_jwt(self, token_exp=60*60*24*30):
        # token_expをデフォルトで30日に指定
        # .envの内容を取得。
        private_key = {
                'alg':os.environ['alg'],
                'd':os.environ['d'],
                'dp':os.environ['dp'],
                'dq':os.environ['dq'],
                'e':os.environ['e'],
                'kty':os.environ['kty'],
                'n':os.environ['n'],
                'p':os.environ['p'],
                'q':os.environ['q'],
                'qi':os.environ['qi'],
                'use':os.environ['use']
                }

        # headerを以下の内容で指定
        headers = {
                'alg':'RS256',
                'typ':'JWT',
                'kid':os.environ['kid']
                }

        # payloadを以下の内容で指定
        payload = {
                'iss':self.channel_id,
                'sub':self.channel_id,
                'aud':f'{self.base_domain}/',
                'exp':int(time.time())+(60*30),
                'token_exp':token_exp
                }

        key = RSAAlgorithm.from_jwk(private_key)
        JWT = jwt.encode(payload, key, algorithm='RSA256', headers=headers, json_encoder=None)

        return JWT


# fastapiの実行
app = FastAPI()

# サーバー起動時にアクセストークンの発行を行う。
at = AccessToken()
# 今回は1時間の有効期限のアクセストークン
at.encode_jwt(token_exp=3600)

encode_jwt()でこのあと生成させるアクセストークンの有効期限を指定できます。戻り値がJWTになります。何も引数を指定しなければ30日の期限で生成されるようにできます。token_expに秒数で有効期限を指定すればその分だけ使用可能なチャネルアクセストークンv2.1を発行させることが可能になります。
発行できる数には上限があるのでちょっとお試しとか動作確認するくらいなら60分とか短めに指定しておくとチャネルアクセストークンv2.1が発行できなくなることを防ぐことができます。

もちろん、jwtにも有効期限を設けることができます。
payloadの中のexpにてそれを行っています。jwtのアサーションの最大有効期間が30分であるため、UNIX時間に1800(s)を足しています。

チャネルアクセストークンv2.1の発行を行う関数の作成

main.pyに以下を追記します。

Python main.py
# アクセストークンの生成やメッセージの送信関係
import json
import requests

class AccessToken:
    def __init__(self):
        # 記載済みのため省略


    def gen_jwt(self, token_exp=60*60*24*30):
        # 記載済みのため省略


    # チャネルアクセストークンv2.1の発行を行う関数
    def issue_access_token(self, JWT):
        # トークン発行のエンドポイントに各種パラメータをセットしてpostリクエストを行う。
        # ヘッダとボディは以下のように指定する。        
        uri = f'{self.auth_uri}/token'
        headers = {
                'Content-Type':'application/x-www-form-urlencoded'
                }
        data = {
                'grant_type':'client_credentials',
                'client_assertion_type':'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
                'client_assertion':JWT
                }

        r = requests.post(url=uri, headers=headers, data=data)
        r_json = r.json()

        # 生成に失敗したときのハンドリング。status_codeでバリデーションした方が良いかもしれない。
        if not 'error' in r_json:
            return r_json

        else:
            status_code = r.status_code
            r_json['status_code'] = r.status_code
            return r_json


# fastapiの実行
app = FastAPI()

# サーバー起動時にアクセストークンの発行を行う。
at = AccessToken()
# 今回は1時間の有効期限のアクセストークン
JWT = at.encode_jwt(token_exp=3600)
# ↓↓↓以下も追記する↓↓↓
# アクセストークンの発行処理。
issued_at = at.issue_access_token(JWT)
access_token = issued_at['access_token']

issue_access_token()でチャネルアクセストークンv2.1を発行できます。引数には先ほど作成した関数の戻り値であるJWTを入れます。作成が成功すると以下の辞書型の戻り値を得ることができます。

{
    "access_token": "{アクセストークン}",
    "token_type": "Bearer",
    "expires_in": {アクセストークンが発酵されてから期限切れになるまでの秒数(int)},
    "key_id": "{アクセストークンを識別するID}"
}

key_idはアクセストークンを破棄したくなった場合などに使用するのでaccess_tokenと一緒に破棄するまで安全な場所で保管するのが望ましい。
今回、アクセストークンの破棄や他のアクセストークンの発行方法などは紹介しない。

これで完成度が大体50%くらいです。完成まであと少し!!!

LINEからのメッセージを受け取る

やっとそれらしいところまで来ました。
これからメッセージを受信する部分に関して記述していきます。

LINEからのPOSTリクエストを受け取る

main.pyの最終行に以下を追記します。

Python main.py
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
    # このあと色々処理を書きます。一旦割愛。
    pass

上記は/line-messaging-api/handle_requestというエンドポイントでPOSTリクエストをうけつけますよ!という意味になります。正直、エンドポイントはなんでも良いのですが、Messaging APIを受け付けるエンドポイントという意味でこんな感じにしてみました。

@app.post('/line-messaging-api/handle_request')

の部分は/line-messaging-api/handle_requestPOSTを受け付けますよという意味になります。

async def line(data:Request, x_line_signature:str=Header(None)):

の部分はRequestdataに、headerx_line_signatureに格納しますよという意味になります。
一応、これでLINEからのリクエストを受け取れるようになりました。

受信したメッセージがLINEから送られたものなのかを検証

LINEからのリクエストかどうかを識別するため、main.pyに以下を追記します。

Python main.py
########################################
# 以下をimport文が書いてある下に列挙する
########################################
# 署名の検証関係
import base64
import hashlib
import hmac

########################################
# 以下をAccessTokenクラスの2行したくらい下に書く
########################################
# 署名の検証に関するclass
class VerifySignature(AccessToken):
    def __init__(self):
        # AccessToken()でprivate_keyを読み込んでいるので継承
        super().__init__()


    def verify_signature(self, l_signature, data):
        client_secret = self.client_secret
        # ボディのダイジェスト値を求めます。
        hash_ = hmac.new(client_secret.encode('utf-8'), data, hashlib.sha256).digest()
        # ダイジェスト値をBase64エンコードします。
        b_signature = base64.b64encode(hash_)
        # 署名をutf-8にデコード
        signature = b_signature.decode('utf-8')

        # リクエストボディとリクエストヘッダの署名が一致するかを確かめます。
        if l_signature == signature:
            return True

        else:
            return False


# fastapiの実行
app = FastAPI()

# サーバー起動時にアクセストークンの発行を行う。
at = AccessToken()
# 今回は1時間の有効期限のアクセストークン
JWT = at.encode_jwt(token_exp=3600)
# アクセストークンの発行処理。
issued_at = at.issue_access_token(JWT)
access_token = issued_at['access_token']

# ↓↓↓以下を追記↓↓↓
# 署名の検証
vs = VerifySignature()

新たに署名を検証するため、VerifySignatureというclassを作成し、その中のverify_signature()関数で署名の検証をします。
この関数の引数には次に記述する内容で出てくる、リクエストヘッダリクエストボディを与えます。

データを抽出して署名を検証する

Python main.py
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
    # ↓↓↓以下から記載↓↓↓
    # Requestからbodyのデータを抽出
    b_data = await data.body()
    # 署名の検証
    l_signature = vs.verify_signature(x_line_signature, b_data)
    # 署名を検証し、一致しなければ`invalid_requests`を返す。
    if not l_signature:
        raise HTTPException(
                status_code=400,
                detail='invalid_requests',
                headers={'Content-type':'application/json'}
                )

    # Requestからjsonを抽出
    d_json = await data.json()

先ほど作成した署名を検証する関数を組み込んで検証しただけの処理を組んでいます。
fastapiでは例外としてレスポンスをカスタマイズすることができます。それがraise HTTPExceprionの部分です。

メッセージの内容を受け取る。

ここまで、メッセージを受け取るまでの準備を行なってきました。ついにここではユーザから送られてきたメッセージを抽出します。
main.pyに以下を追記します。

Python main.py
#↓↓↓importが列挙されている部分の一番下に以下を記述
########################################
# 以下をVerifySignatureクラスの2行したくらい下に書く
########################################
class Message(AccessToken):
    def __init__(self):
        super().__init__()
        # メッセージ送信用ドメイン
        self.msg_uri = f'{self.base_domain}/v2/bot/message'

    def send_message(self, access_token, message, send_type, reply_token):
        # 後ほどこの関数の紹介&説明を行う。
        pass


    def recieve(self, d_json):
        # d_jsonに含まれるeventsの内容を抽出する。
        events = d_json['events']
        # eventsの中には返信を行うのに重要なreply_tokenやメッセージの内容が含まれている場合がある。
        # 順番に返事が行えるようにreply_tokenを格納するlistを先に準備しておく。
        reply_tokens = list()
        # 複数の返信先にメッセージを返せるように返信用メッセージを格納するlistを先に準備しておく。
        msgs = list()
        # eventsの中身を一つずつ取り出す。
        for event in events:
            # reply_tokenを抽出してlistに格納
            reply_token = event['replyToken']
            reply_tokens.append(reply_token)
            # 受信したメッセージを抽出
            message_data = event['message']
            # 写真や動画の場合もあるので受信したメッセージの種類を抽出する。
            messege_type = message_data['type']
            # 受信内容がテキストメッセージならmessageに内容を格納
            if messege_type == 'text':
                message = message_data['text']
                msgs.append(message)
            else:
                # 今は一旦pass
                pass

########################################
# AccessToken関係の処理は省略
# vs = VerifySignatureの下に以下を記述
########################################
# メッセージ関係
msg = Message()

@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
    # Requestからbodyのデータを抽出
    b_data = await data.body()
    # 署名の検証
    l_signature = vs.verify_signature(x_line_signature, b_data)
    # 署名を検証し、一致しなければ`invalid_requests`を返す。
    if not l_signature:
        raise HTTPException(
                status_code=400,
                detail='invalid_requests',
                headers={'Content-type':'application/json'}
                )

    # Requestからjsonを抽出
    d_json = await data.json()
    # ↓↓↓以下を記述↓↓↓
    # 受信したメッセージの解析と返信処理を行う関数
    msg.Message(d_json)
    return {}

まず、メッセージ関連のMessageクラスを作成しました。recieve()は受け取ったメッセージの内容を解析して送信する処理を行う関数です。
送信処理に関しては次の項目で記述します。
引数に指定されたd_jsonは以下のような辞書型変数です。

{
    "destination": "xxxxxxxxxx",
    "events": [
        {
            "type": "message",
            "message": {
                "type": "text",
                "id": "14353798921116",
                "text": "Hello, world"
            },
            "timestamp": 1625665242211,
            "source": {
                "type": "user",
                "userId": "U80696558e1aa831..."
            },
            "replyToken": "757913772c4646b784d4b7ce46d12671",
            "mode": "active"
        },...
    ]
}

eventには以上のような情報が含まれているのでreply_tokenmessageの内容を抽出してあげればオウム返し機能を実現させることができます。

返信の処理をする

返信を行うにはmain.pyに以下の内容を書き加えます。

Python main.py
    # メッセージ送信関数
    def send_message(self, access_token, message, send_type, reply_token):
        # ↓↓↓以下の内容を記述↓↓↓
        # 返信用メッセージの生成。
        # 同一reply_tokenに5個までメッセージを送信できる。
        # 今回はreply_token1つにつき1つのメッセージを返信する。
        content = [
                {
                    'type':'text',
                    'text':message
                    }
                ]

        # メッセージ送信用エンドポイントの指定
        uri = f'{self.msg_uri}/{send_type}'

        # アクセストークンをAutorizationのなかにBearer形式で指定
        headers = {
                'Content-Type':'application/json',
                'Authorization':f'Bearer {access_token}'
                }

        # 返信用のメッセージデータをreply_tokenと一緒に指定
        data = {
                'messages':content,
                'replyToken':reply_token
                }

        # メッセージの返信
        r = requests.post(url=uri, headers=headers, data=json.dumps(data))
        r_json = r.json()

        # エラーハンドリング
        if not 'error' in r_json:
            return r_json

        else:
            status_code = r.status_code
            r_json['status_code'] = r.status_code
            return r_json


    # 受信内容解析&返信実行関数
    def recieve(self, d_json):
        # d_jsonに含まれるeventsの内容を抽出する。
        events = d_json['events']
        # eventsの中には返信を行うのに重要なreply_tokenやメッセージの内容が含まれている場合がある。
        # 順番に返事が行えるようにreply_tokenを格納するlistを先に準備しておく。
        reply_tokens = list()
        # 複数の返信先にメッセージを返せるように返信用メッセージを格納するlistを先に準備しておく。
        msgs = list()
        # eventsの中身を一つずつ取り出す。
        for event in events:
            # reply_tokenを抽出してlistに格納
            reply_token = event['replyToken']
            reply_tokens.append(reply_token)
            # 受信したメッセージを抽出
            message_data = event['message']
            # 写真や動画の場合もあるので受信したメッセージの種類を抽出する。
            messege_type = message_data['type']
            # 受信内容がテキストメッセージならmessageに内容を格納
            if messege_type == 'text':
                message = message_data['text']
                msgs.append(message)
            else:
                # 今は一旦pass
                pass

        # ↓↓↓以下を記述↓↓↓
        # 返信処理の実行
        for reply_token in reply_tokens:
            # index番号の取得
            index = reply_tokens.index(reply_token)
            # reply_tokenのindex番号に対応するmessageを取得
            msg = msgs[index]
            # access_tokenなどを引数にしてメッセージの返信処理関数を実行
            self.send_message(access_token, msg, 'reply', reply_token) 

send_message()で返信の処理を行います。この関数の引数にreceive()で取得したreply_tokenや遥かなたで取得したaccess_tokenを引数として指定することで返信を行うことができるようになります。あとは、receive()を然るべき場所で呼び出してあげればオウム返しbotの完成です。

msg.receive()を呼び出す。

main.pyに以下の内容を追記します。

Python main.py
@app.post('/line-messaging-api/handle_request')
async def line(data:Request, x_line_signature:str=Header(None)):
    # Requestからbodyのデータを抽出
    b_data = await data.body()
    # 署名の検証
    l_signature = vs.verify_signature(x_line_signature, b_data)
    # 署名を検証し、一致しなければ`invalid_requests`を返す。
    if not l_signature:
        raise HTTPException(
                status_code=400,
                detail='invalid_requests',
                headers={'Content-type':'application/json'}
                )

    # Requestからjsonを抽出
    d_json = await data.json()
    # ↓↓↓以下を記載↓↓↓
    # 受け取ったメッセージを返信処理する関数を実行
    msg.recieve(d_json)
    # 空のレスポンスを返す
    return {}

最後に、recieve()関数を呼び出してオウム返しbotの完成です!!!

サーバーの起動

ターミナルを開き、

$ uvicorn main:app --reload
$ ngrok http 8000

以上二つを実行します。
http://127.0.0.1:4040/inspect/http にアクセスし、https://から始まるURLをコピーし、LINE DevelopersコンソールのMessaging API設定内Webhook設定{先ほどコピーしたurl}/line-messaging-api/handle_request(/line-messaging-api/handle_requestは作成したエンドポイント)を貼り付け保存すればLINEでデバッグができるようになります。

完成

こんな感じで動けば成功です。

11
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?