ゴール
1時間に1回タイムラインから質問ツイートを取得してAIに答えさせる
使ったもの
- Go 1.15.7
- AWS Lambda
- Serverless Framework
- SSM Parameter Store
Talk APIとは
リクルートさんが無料で提供しているAIプロジェクトA3RT(アート)のうちの1つのAPIです。
https://a3rt.recruit-tech.co.jp/product/talkAPI/
入力文に対して自動で応答文が返ってきます。
APIリクエストに入力文を載せてレスポンスとして応答文を受け取るイメージです。
仕様
- botのタイムラインから「?」が末尾のツイートを取得する
- 取得したツイートをTalkAPIへリクエストとして投げる
- 取得したツイートとレスポンスの応答文をQ&A形式にしてツイートする
- これを1時間に1回実行する
構成
twitter_bot/
├── Gopkg.lock
├── Gopkg.toml
├── Makefile
├── README.md
├── bin
│ └── tweet
├── serverless.yml
└── tweet
├── main.go
├── main_test.go
└── .env(gitignore対象)
Makefileはserverless create -t aws-go-dep -p myservice
で生成されるテンプレートを使っています。
Serverless Framework example for Golang and Lambda
ファイルを分割することも考えましたが、分量がそこまで多くなかったのでmain.goにすべて書いています。
実装
構造体
type TalkResponse struct {
Status int `json:"status"`
Massage string `json:"message"`
Results []Result `json:"results"`
}
type Result struct {
Perplexity float64 `json:"perplexity"`
Reply string `json:"reply"`
}
タイムラインからツイート一覧を取得
const (
COUNT = "1000"
)
func main() {
// local用
godotenv.Load(".env")
api := connectTwitterAPi()
tweets := getTweetFromTimeLine(api, COUNT)
}
func connectTwitterAPi() *anaconda.TwitterApi {
return anaconda.NewTwitterApiWithCredentials(
os.Getenv("ACCESS_TOKEN"),
os.Getenv("ACCESS_TOKEN_SECRET"),
os.Getenv("CONSUMER_KEY"),
os.Getenv("CONSUMER_SECRET"))
}
func getTweetFromTimeLine(api *anaconda.TwitterApi, count string) []anaconda.Tweet {
v := url.Values{}
v.Set("count", count)
tweets, err := api.GetHomeTimeline(v)
checkError(err)
return tweets
}
ローカルでは.env
で環境変数を管理し、
Lambda関数ではSSM Parameter Storeから取得します
ACCESS_TOKEN=
ACCESS_TOKEN_SECRET=
CONSUMER_KEY=
CONSUMER_SECRET=
APIKEY=
functions:
tweet:
handler: bin/tweet
environment:
ACCESS_TOKEN: ${ssm:/twitter/accessToken}
ACCESS_TOKEN_SECRET: ${ssm:/twitter/accessTokenSecret}
CONSUMER_KEY: ${ssm:/twitter/consumerKey}
CONSUMER_SECRET: ${ssm:/twitter/consumerSecret}
APIKEY: ${ssm:/twitter/a3rt/apikey}
末尾が「?」のツイートを抽出してTALK APIへリクエスト
const (
BASEURL = "https://api.a3rt.recruit-tech.co.jp/talk/v1/smalltalk"
COUNT = "1000"
)
func main() {
// local用
godotenv.Load(".env")
api := connectTwitterAPi()
tweets := getTweetFromTimeLine(api, COUNT)
for _, tweet := range tweets {
if HasSuffix(tweet) {
talkResponse := requestTalkAPI(tweet)
os.Exit(0)
}
}
}
func HasSuffix(tweet anaconda.Tweet) bool {
if strings.HasSuffix(tweet.FullText, "?") {
return true
}
return false
}
func requestTalkAPI(tweet anaconda.Tweet) TalkResponse {
req := buildRequest(tweet)
client := buildClient()
resp, err := client.Do(req)
checkError(err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var talkResponse TalkResponse
err = json.Unmarshal(body, &talkResponse)
checkError(err)
// debug
fmt.Println(tweet.FullText)
fmt.Println(talkResponse)
return talkResponse
}
func checkError(err error) {
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func buildRequest(tweet anaconda.Tweet) *http.Request {
endpoint := fmt.Sprintf("%s?apikey=%s", BASEURL, os.Getenv("APIKEY"))
values := make(url.Values)
values.Set("query", tweet.FullText)
req, err := http.NewRequest("POST", endpoint, strings.NewReader(values.Encode()))
checkError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
return req
}
func buildClient() *http.Client {
client := &http.Client{}
client.Timeout = time.Second * 15
return client
}
質問ツイートと応答分をツイート
func main() {
// local用
godotenv.Load(".env")
api := connectTwitterAPi()
tweets := getTweetFromTimeLine(api, COUNT)
for _, tweet := range tweets {
if HasSuffix(tweet) {
talkResponse := requestTalkAPI(tweet)
postTweet(talkResponse, tweet, api)
os.Exit(0)
}
}
}
func postTweet(talkResponse TalkResponse, tweet anaconda.Tweet, api *anaconda.TwitterApi) {
for _, result := range talkResponse.Results {
status := fmt.Sprintf("Q. %s\nA. %s", tweet.FullText, result.Reply)
api.PostTweet(status, nil)
}
}
全体
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/ChimeraCoder/anaconda"
"github.com/joho/godotenv"
)
const (
BASEURL = "https://api.a3rt.recruit-tech.co.jp/talk/v1/smalltalk"
COUNT = "1000"
)
func main() {
// local用
godotenv.Load(".env")
api := connectTwitterAPi()
tweets := getTweetFromTimeLine(api, COUNT)
for _, tweet := range tweets {
if HasSuffix(tweet) {
talkResponse := requestTalkAPI(tweet)
postTweet(talkResponse, tweet, api)
os.Exit(0)
}
}
}
func connectTwitterAPi() *anaconda.TwitterApi {
return anaconda.NewTwitterApiWithCredentials(
os.Getenv("ACCESS_TOKEN"),
os.Getenv("ACCESS_TOKEN_SECRET"),
os.Getenv("CONSUMER_KEY"),
os.Getenv("CONSUMER_SECRET"))
}
func getTweetFromTimeLine(api *anaconda.TwitterApi, count string) []anaconda.Tweet {
v := url.Values{}
v.Set("count", count)
tweets, err := api.GetHomeTimeline(v)
checkError(err)
return tweets
}
func requestTalkAPI(tweet anaconda.Tweet) TalkResponse {
req := buildRequest(tweet)
client := buildClient()
resp, err := client.Do(req)
checkError(err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var talkResponse TalkResponse
err = json.Unmarshal(body, &talkResponse)
checkError(err)
// debug
fmt.Println(tweet.FullText)
fmt.Println(talkResponse)
return talkResponse
}
func checkError(err error) {
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func HasSuffix(tweet anaconda.Tweet) bool {
if strings.HasSuffix(tweet.FullText, "?") {
return true
}
return false
}
func buildRequest(tweet anaconda.Tweet) *http.Request {
endpoint := fmt.Sprintf("%s?apikey=%s", BASEURL, os.Getenv("APIKEY"))
values := make(url.Values)
values.Set("query", tweet.FullText)
req, err := http.NewRequest("POST", endpoint, strings.NewReader(values.Encode()))
checkError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
return req
}
func buildClient() *http.Client {
client := &http.Client{}
client.Timeout = time.Second * 15
return client
}
func postTweet(talkResponse TalkResponse, tweet anaconda.Tweet, api *anaconda.TwitterApi) {
for _, result := range talkResponse.Results {
status := fmt.Sprintf("Q. %s\nA. %s", tweet.FullText, result.Reply)
api.PostTweet(status, nil)
}
}
type TalkResponse struct {
Status int `json:"status"`
Massage string `json:"message"`
Results []Result `json:"results"`
}
type Result struct {
Perplexity float64 `json:"perplexity"`
Reply string `json:"reply"`
}
1時間に1回の定期実行
serverless.ymlにeventsを追加します。
functions:
tweet:
handler: bin/tweet
environment:
ACCESS_TOKEN: ${ssm:/twitter/accessToken}
ACCESS_TOKEN_SECRET: ${ssm:/twitter/accessTokenSecret}
CONSUMER_KEY: ${ssm:/twitter/consumerKey}
CONSUMER_SECRET: ${ssm:/twitter/consumerSecret}
APIKEY: ${ssm:/twitter/a3rt/apikey}
events:
- schedule: rate(1 hour)
まとめ
twitter botは割と簡単に作成できてしまうので、試したことの無い方はぜひオリジナルのbotを作ってみては?
こちらソースコードです。
あとtestも雑ですが一応書いてあります。