概要
以下の記事で__Pythonスクリプト__から各APIを操作する方法をを紹介しました。
今回は、
- QiitaAPIを使って記事一覧からランダムで取得
- TwitterAPIを使っていい感じに投稿する
- Incoming Webhookを使って実行結果をSlackに通知する
- AWS lambdaに乗せて実行をする
- AWS CloutWatchで定期実行する
という部分を実装してみようと思います。
手順
環境
- Python 3.6.1
- requests-oauthlib==0.8.0
- requests==2.18.4
前提
以下のものは__事前__に用意しておいてください。
-
Python関連
- requestsのインストール
- requests-oauthlibのインストール
-
Twitter関連
- Twitterアカウント
- Consumer Key
- Consumer Secret
- Access Token
- Access Token Secret
-
AWS関連
- AWSアカウント
-
Qiita関連
- Qiitaアカウント
- Qiita記事
-
Slack関連
- Incoming Webhookの設定
- 通知先Chanel
また、今回は__リファクタリング等__はしていないので__冗長的なコーディング__になってしまっている部分がありますが、本質ではないので今回はそのままとさせていただきますorz
Twitterへの投稿
まずは__QiitaAPI__を利用して、__ランダムで投稿記事を取得__し、__Twitterに投稿__するところまで実装してみましょう。
CONSUMER_KEY = "{コンシューマーキー}"
CONSUMER_SECRET = "{コンシューマーシークレット}"
ACCESS_TOKEN = "{アクセストークン}"
ACCESS_TOKEN_SECRET = "{アクセストークンシークレット}"
import json, config, requests, random, sys #必要なライブラリの読み込み
from requests_oauthlib import OAuth1Session #OAuthのライブラリの読み込み
CONSUMER_KEY = config.CONSUMER_KEY
CONSUMER_SECRET = config.CONSUMER_SECRET
ACCESS_TOKEN = config.ACCESS_TOKEN
ACCESS_TOKEN_SECRET = config.ACCESS_TOKEN_SECRET
twitter = OAuth1Session(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET) #認証処理
TWITTER_API_HOST = "https://api.twitter.com/1.1" #TwitterAPIホスト
STATUSES_UPDATE_ENDPOINT = "/statuses/update.json" #ツイートエンドポイント
TARGET_USER_NAME = "{Qiitaユーザー名}" #取得対象のQiitaユーザー名
QIITA_API_HOST = "https://qiita.com/api/v2" #QiitaAPIホスト
USER_ENDPOINT = f"/users/{TARGET_USER_NAME}" #ユーザー情報取得エンドポイント
USER_ITEMS_ENDPOINT = f"/users/{TARGET_USER_NAME}/items" #ユーザー記事一覧取得エンドポイント
PAGE_PER_COUNT = 20 #1ページ辺りの記事取得件数
response = requests.get(
QIITA_API_HOST + USER_ENDPOINT
).json() #対象Qiitaアカウント情報をJsonで取得
totalItemCount = response['items_count'] #投稿総数を取得
pageCount = totalItemCount // PAGE_PER_COUNT #1ページ辺りの記事取得件数で取得出来たページ数を取得
hasSurplus = totalItemCount % PAGE_PER_COUNT != 0 #余りの記事数を取得
totalPageCount = pageCount + 1 if hasSurplus else pageCount #余りの記事数があった場合、端数分のページ数を追加
params = {
"page": random.randint(1,totalPageCount), #ページ番号をランダム生成
"per_page": PAGE_PER_COUNT
} #記事一覧をページ番号と1ページ辺りの表示件数を指定するためのパラメータを生成
response = requests.get(
QIITA_API_HOST + USER_ITEMS_ENDPOINT,
params=params
).json() #対象Qiitaアカウントの記事一覧を取得
if( response == [] ): #記事一覧が空だった場合
print("アイテムを取得出来ませんでしたので終了します。")
sys.exit() #処理終了
item = random.choice(response) #取得した一覧からランダムで記事を取得
url = item["url"] #記事URLを取得
title = item["title"] #記事タイトルを取得
tags = map(
lambda element: ("#" + element["name"]),
item["tags"]
) #タグ一覧をTwitterハッシュ化して配列として再生成
tagsStr = ' '.join(list(tags)) #タグ一覧を文字列結合
params = {
"status" : "【Qiita】" +title + "\n" + tagsStr + "\n" + url #ツイート内容を生成
} #Postするパラメータを生成
res = twitter.post(
TWITTER_API_HOST + STATUSES_UPDATE_ENDPOINT ,
params = params
) #APIにPost通信
if res.status_code == 200: #正常投稿出来た場合
print("Success.")
else: #正常投稿出来なかった場合
print("Failed. : %s"% vars(res))
確認
python script.py

それっぽくツイート出来てますね!
ただ、Qiitaの__タグ__で「.」や「-」が含まれてると、__Twitterのハッシュタグがうまく動きません__が、それは今回は__妥協__しましょうorz
Slackへの通知
次に__処理結果をSlackに通知__するところを実装してみましょう。
import json, config, requests, random, sys #必要なライブラリの読み込み
from requests_oauthlib import OAuth1Session #OAuthのライブラリの読み込み
CONSUMER_KEY = config.CONSUMER_KEY
CONSUMER_SECRET = config.CONSUMER_SECRET
ACCESS_TOKEN = config.ACCESS_TOKEN
ACCESS_TOKEN_SECRET = config.ACCESS_TOKEN_SECRET
twitter = OAuth1Session(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET) #認証処理
INCOMING_WEBHOOK_URL = config.INCOMING_WEBHOOK_URL
TWITTER_API_HOST = "https://api.twitter.com/1.1" #TwitterAPIホスト
STATUSES_UPDATE_ENDPOINT = "/statuses/update.json" #ツイートエンドポイント
TWITTER_USER_NAME = "Bakira_Tech_Bot"
QIITA_USER_NAME = "bakira" #取得対象のQiitaアカウント名
QIITA_API_HOST = "https://qiita.com/api/v2" #QiitaAPIホスト
USER_ENDPOINT = f"/users/{QIITA_USER_NAME}" #ユーザー情報取得エンドポイント
USER_ITEMS_ENDPOINT = f"/users/{QIITA_USER_NAME}/items" #ユーザー記事一覧取得エンドポイント
PAGE_PER_COUNT = 20 #1ページ辺りの記事取得件数
response = requests.get(
QIITA_API_HOST + USER_ENDPOINT
).json() #対象Qiitaアカウント情報をJsonで取得
totalItemCount = response['items_count'] #投稿総数を取得
pageCount = totalItemCount // PAGE_PER_COUNT #1ページ辺りの記事取得件数で取得出来たページ数を取得
hasSurplus = totalItemCount % PAGE_PER_COUNT != 0 #余りの記事数を取得
totalPageCount = pageCount + 1 if hasSurplus else pageCount #余りの記事数があった場合、端数分のページ数を追加
params = {
"page": random.randint(1,totalPageCount), #ページ番号をランダム生成
"per_page": PAGE_PER_COUNT
} #記事一覧をページ番号と1ページ辺りの表示件数を指定するためのパラメータを生成
response = requests.get(
QIITA_API_HOST + USER_ITEMS_ENDPOINT,
params=params
).json() #対象Qiitaアカウントの記事一覧を取得
if( response == [] ): #記事一覧が空だった場合
requests.post(
INCOMING_WEBHOOK_URL,
data = json.dumps(
{
"attachments":[
{
"fallback":"Qiita Article is Empty.",
"pretext":"Bot Result.",
"color":"#FF0000",
"fields":[
{
"title":"Qiita Item is Empty",
"value":"There was no Article"
}
]
}
]
}
))
sys.exit() #処理終了
item = random.choice(response) #取得した一覧からランダムで記事を取得
url = item["url"] #記事URLを取得
title = item["title"] #記事タイトルを取得
tags = map(
lambda element: ("#" + element["name"]),
item["tags"]
) #タグ一覧をTwitterハッシュ化して配列として再生成
tagsStr = ' '.join(list(tags)) #タグ一覧を文字列結合
params = {
"status" : "【Qiita】" +title + "\n" + tagsStr + "\n" + url #ツイート内容を生成
} #Postするパラメータを生成
res = twitter.post(
TWITTER_API_HOST + STATUSES_UPDATE_ENDPOINT ,
params = params
) #APIにPost通信
if res.status_code == 200: #正常投稿出来た場合
resDic = json.loads(res.text)
id = resDic["id"]
requests.post(
INCOMING_WEBHOOK_URL,
data = json.dumps(
{
"attachments":[
{
"fallback":"Tweet Success.",
"pretext":"Bot Result.",
"color":"#00FF00",
"fields":[
{
"title":"Tweet Success.",
"value":f"Tweet URL : https://twitter.com/{TWITTER_USER_NAME}/status/{id}"
}
]
}
]
}
))
else: #正常投稿出来なかった場合
requests.post(
INCOMING_WEBHOOK_URL,
data = json.dumps(
{
"attachments":[
{
"fallback":"Tweet Failed.",
"pretext":"Bot Result.",
"color":"#FF0000",
"fields":[
{
"title":"Tweet Failed.",
"value":f"Error.[{res.status_code}]"
}
]
}
]
}
))

細かい部分は__今後の課題__として、一旦しっかり__Slackへの通知__が出来ていそうです。
AWS-lambdaへ登録
Scriptをlambda向けに修正
先ほど作ったスクリプト__はコマンドから実行可能__ですが、lambda__上では動かないので__少し手を加えます。
修正点としては、
- ファイル名を
lambda_function.py
に変更 - 全体を
lambda_handler(event, context)
関数にする -
設定値を環境変数化
- ※環境変数の指定の仕方は後述します。
修正後のファイルは以下となります。
import json, os, requests, random, sys #必要なライブラリの読み込み
from requests_oauthlib import OAuth1Session #OAuthのライブラリの読み込み
def lambda_handler(event, context):
CONSUMER_KEY = os.environ.get('CONSUMER_KEY')
CONSUMER_SECRET = os.environ.get('CONSUMER_SECRET')
ACCESS_TOKEN = os.environ.get('ACCESS_TOKEN')
ACCESS_TOKEN_SECRET = os.environ.get('ACCESS_TOKEN_SECRET')
twitter = OAuth1Session(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET) #認証処理
INCOMING_WEBHOOK_URL = os.environ.get('INCOMING_WEBHOOK_URL')
TWITTER_API_HOST = "https://api.twitter.com/1.1" #TwitterAPIホスト
STATUSES_UPDATE_ENDPOINT = "/statuses/update.json" #ツイートエンドポイント
TWITTER_USER_NAME = os.environ.get('TWITTER_USER_NAME')
QIITA_USER_NAME = os.environ.get('QIITA_USER_NAME') #取得対象のQiitaアカウント名
QIITA_API_HOST = "https://qiita.com/api/v2" #QiitaAPIホスト
USER_ENDPOINT = f"/users/{QIITA_USER_NAME}" #ユーザー情報取得エンドポイント
USER_ITEMS_ENDPOINT = f"/users/{QIITA_USER_NAME}/items" #ユーザー記事一覧取得エンドポイント
PAGE_PER_COUNT = 20 #1ページ辺りの記事取得件数
response = requests.get(
QIITA_API_HOST + USER_ENDPOINT
).json() #対象Qiitaアカウント情報をJsonで取得
totalItemCount = response['items_count'] #投稿総数を取得
pageCount = totalItemCount // PAGE_PER_COUNT #1ページ辺りの記事取得件数で取得出来たページ数を取得
hasSurplus = totalItemCount % PAGE_PER_COUNT != 0 #余りの記事数を取得
totalPageCount = pageCount + 1 if hasSurplus else pageCount #余りの記事数があった場合、端数分のページ数を追加
params = {
"page": random.randint(1,totalPageCount), #ページ番号をランダム生成
"per_page": PAGE_PER_COUNT
} #記事一覧をページ番号と1ページ辺りの表示件数を指定するためのパラメータを生成
response = requests.get(
QIITA_API_HOST + USER_ITEMS_ENDPOINT,
params=params
).json() #対象Qiitaアカウントの記事一覧を取得
if( response == [] ): #記事一覧が空だった場合
requests.post(
INCOMING_WEBHOOK_URL,
data = json.dumps(
{
"attachments":[
{
"fallback":"Qiita Article is Empty.",
"pretext":"Bot Result.",
"color":"#FF0000",
"fields":[
{
"title":"Qiita Item is Empty",
"value":"There was no Article"
}
]
}
]
}
))
sys.exit() #処理終了
item = random.choice(response) #取得した一覧からランダムで記事を取得
url = item["url"] #記事URLを取得
title = item["title"] #記事タイトルを取得
tags = map(
lambda element: ("#" + element["name"]),
item["tags"]
) #タグ一覧をTwitterハッシュ化して配列として再生成
tagsStr = ' '.join(list(tags)) #タグ一覧を文字列結合
params = {
"status" : "【Qiita】" +title + "\n" + tagsStr + "\n" + url #ツイート内容を生成
} #Postするパラメータを生成
res = twitter.post(
TWITTER_API_HOST + STATUSES_UPDATE_ENDPOINT ,
params = params
) #APIにPost通信
if res.status_code == 200: #正常投稿出来た場合
resDic = json.loads(res.text)
id = resDic["id"]
requests.post(
INCOMING_WEBHOOK_URL,
data = json.dumps(
{
"attachments":[
{
"fallback":"Tweet Success.",
"pretext":"Bot Result.",
"color":"#00FF00",
"fields":[
{
"title":"Tweet Success.",
"value":f"Tweet URL : https://twitter.com/{TWITTER_USER_NAME}/status/{id}"
}
]
}
]
}
))
else: #正常投稿出来なかった場合
requests.post(
INCOMING_WEBHOOK_URL,
data = json.dumps(
{
"attachments":[
{
"fallback":"Tweet Failed.",
"pretext":"Bot Result.",
"color":"#FF0000",
"fields":[
{
"title":"Tweet Failed.",
"value":f"Error.[{res.status_code}]"
}
]
}
]
}
))
ライブラリを同階層にインストール
lambda__ではpipコマンドでモジュールをインストールすることが出来ない__ので、__サードパーティ製のモジュール__が必要な場合は__zip化してアップロード__する必要があります。
今回は
- requests
- requests_oauthlib
を__zipに含める__必要があります。
__作業ディレクトリ__で以下のコマンドを実行してください。
pip install requests -t .
pip install requests_oauthlib -t .
そうすると以下のように__展開__されます。
.
├── certifi
├── certifi-2017.11.5.dist-info
├── chardet
├── chardet-3.0.4.dist-info
├── idna
├── idna-2.6.dist-info
├── oauthlib
├── oauthlib-2.0.6.dist-info
├── requests
├── requests-2.18.4.dist-info
├── requests_oauthlib
├── requests_oauthlib-0.8.0.dist-info
├── lambda_function.py
├── urllib3
└── urllib3-1.22.dist-info
zip化
次にlambdaにアップロードするために__zip化__しましょう。
__作業ディレクトリ__で以下を実行してください。
zip -r upload.zip *
そうすると、__upload.zip__というファイルが出来ていると思います。
これを__アップロード__します。
AWS Lambdaの設定
Lambda関数の作成
次にAWSに__ログイン__し、__コンピューティング__メニューの__Lambda__を選択します。

__関数の作成ボタン__をクリックします。

__一から作成__を選択し、以下を設定の上、
- 名前 : 任意の名称
- ランタイム : Python 3.6
- ロール : テンプレートから新しいロールを作成
- ロール名 : 任意の名称
- ポリシーテンプレート : プルダウンからBasic Edge Lambda アクセス権限を選択
__関数の作成ボタン__をクリックしてください。

そうすると、Lambda関数が作成されるので__詳細__を設定していきます。

関数コード
コードエントリータイプを__.ZIPファイルをアップロード__、ランタイムを__Python 3.6__、ハンドラを__lambda_function.lambda_handler__、先ほど作成した__zipファイル__を選択します。

環境変数
Lambda関数単位で__環境変数__を指定することが出来ます。
今回は先ほど__config.py__に記載していたものを環境変数として設定してみましょう。

テスト
設定が出来たら__テスト実行__してみましょう。

このような表示がされたら__無事にスクリプトが動いた__という事になります。

Slackにも__通知__が来ていますね。
Lambda関数を使うメリット
__サーバーを用意する必要がない__ので__サーバーレス構成__を実現することが可能です。
Lambdaは他にも__AWSサービス群への操作をトリガーに処理を実行することが出来る__ので非常に便利です!!
また、__ログファイル__についても__デフォルト__で__CloudWatch Logsに吐き出してくれる__のでとても助かりますね。

AWS CloudWatchの設定
スケジューリング登録
次はこのスクリプトを__定期的に実行__するように仕込みます。
今回は__毎日00:00__に投稿するようにスケジューリングしてみましょう。
実現するには__CloudWatch Events__を利用します。
__管理ツールメニュー__の__CloudWatch__を選択します。

イベントから__ルールの作成__ボタンをクリックします。

__イベントソース__の__スケジュール__を選択し、Cron式に0 0 * * ? 0
を入力します。
__ターゲット__は__Lambda関数__にし、機能は__先ほど作ったLambda関数__を選択します。
そして、__設定の詳細ボタン__をクリックします。

名前を__任意の名称__にして__ルールの作成ボタン__をクリックします。

これでルールが作成されたので__スケジューリングは完了__です。

JSTとUTCの違い
ここですっかり__次の日の00:00__にLambda関数が実行されると思って油断していたら、、、

むむ。。。
__9:00__にLambda関数が__実行__されていました。
CloudWatchで指定するCronは__UTC(GMT)__で実行されるので、__JST__に置き換えた時間指定をする必要があります。
厳密には__9時間__のズレがあるので__-9時間__をすれば良いのです。
今回の場合__00:00__に実行したいので、0 15 * * ? 0
と指定します。

これで正常に__毎日00:00__にLambda関数が実行されるようになりました♪
※__処理のラグ__で多少通知時間がズレてますが良しとしましょう!!笑
