LoginSignup
6
0
お題は不問!Qiita Engineer Festa 2023で記事投稿!

DiscordとLINEをPython+FastAPI+Dockerで連携させる【番外編】音声ファイルの送受信

Posted at

挨拶

こんにちは、マグロです。
前回、一通りのやり取りを実装しました。

これで一応完成ではあるのですが、今回は番外編として音声ファイルの送受信を行います。

背景

LINEにはボイスメッセージ機能が存在し、APIの方でも送受信に関する方法が記載されています。

送信

受信

ですが、連携をする際に以下のような問題に直面しました。

  • Discordのファイルアップロード上限は8MB(画像はGyazo,動画はYouTubeを使用して回避)
  • LINEに音声を送信できるファイルはm4aのみ
  • 音声を保存し、共有できるサービスが見当たらない

上記に加えて音声の送受信は携帯で録音して送るということが多く、PCユーザーの多いDiscordでは需要がないと考え、実装を見送っていました。

しかし2~3ヶ月ほど前、Discordにこんなアナウンスが流れました。

Discord側がモバイル向けに音声メッセージの機能を付け加えるとの声明が出ました。
API対応は執筆時点でまだされていないようですが、音声ファイルの送受信に需要があると判断したのでしょう。
風向きが変わり始めました。

そして、、、

ついにファイルの送信上限が25MBに拡張されることがアナウンスされました。
動画ファイルはともかく、音声ファイルの送受信には十分な容量です。

というわけで、音声ファイルの送受信を実装します。

設計

今までで一番の曲者です。
まずLINEに送る場合は音声ファイルをm4aに変換する必要があります。
音声ファイルの変換にはffmpegを使用します。
第一回でDockerでインストールさせてあるため、特にすることはありません。
流れは以下の通りです。

Discord→LINE

  • 前回と同様にDiscord側のファイル拡張子から音声ファイルを判別。
  • ffmpegでm4aに変換。
  • 変換したm4aファイルをDiscordにアップロード。
  • アップロードしたファイルのURLを取得し、LINEBotで送信。

何度も言いますがLINEはファイルをバイナリデータとして扱います。
ファイルの種類はimage,video,audioといった形でLINE側から送られてきますが、拡張子まではわかりません。
ファイルの送信にはDiscordAPIを直接使用してアップロードしますが、拡張子を含めたファイル名を送信の際に求められます。

ではどうするかというと、バイナリデータから判別します。
ファイルのバイナリにはマジックナンバーと呼ばれるファイルの拡張子を示すものが存在します。
詳しく説明すると大きく話がそれるので、気になった方は調べてみてください。
流れは以下のようになります。

LINE→Discord

  • ファイルのバイナリデータを取得。
  • バイナリデータからマジックナンバーを識別、該当する拡張子をファイル名の末尾に置く。
  • DiscordAPIを使用してファイルをアップロード。

コーデイング

ディレクトリ構成

$ tree
.
├── app
│   ├── cogs
│   │   └── mst_line.py  # DiscordからLINEへ
│   ├── core
│   │   └── start.py     # DiscordBot起動用
│   ├── message_type
│   │   ├── discord_type
│   │   │   ├── discord_type.py     # Discordのサーバーに関するクラス
│   │   │   └── message_creater.py  # DiscordAPIを直接叩く
│   │   ├── line_type
│   │   │   ├── line_type.py        # LINEのプロフィールなどのクラス
│   │   │   ├── line_event.py       # LINEのイベントに関するクラス
│   │   │   └── line_message.py     # LINEのメッセージに関するクラス
│   │   ├── file_type.py
│   │   └── youtube_upload.py
│   ├── server.py       # サーバー立ち上げ
│   └── main.py  
├── Dockerfile
├── Profile
└── requirements.txt  

file_type.py

file_type.py
import io
import os

class Audio_Files:
    """
    ファイルのバイナリのデータを格納するクラス
    
    param:
    byte:bytes
    ファイルのバイトデータ

    filename:str
    ファイル名、拡張子は付けても付けなくてもいい

    付かなかった場合マジックナンバーから推測する
    """
    def __init__(
        self,
        byte:bytes,
        filename:str = None
    )-> None:
        """
        Discordのファイルの送信を行う際のクラス

        param:
        byte:bytes
        ファイルのバイナリデータ

        iobyte:io.BytesIO
        ファイルのバイナリデータ

        filename:str
        ファイル名、拡張子も付ける

        ついていない場合はマジックナンバーから推測する
        """
        self.byte = byte
        self.iobyte = io.BytesIO(byte)
        if len(os.path.splitext(filename)[1]) == 0:
            extension = self.detect_audio_file()
            self.filename = filename + extension.capitalize()
        else:
            self.filename = filename
        
    def detect_audio_file(self) -> str:
        """
        バイナリデータのマジックナンバーから音声ファイルの拡張子を識別する。

        param:
        file_byte:io.BytesIO
        ファイルのバイナリデータ

        return
        拡張子の文字列:str
        """
        header = self.iobyte.read(12)
            
        # AIFFファイルのマジックナンバー
        if header.startswith(b'FORM') and header[8:12] == b'AIFF':
            return '.aiff'
            
        # AIFF-Cファイルのマジックナンバー
        if header.startswith(b'FORM') and header[8:12] == b'AIFC':
            return '.aifc'
            
        # WAVEファイルのマジックナンバー
        if header.startswith(b'RIFF') and header[8:12] == b'WAVE':
            return '.wav'
            
        # MP3ファイルのマジックナンバー
        if header.startswith(b'\xFF\xFB') or header.startswith(b'\xFF\xF3') or \
        header.startswith(b'\xFF\xF2') or header.startswith(b'\xFF\xF4'):
            return '.mp3'

        # FLACファイルのマジックナンバー
        if header.startswith(b'fLaC'):
            return '.flac'

        # OGG Vorbisファイルのマジックナンバー
        if header.startswith(b'OggS') and header[28:31] == b'vorb':
            return '.ogg'

        # AACファイルのマジックナンバー
        if header.startswith(b'\xFF\xF1') or header.startswith(b'\xFF\xF9'):
            return '.aac'

        # AC-3ファイルのマジックナンバー
        if header.startswith(b'\x0B\x77') or header.startswith(b'\x77\x0B'):
            return '.ac3'

        # AMRファイルのマジックナンバー
        if header.startswith(b'#!AMR'):
            return '.amr'

        # GSMファイルのマジックナンバー
        if header.startswith(b'\x00\x01\x00\x01'):
            return '.gsm'
        
        # M4Aのマジックナンバー(LINEボイスメッセージの標準規格)
        if header.startswith(b'\x00\x00\x00\x1c') and header[8:12] == b'M4A ':
            return '.m4a'

音声ファイルのマジックナンバーは先頭の数バイトから識別することができます。
マジックナンバーの詳細は省きます。

message_creater.py

message_creater.py
import os
import re
+ import io

import aiohttp
import requests
import asyncio
import time

from dotenv import load_dotenv
load_dotenv()

from functools import partial
from typing import List,Tuple

import asyncio

from message_type.discord_type.discord_type import Discord_Member,Discord_Role,Discord_Channel
+ from message_type.file_type import Audio_Files

class ReqestDiscord:
    def __init__(self, guild_id: int, limit: int, token: str) -> None:
        self.guild_id = guild_id
        self.limit = limit
        self.headers = {
            'Authorization': f'Bot {token}',
            'Content-Type': 'application/x-www-form-urlencoded',
        }
+       # ファイルアップロードの際にContent-Typeが邪魔になるので取り除く
+       self.no_content_headers = {
+           'Authorization': f'Bot {token}'
+       }

    async def member_get(self) -> List[Discord_Member]:
        """
        サーバーのユーザーを取得する。
        戻り値
        Discord_Member
        """
        
        async with aiohttp.ClientSession() as session:
            async with session.get(
                url = f'https://discordapp.com/api/guilds/{self.guild_id}/members?limit={self.limit}',
                headers = self.headers
            ) as resp:
                # 取得したユーザー情報を展開
                res = await resp.json()
                member_list = []
                for member in res:
                    r = Discord_Member.new_from_json_dict(member)
                    member_list.append(r)
        
        return member_list
            

    async def role_get(self) -> List[Discord_Role]:
        """
        ロールを取得する。
        戻り値
        Discord_Role
        """

        async with aiohttp.ClientSession() as session:
            async with session.get(
                url = f'https://discordapp.com/api/guilds/{self.guild_id}/roles',
                headers = self.headers
            ) as resp:
                # 取得したロール情報を取得
                res = await resp.json()
                role_list = []
                for role in res:
                    r = Discord_Role.new_from_json_dict(role)
                    role_list.append(r)

        return role_list

    async def channel_get(self) -> List[Discord_Channel]:
        """
        チャンネルを取得する。
        戻り値
        Discord_Channel
        """

        async with aiohttp.ClientSession() as session:
            async with session.get(
                url = f'https://discordapp.com/api/guilds/{self.guild_id}/channels',
                headers = self.headers
            ) as resp:
                # 取得したチャンネルを展開
                res = await resp.json()
                channel_list = []
                for channel in res:
                    r = Discord_Channel.new_from_json_dict(channel)
                    channel_list.append(r)

        return channel_list

    async def members_find(self, message: str) -> str:
        """
        テキストメッセージのメンションを変換する。
        @ユーザ名#4桁の数字#member → @ユーザ名

        戻り値
        message      変更後の文字列: str
        """
        
        # @{空白以外の0文字以上}#{0以上の数字}#member
        member_mention_list = re.findall("@\S*?#\d*?#member",message,re.S)

        if not member_mention_list:
            return message
        
        get_member_list = await self.member_get()

        for member in get_member_list:
            # ユーザー名の空白文字を削除
            member.user.username = re.sub("[\u3000 \t]", "",member.user.username)

            # メッセージに「@{ユーザー名}#{4桁の数字}member」が含まれていた場合
            if f'@{member.user.username}#{member.user.discreminator}#member' in member_mention_list:
                message = message.replace(f'@{member.user.username}#{member.user.discreminator}#member',f'<@{member.user.id}>')
                member_mention_list = [
                    user for user in member_mention_list 
                    if user != f'@{member.user.username}#{member.user.discreminator}#member'
                ]
            if not member_mention_list:
                return message

        return message


    async def roles_find(self, message: str) -> str:
        """
        テキストメッセージのメンションを変換する。
        @ロール名#role → @ロール名

        戻り値
        message      変更後の文字列: str
        """
        
        role_list = re.findall("@\S*?#role",message,re.S)

        if not role_list:
            return message
        
        get_role_list = await self.role_get()

        for role in get_role_list:
            # ロール名の空白文字を削除
            role.name = re.sub("[\u3000 \t]", "",role.name)

            # メッセージに「@{ロール名}#role」が含まれていた場合
            if f'@{role.name}#role' in role_list:
                message = message.replace(f'@{role.name}#role',f'<@&{role.id}>')
                role_list = [
                    rolename for rolename in role_list 
                    if rolename != f'@{role.name}#role'
                ]
            if not role_list:
                return message

        return message
                
        
    async def channel_select(self, channel_id: int, message: str) -> Tuple[int,str]:
        """
        テキストメッセージから送信場所を読み取り変更する。
        テキストチャンネルのみ可能。
        /チャンネル名#channel → 削除

        戻り値
        channel_id      送信先のチャンネル      :id
        message         指定したチャンネル名    :str
        """
        
        channel_list = re.findall("\A/\S*?#channel",message,re.S)

        if not channel_list or message.find('/') != 0:
            return channel_id, message
        
        get_channel_list = await self.channel_get()

        for channel in get_channel_list:
            # チャンネル名の空白文字を削除
            channel.name = re.sub("[\u3000 \t]", "",channel.name)

            # メッセージの先頭に「/{チャンネル名}#channel」が含まれていた場合
            if message.find(f'/{channel.name}#channel') == 0 and channel.type == 0:
                message = message.lstrip(f'/{channel.name}#channel')
                channel_id = channel.id
                return channel_id, message

        return channel_id, message

    async def send_discord(self, channel_id: int, message: str):
        """
        Discordへメッセージを送信する。

        channel_id  :int
            Discordのテキストチャンネルのid
        message     :str
            テキストメッセージ
        """
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                url = f'https://discordapp.com/api/channels/{channel_id}/messages',
                headers = self.headers,data = {'content': f'{message}'}
            ) as resp:
                return await resp.json()

+   async def send_discord_file(
+       self, 
+       channel_id: int, 
+       message: str, 
+       fileobj:Audio_Files
+   ) -> Dict:
+       """
+       Discordへファイル付きのメッセージを送信する。
+       
+       channel_id  :int
+           Discordのテキストチャンネルのid
+       message     :str
+           テキストメッセージ
+       fileobj     :Audio_Files
+           音声ファイルのオブジェクト
+       """
+
+       with aiohttp.MultipartWriter("form-data") as mpwriter:
+           # ファイルを送付
+           mpwriter.append(
+               obj=io.BytesIO(fileobj.byte)
+           ).set_content_disposition(
+               disptype='form-data', 
+               name=fileobj.filename, 
+               filename=fileobj.filename
+           )
+
+           # テキストメッセージを送付
+           mpwriter.append(
+               obj=message
+           ).set_content_disposition(
+               disptype='form-data',
+               name="content"
+           )
+
+           # Discordにファイルとメッセージを送信
+           # 'Content-Type': 'application/x-www-form-urlencoded'が邪魔なので取り除く
+           async with aiohttp.ClientSession() as session:
+               async with session.post(
+                   url = f'{DISCORD_BASE_URL}/channels/{channel_id}/messages',
+                   headers = self.no_content_headers,
+                   data = mpwriter
+               ) as resp:
+                   return await resp.json()

ポイント

        with aiohttp.MultipartWriter("form-data") as mpwriter:
            # ファイルを送付
            mpwriter.append(
                obj=io.BytesIO(fileobj.byte)
            ).set_content_disposition(
                disptype='form-data', 
                name=fileobj.filename, 
                filename=fileobj.filename
            )

            # テキストメッセージを送付
            mpwriter.append(
                obj=message
            ).set_content_disposition(
                disptype='form-data',
                name="content"
            )

with aiohttp.MultipartWriter("form-data") as mpwriter:で非同期でフォームデータの作成をしています。
obj=fileobj.iobyteはファイルのバイナリデータで、set_content_dispositionで関連する情報を追加しています。name,filenameにはファイル名(拡張子含む)を入れます。

こうすることでバイナリデータのまま、Discordに音声ファイルを送信できます。

テキストメッセージはobj=messageにテキストを入れ、set_content_dispositionname="content"と指定することでテキストを送付できます。

            # Discordにファイルとメッセージを送信
            # 'Content-Type': 'application/x-www-form-urlencoded'が邪魔なので取り除く
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    url = f'{DISCORD_BASE_URL}/channels/{channel_id}/messages',
                    headers = self.no_content_headers,
                    data = mpwriter
                ) as resp:
                    return await resp.json()

aiohttpで非同期でメッセージを送信します。
application/x-www-form-urlencodedはテキストを送信する形式なのでファイル送信には使用できません。
content-typeなしで送信させます。

line_message.py

import json
import requests
from requests import Response

import datetime

import os
import asyncio
from functools import partial

import aiohttp
import subprocess
from typing import List

from dotenv import load_dotenv
load_dotenv()

from message_type.line_type.line_type import Profile,GyazoJson
from message_type.youtube_upload import YouTubeUpload
+ from message_type.file_type import Audio_Files

NOTIFY_URL = 'https://notify-api.line.me/api/notify'
NOTIFY_STATUS_URL = 'https://notify-api.line.me/api/status'
LINE_BOT_URL = 'https://api.line.me/v2/bot'
LINE_CONTENT_URL = 'https://api-data.line.me/v2/bot'

# LINEのgetリクエストを行う
async def line_get_request(url: str, token: str) -> json:
    async with aiohttp.ClientSession() as session:
        async with session.get(
            url = url,
            headers = {'Authorization': 'Bearer ' + token}
        ) as resp:
            return await resp.json()

+ class Voice_File:
+   """
+   Discordの音声ファイルのURLと秒数を格納するクラス
+
+   param:
+   url     :str
+   音声ファイルのURL
+
+   second  :float
+   音声ファイルの秒数(秒)
+   """
+   def __init__(self,url:str,second:float) -> None:
+       self.url = url
+       self.second = second


# LINEのpostリクエストを行う
async def line_post_request(url: str, headers: dict, data: dict) -> json:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            url = url,
            headers = headers,
            data = data
        ) as resp:
            return await resp.json()

class LineBotAPI:
    def __init__(self, notify_token: str, line_bot_token: str, line_group_id: str) -> None:
        self.notify_token = notify_token
        self.line_bot_token = line_bot_token
        self.line_group_id = line_group_id
        self.loop = asyncio.get_event_loop()

    # LINE Notifyでテキストメッセージを送信
    async def push_message_notify(self, message: str) -> json:
        data = {'message': f'message: {message}'}
        return await line_post_request(
            url = NOTIFY_URL, 
            headers = {'Authorization': f'Bearer {self.notify_token}'}, 
            data = data
        )
        
    # LINE Messageing APIでテキストメッセージを送信
    async def push_message(self,message_text:str) -> json:
        data = {
            'to':self.line_group_id,
            'messages':[
                {
                    'type':'text',
                    'text':message_text
                }
            ]
        }
        return await line_post_request(
            url = LINE_BOT_URL + "/message/push",
            headers = {
                'Authorization': 'Bearer ' + self.line_bot_token,
                'Content-Type': 'application/json'
            },
            data = json.dumps(data)
        )

    
    # LINE Notifyで画像を送信
    async def push_image_notify(self, message: str, image_url: str) -> Dict:
        """
        LINE Notifyで画像を送信

        param
        message:str
        送信するテキストメッセージ

        image_url:str
        送信する画像のURL

        return
        resp.json:json
        レスポンス
        """
        if len(message) == 0:
            message = "画像を送信しました。"
        data = {
            'imageThumbnail': f'{image_url}',
            'imageFullsize': f'{image_url}',
            'message': f'{message}',
        }
        return await line_post_request(
            url = NOTIFY_URL, 
            headers = {'Authorization': f'Bearer {self.notify_token}'}, 
            data = data
        )

    # 動画の送信(動画のみ)
    async def push_movie(self, preview_image: str, movie_urls: List[str]) -> Dict:
        """
        LINEBotで動画の送信(動画のみ)

        param
        preview_image:str
        プレビュー画像のURL

        movie_urls:List[str]
        動画のURL(複数)

        return
        resp.json:json
        レスポンス
        """
        data = []
        # 動画を1個ずつ格納
        for movie_url in movie_urls:
            data.append({
                "type": "video",
                "originalContentUrl": movie_url,
                "previewImageUrl": preview_image
            })
        datas = {
            "to": self.line_group_id,
            "messages": data
        }
        return await line_post_request(
            url = LINE_BOT_URL + "/message/push",
            headers = {
                'Authorization': 'Bearer ' + self.line_bot_token,
                'Content-Type': 'application/json'
            },
            data = json.dumps(datas)
        )
    
+   async def push_voice(self,voice_file:List[Voice_File]) -> Dict:
+       """
+       LINEBotで音声の送信
+
+       param
+       voice_file:List[Voice_File]
+       送信する音声ファイルのクラス
+
+       return
+       Dict
+       レスポンス
+       """
+       data = []
+       for voice in voice_file:
+           data.append({
+               'type':'audio',
+               'originalContentUrl':voice.url,
+               'duration':int(voice.second * 1000)
+           })
+       datas = {
+           'to': self.line_group_id,
+           'messages': data
+       }
+       return await line_post_request(
+           url = LINE_BOT_URL + "/message/push",
+           headers = {
+               'Authorization': 'Bearer ' + self.line_bot_token,
+               'Content-Type': 'application/json'
+           },
+           data = json.dumps(datas)
+       )

    # 送ったメッセージ数を取得
    async def totalpush(self) -> int:
        r = await line_get_request(
            LINE_BOT_URL + "/message/quota/consumption",
            self.line_bot_token
        )
        return int(r["totalUsage"])

    # LINE Notifyのステータスを取得
    async def notify_status(self) -> Response:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                url = NOTIFY_STATUS_URL,
                headers = {'Authorization': 'Bearer ' + self.notify_token}
            ) as resp:
                return resp

    # LINE Notifyの1時間当たりの上限を取得
    async def rate_limit(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-Limit')
        return int(ratelimit)

    # LINE Notifyの1時間当たりの残りの回数を取得
    async def rate_remaining(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-Remaining')
        return int(ratelimit)

    # LINE Notifyの1時間当たりの画像送信上限を取得
    async def rate_image_limit(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-ImageLimit')
        return int(ratelimit)

    # LINE Notifyの1時間当たりの残り画像送信上限を取得
    async def rate_image_remaining(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-ImageRemaining')
        return int(ratelimit)

    
    # LINEのユーザプロフィールから名前を取得
    async def get_proflie(self, user_id: str) -> Profile:
        """
        LINEのユーザプロフィールから名前を取得

        param
        user_id:str
        LINEのユーザーid

        return
        profile:Profile
        LINEユーザーのプロフィールオブジェクト
        """
        # グループIDが有効かどうか判断
        
        r = await line_get_request(
            LINE_BOT_URL + f"/group/{self.line_group_id}/member/{user_id}",
            self.line_bot_token,
        )
        
        # グループIDが無効の場合、友達から判断
        if r.get('message') != None:
            r = await line_get_request(
                LINE_BOT_URL + f"/profile/{user_id}",
                self.line_bot_token,
            )
        return await Profile.new_from_json_dict(data=r)

    # LINEから画像データを取得し、Gyazoにアップロード
    async def image_upload(self, message_id: int) -> GyazoJson:
        """
        LINEから画像データを取得し、Gyazoにアップロードする

        param
        message_id:int
        LINEのメッセージのid

        return
        gayzo:GyazoJson
        Gyazoの画像のオブジェクト
        """
        # 画像のバイナリデータを取得
        async with aiohttp.ClientSession() as session:
            async with session.get(
                    url = LINE_CONTENT_URL + f'/message/{message_id}/content',
                    headers={
                        'Authorization': 'Bearer ' + self.line_bot_token
                    }
            ) as bytes:
                image_bytes = await bytes.read()

                # Gyazoにアップロードする
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                        url = 'https://upload.gyazo.com/api/upload',
                        headers={
                            'Authorization': 'Bearer ' + os.environ['GYAZO_TOKEN'],
                        },
                        data={
                            'imagedata': image_bytes
                        }
                    ) as gyazo_image:
                        return await GyazoJson.new_from_json_dict(await gyazo_image.json())
        
    # LINEから受け取った動画を保存し、YouTubeに限定公開でアップロード
    async def movie_upload(self, message_id: int, display_name: str) -> str:
        """
        LINEから受け取った動画を保存し、YouTubeに限定公開でアップロード

        param
        message_id:int
        LINEのメッセージのid

        display_name:str
        LINEのユーザー名

        return
        youtube_id:str
        YouTubeの動画id
        """
        # 動画のバイナリデータを取得
        async with aiohttp.ClientSession() as session:
            async with session.get(
                    url = LINE_CONTENT_URL + f'/message/{message_id}/content',
                    headers={
                        'Authorization': 'Bearer ' + self.line_bot_token
                    }
            ) as bytes:

                video_bytes = await bytes.read()

                youtube_video = YouTubeUpload(
                    title=f"{display_name}からの動画",
                    description="LINEからの動画",
                    privacy_status="unlisted"
                )

                youtube = await youtube_video.get_authenticated_service()

                return await youtube_video.byte_upload(
                    video_bytes=io.BytesIO(video_bytes),
                    youtube=youtube
                )

+   # LINEから受け取った音声データを取得し、Discordにアップロード
+   async def voice_get(self ,message_id: int) -> Audio_Files:
+       """
+       LINEから受け取った音声データを取得し、Discordにアップロード
+
+       param:
+       message_id:int
+       LINEのメッセージのid
+
+       return
+       Audio_File
+       アップロードする音声データのクラス
+       """
+       # 音声のバイナリデータを取得
+       async with aiohttp.ClientSession() as session:
+           async with session.get(
+                   url = LINE_CONTENT_URL + f'/message/{message_id}/content',
+                   headers={
+                       'Authorization': 'Bearer ' + self.line_bot_token
+                   }
+           ) as bytes:
+
+               voice_bytes = await bytes.read()
+
+               # アップロードするファイルを指定する
+               return Audio_Files(
+                   byte=voice_bytes,
+                   filename='line_audio'
+               )

    # 当月に送信できるメッセージ数の上限目安を取得(基本1000,23年6月以降は200)
    async def pushlimit(self) -> str:
        r = await line_get_request(
            LINE_BOT_URL + "/message/quota",
            self.line_bot_token
        )
        return r["value"]

ポイント

class Voice_File:
    """
    Discordの音声ファイルのURLと秒数を格納するクラス

    param:
    url     :str
    音声ファイルのURL

    second  :float
    音声ファイルの秒数(秒)
    """
    def __init__(self,url:str,second:float) -> None:
        self.url = url
        self.second = second

Discordの音声ファイルのURLと秒数を格納するクラスです。
詳細は後述。

    async def push_voice(self,voice_file:List[Voice_File]) -> Dict:
        """
        LINEBotで音声の送信

        param
        voice_file:List[Voice_File]
        送信する音声ファイルのクラス

        return
        Dict
        レスポンス
        """
        data = []
        for voice in voice_file:
            data.append({
                'type':'audio',
                'originalContentUrl':voice.url,
                'duration':int(voice.second * 1000)
            })
        datas = {
            'to': self.line_group_id,
            'messages': data
        }
        return await line_post_request(
            url = LINE_BOT_URL + "/message/push",
            headers = {
                'Authorization': 'Bearer ' + self.line_bot_token,
                'Content-Type': 'application/json'
            },
            data = json.dumps(datas)
        )

LINEに音声を送信させます。
動画と同じく、リストで複数の音声を送信できます。
'duration':int(voice.second * 1000)は音声の長さ(ミリ秒)を示しています。
これがないと送信できないようです。

    # LINEから受け取った音声データを取得し、Discordにアップロード
    async def voice_get(self ,message_id: int) -> Audio_Files:
        """
        LINEから受け取った音声データを取得し、Discordにアップロード

        param:
        message_id:int
        LINEのメッセージのid

        return
        Audio_File
        アップロードする音声データのクラス
        """
        # 音声のバイナリデータを取得
        async with aiohttp.ClientSession() as session:
            async with session.get(
                    url = LINE_CONTENT_URL + f'/message/{message_id}/content',
                    headers={
                        'Authorization': 'Bearer ' + self.line_bot_token
                    }
            ) as bytes:

                voice_bytes = await bytes.read()

                # アップロードするファイルを指定する
                return Audio_Files(
                    byte=voice_bytes,
                    filename='line_audio'
                )

LINEから受け取った音声データを取得し、Audio_Filesとしてインスタンスを作ります。

mst_line.py

mst_line.py
import discord
from discord.ext import commands
import os
from typing import List,Tuple,Union

from dotenv import load_dotenv
load_dotenv()

from message_type.line_type.line_message import LineBotAPI
from core.start import DBot

class mst_line(commands.Cog):
    def __init__(self, bot : DBot):
        self.bot = bot

    # DiscordからLINEへ
    @commands.Cog.listener(name='on_message')
    async def on_message(self, message:discord.Message):

        # メッセージがbot、閲覧注意チャンネル、ピン止め、ボイスチャンネルの場合終了
        if (message.author.bot is True or
            message.channel.nsfw is True or
            message.type == discord.MessageType.pins_add or
            message.channel.type == discord.ChannelType.voice):
            return

        # FIVE_SECONDs,FIVE_HOUR
        # ACCESS_TOKEN,GUILD_ID,TEMPLE_ID (それぞれ最低限必要な環境変数)
        bots_name=os.environ['BOTS_NAME'].split(",")

        for bot_name in bots_name:
            # メッセージが送られたサーバーを探す
            if os.environ.get(f"{bot_name}_GUILD_ID") == str(message.guild.id):
                line_bot_api = LineBotAPI(
                    notify_token = os.environ.get(f'{bot_name}_NOTIFY_TOKEN'),
                    line_bot_token = os.environ[f'{bot_name}_BOT_TOKEN'],
                    line_group_id = os.environ.get(f'{bot_name}_GROUP_ID')
                )
                break

        # line_bot_apiが定義されなかった場合、終了
        # 主な原因はLINEグループを作成していないサーバーからのメッセージ
        if not bool('line_bot_api' in locals()):
            return

        # 送信NGのチャンネル名の場合、終了
        ng_channel = os.environ.get(f"{bot_name}_NG_CHANNEL").split(",")
        if message.channel.name in ng_channel:
            return

        # テキストメッセージ
        messagetext=f"{message.channel.name}にて、{message.author.name}"

        if message.type == discord.MessageType.new_member:
            messagetext=f"{message.author.name}が参加しました。"

        if message.type == discord.MessageType.premium_guild_subscription:
            messagetext=f"{message.author.name}がサーバーブーストしました。"

        if message.type == discord.MessageType.premium_guild_tier_1:
            messagetext=f"{message.author.name}がサーバーブーストし、レベル1になりました!!!!!!!!"

        if message.type == discord.MessageType.premium_guild_tier_2:
            messagetext=f"{message.author.name}がサーバーブーストし、レベル2になりました!!!!!!!!!"

        if message.type == discord.MessageType.premium_guild_tier_3:
            messagetext=f"{message.author.name}がサーバーブーストし、レベル3になりました!!!!!!!!!!!"


        # LINEに送信する動画、画像、音声ファイルのリスト
        imagelist = []
        videolist = []
+       voicelist = []

        # ユーザーネームの空白文字を削除
        user_name = re.sub("[\u3000 \t]", "",message.author.name)


        # 送付ファイルがあった場合
        if message.attachments:
            # 画像か動画であるかをチェック
            imagelist, message.attachments = await image_checker(message.attachments)
            videolist, message.attachments = await video_checker(message.attachments)
+           voicelist, message.attachments = await voice_checker(message.attachments,message)

            messagetext += "が、"

            # 送信された動画と画像の数を格納
            if len(imagelist) > 0:
                messagetext += f"画像を{len(imagelist)}枚、"

            if len(videolist) > 0:
                messagetext += f"動画を{len(videolist)}個、"

+           if len(voicelist) > 0:
+               messagetext += f"音声を{len(voicelist)}個、"

            # 画像と動画以外のファイルがある場合、urlを直接書き込む
            if message.attachments:
                for attachment in message.attachments:
                    messagetext += f"\n{attachment.url} "

            messagetext += "送信しました。"

        # メッセージ本文を書き込む
        messagetext += f"{message.clean_content}"

        # スタンプが送付されている場合
        if message.stickers:
            # 動くスタンプは送信不可のため終了
            if message.stickers[0].url.endswith(".json"):
                return
            # 画像として送信
            else:
                messagetext = f'{messagetext} スタンプ:{message.stickers[0].name}'
                imagelist, message.stickers = await image_checker(message.stickers)

        # 画像を一個ずつNotifyで送信
        if len(imagelist) > 0:
            for image in imagelist:
                await line_bot_api.push_image_notify(message=messagetext,image_url=image)

        # 動画を送信
        if len(videolist) > 0:
            if hasattr(message.guild.icon,'url'):
                icon_url = message.guild.icon.url
            else:
                icon_url = message.author.display_avatar.url

            await line_bot_api.push_message_notify(message=messagetext)
            await line_bot_api.push_movie(preview_image=icon_url,movie_urls=videolist)

+       # 音声を送信
+       if len(voicelist) > 0:
+           await line_bot_api.push_message_notify(message=messagetext)
+           await line_bot_api.push_voice(voice_file=voicelist)

        # ファイルなしの場合、テキストを送信
-       if len(imagelist) + len(videolist) == 0:
+       if len(imagelist) + len(videolist) + len(voicelist) == 0:
            await line_bot_api.push_message_notify(message=messagetext)



# 画像を識別
async def image_checker(
    attachments:List[discord.Attachment]
) -> Tuple[
    List[str],
    Union[List[discord.Attachment],List[discord.StickerItem]]
]:
    """
    Discordの送付ファイルから、画像を抜き出す。
    引数:      attachments:    Discordの送付ファイル
    戻り値:    image_urls:     画像かスタンプのurlリスト
               attachments:    画像を抜き出したDiscordの送付ファイル
    """
    image = (".jpg", ".png", ".JPG", ".PNG", ".jpeg", ".gif", ".GIF")
    image_urls = []
    for attachment in attachments[:]:
        # 画像があった場合、urlを画像のリストに追加し、送付ファイルのリストから削除
        if attachment.url.endswith(image):
            image_urls.append(attachment.url)
            attachments.remove(attachment)

    return image_urls, attachments

# 動画を識別
async def video_checker(
    attachments:List[discord.Attachment]
) -> Tuple[
    List[str],
    List[discord.Attachment]
]:
    """
    Discordの送付ファイルから、動画を抜き出す。
    引数:      attachments:    Discordの送付ファイル
    戻り値:    video_urls:     動画のurlリスト
               attachments:    動画を抜き出したDiscordの送付ファイル
    """
    video = (".mp4", ".MP4", ".MOV", ".mov", ".mpg", ".avi", ".wmv")
    video_urls = []
    for attachment in attachments[:]:
        # 動画があった場合、urlを動画のリストに追加し、送付ファイルのリストから削除
        if attachment.url.endswith(video):
            video_urls.append(attachment.url)
            attachments.remove(attachment)

    return video_urls, attachments

+ # 音声を識別
+ async def voice_checker(
+   attachments:List[discord.Attachment],
+   message:discord.Message
+) -> Tuple[
+   List[Voice_File],
+   List[discord.Attachment]
+]:
+   """
+   Discordの送付ファイルから、音声を抜き出す。
+   m4a以外のファイルは、ffmpegで変換しDiscordに送信する。
+   引数:      attachments:    Discordの送付ファイル
+   戻り値:    video_urls:     音声のurlリスト
+              attachments:    音声を抜き出したDiscordの送付ファイル
+   """
+   voice = (".wav",".mp3",".flac",".aif",".m4a",".oga",".ogg")
+   voice_files = []
+   loop = asyncio.get_event_loop()
+   for attachment in attachments[:]:
+       # 動画があった場合、urlを動画のリストに追加し、送付ファイルのリストから削除
+       if attachment.url.endswith(voice):
+           # 音声ファイルをダウンロードする
+           await attachment.save(attachment.filename)
+
+           # m4aの場合はそのまま格納
+           if attachment.url.endswith('.m4a'):
+               voice_url = attachment.url
+               attachments.remove(attachment)
+           else:
+               # ffmpegを使用して音声ファイルをm4aに変換する
+               output_filename = f"{os.path.splitext(attachment.filename)[0]}.m4a"
+               await loop.run_in_executor(
+                   None,
+                   partial(
+                       subprocess.run,['ffmpeg', '-i', attachment.filename, output_filename],
+                       check=True
+                   )
+               )
+
+               # aiofilesで非同期でファイルを開く
+               async with aiofiles.open(output_filename, 'rb') as f:
+                   m4a_data = await f.read()
+                   # Discordにファイルを送信する
+                   m4a_file = discord.File(
+                       fp=io.BytesIO(m4a_data), 
+                       filename=output_filename
+                   )
+                   m4a_file_message = await message.channel.send(f"m4aファイルに変換します。: {attachment.filename} -> {output_filename}", file=m4a_file)
+
+               voice_url = m4a_file_message.attachments[0].url
+               # 変換したm4aファイルのurl
+               attachments.remove(attachment)
+
+           # m4aファイルの秒数を計算
+           ogg_sound = AudioSegment.from_file(output_filename,format="m4a")
+           sound_second = ogg_sound.duration_seconds
+
+           voice_files.append(Voice_File(
+               url=voice_url,
+               second=sound_second
+           ))
+
+           # 変換したファイルを削除する
+           os.remove(output_filename)
+           os.remove(attachment.filename)
+
+   return voice_files, attachments


def setup(bot:DBot):
    return bot.add_cog(mst_line(bot))

ポイント

# 音声を識別
async def voice_checker(
    attachments:List[discord.Attachment],
    message:discord.Message
) -> Tuple[
    List[Voice_File],
    List[discord.Attachment]
]:
    """
    Discordの送付ファイルから、音声を抜き出す。
    m4a以外のファイルは、ffmpegで変換しDiscordに送信する。
    引数:      attachments:    Discordの送付ファイル
    戻り値:    video_urls:     音声のurlリスト
               attachments:    音声を抜き出したDiscordの送付ファイル
    """
    voice = (".wav",".mp3",".flac",".aif",".m4a",".oga",".ogg")
    voice_files = []
    loop = asyncio.get_event_loop()
    for attachment in attachments[:]:
        # 動画があった場合、urlを動画のリストに追加し、送付ファイルのリストから削除
        if attachment.url.endswith(voice):
            # 音声ファイルをダウンロードする
            await attachment.save(attachment.filename)
            
            # m4aの場合はそのまま格納
            if attachment.url.endswith('.m4a'):
                voice_url = attachment.url
                attachments.remove(attachment)
            else:
                # ffmpegを使用して音声ファイルをm4aに変換する
                output_filename = f"{os.path.splitext(attachment.filename)[0]}.m4a"
                await loop.run_in_executor(
                    None,
                    partial(
                        subprocess.run,['ffmpeg', '-i', attachment.filename, output_filename],
                        check=True
                    )
                )

                # aiofilesで非同期でファイルを開く
                async with aiofiles.open(output_filename, 'rb') as f:
                    m4a_data = await f.read()
                    # Discordにファイルを送信する
                    m4a_file = discord.File(
                        fp=io.BytesIO(m4a_data), 
                        filename=output_filename
                    )
                    m4a_file_message = await message.channel.send(f"m4aファイルに変換します。: {attachment.filename} -> {output_filename}", file=m4a_file)

                voice_url = m4a_file_message.attachments[0].url
                # 変換したoggファイルのurl
                attachments.remove(attachment)

            # m4aファイルの秒数を計算
            ogg_sound = AudioSegment.from_file(output_filename,format="m4a")
            sound_second = ogg_sound.duration_seconds

            voice_files.append(Voice_File(
                url=voice_url,
                second=sound_second
            ))

            # 変換したファイルを削除する
            os.remove(output_filename)
            os.remove(attachment.filename)

    return voice_files, attachments

基本的にはimage_checkervideo_checkerと同じく、拡張子から音声ファイルかどうかを認識していますが、音声ファイルはいったんローカルに保存します。
拡張子がm4aの場合はそのまま格納しますが、それ以外の場合はffmpegm4aに変換します。

変換後、Discord上にm4aファイルをアップロードします。

m4a_file_messageにアップロードされたファイルのurlがあるのでそれを取得します。
pydubを使用しダウンロード、もしくは変換したm4aの秒数を計測した後、urlとその秒数をもとにVoice_Fileを定義し格納します。

これでLINEへ音声ファイルを送信する準備が整います。

server.py

server.py
from fastapi import FastAPI,Depends,HTTPException,Request,Header,Response
from fastapi.responses import HTMLResponse
from threading import Thread
import uvicorn

import base64
import hashlib
import hmac
import re

from dotenv import load_dotenv
load_dotenv()


from message_type.line_type.line_event import Line_Responses
from message_type.discord_type.message_creater import ReqestDiscord
from message_type.line_type.line_message import LineBotAPI


import os

bots_name = os.environ['BOTS_NAME'].split(",")
TOKEN = os.environ['TOKEN']

app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)

# LINE側のメッセージを受け取る
@app.post("/line_bot")
async def line_response(
    response:Line_Responses,
    byte_body:Request, 
    x_line_signature=Header(None)
):
    """
    response:Line_Responses
    LINEから受け取ったイベントの内容
    jsonがクラスに変換されている。
    
    byte_body:Request
    LINEから受け取ったイベントのバイナリデータ。
    LINEからのメッセージという署名の検証に必要。

    x_line_signature:Header
    LINEから受け取ったjsonのヘッダー。
    こちらも署名に必要。
    """

    # request.bodyを取得
    boo = await byte_body.body()
    body = boo.decode('utf-8')

    # channel_secretからbotの種類を判別する
    for bot_name in bots_name:
        channel_secret = os.environ[f'{bot_name}_CHANNEL_SECRET']
        # ハッシュ値を求める
        hash = hmac.new(
            channel_secret.encode('utf-8'),
            body.encode('utf-8'), 
            hashlib.sha256
        ).digest()

        # 結果を格納
        signature = base64.b64encode(hash)
        decode_signature = signature.decode('utf-8')

        if decode_signature == x_line_signature:
            channel_secret = os.environ[f'{bot_name}_CHANNEL_SECRET']
            # Discordサーバーのクラスを宣言
            discord_find_message = ReqestDiscord(
                guild_id = int(os.environ[f'{bot_name}_GUILD_ID']),
                limit = int(os.environ["USER_LIMIT"]), 
                token = TOKEN
            )
            # LINEのクラスを宣言
            line_bot_api = LineBotAPI(
                notify_token = os.environ.get(f'{bot_name}_NOTIFY_TOKEN'),
                line_bot_token = os.environ[f'{bot_name}_BOT_TOKEN'],
                line_group_id = os.environ.get(f'{bot_name}_GROUP_ID')
            )
            # メッセージを送信するDiscordのテキストチャンネルのID
            channel_id = int(os.environ[f'{bot_name}_CHANNEL_ID'])
            break

    # ハッシュ値が一致しなかった場合エラーを返す
    if decode_signature != x_line_signature: 
        raise Exception

    # 応答確認の場合終了
    if type(response.events) is list:
        return HTMLResponse("OK")

    # イベントの中身を取得
    event = response.events

    # LINEのプロフィールを取得(友達登録している場合)
    profile_name = await line_bot_api.get_proflie(user_id=event.source.userId)

    # テキストメッセージの場合
    if event.message.type == 'text':
        message = event.message.text
        # Discordのメンバー、ロール、チャンネルの指定があるか取得する
        """
        members_find
        テキストメッセージからユーザーのメンションを検出し、変換する。
        @ユーザー#0000#member → <@00000000000>
        roles_find
        テキストメッセージからロールのメンションを検出し、変換する。
        @ロール#role → <@&0000000000>
        channel_select
        テキストメッセージから送信場所を検出し、送信先のチャンネルidを返す。
        テキストチャンネルのみ送信可能。ただし、メッセージの先頭に書かれていなければ適用されない。
        /チャンネル名#channel → 削除
        """

        message = await discord_find_message.members_find(message=message)
        message = await discord_find_message.roles_find(message=message)
        channel_id, message = await discord_find_message.channel_select(channel_id=channel_id,message=message)

      # 画像が送信された場合
      if event.message.type == 'image':
          # バイナリデータを取得しGyazoに送信
          gyazo_json = await line_bot_api.image_upload(event.message.id)
          # Gyazoのurlを返す
          message = f"https://i.gyazo.com/{gyazo_json.image_id}.{gyazo_json.type}"


    # 動画が送信された場合
    if event.message.type == 'video':
        # 動画をYouTubeにアップロードし、urlを返す
        youtube_id = await line_bot_api.movie_upload(
            message_id=event.message.id,
            display_name=profile_name.display_name
        )
        message = f"https://youtu.be/{youtube_id}"

+   # 音声が送信された場合
+   if event.message.type == 'audio':
+       # 音声ファイルのデータを取得し、Discordに送信
+       fileobj = await line_bot_api.voice_get(message_id=event.message.id)
+       await discord_find_message.send_discord_file(
+           channel_id=channel_id,
+           message=f'{profile_name.display_name}',
+           fileobj=fileobj
+       )
+       # レスポンス200を返し終了
+       return HTMLResponse(content="OK")


    # LINEの名前 「メッセージ」の形式で送信
    message = f'{profile_name.display_name} \n{message}'
    await discord_find_message.send_discord(channel_id=channel_id, message=message)

    # レスポンス200を返し終了
    return HTMLResponse(content="OK")

def run():
    uvicorn.run("server:app",  host="0.0.0.0", port=int(os.getenv("PORT", default=5000)), log_level="info")

# DiscordBotと並列で立ち上げる
def keep_alive():
    t = Thread(target=run)
    t.start()

# ローカルで実行する際
if __name__ == '__main__':
    uvicorn.run(app,host='localhost', port=8000)

実行

以下のように送受信できていれば成功です。
念のため音声が聞こえるかも確認しましょう。
image.png
image.png

image.png

終わりに

全5+1回に及ぶDiscordとLINE連携もこれで終わりです。本当にお疲れ様でした。
音声も追加したことで、よりコミュニケーション表現の幅が広くなったといえます。

本Botはこれにいくつか機能を追加して身内で使用しているのですが、改良を重ねており、本記事のLINE連携をWebUI上で制御できるようにしています。
以下のように実装しています。

FastAPIとjinja2を使用してフルスタックアプリとして運用していますが、フロントと分けた方がよいと考えたためまた改良しようと考えています。

完成して、やる気があればまた記事にしたいと思います。

ここまで読んでいただきありがとうございます。

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0