5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

お題は不問!Qiita Engineer Festa 2023で記事投稿!

DiscordとLINEをPython+FastAPI+Dockerで連携させる【その4】LINEからDiscordへの動画

Last updated at Posted at 2023-06-16

挨拶

こんにちは。マグロです。
前回の続きです。

今回はLINEからDiscordに動画を送ります。

背景

前回の記事の通り、LINEはアップロードされたファイルはバイナリデータでサーバーに保存され、一定時間で削除されます。
動画も画像と同様に、バイナリデータで返されます。

前回使用したGyazoは、バイナリデータを直接アップロードすることで共有出来ましたが、動画となると一気にハードルが上がります。
理由として

  • Discordのファイル容量の上限は8MB(1分以上の動画なんて圧縮しても夢のまた夢)
  • 動画共有サービスの要件はGyazo同様、容量無制限、プライバシー保護、無料、APIありの要件を満たすこと
  • できれば一般的に認知度が高いサービス(危ない動画リンクは避ける)

非常にハードルが高いのですが、プライベートで長期的に使用するなら要件を満たさなければなりません。
ですがそんなサービスあるのでしょうか。

YouTubeを採用

思ったより身近にありました。YouTubeです。

  • 容量無制限(厳密には一日ごとに上限あり)
  • プライバシー保護(URLを知っている人のみ見れる、限定公開機能あり)
  • 無料
  • APIあり(GCPでアップロード用のYouTube Data APIがある)

さすが世界一の動画共有サービス、なんでもあります。
というわけでこいつを組み込みましょう。

設計

まずYouTube Data APIのリファレンスを読みます。
またコードはPython2で書かれているため、そのまま利用することはできません。

ざっくりまとめると

  • GCPでアップロード用のclient_secrets.jsonを生成。
  • それを基にOAuthで認証。
  • サンプルプログラムで動画投稿。

といった形で投稿しています。

こいつを

  • GCPでAPIの申請を行い、Googleアカウントで認証を行う。
  • 動画が投稿できるか、一度ローカルで試す。
  • 作成された認証情報を基にBotに組み込む。

といった形で組み込んでいきます。

はじめは

下準備

GCPの申請と動画投稿をします。
以下の記事を参考に進めてください。

作成されたclient_secret.jsonupload_video.py-oauth2.jsonos.environにあたる部分は環境変数として扱います。
控えておいてください。

client_secret.json
{
  "installed":
  	        {
  	            "client_id":os.environ["YOUTUBE_CLIENT_ID"],
  	            "project_id":os.environ["YOUTUBE_PROJECT_ID"],
  	            "auth_uri":"https://accounts.google.com/o/oauth2/auth",
  	            "token_uri":"https://oauth2.googleapis.com/token",
  	            "auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs",
  				"client_secret":os.environ["YOUTUBE_CLIENT_SECRET"],
  				"redirect_uris":["http://localhost"]
  			}
}
upload_video.py-oauth2.json
{
	    "access_token":os.environ["YOUTUBE_ACCESS_TOKEN"],
	    "client_id":os.environ["YOUTUBE_CLIENT_ID"],
	    "client_secret":os.environ["YOUTUBE_CLIENT_SECRET"],
	    "refresh_token":os.environ["YOUTUBE_REFRESH_TOKEN"],
	    "token_expiry": os.environ["YOUTUBE_TOKEN_EXPIRY"], 
	    "token_uri": "https://oauth2.googleapis.com/token",
	    "user_agent": None,
	    "revoke_uri": "https://oauth2.googleapis.com/revoke", 
	    "id_token": None, 
	    "id_token_jwt": None, 
	    "token_response": {
	        "access_token":os.environ["YOUTUBE_ACCESS_TOKEN"],
	        "expires_in": 3599, 
	        "scope": "https://www.googleapis.com/auth/youtube.upload", 
	        "token_type": "Bearer"
	    },
	    "scopes": ["https://www.googleapis.com/auth/youtube.upload"], 
	    "token_info_uri": "https://oauth2.googleapis.com/tokeninfo", 
	    "invalid": False, 
	    "_class": "OAuth2Credentials", 
	    "_module": "oauth2client.client"
}

コーデイング

ディレクトリ構成

$ tree
.
├── app
│   ├── cogs
│   │   └── mst_line.py  # DiscordからLINEへ
│   ├── core
│   │   └── start.py     # DiscordBot起動用
│   ├── message_type
│   │   ├── discord_type
│   │   │   ├── discord_type.py     # Discordのサーバーに関するクラス
│   │   │   └── message_creater.py  # DiscordAPIを直接叩く
│   │   ├── line_type
│   │   │   ├── line_type.py        # LINEのプロフィールなどのクラス
│   │   │   ├── line_event.py       # LINEのイベントに関するクラス
│   │   │   └── line_message.py     # LINEのメッセージに関するクラス
│   │   └── youtube_upload.py
│   ├── server.py       # サーバー立ち上げ
│   └── main.py  
├── Dockerfile
├── Profile
└── requirements.txt  

youtube_upload.py

youtube_upload.py
import http.client  # httplibはPython3はhttp.clientへ移行
import httplib2
import os
import random
import time
import io
import json

import aiofiles

import asyncio
from functools import partial

from googleapiclient.discovery import build,Resource
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload,MediaIoBaseUpload,HttpRequest
from oauth2client.client import flow_from_clientsecrets,Credentials,OAuth2WebServerFlow
from oauth2client.file import Storage
from oauth2client.tools import run_flow


# HTTPトランスポートライブラリに再試行を行わないよう明示的に伝える。
# リトライのロジックは本プログラムで処理するため。
httplib2.RETRIES = 1
# エラーが起きた際の最大再試行回数
MAX_RETRIES = 10
# これらの例外が発生した場合は常に再試行します。
RETRIABLE_EXCEPTIONS = (httplib2.HttpLib2Error,
                        IOError,
                        http.client.NotConnected,
                        http.client.IncompleteRead,
                        http.client.ImproperConnectionState,
                        http.client.CannotSendRequest,
                        http.client.CannotSendHeader,
                        http.client.ResponseNotReady,
                        http.client.BadStatusLine)

# これらのステータスコードエラーが発生した場合、 再試行を行います。
# コードが発生した場合は、常に再試行します。
RETRIABLE_STATUS_CODES = [500, 502, 503, 504]

"""
CLIENT_SECRETS_FILE変数は、client_idとclient_secretを含む、
このアプリケーションのOAuth 2.0情報を含むファイルの名前を指定します。
OAuth 2.0のクライアントIDとクライアントシークレットは、
以下のGoogle API Consoleから取得することができます。
https://console.cloud.google.com/

あなたのプロジェクトでYouTube Data APIが有効になっていることを確認してください.
YouTube Data APIにアクセスするためにOAuth2を利用する際の詳細な情報は、こちらを参照してください。
https://developers.google.com/youtube/v3/guides/authentication

client_secrets.jsonのファイルフォーマットに関する詳しい情報は, こちらを参照してください:
https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
"""
CLIENT_SECRETS_FILE = f"client_secret_{os.environ['YOUTUBE_CLIENT_ID']}.json"
OAUTH2_FILE = "upload_video.py-oauth2.json"

# この変数はCLIENT_SECRETS_FILEが見つからない場合に表示されるメッセージを定義します。
MISSING_CLIENT_SECRETS_MESSAGE = f"""
WARNING: Please configure OAuth 2.0

To make this sample run you will need to populate the client_secrets.json file
found at:

   {os.path.abspath(
        os.path.join(
            os.path.dirname(__file__),
            CLIENT_SECRETS_FILE
        )
    )}

with information from the API Console
https://console.developers.google.com/

For more information about the client_secrets.json file format, please visit:
https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
"""



"""
このOAuth 2.0のアクセススコープでは、アプリケーションが認証されたユーザーの YouTubeチャンネルにファイルをアップロードすることは許されますが、
それ以外のアクセスは許可されません。
認証されたユーザーの YouTube チャンネルにファイルをアップロードすることはできますが、
それ以外のアクセスはできません。
"""

YOUTUBE_UPLOAD_SCOPE = "https://www.googleapis.com/auth/youtube.upload"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

# publicで公開、privateで非公開、unlistedで限定公開
VALID_PRIVACY_STATUSES = ("public", "private", "unlisted")

class YouTubeUpload():
    def __init__(
        self,
        file_path:str = None,
        title:str = None,
        description:str = None,
        tag:str = None,
        category_id:int = 22,
        privacy_status:str = "unlisted"
    ) -> None:
        """
        YouTubeに動画をアップロードするオブジェクト

        file_path:str
        動画ファイルのパス。

        title:str
        動画タイトル。

        description:str
        動画の説明欄。

        tag:str
        動画のタグ。カンマ区切りで区分けできる

        category_id:int
        カテゴリーのid。詳細は下記参照。
        https://gist.github.com/dgp/1b24bf2961521bd75d6c

        privacy_status:str
        動画の公開形式。
        publicで公開、privateで非公開、unlistedで限定公開
        """
        self.file_path = file_path
        self.title = title
        self.description = description
        self.tag = tag
        self.category_id = category_id
        self.privacy_status = privacy_status
        self.loop = asyncio.get_event_loop()


    async def create_client_secret(self) -> None:
        # YouTubeの認証情報をjson形式で作成
        cli = {
            "installed":
                {
                    "client_id":os.environ["YOUTUBE_CLIENT_ID"],
                    "project_id":os.environ["YOUTUBE_PROJECT_ID"],
                    "auth_uri":"https://accounts.google.com/o/oauth2/auth",
                    "token_uri":"https://oauth2.googleapis.com/token",
                    "auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs",
			        "client_secret":os.environ["YOUTUBE_CLIENT_SECRET"],
			        "redirect_uris":["http://localhost"]
		        }
	    }
        async with aiofiles.open(CLIENT_SECRETS_FILE,"w") as f:
            json_data = json.dumps(cli)
            await f.write(json_data)
            await f.close()

    async def create_oauth(self) -> None:
        oau = {
            "access_token":os.environ["YOUTUBE_ACCESS_TOKEN"],
            "client_id":os.environ["YOUTUBE_CLIENT_ID"],
            "client_secret":os.environ["YOUTUBE_CLIENT_SECRET"],
            "refresh_token":os.environ["YOUTUBE_REFRESH_TOKEN"],
            "token_expiry": os.environ["YOUTUBE_TOKEN_EXPIRY"], 
            "token_uri": "https://oauth2.googleapis.com/token",
            "user_agent": None,
            "revoke_uri": "https://oauth2.googleapis.com/revoke", 
            "id_token": None, 
            "id_token_jwt": None, 
            "token_response": {
                "access_token":os.environ["YOUTUBE_ACCESS_TOKEN"],
                "expires_in": 3599, 
                "scope": "https://www.googleapis.com/auth/youtube.upload", 
                "token_type": "Bearer"
            },
            "scopes": ["https://www.googleapis.com/auth/youtube.upload"], 
            "token_info_uri": "https://oauth2.googleapis.com/tokeninfo", 
            "invalid": False, 
            "_class": "OAuth2Credentials", 
            "_module": "oauth2client.client"
        }
        async with aiofiles.open(OAUTH2_FILE,"w") as f:
            json_data = json.dumps(oau)
            await f.write(json_data)
            await f.close()


    async def get_authenticated_service(self) -> Resource:
        """
        認証フロー?を作成しYouTubeAPIのリソースを作成
        
        return
        youtube:Resource
        動的生成されたYouTubeAPIのオブジェクト
        """

        if not bool(os.path.isfile(CLIENT_SECRETS_FILE)):
            await self.create_client_secret()

        if not bool(os.path.isfile(OAUTH2_FILE)):
            await self.create_oauth()

        flow:OAuth2WebServerFlow = flow_from_clientsecrets(
            filename=CLIENT_SECRETS_FILE,
            scope=YOUTUBE_UPLOAD_SCOPE,
            message=MISSING_CLIENT_SECRETS_MESSAGE
        )

        # 認証情報を取得
        storage = Storage(OAUTH2_FILE)
        credentials:Credentials = storage.get()

        # OAuthファイルが見つからない場合
        if credentials is None or credentials.invalid:
            credentials = run_flow(flow = flow, storage = storage)

        # 生成したYouTubeAPIのオブジェクトを返す
        return build(
            serviceName=YOUTUBE_API_SERVICE_NAME,
            version=YOUTUBE_API_VERSION,
            http=credentials.authorize(httplib2.Http())
        )

    # 使用していません。本Botは下記のbyte_uploadを使用しバイナリデータから直接YouTubeにアップロードします。
    async def initialize_upload(self,youtube:Resource) -> None:
        """
        動画ファイルからYouTubeにアップロードする。

        param
        youtube:Resource
        動的生成されたYouTubeAPIのオブジェクト


        """
        # タグ(カンマ区切り)があった場合、格納
        tags:list = None
        if self.tag:
            tags = self.tag.split(",")

        # アップロード用のbody作成
        body = dict(
            snippet=dict(
                title=self.title,
                description=self.description,
                tags=tags,
                categoryId=self.category_id
            ),
            status=dict(
                privacyStatus=self.privacy_status
            )
        )


        """
        chunksizeパラメータは、一度にアップロードされるデータの各チャンクのサイズをバイト単位で指定します。
        より少ないチャンクでより高速なアップロードを行うため、信頼性の高い接続のために高い値を設定します。
        信頼性の低い接続での回復を良くするためには、低い値を設定してください。
        以下のコードで「chunksize」を -1 に設定すると、
        1 回の HTTP リクエストでファイル全体がアップロードされることを意味します。
        (これは通常ベストプラクティスですが、2.6より古いPythonを使用している場合、
        またはApp Engine上で動作している場合は、チャンクサイズを以下のように設定する必要があります。
        1024 * 1024 (1MB).
        """

        # API の videos.insert メソッドを呼び出して、動画の作成とアップロードを行います。
        insert_request:HttpRequest = youtube.videos().insert(
            part=",".join(body.keys()),
            body=body,
            media_body=MediaFileUpload(
                filename=self.file_path, 
                chunksize=-1, 
                resumable=True
            )
        )

        await self.resumable_upload(insert_request)


    async def byte_upload(
        self,
        video_bytes:io.BytesIO,
        youtube:Resource
    ) -> str:
        """
        動画のバイナリデータから直接YouTubeにアップロードする。

        param
        video_bytes:io.BytesIO
        動画のバイナリデータ

        youtube:Resource
        動的生成されたYouTubeAPIのオブジェクト

        return
        youtube_id:str
        アップロードした動画のid
        """
        # タグ(カンマ区切り)があった場合、格納
        tags:list = None
        if self.tag:
            tags = self.tag.split(",")
        
        # 動画バイナリからアップロード用のデータを生成
        media = MediaIoBaseUpload(
            fd=video_bytes,
            mimetype='video/*',
            chunksize=1024*1024,
            resumable=True
        )

        # videos.insertで動画をアップロード
        request:HttpRequest = await self.loop.run_in_executor(
            None,
            partial(
                youtube.videos().insert,
                part='snippet,status',
                body={
                    'snippet': {
                        'title': self.title,
                        'description': self.description,
                        'tags': tags,
                        'categoryId': self.category_id
                    },
                    'status': {
                        'privacyStatus': self.privacy_status
                    }
                },
                media_body=media
            )
        )

        # 動画のidを取得
        youtube_id = await self.resumable_upload(request)
        return youtube_id


    # 失敗したアップロードを再開するために指数関数的なバックオフ戦略を実装しています。
    async def resumable_upload(self,insert_request:HttpRequest) -> str:
        """
        正常なアップロードができるまでアップロードを繰り返します。

        param
        insert_request  :HttpRequest
        動画のアップロード情報を格納するオブジェクト

        return:
        youtube_id      :str
        
        """
        response:dict = None
        error:str = None
        retry:int = 0
        video_id:str = None
        print("アップロード中...")
        while response is None:
            try:
                status, response = await self.loop.run_in_executor(
                    None,
                    partial(
                        insert_request.next_chunk,
                    )
                )
                if response is not None:
                    if 'id' in response:
                        print(f"アップロードに成功しました。動画ID:{response['id']}")
                        video_id = response['id']
                    else:
                        exit(f"アップロードに失敗しました。レスポンス: {response}")
            except HttpError as e:
                if e.resp.status in RETRIABLE_STATUS_CODES:
                    error = f"HTTPエラー {e.resp.status} が発生しました。:\n{e.content}"
                else:
                    raise
            except RETRIABLE_EXCEPTIONS as e:
                error = f"リトライ可能なエラーが発生しました。: {e}"
            if error is not None:
                print(error)
                retry += 1
                if retry > MAX_RETRIES:
                    exit("リトライ回数の上限に達しました。")
                max_sleep = 2 ** retry
                sleep_seconds = random.random() * max_sleep
                print(f"{sleep_seconds}秒後、再アップロードを試みます。")
                await asyncio.sleep(sleep_seconds)

        return video_id

YouTubeAPIを使用してアップロードするクラスを宣言しています。
解説は後述。

line_message.py

line_message.py
import json
import requests
from requests import Response

import datetime

import os
import asyncio
from functools import partial

import aiohttp
import subprocess
from typing import List

from dotenv import load_dotenv
load_dotenv()


from message_type.line_type.line_type import Profile,GyazoJson
+ from message_type.youtube_upload import YouTubeUpload


NOTIFY_URL = 'https://notify-api.line.me/api/notify'
NOTIFY_STATUS_URL = 'https://notify-api.line.me/api/status'
LINE_BOT_URL = 'https://api.line.me/v2/bot'
LINE_CONTENT_URL = 'https://api-data.line.me/v2/bot'

# LINEのgetリクエストを行う
async def line_get_request(url: str, token: str) -> json:
    async with aiohttp.ClientSession() as session:
        async with session.get(
            url = url,
            headers = {'Authorization': 'Bearer ' + token}
        ) as resp:
            return await resp.json()

# LINEのpostリクエストを行う
async def line_post_request(url: str, headers: dict, data: dict) -> json:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            url = url,
            headers = headers,
            data = data
        ) as resp:
            return await resp.json()

class LineBotAPI:
    def __init__(self, notify_token: str, line_bot_token: str, line_group_id: str) -> None:
        self.notify_token = notify_token
        self.line_bot_token = line_bot_token
        self.line_group_id = line_group_id
        self.loop = asyncio.get_event_loop()

    # LINE Notifyでテキストメッセージを送信
    async def push_message_notify(self, message: str) -> json:
        data = {'message': f'message: {message}'}
        return await line_post_request(
            url = NOTIFY_URL, 
            headers = {'Authorization': f'Bearer {self.notify_token}'}, 
            data = data
        )
        
    # LINE Messageing APIでテキストメッセージを送信
    async def push_message(self,message_text:str) -> json:
        data = {
            'to':self.line_group_id,
            'messages':[
                {
                    'type':'text',
                    'text':message_text
                }
            ]
        }
        return await line_post_request(
            url = LINE_BOT_URL + "/message/push",
            headers = {
                'Authorization': 'Bearer ' + self.line_bot_token,
                'Content-Type': 'application/json'
            },
            data = json.dumps(data)
        )

    # 送ったメッセージ数を取得
    async def totalpush(self) -> int:
        r = await line_get_request(
            LINE_BOT_URL + "/message/quota/consumption",
            self.line_bot_token
        )
        return int(r["totalUsage"])

    # LINE Notifyのステータスを取得
    async def notify_status(self) -> Response:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                url = NOTIFY_STATUS_URL,
                headers = {'Authorization': 'Bearer ' + self.notify_token}
            ) as resp:
                return resp

    # LINE Notifyの1時間当たりの上限を取得
    async def rate_limit(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-Limit')
        return int(ratelimit)

    # LINE Notifyの1時間当たりの残りの回数を取得
    async def rate_remaining(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-Remaining')
        return int(ratelimit)

    # LINE Notifyの1時間当たりの画像送信上限を取得
    async def rate_image_limit(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-ImageLimit')
        return int(ratelimit)

    # LINE Notifyの1時間当たりの残り画像送信上限を取得
    async def rate_image_remaining(self) -> int:
        resp = await self.notify_status()
        ratelimit = resp.headers.get('X-RateLimit-ImageRemaining')
        return int(ratelimit)

    # 友達数、グループ人数をカウント
    async def friend(self) -> str:
        # グループIDが有効かどうか判断
        try:
            r = await line_get_request(
                LINE_BOT_URL + "/group/" + self.line_group_id + "/members/count",
                self.line_bot_token,
            )
            return r["count"]
        # グループIDなしの場合、友達数をカウント
        except KeyError:
            # 日付が変わった直後の場合、前日を参照
            if datetime.datetime.now().strftime('%H') == '00':
                before_day = datetime.date.today() + datetime.timedelta(days=-1)
                url = LINE_BOT_URL + "/insight/followers?date=" + before_day.strftime('%Y%m%d')
            else:
                url = LINE_BOT_URL + "/insight/followers?date=" + datetime.date.today().strftime('%Y%m%d')
            r = await line_get_request(
                url,
                self.line_bot_token,
            )
            return r["followers"] 

    # 当月に送信できるメッセージ数の上限目安を取得(基本1000,23年6月以降は200)
    async def pushlimit(self) -> str:
        r = await line_get_request(
            LINE_BOT_URL + "/message/quota",
            self.line_bot_token
        )
        return r["value"]

    # LINEのユーザプロフィールから名前を取得
    async def get_proflie(self, user_id: str) -> Profile:
        # グループIDが有効かどうか判断
        try:
            r = await line_get_request(
                LINE_BOT_URL + f"/group/{self.line_group_id}/member/{user_id}",
                self.line_bot_token,
            )
        # グループIDが無効の場合、友達から判断
        except KeyError:
            r = await line_get_request(
                LINE_BOT_URL + f"/profile/{user_id}",
                self.line_bot_token,
            )
        return await Profile.new_from_json_dict(data=r)


    # LINEから画像データを取得し、Gyazoにアップロード
    async def image_upload(self, message_id: int) -> GyazoJson:
        """
        LINEから画像データを取得し、Gyazoにアップロードする

        param
        message_id:int
        LINEのメッセージのid

        return
        gayzo:GyazoJson
        Gyazoの画像のオブジェクト
        """
        # 画像のバイナリデータを取得
        async with aiohttp.ClientSession() as session:
            async with session.get(
                    url = LINE_CONTENT_URL + f'/message/{message_id}/content',
                    headers={
                        'Authorization': 'Bearer ' + self.line_bot_token
                    }
            ) as bytes:
                image_bytes = await bytes.read()

                # Gyazoにアップロードする
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                        url = 'https://upload.gyazo.com/api/upload',
                        headers={
                            'Authorization': 'Bearer ' + os.environ['GYAZO_TOKEN'],
                        },
                        data={
                            'imagedata': image_bytes
                        }
                    ) as gyazo_image:
                        return await GyazoJson.new_from_json_dict(await gyazo_image.json())
           
+   # LINEから受け取った動画を保存し、YouTubeに限定公開でアップロード
+   async def movie_upload(self, message_id: int, display_name: str) -> str:
+       """
+       LINEから受け取った動画を保存し、YouTubeに限定公開でアップロード
+
+       param
+       message_id:int
+       LINEのメッセージのid
+
+       display_name:str
+       LINEのユーザー名
+
+       return
+       youtube_id:str
+       YouTubeの動画id
+       """
+       # 動画のバイナリデータを取得
+       async with aiohttp.ClientSession() as session:
+           async with session.get(
+                   url = LINE_CONTENT_URL + f'/message/{message_id}/content',
+                   headers={
+                       'Authorization': 'Bearer ' + self.line_bot_token
+                   }
+           ) as bytes:
+
+               video_bytes = await bytes.read()
+                youtube_video = YouTubeUpload(
+                   title=f"{display_name}からの動画",
+                   description="LINEからの動画",
+                   privacy_status="unlisted"
+               )
+
+               youtube = await youtube_video.get_authenticated_service()
+
+               return await youtube_video.byte_upload(
+                   video_bytes=io.BytesIO(video_bytes),
+                   youtube=youtube
+               )

解説

        async with aiohttp.ClientSession() as session:
            async with session.get(
                    url = LINE_CONTENT_URL + f'/message/{message_id}/content',
                    headers={
                        'Authorization': 'Bearer ' + self.line_bot_token
                    }
            ) as bytes:

                video_bytes = await bytes.read()

/message/{message_id}/contentで動画のバイナリデータを取得します。

                youtube_video = YouTubeUpload(
                    title=f"{display_name}からの動画",
                    description="LINEからの動画",
                    privacy_status="unlisted"
                )

youtube_upload.pyで宣言したYouTubeUploadを使いインスタンスを作成します。
YouTubeの動画タイトルは{動画をアップロードしたLINEのユーザー名}の動画
動画説明欄はLINEからの動画
公開範囲は限定公開になります。

youtube = await youtube_video.get_authenticated_service()

実行されることで、環境変数からYouTubeAPIの認証情報を作成します。

                return await youtube_video.byte_upload(
                    video_bytes=io.BytesIO(video_bytes),
                    youtube=youtube
                )

LINEから受け取ったバイナリデータをそのままYouTubeにアップロードします。
戻り値はstrでアップロードされたYouTubeの動画idが返却されます。

server.py

アップロードした動画のYouTubeURLをDiscordに送信します。

server.py
from fastapi import FastAPI,Depends,HTTPException,Request,Header,Response
from fastapi.responses import HTMLResponse
from threading import Thread
import uvicorn

import base64
import hashlib
import hmac
import re

from dotenv import load_dotenv
load_dotenv()


from message_type.line_type.line_event import Line_Responses
from message_type.discord_type.message_creater import ReqestDiscord
from message_type.line_type.line_message import LineBotAPI


import os

bots_name = os.environ['BOTS_NAME'].split(",")
TOKEN = os.environ['TOKEN']

app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)

# LINE側のメッセージを受け取る
@app.post("/line_bot")
async def line_response(
    response:Line_Responses,
    byte_body:Request, 
    x_line_signature=Header(None)
):
    """
    response:Line_Responses
    LINEから受け取ったイベントの内容
    jsonがクラスに変換されている。
    
    byte_body:Request
    LINEから受け取ったイベントのバイナリデータ。
    LINEからのメッセージという署名の検証に必要。

    x_line_signature:Header
    LINEから受け取ったjsonのヘッダー。
    こちらも署名に必要。
    """

    # request.bodyを取得
    boo = await byte_body.body()
    body = boo.decode('utf-8')

    # channel_secretからbotの種類を判別する
    for bot_name in bots_name:
        channel_secret = os.environ[f'{bot_name}_CHANNEL_SECRET']
        # ハッシュ値を求める
        hash = hmac.new(
            channel_secret.encode('utf-8'),
            body.encode('utf-8'), 
            hashlib.sha256
        ).digest()

        # 結果を格納
        signature = base64.b64encode(hash)
        decode_signature = signature.decode('utf-8')

        if decode_signature == x_line_signature:
            channel_secret = os.environ[f'{bot_name}_CHANNEL_SECRET']
            # Discordサーバーのクラスを宣言
            discord_find_message = ReqestDiscord(
                guild_id = int(os.environ[f'{bot_name}_GUILD_ID']),
                limit = int(os.environ["USER_LIMIT"]), 
                token = TOKEN
            )
            # LINEのクラスを宣言
            line_bot_api = LineBotAPI(
                notify_token = os.environ.get(f'{bot_name}_NOTIFY_TOKEN'),
                line_bot_token = os.environ[f'{bot_name}_BOT_TOKEN'],
                line_group_id = os.environ.get(f'{bot_name}_GROUP_ID')
            )
            # メッセージを送信するDiscordのテキストチャンネルのID
            channel_id = int(os.environ[f'{bot_name}_CHANNEL_ID'])
            break

    # ハッシュ値が一致しなかった場合エラーを返す
    if decode_signature != x_line_signature: 
        raise Exception

    # 応答確認の場合終了
    if type(response.events) is list:
        return HTMLResponse("OK")

    # イベントの中身を取得
    event = response.events

    # LINEのプロフィールを取得(友達登録している場合)
    profile_name = await line_bot_api.get_proflie(user_id=event.source.userId)

    # テキストメッセージの場合
    if event.message.type == 'text':
        message = event.message.text
        # Discordのメンバー、ロール、チャンネルの指定があるか取得する
        """
        members_find
        テキストメッセージからユーザーのメンションを検出し、変換する。
        @ユーザー#0000#member → <@00000000000>
        roles_find
        テキストメッセージからロールのメンションを検出し、変換する。
        @ロール#role → <@&0000000000>
        channel_select
        テキストメッセージから送信場所を検出し、送信先のチャンネルidを返す。
        テキストチャンネルのみ送信可能。ただし、メッセージの先頭に書かれていなければ適用されない。
        /チャンネル名#channel → 削除
        """

        message = await discord_find_message.members_find(message=message)
        message = await discord_find_message.roles_find(message=message)
        channel_id, message = await discord_find_message.channel_select(channel_id=channel_id,message=message)

      # 画像が送信された場合
      if event.message.type == 'image':
          # バイナリデータを取得しGyazoに送信
          gyazo_json = await line_bot_api.image_upload(event.message.id)
          # Gyazoのurlを返す
          message = f"https://i.gyazo.com/{gyazo_json.image_id}.{gyazo_json.type}"

+   # 動画が送信された場合
+   if event.message.type == 'video':
+       # 動画をYouTubeにアップロードし、urlを返す
+       youtube_id = await line_bot_api.movie_upload(
+           message_id=event.message.id,
+           display_name=profile_name.display_name
+       )
+       message = f"https://youtu.be/{youtube_id}"

    # LINEの名前 「メッセージ」の形式で送信
    message = f'{profile_name.display_name} \n{message}'
    await discord_find_message.send_discord(channel_id=channel_id, message=message)

    # レスポンス200を返し終了
    return HTMLResponse(content="OK")

def run():
    uvicorn.run("server:app",  host="0.0.0.0", port=int(os.getenv("PORT", default=5000)), log_level="info")

# DiscordBotと並列で立ち上げる
def keep_alive():
    t = Thread(target=run)
    t.start()

# ローカルで実行する際
if __name__ == '__main__':
    uvicorn.run(app,host='localhost', port=8000)

画像と同様にURLを直接テキストに張り付けて送信します。

完成!!

試しに動画を送ってみましょう。(画像はイメージです)
image.png

URLがDiscordに書いてあれば成功です。

終わりに

お疲れ様でした。
だいぶ機能は充実したかと思います。
さて残りは

の2つなのですが、先月あたりにDiscordのファイル送信上限が8MBから25MBまで増設されたため、音声ファイルの送受信が現実味を帯びてきました。
というかすでに実装できています。

なので音声ファイルは番外編として扱いたいと思います。

次回は

を説明します。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?