0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Google Drive 上の mp3 を 再生する Alexa スキル (python)

Last updated at Posted at 2024-12-16

 以下の記事で実装したスキルの python版 & 機能強化版です。

 Google Drive API の利用準備の説明が手厚い記事になってしまいましたので、コードをすぐ見たい!という方は 4.3. コード実装 に飛んでください。

1. できること

  • Google Drive のフォルダ ID を指定した再生が可能です
  • (従来の) json 形式で記述した再生リストを指定した再生が可能です
  • スキルに呼び掛ける際の再生リストの名前と上記 2 種類の再生対象との紐づけは root.json のみで管理し、スキル作成者でなくても再生リストをメンテナンス可能です
  • スキル指示一覧
    No 機能 発話例 (wake ワード省略) 備考
    1 起動 & 再生 {スキル名}{再生リスト名} を再生して
    2 起動 & シャッフル再生 {スキル名}{再生リスト名} をシャッフル再生して
    3 次に進む 次の曲
    4 前に戻る 前の曲
    5 指定番目の曲に飛ぶ {スキル名} でトラック {番号} 番号は 1 始まり
    6 最初から再生 最初から再生して
    7 1曲繰り返し リピート再生して 次に進む、前に戻る、指定番目の曲に飛ぶ、のいずれかで解除
    8 全体繰り返し ループ再生して
    9 全体繰り返し解除 ループ再生を解除
    10 シャッフル再生 シャッフル再生して
    11 シャッフル再生解除 シャッフル再生を解除
    12 一時停止 一時停止して
    13 再生再開 再開して
    14 再生中の曲情報 (再生中に){スキル名} でヘルプ 再生リストの記載情報か ID3タグの情報を返答
    15 再生リスト名一覧 (再生前に){スキル名} でヘルプ root.json の再生リスト名一覧を返答
    16 スキル終了 キャンセル 「終了して」だと一時停止になる

2. 主な改善点

 冒頭記事の Node.js 版スキルは再生リストを追加する際にスロットの値として定義しなければいけないことや、再生リストを json で作らなければいけないことなどが不便で、その解消ついでに Node.js から python に移行しました。

2.1. 再生リスト名のスロット定義廃止

 Amazon Echo に再生リスト名を指示しても意図通りの単語で認識されるとは限らないため、従来は同義語とともに再生リスト名をスロットとして定義しておき、認識した項目の ID をキーとして root.json を参照していました。Alexa Skill の仕組みで認識のブレを吸収できるのは良いのですが、新しい再生リストを追加するたびに Alexa Console でスロット定義を更新する必要が生じます。誤認識したリスト名に対する正しいリスト名を root.json に定義しておいて自前で変換することで、いちいちスロット定義を更新する必要を無くします。

 例えば EasyListening という再生リストがあったとして「Easy Listening」「イージーリスニング」「E G リスニング」などと認識されてしまう場合は以下のように定義します。

root.json
{
    "playlists": {
        "easylistening": { "type": "folder", "id": "1xxxxxxxxxxx" }
    },
    "synonyms": {
        "イージーリスニング": "easylistening",
        "egリスニング": "easylistening"
    }
}

なお大文字小文字や空白有無の組み合わせを吸収するためすべて小文字化して空白は削除する処理にしていますので、"playlists" 側を easylistening としておけば、"synonyms" として easy listeningEasy Listening などを定義する必要はありません。

2.2. フォルダー指定再生対応

 JSON 形式の再生リストには各音楽ファイルのファイル ID を記述する必要があり、Google Drive にファイルをアップロードしてからファイル ID を確認して記述する手間が必要でした。再生リストファイルにもメリットはありますが、フォルダー指定で再生できれば便利なことは言うまでもありません。

 フォルダー内のファイルやサブフォルダーを取得するには Google Drive API を利用する必要があり、GCP の Console にアクセスしてクレジットカード番号登録やプロジェクト作成などをする必要がありますが、API の利用に料金はかからないので少しの手間で大きな利便性を得られます。

 再生リストの種類が 2 つになるため root.json には再生リストの種類を type として記述します。
 Google Drive API を使えば id からファイルかフォルダーかは分かるのですが、処理時間 (待ち時間) を抑える意味でも API 実行を最小限にするため最初から分かっていることは記述することにしています。

root.json
{
    "playlists": {
        "ファイル指定": { "type": "file", "id": "1xxxxxxxxxxx" },
        "フォルダー指定": { "type": "folder", "id": "1xxxxxxxxxxx" }
    },
    "synonyms": {}
}

3. Google Drive API 利用準備

 必要なことは以下の 2 点です。

  • GCP で Google Drive API を有効化したサービスアカウントと秘密鍵を用意する
  • 楽曲ファイルの入った Google Drive フォルダで、上記サービスアカウントに対して編集者権限で共有設定する

 編集者権限としましたが、もう聞かなくなったファイルの削除機能とかを実装しようとしない限り閲覧者権限で良いです。

 上記だけでは分からない方向けに詳細な準備手順を載せておきます。初めて GCP を利用する場合はクレジットカードの登録などが必要ですが、その部分の手順までは書けませんので Web で検索するなどして対応してください。

詳細な準備手順 (折り畳み)
  1. Google Cloud Console にログインします
  2. プロジェクトの選択 リストをクリックし、ダイアログの 新しいプロジェクト をクリックします
    drive-api-01.png
  3. プロジェクト名 (この例では drive-api-for-alexa) を入力して 作成 ボタンをクリックします
    drive-api-02.png
  4. これ以降のすべての手順で、作成したプロジェクト (drive-api-for-alexa) が上部のリストで選択された状態であるものとします
  5. サイドバーの 有効な API とサービス を選択し、╋ API とサービスを有効にする をクリックします
    drive-api-03.png
  6. API の検索テキストボックスに google drive api と入力し、表示結果の中から Google Drive API をクリックします
    drive-api-04.png
  7. 表示された API 詳細画面の 有効にする をクリックします
    drive-api-05.png
    ここまでで Google Drive API の有効化は完了です。
    続いてプログラム (スキル) に割り当てる Google Drive API 用アカウントとなるサービスアカウントを作成します
  8. サイドバーの 認証情報 を選択し、╋ 認証情報を作成 をクリックして表示されるリストから サービスアカウント をクリックします
    drive-api-06.png
  9. サービスアカウントの作成画面で サービス アカウント名 (この例では drive-api-for-alexa) を入力します。サービス アカウント ID (メールアドレス) も連動して入力されますが、後でこのアドレスに対して Google Drive フォルダの編集権限を付与しますので、必要に応じて都合の良い ID に変更してください。
    アカウント名とアカウント ID が入力できたら 完了 をクリックします
    drive-api-07.png
  10. 作成が完了すると画面が更新されてサービスアカウントが一覧で表示されますので、作成したサービスアカウントのメールアドレスのリンク (drive-api-for-alexa@drive-api-for-alexa.iam.gserviceaccount.com) をクリックします
  11. サービスアカウントの詳細が表示されたら のタブを選択し、キーを追加 のリストから 新しい鍵を作成 をクリックします
    drive-api-08.png
  12. 秘密鍵の作成ダイアログが表示されますので、キーのタイプとして JSON を選択し、作成 をクリックします
    drive-api-09.png
  13. 秘密鍵の記録された json ファイルがダウンロードされますので無くさないように保管してください。
    紛失した場合は タブに作成した秘密鍵一覧が表示されますので、該当鍵の行のゴミ箱アイコンをクリックして秘密鍵を削除したあと、もう一度鍵を作成してそちらを使ってください
    drive-api-10.png
    ここまででサービスアカウントの作成は完了です。
    続いて作成したサービスアカウントに対して、Google Drive のアクセス権限を付与します
  14. Google Drive の楽曲ファイルが入ったフォルダ (この例では musics) の右クリックメニューから 共有 をクリックします
    drive-api-11.png
  15. ユーザー、グループ、カレンダーの予定を追加 と書かれたテキストボックスをクリックし、作成したサービスアカウントのメールアドレスを入力します
    drive-api-12.png
  16. 権限のリストから 編集者 を選び、通知 のチェックボックスを外してから 共有 をクリックします
    drive-api-13.png

Google Drive API を利用するための準備は以上です。

4. Alexa Skill の作成

 開発コンソールでのスキル作成手順は以前からほとんど変わっていないので要点のみ記述します。画面イメージは Node.js 版の記事 を参考にしてください。

4.1. スキル作成

  1. 開発コンソール にログインします
  2. スキルタブのスキル一覧画面右上の スキルの作成 ボタンをクリックします
  3. 名前、ロケール
    • スキル名を入力します (例 MyPlayer。スキル呼び出し名ではありません)
    • プライマリロケールとして 日本語 を選択します
    • 次へ ボタンをクリックします
  4. エクスペリエンス、モデル、ホスティングサービス
    • エクスペリエンスのタイプとして 音楽&オーディオ を選択します
    • モデルとして カスタム を選択します (選択済です)
    • ホスティングサービスとして Alexa-hosted (Python) を選択します
    • ホスト地域として 米国西部(オレゴン) を選択します
    • 次へ ボタンをクリックします
  5. テンプレート
    • Templates として スクラッチで作成 を選択します
    • 次へ ボタンをクリックします
  6. 審査
    • 内容を確認し、スキルを作成する ボタンをクリックします

4.2. スキル設定

注) 確認のため上記手順でスキルを作成したところ開発コンソールの表示が一部英語になってしまったため、以下の説明では日本語表示の場合と単語が異なっているかも知れません。

4.2.1. 呼び出し名

 ビルドタブ、サイドバーの 呼び出し を展開して 呼び出し名 を選択し、スキルの呼び出し名に任意の名称 (例 マイプレイヤー) を入力し、上部の モデルを保存 ボタンをクリックします。
 スキル呼び出し名が認識しずらいものだとスキル起動がままならないため、認識しやすい単語を色々試してみてください。

4.2.2. インターフェース

 ビルドタブ、サイドバーの アセット を展開して インターフェース を選択し、Audio Player のスイッチを ON にし、インターフェースを保存 ボタンをクリックします。

4.2.3 インテント

 ビルドタブ、サイドバーの 対話モデル を展開して インテント を選択し、インテント一覧の HelloWorldIntent を削除します。
 続いて +インテントを追加 ボタンをクリックし、以下 3 つのカスタムインテントを作成します。発話サンプルは 1 つのみ載せますので適宜バリエーションを追加してください。

インテント名 発話サンプル インテントスロット名 スロットタイプ
PlayIntent {playlist} を再生して playlist AMAZON.Artist
ShufflePlayIntent {playlist} をシャッフル再生して playlist AMAZON.Artist
JumpTrackIntent トラック {track} track AMAZON.NUMBER

 作成できたら モデルを保存 ボタンをクリックし、モデルをビルド ボタンをクリックします。

4.3. コード実装

 コードエディタタブを開き、以下の 4 ファイルを準備します。
 フォルダーツリーのファイル名をダブルクリックするとそのファイルのタブが表示され、編集できます。

4.3.1. requirements.txt

 以下の内容をコピペします。
 Google Drive API 用のライブラリーと、楽曲ファイルの ID3Tag を読み取るライブラリーを使います。

requirements.txt
ask-sdk-core==1.19.0
google-api-python-client==2.154.0
google-auth==2.36.0
tinytag==2.0.0

4.3.2. environments.py

 このファイルは元々存在しませんので新規に作成します。上部左側の ファイルを作成 アイコンをクリックし、ファイルのパスとして /lambda/envirionments.py と入力して ファイルを作成 をクリックします。
 ROOT_ID は Google Drive に置いた root.json のファイル ID、ACCOUNT_INFO はサービスアカウントの秘密鍵の情報でスキル作成者ごとに固有の内容になります。以下をコピペしても動きません ので本記事の内容を理解の上、記述してください。

environments.py
# Google Drive に置いた root.json のファイル ID
ROOT_ID = "1234567890abcdefghijklmnopqrstuvw"
# サービスアカウントの秘密鍵の情報
ACCOUNT_INFO = {
  "type": "service_account",
  "project_id": "drive-api-for-alexa",
  "private_key_id": "316e981abdcd87fe13ae3feb44b2b839f2f43b67",
  "private_key": "-----BEGIN PRIVATE KEY-----(省略)-----END PRIVATE KEY-----\n",
  "client_email": "drive-api-for-alexa@drive-api-for-alexa.iam.gserviceaccount.com",
  "client_id": "117638054734388579398",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/drive-api-for-alexa%40drive-api-for-alexa.iam.gserviceaccount.com",
  "universe_domain": "googleapis.com"
}

 秘密鍵などの情報はコードには記述せず環境変数に格納したものを参照するのがセオリーですが、Alexa-hosted のスキルでは環境変数を追加設定できません。本記事では py に記述しておいて import していますが、各自安全な方法に置き換えてください

4.3.3. lambda_function.py

 スキルの要求 (Intent) や AudioPlayer からの要求 (Request) に対応する本体部分です。
 後述する utils.py に定義されている PlaybackManagerPlaylistUtil クラスを利用して Intent や Request に応じた処理を実行しています。
 スキル応答としての発話内容を決定するのもこのファイル内での処理になります。

lambda_function.py (折り畳み)
lambda_function.py
# This sample demonstrates handling intents from an Alexa skill using the Alexa Skills Kit SDK for Python.
# Please visit https://alexa.design/cookbook for additional examples on implementing slots, dialog management,
# session persistence, api calls, and more.
# This sample is built using the handler classes approach in skill builder.
import logging

import ask_sdk_core.utils as ask_utils
from ask_sdk_core.dispatch_components import AbstractExceptionHandler, AbstractRequestHandler
from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_core.skill_builder import SkillBuilder
from ask_sdk_core.utils.request_util import get_slot_value
from ask_sdk_model import Response
from utils import PlaybackManager, PlaylistUtil

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


# 標準ビルトインテント https://developer.amazon.com/ja-JP/docs/alexa/custom-skills/standard-built-in-intents.html
# AudioPlayer インターフェース https://developer.amazon.com/ja-JP/docs/alexa/custom-skills/audioplayer-interface-reference.html
class LaunchRequestHandler(AbstractRequestHandler):
    """Handler for Skill Launch."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_request_type("LaunchRequest")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = reprompt = "プレイリスト名を教えてください。"
        return handler_input.response_builder.speak(answer).ask(reprompt).response


class PlayIntentHandler(AbstractRequestHandler):
    """Handler for (Shuffle) Play Intent."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        names = ["PlayIntent", "ShufflePlayIntent"]
        return any(ask_utils.is_intent_name(name)(handler_input) for name in names)

    def handle(self, handler_input: HandlerInput) -> Response:
        slot_value = get_slot_value(handler_input, "playlist") or ""
        info = PlaylistUtil.find(slot_value)
        if not info:
            answer = "ルートジェイソンが読み込めません。"
            return handler_input.response_builder.speak(answer).response
        elif not info.type:
            answer = f"{slot_value} はプレイリストに登録されていません。"
            reprompt = "もう一度プレイリスト名を教えてください。"
            logger.info(answer)
            return handler_input.response_builder.speak(answer).ask(reprompt).response

        enable = ask_utils.is_intent_name("ShufflePlayIntent")(handler_input)
        answer = f"{info.name}{'シャッフル' if enable else ''}再生します。"
        PlaybackManager(handler_input, info).shuffle(enable).play()
        return handler_input.response_builder.speak(answer).response


class PlaybackRequestHandler(AbstractRequestHandler):
    """Handler for AudioPlayer Request."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.get_request_type(handler_input).startswith("AudioPlayer.Playback")

    def handle(self, handler_input: HandlerInput) -> Response:
        request_type = ask_utils.get_request_type(handler_input)
        if request_type == "AudioPlayer.PlaybackNearlyFinished":
            PlaybackManager(handler_input).next().reserve()

        return handler_input.response_builder.response


class GoForwardTracksHandler(AbstractRequestHandler):
    """Handler for Go Forward Tracks."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.NextIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        PlaybackManager(handler_input).go_forward().play()
        return handler_input.response_builder.response


class GoBackTracksHandler(AbstractRequestHandler):
    """Handler for Go Back Tracks."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.PreviousIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        PlaybackManager(handler_input).go_back().play()
        return handler_input.response_builder.response


class JumpTrackHandler(AbstractRequestHandler):
    """Handler for Jump Track."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("JumpTrackIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        track = int(get_slot_value(handler_input, "track") or "0")
        answer = f"{track} 番目の曲を再生します。"
        PlaybackManager(handler_input).jump_to(track - 1).play()
        return handler_input.response_builder.speak(answer).response


class StartOverHandler(AbstractRequestHandler):
    """Handler for Start Over."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.StartOverIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = "最初から再生します。"
        PlaybackManager(handler_input).jump_to(0).play()
        return handler_input.response_builder.speak(answer).response


class ShuffleOnOffHandler(AbstractRequestHandler):
    """Handler for Shuffle on/off."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        names = ["AMAZON.ShuffleOnIntent", "AMAZON.ShuffleOffIntent"]
        return any(ask_utils.is_intent_name(name)(handler_input) for name in names)

    def handle(self, handler_input: HandlerInput) -> Response:
        enable = ask_utils.is_intent_name("AMAZON.ShuffleOnIntent")(handler_input)
        answer = "シャッフル再生します。" if enable else "順番に再生します。"
        PlaybackManager(handler_input).shuffle(enable).jump_to(0).play()
        return handler_input.response_builder.speak(answer).response


class LoopOnOffHandler(AbstractRequestHandler):
    """Handler for Loop on/off."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        names = ["AMAZON.LoopOnIntent", "AMAZON.LoopOffIntent"]
        return any(ask_utils.is_intent_name(name)(handler_input) for name in names)

    def handle(self, handler_input: HandlerInput) -> Response:
        enable = ask_utils.is_intent_name("AMAZON.LoopOnIntent")(handler_input)
        answer = f"ループ再生を {'オン' if enable else 'オフ'} にします。"
        PlaybackManager(handler_input).loop(enable).update_state()
        return handler_input.response_builder.speak(answer).response


class RepeatHandler(AbstractRequestHandler):
    """Handler for Repeat."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.RepeatIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = "現在の曲をリピート再生します。"
        PlaybackManager(handler_input).repeat().update_state()
        return handler_input.response_builder.speak(answer).response


class PauseHandler(AbstractRequestHandler):
    """Handler for Pause."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.PauseIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = "再生を一時停止します。"
        PlaybackManager(handler_input).pause()
        return handler_input.response_builder.speak(answer).response


class ResumeHandler(AbstractRequestHandler):
    """Handler for Resume."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.ResumeIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = "再生を再開します。"
        PlaybackManager(handler_input).update_state()
        return handler_input.response_builder.speak(answer).response


class HelpHandler(AbstractRequestHandler):
    """Handler for Help."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_intent_name("AMAZON.HelpIntent")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        manager = PlaybackManager(handler_input)
        if info := manager.get_playing_info():
            orig_track = f"。トラック {info.original_track + 1} " if info.is_shuffled else ""
            answer = f"これは。全 {info.total} 曲中 {info.track + 1} 番目{orig_track}の曲で。{info.info}。です。"
        elif names := PlaylistUtil.get_names():
            answer = f"利用可能なプレイリストは。{''.join(names)} です。"
        elif names is not None:
            answer = "プレイリストが登録されていません。"
        else:
            answer = "ルートジェイソンが読み込めません。"

        return handler_input.response_builder.speak(answer).response


class CancelOrStopHandler(AbstractRequestHandler):
    """Single handler for Cancel and Stop Intent."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        names = ["AMAZON.CancelIntent", "AMAZON.StopIntent"]
        return any(ask_utils.is_intent_name(name)(handler_input) for name in names)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = "再生を終了します。"
        PlaybackManager(handler_input).stop()
        return handler_input.response_builder.speak(answer).response


class SessionEndedRequestHandler(AbstractRequestHandler):
    """Handler for Session End."""

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_request_type("SessionEndedRequest")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        answer = "再生を終了します。"
        PlaybackManager(handler_input).stop()
        return handler_input.response_builder.speak(answer).response


class IntentReflectorHandler(AbstractRequestHandler):
    """The intent reflector is used for interaction model testing and debugging.
    It will simply repeat the intent the user said. You can create custom handlers
    for your intents by defining them above, then also adding them to the request
    handler chain below.
    """

    def can_handle(self, handler_input: HandlerInput) -> bool:
        return ask_utils.is_request_type("IntentRequest")(handler_input)

    def handle(self, handler_input: HandlerInput) -> Response:
        intent_name = ask_utils.get_intent_name(handler_input)
        answer = f"{intent_name} インテントがトリガーされました。"
        return handler_input.response_builder.speak(answer).response


class CatchAllExceptionHandler(AbstractExceptionHandler):
    """Generic error handling to capture any syntax or routing errors. If you receive an error
    stating the request handler chain is not found, you have not implemented a handler for
    the intent being invoked or included it in the skill builder below.
    """

    def can_handle(self, handler_input: HandlerInput, exception: Exception) -> bool:
        return True

    def handle(self, handler_input: HandlerInput, exception: Exception) -> Response:
        logger.error(exception, exc_info=True)
        speak_output = "問題が発生しました。もう一度試してください。"
        return handler_input.response_builder.speak(speak_output).ask(speak_output).response


# The SkillBuilder object acts as the entry point for your skill, routing all request and response
# payloads to the handlers above. Make sure any new handlers or interceptors you've
# defined are included below. The order matters - they're processed top to bottom.
sb = SkillBuilder()

sb.add_request_handler(LaunchRequestHandler())
sb.add_request_handler(PlayIntentHandler())
sb.add_request_handler(PlaybackRequestHandler())
sb.add_request_handler(GoForwardTracksHandler())
sb.add_request_handler(GoBackTracksHandler())
sb.add_request_handler(JumpTrackHandler())
sb.add_request_handler(StartOverHandler())
sb.add_request_handler(ShuffleOnOffHandler())
sb.add_request_handler(LoopOnOffHandler())
sb.add_request_handler(RepeatHandler())
sb.add_request_handler(PauseHandler())
sb.add_request_handler(ResumeHandler())
sb.add_request_handler(HelpHandler())
sb.add_request_handler(CancelOrStopHandler())
sb.add_request_handler(SessionEndedRequestHandler())
# make sure IntentReflectorHandler is last so it doesn't override your custom intent handlers
sb.add_request_handler(IntentReflectorHandler())

sb.add_exception_handler(CatchAllExceptionHandler())

lambda_handler = sb.lambda_handler()

4.3.4. utils.py

 AudioPlayer 固有処理を (ある程度) 隠ぺいして再生関連機能を提供する PlaybackManager と、Google Drive 固有処理を隠ぺいして再生リストアクセス機能を提供する PlaylistUtil を定義しています。

utils.py (折り畳み)
utils.py
import json
import logging
import random
import urllib.request
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from typing import Any, Optional

from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_model.interfaces.audioplayer import (
    AudioItem,
    ClearBehavior,
    ClearQueueDirective,
    PlayBehavior,
    PlayDirective,
    StopDirective,
    Stream,
)
from environments import ACCOUNT_INFO, ROOT_ID
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import Resource, build
from googleapiclient.http import MediaIoBaseDownload
from tinytag import TinyTag

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class PlaylistType(Enum):
    """プレイリスト種別"""

    file = 1
    """JSON ファイル形式プレイリスト"""
    folder = 2
    """フォルダ内抽出形式プレイリスト"""


class PlayIntent(Enum):
    """再生意図"""

    PLAY = 1
    """即時再生"""
    RESERVE = 2
    """予約再生"""
    UPDATE = 3
    """状態更新"""


@dataclass
class PlaylistInfo:
    """プレイリスト情報"""

    name: str
    """プレイリスト名"""
    type: Optional[PlaylistType] = None
    """プレイリスト種別"""
    id: str = ""
    """プレイリストID
    * `file`: json ファイル ID
    * `folder`: フォルダー ID
    """


@dataclass
class PlayingInfo:
    """再生情報"""

    total: int
    """全トラック数"""
    track: int
    """現在トラック番号"""
    is_shuffled: bool
    """シャッフル再生中かどうか"""
    original_track: int
    """シャッフル無しの場合のトラック番号"""
    info: str
    """楽曲情報"""


class PlaybackManager:
    def __init__(self, handler_input: HandlerInput, list_info: Optional[PlaylistInfo] = None) -> None:
        """イニシャライザー

        Args:
            handler_input (HandlerInput): リクエストハンドラー入力
            list_info (PlaylistInfo | None): プレイリスト情報。スキル起動時のみ指定する
        """
        self._handler = handler_input
        token = self._handler.request_envelope.context.audio_player.token
        if not list_info and token:
            vals = self._handler.request_envelope.context.audio_player.token.split("||")
            self._type = PlaylistType(int(vals[0]))
            self._id = vals[1]
            self._total, self._track, self._index, self._shuffle = [int(v) for v in vals[2:6]]
            self._loop, self._repeat, self._fileid, self._info = vals[6:10]
        else:
            self._type = list_info.type if list_info else None
            self._id = list_info.id if list_info else ""
            self._total = self._track = self._index = self._shuffle = 0
            self._loop = self._repeat = self._fileid = self._info = ""

    @property
    def token(self) -> str:
        """str: 現在設定値に対応するトークン"""
        return (
            f"{self._type.value}||{self._id}||{self._total}||{self._track}||{self._index}||"
            f"{self._shuffle}||{self._loop}||{self._repeat}||{self._fileid}||{self._info}"
        )

    def next(self) -> "PlaybackManager":
        """トラック番号を次の曲に進める。

        リピート再生中は同じトラック番号を維持する
        """
        self._track += int(not self._repeat)
        return self

    def go_forward(self, tracks: int = 1) -> "PlaybackManager":
        """トラック番号を指定した曲数分進めてリピートを解除する"""
        self._track += tracks
        self._repeat = ""
        return self

    def go_back(self, tracks: int = 1) -> "PlaybackManager":
        """トラック番号を指定した曲数分戻してリピートを解除する"""
        self._track -= tracks
        self._repeat = ""
        return self

    def jump_to(self, track: int) -> "PlaybackManager":
        """トラック番号を指定した番号に変更してリピートを解除する"""
        self._track = track
        self._repeat = ""
        return self

    def shuffle(self, enable: bool) -> "PlaybackManager":
        """シャッフル再生の ON/OFF を切り替える"""
        self._shuffle = random.randint(100, 999) if enable else 0
        return self

    def loop(self, enable: bool) -> "PlaybackManager":
        """ループ再生の ON/OFF を切り替える"""
        self._loop = "on" if enable else ""
        return self

    def repeat(self) -> "PlaybackManager":
        """リピート再生を ON にする"""
        self._repeat = "on"
        return self

    def play(self) -> None:
        """現在設定値に対応する曲をすぐに再生する"""
        if directive := self._create_directive():
            self._handler.response_builder.add_directive(directive).set_should_end_session(True)

    def reserve(self) -> None:
        """現在設定値に対応する曲の再生を予約する"""
        if directive := self._create_directive(PlayIntent.RESERVE):
            self._handler.response_builder.add_directive(directive).set_should_end_session(True)

    def update_state(self) -> None:
        """現在設定値に応じた再生状態に更新する"""
        if directive := self._create_directive(PlayIntent.UPDATE):
            self._handler.response_builder.add_directive(directive).set_should_end_session(True)

    def pause(self) -> "PlaybackManager":
        """再生キューを維持した状態で再生を停止する"""
        directive = StopDirective()
        self._handler.response_builder.add_directive(directive).set_should_end_session(True)
        return self

    def stop(self) -> "PlaybackManager":
        """再生キューをクリアして再生を停止する"""
        directive = ClearQueueDirective(ClearBehavior.CLEAR_ALL)
        self._handler.response_builder.add_directive(directive).set_should_end_session(True)
        return self

    def get_playing_info(self) -> Optional[PlayingInfo]:
        """再生中の楽曲情報を取得する

        Returns:
            PlayingInfo|None: 再生中の楽曲情報。再生中でない場合 None
        """
        if not self._fileid:
            return None
        if not self._info:
            texts = []
            tag = PlaylistUtil.get_track_info(self._fileid)
            if tag.album:
                texts.extend(["アルバム", tag.album])
            if tag.artist:
                texts.append(tag.artist)
            if texts:
                texts.append("")
            if tag.title:
                texts.append(tag.title)
            elif tag.filename:
                texts.append(tag.filename)
            else:
                texts.append("曲名不明")
            self._info = "".join(texts)

        return PlayingInfo(
            total=self._total,
            track=self._track,
            is_shuffled=bool(self._shuffle),
            original_track=self._index,
            info=self._info,
        )

    def _create_directive(self, intent: PlayIntent = PlayIntent.PLAY) -> Optional[PlayDirective]:
        """PlayDirective を作成する

        Args:
            intent (Intent): 再生意図

        Returns:
            PlayDirective|None: PlayDirective インスタンス。再生トラックが決定できない場合 None
        """
        if not self._decide_track():
            return None

        url = PlaylistUtil.get_url(self._fileid)
        behavior, prev_token, offset = PlayBehavior.REPLACE_ALL, None, None
        if intent == PlayIntent.RESERVE:
            behavior = PlayBehavior.ENQUEUE
            prev_token = self._handler.request_envelope.context.audio_player.token
        elif intent == PlayIntent.UPDATE:
            offset = self._handler.request_envelope.context.audio_player.offset_in_milliseconds

        return PlayDirective(behavior, AudioItem(Stream(prev_token, self.token, url, offset)))

    def _decide_track(self) -> bool:
        """再生トラックを決定する

        Returns:
            bool: トラックを決定できた場合 True、全曲終了やエラーの場合 False
        """
        tracks = PlaylistUtil.get_tracks(self._type, self._id)
        if not tracks:
            return False

        self._total = len(tracks)
        track = self._track % self._total if self._loop else self._track
        if not 0 <= track < self._total:
            return False

        index = track
        if self._shuffle:
            random.seed(self._shuffle)
            index = random.sample(range(self._total), track + 1)[-1]

        info: dict = tracks[index]
        self._track = track
        self._index = index
        self._fileid = str(info["id"])
        self._info = str(info.get("info", ""))
        return True


class PlaylistUtil:
    """プレイリストユーティリティー"""

    @classmethod
    def get_names(cls) -> Optional[list]:
        """プレイリスト名のリストを取得する

        Returns:
            list[str]|None: プレイリスト名のリスト。root.json が存在しない場合 None
        """
        root: Optional[dict] = cls._load_json(ROOT_ID)
        if isinstance(root, dict):
            return list(root.get("playlists", {}).keys())
        else:
            return None

    @classmethod
    def find(cls, request_name: str) -> Optional[PlaylistInfo]:
        """プレイリスト情報を取得する

        要求プレイリスト名を正規化 (空白除去 + 小文字化 + synonyms 置換) した名称に
        対応するプレイリスト情報を取得する。

        Args:
            request_name (str): 要求プレイリスト名

        Returns:
            PlayListInfo|None: プレイリスト情報。root.json が存在しない場合 None。
                対応するプレイリストが存在しない場合は `name` のみセットされる。
        """
        root: Optional[dict] = cls._load_json(ROOT_ID)
        if not isinstance(root, dict):
            return None

        name = request_name.replace(" ", "").lower()
        list_name: str = root.get("synonyms", {}).get(name, name)
        info: dict = root.get("playlists", {}).get(list_name, {})
        list_type = PlaylistType._member_map_.get(info.get("type"))
        list_id: str = info.get("id")
        if list_type and list_id:
            return PlaylistInfo(name=list_name, type=list_type, id=list_id)
        else:
            return PlaylistInfo(name=list_name)

    @classmethod
    def get_tracks(cls, type: PlaylistType, id: str) -> Optional[list]:
        """プレイリストのトラックリストを取得する

        Args:
            type (PlaylistType): プレイリスト種別
            id (str): プレイリスト ID

        Returns:
            list|None: トラックリスト。取得エラーの場合 None
        """
        try:
            if type == PlaylistType.file:
                return list(cls._load_json(id))

            with cls._get_resource() as resource:
                return cls._list_audio_files(resource, id)
        except Exception:
            logger.exception(f"get_tracks({type, id}) failed.")

    @classmethod
    def get_url(cls, id: str) -> str:
        """ダウンロード用 URL を取得する

        Args:
            id (str): Google Drive ファイル ID

        Returns:
            str: ダウンロード用 URL
        """
        return f"https://drive.google.com/uc?export=download&id={id}"

    @classmethod
    def get_track_info(cls, id: str) -> Optional[TinyTag]:
        """楽曲情報 (タグ情報) を取得する

        Args:
            id (str): 楽曲ファイル ID

        Returns:
            TinyTag|None: タグ情報。取得エラーの場合 None
        """
        try:
            with cls._get_resource() as resource:
                filename: Optional[str] = resource.files().get(fileId=id, fields="name").execute().get("name")
                file = BytesIO()
                request = resource.files().get_media(fileId=id)
                downloader = MediaIoBaseDownload(file, request)
                while not downloader.next_chunk()[1]:
                    pass

            return TinyTag.get(filename, file, duration=False)
        except Exception:
            logger.exception(f"get_track_info({id}) failed.")

    @classmethod
    def _load_json(cls, id: str) -> Optional[Any]:
        """json ファイルをロードする

        Args:
            id (str): ファイル ID

        Returns:
            Any|None: ロードしたオブジェクト。エラーの場合 None
        """
        try:
            req = urllib.request.Request(cls.get_url(id))
            with urllib.request.urlopen(req) as res:
                return json.load(res)
        except Exception:
            logger.exception(f"load_json({id}) failed")

    @classmethod
    def _get_resource(cls) -> Resource:
        """Google Drive リソースを取得する

        Returns:
            Resource: Google Drive リソース
        """
        scopes = ["https://www.googleapis.com/auth/drive"]
        cred = Credentials.from_service_account_info(info=ACCOUNT_INFO, scopes=scopes)
        return build("drive", "v3", credentials=cred)

    @classmethod
    def _list_audio_files(cls, service: Resource, folder_id: str) -> list:
        """指定フォルダー以下の楽曲ファイルをリストアップする

        Args:
            service (Resource): Google Drive リソース
            folder_id (str): フォルダー ID

        Returns:
            list[dict[str,str]]: 楽曲ファイルリスト
        """
        # https://developers.google.com/drive/api/guides/search-files
        MIME_FOLDER = "application/vnd.google-apps.folder"
        query = (
            f"'{folder_id}' in parents and trashed = false "
            f"and (mimeType = '{MIME_FOLDER}' or mimeType contains 'audio/')"
        )
        result = service.files().list(q=query, fields="files(id,  mimeType)", orderBy="name").execute()
        files, sub_files = [], []
        for item in result.get("files", []):
            if item["mimeType"] == MIME_FOLDER:
                sub_files += cls._list_audio_files(service, item["id"])
            else:
                files.append({"id": item["id"]})

        return files + sub_files

4.4. デプロイ

 すべてのファイルを記述できたら上部右側の 保存 ボタンをクリックし、続いて デプロイ ボタンをクリックしてください。

4.5. 実機テスト有効化

 スキル開発者のアカウントと紐づいた Amazon Echo 実機でスキルを実行可能にします。
 テストタブを選択し、スキルテストが有効になっているステージとして 開発中 を選択します。

5. 再生関連ファイル準備

5.1. root.json (必須)

 environments.py の ROOT_ID にファイル ID を指定するファイルです。Google Drive 上に作成し、共有の設定から リンクを知っている全員 に対して閲覧者権限を付与してください。
 再生リストファイル指定で再生する場合は "type" に "file"、"id" にはファイル ID を記述し、フォルダー指定で再生する場合は "type" に "folder"、"id" にはフォルダー ID を記述します。
 "playlists"、"synonyms" ともにアルファベットは小文字化し、半角スペースを除去したものをキー名称として指定します。

root.json
{
    "playlists": {
        "ファイル指定": { "type": "file", "id": "1xxxxxxxxxxx" },
        "フォルダー指定": { "type": "folder", "id": "1xxxxxxxxxxx" },
        "easylistening": { "type": "folder", "id": "1xxxxxxxxxxx" }
    },
    "synonyms": {
        "イージーリスニング": "easylistening",
        "egリスニング": "easylistening"
    }
}

5.2. 再生リスト (任意)

 再生対象の楽曲ファイルを JSON のリストとして記述したファイルです。
 root.json の "playlists" には "type": "file" で記述します。ファイル ID でアクセスするためファイル名は何でも構いません。

 "info" の内容は、その曲を再生中に {スキル名} でヘルプ と指示した際にその曲の情報として返答する内容になります。"info" を定義していない場合はファイルに埋め込まれた ID3Tag の情報を使って返答します。

easylistening.json
[
  { "id": "1234567890abcdefghijklmnopqrstuvw", "info": "曲名1" },
  { "id": "1234567891abcdefghijklmnopqrstuvw", "info": "曲名2" },
  { "id": "1234567892abcdefghijklmnopqrstuvw" }
]

5.3. 楽曲ファイル (必須)

 再生対象の楽曲ファイルです。
 フォルダ単位、ファイル単位、どちらでも良いですが root.json と同様に共有の設定から リンクを知っている全員 に対して閲覧者権限を付与してください。

6. おわりに

 駆け足で記事を書いたので不親切な箇所が多々ありますが一旦公開します。時間がある時に少しずつ説明やスクショを追加できればと思います。
 Node.js 版よりもマシな仕組みになっていると思いますが参考になれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?