はじめに
スクレイピングした日経平均のチャート画像を毎日Twitter投稿するLambda関数をRustで作成しました。
成果物
環境など
- 実行環境:AWS Lambda (カスタムランタイム) + Amazon EventBridge + AWS S3
- 開発言語:Rust
- 開発環境:macOS + VSCode + AWS SAM + (Docker)
- 株価チャート取得先:StockCharts.com
- Twitter投稿:TwitterAPI
- 構成図作成: Python + diagrams + graphviz
概要
やっていることは以下です。
- Amazon EventBridgeで毎日1回Lambda関数1つ目を起動する
- AWS Lambda関数1つ目にて、チャート画像のURLをスクレイピングし取得した画像をS3へ保存
- AWS S3に画像が保存されたタイミングをトリガーにして2つ目のLambda関数を起動する
- AWS Lambda関数2つ目にて、S3から画像を取得してTwitterへ投稿
構成
Diagramsで作成しましたが、文字がずれてしまう・・・。
主なライブラリ
httpリクエストに使ったライブラリ:reqwest
htmlから画像URLを抽出するために使ったライブラリ:scraper
twitter投稿に使ったライブラリ:egg-mode
AWS S3まわり:aws-config
、ws-sdk-s3
、aws-types
AWS Lambdaまわり:lambda_runtimev
、lambda_http
Cargo.toml(折りたたみ)
[package]
name = "stock_data"
version = "0.1.0"
authors = ["c3drive <mihara000satsuki@yahoo.co.jp>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lambda_runtime = "0.4"
lambda_http = "0.4.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.59"
# https://crates.io/crates/reqwest/0.11.4
reqwest = { version = "0.11.4", features = ["json"] }
tokio = { version = "1", features = ["full"] }
scraper = "0.12"
async-trait = "0.1.36"
url = { version = "2", features = ["serde"] }
# Make Custom Error
thiserror = "1.0.26"
# Get Server Time
chrono = "0.4"
# Added due to reqwest dependency problems when cross-compiling for RPi
openssl = { version = "0.10.32", features = ["vendored"] }
# Web Resources Download
bytes = "1"
# Put File on AWS S3
aws-config = { git = "https://github.com/awslabs/aws-sdk-rust", tag = "v0.0.18-alpha", package = "aws-config" }
aws-sdk-s3 = { git = "https://github.com/awslabs/aws-sdk-rust", tag = "v0.0.18-alpha", package = "aws-sdk-s3" }
aws-types = { git = "https://github.com/awslabs/aws-sdk-rust", tag = "v0.0.15-alpha", package = "aws-types" }
# Config
config = "0.11"
dotenv = "0.15.0"
# Global DATA
once_cell = "1"
# Twitter
egg-mode = "0.16"
[[bin]]
name = "hello"
path = "src/main.rs"
[[bin]]
name = "stock"
path = "src/stock.rs"
[[bin]]
name = "tweet"
path = "src/twitter.rs"
開発について
機能のわりにソース量が多くなってしまったので悩んだところを抜粋します。
Rustをオブジェクト指向っぽく書きたい
今回、http通信がいくつか発生するのでリクエストの基底クラスをつくり、継承することを考えていましたが、Rustにはクラスがありませんでしたのでそれっぽく作ります。
interface/mod.rs
に基底クラスとしてトレイトとリクエスト部を記載しました。
トレイトは、fn new() -> Self ;
と定義だけになっているものと、fn add_param(&mut self, _values: Vec<String>) { }
のようにデフォルトメソッドを実装しているものがあります。
トレイトは、implから使われ、定義された関数が実装されない場合コンパイルエラーになります。
今回、add_param
の関数の中身は何も定義していませんが、このように書くことで本トレイトを伴って実装するimpl側でadd_param
を実装しなくて良くなります。
※impl側で、実装しない場合はトレイトの関数が呼ばれる動作になり、impl側で、add_param
を実装した場合はimpl側の実装が優先されます。
なお、このトレイトはもう少し抽象度を高くすべきでした。引数と返り値の型制約のせいで汎用的に使えない場合がありました。
send
関数も、トレイトのデフォルトメソッドとして定義したかったのですが、selfを多用した形でのトレイトのコーディング作法がうまく整理できず諦めて外出ししました。トレイトを実装したメソッドから単なる関数として呼ばれています。
pub mod get_stockcharts;
pub mod get_stockchartsimg;
pub mod manage_s3;
pub mod post_tweet;
//####################################################
// ↓↓↓↓↓↓↓ Interface Base class ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//####################################################
use async_trait::{async_trait};
use bytes::Bytes;
use reqwest::{Response, StatusCode};
use std::collections::HashMap;
use thiserror::Error;
use stock_data::make_log;
#[derive(Error, Debug)]
pub enum ApiError {
#[error("[ERROR] NotFound(404 Not Found: code({0}), url({1})")]
NotFound(String, String),
#[error("[ERROR] not 200 http return : code({0}), url({1})")]
InterfaceException(String, String),
#[error("[ERROR] Failed to send a request: {0}")]
SendRequest(#[source] reqwest::Error),
#[error("[ERROR] Failed to read the response body: {0}")]
ResponseBody(#[source] reqwest::Error),
}
// IF実装する際のトレイト
#[async_trait]
pub trait Interface: Sync + Send {
// コンストラクタ
fn new() -> Self ;
// デフォルトは何もしない。パラメータがあれば各IFで実装
fn add_param(&mut self, _values: Vec<String>) { }
// HTTPリクエスト送信
async fn send_request(&mut self) -> Result<(), ApiError>;
// Responseを解析し、必要なデータを抽出&contentへ格納
async fn on_parse(&mut self, response: Response) -> Result<(), ApiError>;
// contentの返却
fn get_content(&self) -> HashMap<String, String>;
}
pub async fn send(url: &str) -> Result<Response, ApiError> {
make_log("[INFO]", "send_request", "reqwest::get start");
// TODO 関数化したい
let result = reqwest::get(url).await;
let response = match result {
Ok(result) => result,
Err(e) => {
return Err(ApiError::SendRequest(e));
}
};
make_log("[INFO]", "send_request", "reqwest::analyze start");
// Check if status is within 200-299.
if response.status().is_success() {
return Ok(response);
} else {
// not 200 http return
match response.status() {
StatusCode::NOT_FOUND => {
println!("error: 目的のページがありませんでした。");
return Err(ApiError::NotFound(response.status().to_string(), url.to_string()));
},
_ => {
println!("error: その他のエラーが発生しました。");
return Err(ApiError::InterfaceException(response.status().to_string(), url.to_string()));
}
}
}
}
//####################################################
// ↑↑↑↑↑↑↑↑ Interface Base class ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
//####################################################
トレイトの実装例として、チャート画像URLのあるHTMLへのリクエストをするモジュールをピックアップします。
struct GetStockChartsIF
クラスとして、リクエストに必要な情報とレスポンスを格納するフィールドを定義します。(レスポンスは別にすべきだったかもしれません)
mod.rs
で定義したトレイトInterface
を継承し実装します。
トレイトで定義した関数を実装しないとエラーになるので中身を実装します。add_param
は実装しなくても問題ありませんが、このGetStockChartsIF
クラスではリクエストパラメータにクエリを加えるため実装(トレイトのからの関数をオーバーライド)しています。
use async_trait::{async_trait};
use reqwest::Response;
use stock_data::*;
use std::collections::HashMap;
use url::Url;
use crate::interfaces::{ApiError, Interface, send};
pub struct GetStockChartsIF {
url: String,
body: String,
content: HashMap<String, String>,
}
#[async_trait]
impl Interface for GetStockChartsIF {
// コンストラクタ
fn new() -> GetStockChartsIF {
GetStockChartsIF {
url: String::from("https://stockcharts.com/h-sc/ui"),
body: String::new(),
content: HashMap::new(),
}
}
// パラメータセット(オーバーライド)
fn add_param(&mut self, values: Vec<String>) {
let keys = vec![String::from("s"),];
let params: HashMap<_, _> = keys.iter().zip(values.iter()).collect();
self.url = String::from(Url::parse_with_params(&self.url, params).unwrap());
}
// リクエスト送信
async fn send_request(&mut self) -> Result<(), ApiError> {
make_log("[INFO]", "send_request", "start");
make_log("[INFO]", "send_request", "send start");
let response = send(&self.url).await?;
self.on_parse(response).await;
make_log("[INFO]", "send_request", "end");
return Ok(());
}
// レスポンスパース
async fn on_parse(&mut self, response: Response) -> Result<(), ApiError> {
make_log("[INFO]", "on_parse", "start");
// パース
make_log("[INFO]", "on_parse", "reqwest::text start");
let text = response.text().await;
let httpxml = match text {
Ok(httpxml) => httpxml,
Err(e) => {
return Err(ApiError::ResponseBody(e));
}
};
let body = httpxml;
// チャート画像URL抜き出し(1つしかない想定なので、一番最初のURLを使う)
let links = get_links(&body, "https:".to_string());
let url = String::from(&links[0]);
// 結果格納
&self.body.push_str(&body);
&self.content.insert(String::from("body"), body);
&self.content.insert(String::from("url"), url);
make_log("[INFO]", "on_parse", "end");
return Ok(());
}
// 返却
fn get_content(&self) -> HashMap<String, String> {
self.content.clone()
}
}
GetStockChartsIF
クラスは、以下のように使います。
// SetUp
let mut chart = GetStockChartsIF::new();
let values = vec![String::from(&event.ticker),];
chart.add_param(values);
// Request
chart.send_request().await?;
// Result
let bodys = chart.get_content();
let url = &bodys["url"];
println!("{}", url);
参考
複数関数を含んだtemplate.yamlの書き方がわからない
以下のように書けました。
Resources:
StockRustFunction: # 1つ目
Type: AWS::Serverless::Function
Metadata:
BuildMethod: makefile
TweetRustFunction: # 2つ目
Type: AWS::Serverless::Function
Metadata:
BuildMethod: makefile
build-StockRustFunction:
cargo build --bin stock --release --target x86_64-unknown-linux-musl
cp ./target/x86_64-unknown-linux-musl/release/stock $(ARTIFACTS_DIR)/bootstrap
build-TweetRustFunction:
cargo build --bin tweet --release --target x86_64-unknown-linux-musl
cp ./target/x86_64-unknown-linux-musl/release/tweet $(ARTIFACTS_DIR)/bootstrap
[[bin]]
name = "stock"
path = "src/stock.rs"
[[bin]]
name = "tweet"
path = "src/twitter.rs"
######参考
https://techpilot.dev/article/aws-lambda-rust
####Amazon EventBridgeトリガーも一緒にデプロイしたい
以下のように書けました。
StockRustFunction
がAmazon EventBridgeをトリガーにしたいLambda関数です。
Events
には月〜金の0時に引数を伴って起動する(設定はGMTのため-9時間の15時)トリガーを設定しています。
今回は日経平均のみ利用していますが、関数自体は引数を変えることで日経平均以外も対応可能なように作っているためInput
に引数を設定しています。これによりトリガーは引数を伴って関数を起動します。
StockRustFunction:
Type: AWS::Serverless::Function
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'cron(0 15 ? * MON-FRI *)'
Name: GetStockChartDailySchedule
Description: get daily stockchart schedule
Input:
!Sub |
{
"ticker": "$NIKK"
}
Enabled: True
S3トリガーも一緒にデプロイしたい
以下のように書けました。
TweetRustFunction
がS3をトリガーにしたいLambda関数です。
Events
にはS3バケットと、トリガーにするプレフィックスを設定しています。こちらも日経平均のみを対象にしているのでプレフィックスでディレクトリを絞っています。
ProjectS3Bucket
はトリガーを設定するS3です。template.yamlのトリガーにS3を含める場合、S3も定義して合わせて作成する必要があります。(既存のS3を使うことができない)
Resources:
ProjectS3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref AwsS3Bucket
TweetRustFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: TweetRust
Events:
S3exampleEvent:
Type: S3
Properties:
Bucket: !Ref ProjectS3Bucket
Events: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: prefix
Value: stock_data/%24NIKK/
環境変数も一緒にデプロイしたい
こちらの記事にも書きましたが、④の方式です。
Rust側の環境設定ファイルの利用は、dotenv
、once_cell
を使いました。
ローカル環境では、.env
を参照しています。
Parameters
でsam実行時の引数を受け取ります。
Test
など定義した項目をEnvironment
のTEST: !Ref Test
で参照するようにします。
これにより、Lambdaの実行環境の環境変数TEST
に渡されたパラメータがセットされます。
なお内容はTwitterAPIのキーなどを環境変数にしています。使うのは、2つ目の関数だけなのですが、Rust側で共通的に読み込んでしまっているためどちらの関数にも同じEnvironment
を定義しています。
Parameters:
Test:
Type: String
AwsS3Bucket:
Type: String
ConsumerKey:
Type: String
ConsumerSecret:
Type: String
BearerToken:
Type: String
AccessToken:
Type: String
AccessTokenSecret:
Type: String
Resources:
StockRustFunction:
Type: AWS::Serverless::Function
Environment:
Variables:
TEST: !Ref Test
AWS_S3_BUCKET: !Ref AwsS3Bucket
CONSUMER_KEY: !Ref ConsumerKey
CONSUMER_SECRET: !Ref ConsumerSecret
BEARER_TOKEN: !Ref BearerToken
ACCESS_TOKEN: !Ref AccessToken
ACCESS_TOKEN_SECRET: !Ref AccessTokenSecret
今回は引数をsamconfig.yaml
に定義しました。
version = 0.1
[default]
[default.global.parameters]
parameter_overrides = "Test=HelloEnv AwsS3Bucket=********** ConsumerKey=********** ConsumerSecret=********** BearerToken=********** AccessToken=********** AccessTokenSecret=**********"
この書き方で、samを実行した時、どのような場合も引数を伴いますが、デプロイだけなら[default.deploy.parameters]
などに定義すれば良いと思います。build
時の定義は意味がありませんでした。
これにより、Test
に渡される値は、HelloEnv
になります。
ローカル環境で引数を伴うトリガーからLambda関数のテストをしたい
AWS SAMをローカルにインストールしていると、sam local invoke
でテストが可能です。
複数関数をビルドしている場合は、関数名を指定してsam local invoke StockRustFunction
とします。
今回は2つの関数どちらも引数を伴うイベントがトリガーなので、このままでは引数がないエラーとなります。
以下のように書きました。
$ sam local invoke StockRustFunction -e events/event.json
$ sam local invoke TweetRustFunction -e events/s3.json
引数に指定したjsonはAWSコンソールのLambda関数画面にて「テスト」からサンプルを取得して改変しました。
{
"ticker": "$NIKK"
}
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "ap-northeast-1",
"eventTime": "2021-09-29T12:35:37.523Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AROAWHUBMU2PID64MSMGG:StockRust"
},
"requestParameters": {
"sourceIPAddress": "54.248.9.188"
},
"responseElements": {
"x-amz-request-id": "V38TVB9J70XZ26CD",
"x-amz-id-2": "s9J3JrMTJyW4NiD25zfcb6JZoPsyDFGPb2dVHHpFtaIm6VUlQ3ZNJQmZArSTfXNcuaJC91ithRSN+8cys5DalVQeZUfMHXnl"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "hello",
"bucket": {
"name": "my-work-project-bucket",
"ownerIdentity": {
"principalId": "A3C3MPNETA4CEY"
},
"arn": "arn:aws:s3:::my-work-project-bucket"
},
"object": {
"key": "stock_data/$NIKK/$NIKK_20211002.png",
"size": 19714,
"eTag": "0139f2a60b23be511ce5ed1c5b9af8ef",
"sequencer": "0061545D9D5976EB66"
}
}
}
]
}
なお、テスト実行時の引数は、samconfig.yaml
に記載することで省略できます。
[default.local_invoke]
[default.local_invoke.parameters]
event = "events/event.json"
# sam local invoke StockRustFunction -e events/event.json
# ↓
$ sam local invoke StockRustFunction
S3トリガーのパラメータが多すぎる
前述のs3.json
の通りですがs3をトリガーにするとパラメータがものすごくたくさん渡ってきます。
この中で、関数から使いたいのはイベント時間、S3バケット名とキー(画像名)しかありません。
使いたいパラメータのみを定義した、strustを作成して、ハンドラーfunc
でevent: S3PutEvent
にて受け取ります。
知識不足で使いたいパラメータの親階層から順にstrustを定義していく形になっていますが、もっと良い方法がある気がします。
use serde::{Deserialize, Serialize};
struct S3PutEvent {
// 大文字始まりのjsonを小文字(records)で探してしまうので大文字(Records)明示
// #[serde(rename_all = "PascalCase")]でもOK
#[serde(rename = "Records")]
Records: Vec<Record>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Record {
eventTime: String,
s3: S3Data,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct S3Data {
bucket: Bucket,
object: Object,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Bucket {
name: String,
arn: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Object {
key: String,
size: usize,
#[serde(rename = "eTag")]
etag: String,
sequencer: String,
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let func = handler_fn(func);
lambda_runtime::run(func).await?;
Ok(())
}
async fn func(event: S3PutEvent, _: Context) -> Result<CustomOutput, Error> {
// イベントを受け取る
let time = &event.Records[0].eventTime;
let s3_data = &event.Records[0].s3;
}
おわりに
これを作った経緯は、日経平均があまりに上がらないので、上がらない前提で売買できないものかと、IN出来そうなタイミングをチャート画像で分析していたためです。
※分析内容は単なる思いつきで、確度が高いものではありません。
分析にチャート画像を使っていたため、実際に注文を出す際も株価などの数値ではなく、毎日チャート画像を通知するものが欲しかったです。
完成する前に最高の売り場が来てしまいました。
ありがとうございました。
2023/05/13 追記
TwitterAPIを取り巻く環境の変化と本開発と運用における目的は果たせたのでトリガーとなっているイベントの「無効化」を行いました。
Amazon EventBridge > ルール > GetStockChartDailySchedule