5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

日経平均のチャート画像を毎日Twitter投稿するLambda関数(Rust)を作る

Last updated at Posted at 2021-10-15

はじめに

スクレイピングした日経平均のチャート画像を毎日Twitter投稿するLambda関数をRustで作成しました。

成果物

環境など

  • 実行環境:AWS Lambda (カスタムランタイム) + Amazon EventBridge + AWS S3
  • 開発言語:Rust
  • 開発環境:macOS + VSCode + AWS SAM + (Docker)
  • 株価チャート取得先:StockCharts.com
  • Twitter投稿:TwitterAPI
  • 構成図作成: Python + diagrams + graphviz

概要

やっていることは以下です。

  • Amazon EventBridgeで毎日1回Lambda関数1つ目を起動する
  • AWS Lambda関数1つ目にて、チャート画像のURLをスクレイピングし取得した画像をS3へ保存
  • AWS S3に画像が保存されたタイミングをトリガーにして2つ目のLambda関数を起動する
  • AWS Lambda関数2つ目にて、S3から画像を取得してTwitterへ投稿

構成

event_processing.png
Diagramsで作成しましたが、文字がずれてしまう・・・。

主なライブラリ

httpリクエストに使ったライブラリ:reqwest
htmlから画像URLを抽出するために使ったライブラリ:scraper
twitter投稿に使ったライブラリ:egg-mode
AWS S3まわり:aws-configws-sdk-s3aws-types
AWS Lambdaまわり:lambda_runtimevlambda_http

Cargo.toml(折りたたみ)
Cargo.toml
[package]
name = "stock_data"
version = "0.1.0"
authors = ["c3drive <mihara000satsuki@yahoo.co.jp>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lambda_runtime = "0.4"
lambda_http = "0.4.0"

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.59"

# https://crates.io/crates/reqwest/0.11.4
reqwest = { version = "0.11.4", features = ["json"] }
tokio = { version = "1", features = ["full"] }

scraper = "0.12"
async-trait = "0.1.36"
url = { version = "2", features = ["serde"] }

# Make Custom Error
thiserror = "1.0.26"

# Get Server Time
chrono = "0.4"

# Added due to reqwest dependency problems when cross-compiling for RPi
openssl = { version = "0.10.32", features = ["vendored"] }

# Web Resources Download
bytes = "1"

# Put File on AWS S3
aws-config = { git = "https://github.com/awslabs/aws-sdk-rust", tag = "v0.0.18-alpha", package = "aws-config" }
aws-sdk-s3 = { git = "https://github.com/awslabs/aws-sdk-rust", tag = "v0.0.18-alpha", package = "aws-sdk-s3" }
aws-types = { git = "https://github.com/awslabs/aws-sdk-rust", tag = "v0.0.15-alpha", package = "aws-types" }

# Config
config = "0.11"
dotenv = "0.15.0"
# Global DATA
once_cell = "1"

# Twitter
egg-mode = "0.16"

[[bin]]
name = "hello"
path = "src/main.rs"

[[bin]]
name = "stock"
path = "src/stock.rs"

[[bin]]
name = "tweet"
path = "src/twitter.rs"

開発について

機能のわりにソース量が多くなってしまったので悩んだところを抜粋します。

Rustをオブジェクト指向っぽく書きたい

今回、http通信がいくつか発生するのでリクエストの基底クラスをつくり、継承することを考えていましたが、Rustにはクラスがありませんでしたのでそれっぽく作ります。

interface/mod.rsに基底クラスとしてトレイトとリクエスト部を記載しました。
トレイトは、fn new() -> Self ;と定義だけになっているものと、fn add_param(&mut self, _values: Vec<String>) { }のようにデフォルトメソッドを実装しているものがあります。
トレイトは、implから使われ、定義された関数が実装されない場合コンパイルエラーになります。
今回、add_paramの関数の中身は何も定義していませんが、このように書くことで本トレイトを伴って実装するimpl側でadd_paramを実装しなくて良くなります。
※impl側で、実装しない場合はトレイトの関数が呼ばれる動作になり、impl側で、add_paramを実装した場合はimpl側の実装が優先されます。

なお、このトレイトはもう少し抽象度を高くすべきでした。引数と返り値の型制約のせいで汎用的に使えない場合がありました。

send関数も、トレイトのデフォルトメソッドとして定義したかったのですが、selfを多用した形でのトレイトのコーディング作法がうまく整理できず諦めて外出ししました。トレイトを実装したメソッドから単なる関数として呼ばれています。

mod.rs
pub mod get_stockcharts;
pub mod get_stockchartsimg;
pub mod manage_s3;
pub mod post_tweet;

//####################################################
// ↓↓↓↓↓↓↓ Interface Base class ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//####################################################
use async_trait::{async_trait};
use bytes::Bytes;
use reqwest::{Response, StatusCode};
use std::collections::HashMap;
use thiserror::Error;

use stock_data::make_log;

#[derive(Error, Debug)]
pub enum ApiError {
    #[error("[ERROR] NotFound(404 Not Found: code({0}), url({1})")]
    NotFound(String, String),
    #[error("[ERROR] not 200 http return : code({0}), url({1})")]
    InterfaceException(String, String),
    #[error("[ERROR] Failed to send a request: {0}")]
    SendRequest(#[source] reqwest::Error),
    #[error("[ERROR] Failed to read the response body: {0}")]
    ResponseBody(#[source] reqwest::Error),
}
// IF実装する際のトレイト
#[async_trait]
pub trait Interface: Sync + Send {
    // コンストラクタ
    fn new() -> Self ;
    // デフォルトは何もしない。パラメータがあれば各IFで実装
    fn add_param(&mut self, _values: Vec<String>) {    }
    // HTTPリクエスト送信
    async fn send_request(&mut self) -> Result<(), ApiError>;
    // Responseを解析し、必要なデータを抽出&contentへ格納
    async fn on_parse(&mut self, response: Response) -> Result<(), ApiError>;
    // contentの返却
    fn get_content(&self) -> HashMap<String, String>;
}
pub async fn send(url: &str) -> Result<Response, ApiError> {
    make_log("[INFO]", "send_request", "reqwest::get start");
    // TODO 関数化したい
    let result = reqwest::get(url).await;
    let response = match result {
        Ok(result) => result,
        Err(e) => {
            return Err(ApiError::SendRequest(e));
        }
    };

    make_log("[INFO]", "send_request", "reqwest::analyze start");
    // Check if status is within 200-299.
    if response.status().is_success() {
        return Ok(response);
    } else {
        // not 200 http return
        match response.status() {
            StatusCode::NOT_FOUND => {
                println!("error: 目的のページがありませんでした。");
                return Err(ApiError::NotFound(response.status().to_string(), url.to_string()));
            },
            _ => {
                println!("error: その他のエラーが発生しました。");
                return Err(ApiError::InterfaceException(response.status().to_string(), url.to_string()));
            }
        }
    }
}
//####################################################
// ↑↑↑↑↑↑↑↑ Interface Base class ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
//####################################################

トレイトの実装例として、チャート画像URLのあるHTMLへのリクエストをするモジュールをピックアップします。
struct GetStockChartsIFクラスとして、リクエストに必要な情報とレスポンスを格納するフィールドを定義します。(レスポンスは別にすべきだったかもしれません)
mod.rsで定義したトレイトInterfaceを継承し実装します。
トレイトで定義した関数を実装しないとエラーになるので中身を実装します。add_paramは実装しなくても問題ありませんが、このGetStockChartsIFクラスではリクエストパラメータにクエリを加えるため実装(トレイトのからの関数をオーバーライド)しています。

get_stockcharts.rs
use async_trait::{async_trait};
use reqwest::Response;
use stock_data::*;
use std::collections::HashMap;
use url::Url;
use crate::interfaces::{ApiError, Interface, send};

pub struct GetStockChartsIF {
    url: String,
    body: String,
    content: HashMap<String, String>,
}

#[async_trait]
impl Interface for GetStockChartsIF {

    // コンストラクタ
    fn new() -> GetStockChartsIF {        
        GetStockChartsIF {
            url: String::from("https://stockcharts.com/h-sc/ui"),
            body: String::new(),
            content: HashMap::new(),
        }
    }
    
    // パラメータセット(オーバーライド)
    fn add_param(&mut self, values: Vec<String>) {
        let keys = vec![String::from("s"),];
        let params: HashMap<_, _> = keys.iter().zip(values.iter()).collect();
        self.url = String::from(Url::parse_with_params(&self.url, params).unwrap());
    }

    // リクエスト送信
    async fn send_request(&mut self) -> Result<(), ApiError> {
        make_log("[INFO]", "send_request", "start");

        make_log("[INFO]", "send_request", "send start");
        let response = send(&self.url).await?;
        self.on_parse(response).await;

        make_log("[INFO]", "send_request", "end");
        return Ok(());
    }

    // レスポンスパース
    async fn on_parse(&mut self, response: Response) -> Result<(), ApiError> {
        make_log("[INFO]", "on_parse", "start");

        // パース
        make_log("[INFO]", "on_parse", "reqwest::text start");
        let text = response.text().await;
        let httpxml = match text {
            Ok(httpxml) => httpxml,
            Err(e) => {
                return Err(ApiError::ResponseBody(e));
            }
        };
        let body = httpxml;

        // チャート画像URL抜き出し(1つしかない想定なので、一番最初のURLを使う)
        let links = get_links(&body, "https:".to_string());
        let url = String::from(&links[0]);
    
        // 結果格納
        &self.body.push_str(&body);

        &self.content.insert(String::from("body"), body);
        &self.content.insert(String::from("url"), url);

        make_log("[INFO]", "on_parse", "end");
        return Ok(());
    }

    // 返却
    fn get_content(&self) -> HashMap<String, String> {
        self.content.clone()
    }
    
}

GetStockChartsIFクラスは、以下のように使います。

stock.rs
    // SetUp
    let mut chart = GetStockChartsIF::new();
    let values = vec![String::from(&event.ticker),];
    chart.add_param(values);

    // Request
    chart.send_request().await?;

    // Result
    let bodys = chart.get_content();
    let url = &bodys["url"];
    println!("{}", url);
参考

複数関数を含んだtemplate.yamlの書き方がわからない

以下のように書けました。

template.yaml
Resources:
  StockRustFunction: # 1つ目
    Type: AWS::Serverless::Function
    Metadata:
      BuildMethod: makefile
  TweetRustFunction: # 2つ目
    Type: AWS::Serverless::Function
    Metadata:
      BuildMethod: makefile
Makefile
build-StockRustFunction:
	cargo build --bin stock --release --target x86_64-unknown-linux-musl
	cp ./target/x86_64-unknown-linux-musl/release/stock $(ARTIFACTS_DIR)/bootstrap

build-TweetRustFunction:
	cargo build --bin tweet --release --target x86_64-unknown-linux-musl
	cp ./target/x86_64-unknown-linux-musl/release/tweet $(ARTIFACTS_DIR)/bootstrap
Cargo.toml
[[bin]]
name = "stock"
path = "src/stock.rs"

[[bin]]
name = "tweet"
path = "src/twitter.rs"

######参考
https://techpilot.dev/article/aws-lambda-rust

####Amazon EventBridgeトリガーも一緒にデプロイしたい

以下のように書けました。

StockRustFunctionがAmazon EventBridgeをトリガーにしたいLambda関数です。
Eventsには月〜金の0時に引数を伴って起動する(設定はGMTのため-9時間の15時)トリガーを設定しています。
今回は日経平均のみ利用していますが、関数自体は引数を変えることで日経平均以外も対応可能なように作っているためInputに引数を設定しています。これによりトリガーは引数を伴って関数を起動します。

template.yaml
  StockRustFunction:
    Type: AWS::Serverless::Function
      Events:
        CWSchedule:
          Type: Schedule
          Properties:
            Schedule: 'cron(0 15 ? * MON-FRI *)'
            Name: GetStockChartDailySchedule
            Description: get daily stockchart schedule
            Input:
              !Sub |
                {
                    "ticker": "$NIKK"
                }
            Enabled: True

S3トリガーも一緒にデプロイしたい

以下のように書けました。

TweetRustFunctionがS3をトリガーにしたいLambda関数です。
EventsにはS3バケットと、トリガーにするプレフィックスを設定しています。こちらも日経平均のみを対象にしているのでプレフィックスでディレクトリを絞っています。
ProjectS3Bucketはトリガーを設定するS3です。template.yamlのトリガーにS3を含める場合、S3も定義して合わせて作成する必要があります。(既存のS3を使うことができない)

template.yaml

Resources:
  ProjectS3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref AwsS3Bucket

  TweetRustFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: TweetRust
      Events:
        S3exampleEvent:
          Type: S3
          Properties:
            Bucket: !Ref ProjectS3Bucket
            Events: s3:ObjectCreated:*
            Filter:
              S3Key:
                Rules:
                - Name: prefix
                  Value: stock_data/%24NIKK/

環境変数も一緒にデプロイしたい

こちらの記事にも書きましたが、④の方式です。
Rust側の環境設定ファイルの利用は、dotenvonce_cellを使いました。
ローカル環境では、.envを参照しています。

Parametersでsam実行時の引数を受け取ります。
Testなど定義した項目をEnvironmentTEST: !Ref Testで参照するようにします。
これにより、Lambdaの実行環境の環境変数TESTに渡されたパラメータがセットされます。
なお内容はTwitterAPIのキーなどを環境変数にしています。使うのは、2つ目の関数だけなのですが、Rust側で共通的に読み込んでしまっているためどちらの関数にも同じEnvironmentを定義しています。

template.yaml
Parameters:
  Test:
    Type: String
  AwsS3Bucket:
    Type: String
  ConsumerKey:
    Type: String
  ConsumerSecret:
    Type: String
  BearerToken:
    Type: String
  AccessToken:
    Type: String
  AccessTokenSecret:
    Type: String

Resources:
  StockRustFunction:
    Type: AWS::Serverless::Function
      Environment:
          Variables:
            TEST: !Ref Test
            AWS_S3_BUCKET: !Ref AwsS3Bucket
            CONSUMER_KEY: !Ref ConsumerKey
            CONSUMER_SECRET: !Ref ConsumerSecret
            BEARER_TOKEN: !Ref BearerToken
            ACCESS_TOKEN: !Ref AccessToken
            ACCESS_TOKEN_SECRET: !Ref AccessTokenSecret

今回は引数をsamconfig.yamlに定義しました。

samconfig.yaml
version = 0.1
[default]
[default.global.parameters]
parameter_overrides = "Test=HelloEnv AwsS3Bucket=********** ConsumerKey=********** ConsumerSecret=********** BearerToken=********** AccessToken=********** AccessTokenSecret=**********"

この書き方で、samを実行した時、どのような場合も引数を伴いますが、デプロイだけなら[default.deploy.parameters]などに定義すれば良いと思います。build時の定義は意味がありませんでした。
これにより、Testに渡される値は、HelloEnvになります。

ローカル環境で引数を伴うトリガーからLambda関数のテストをしたい

AWS SAMをローカルにインストールしていると、sam local invokeでテストが可能です。
複数関数をビルドしている場合は、関数名を指定してsam local invoke StockRustFunctionとします。
今回は2つの関数どちらも引数を伴うイベントがトリガーなので、このままでは引数がないエラーとなります。
以下のように書きました。

実行コマンド
$ sam local invoke StockRustFunction -e events/event.json
$ sam local invoke TweetRustFunction -e events/s3.json

引数に指定したjsonはAWSコンソールのLambda関数画面にて「テスト」からサンプルを取得して改変しました。

events.json
{
    "ticker": "$NIKK"
}
s3.json
{
    "Records": [
      {
        "eventVersion": "2.1",
        "eventSource": "aws:s3",
        "awsRegion": "ap-northeast-1",
        "eventTime": "2021-09-29T12:35:37.523Z",
        "eventName": "ObjectCreated:Put",
        "userIdentity": {
          "principalId": "AWS:AROAWHUBMU2PID64MSMGG:StockRust"
        },
        "requestParameters": {
          "sourceIPAddress": "54.248.9.188"
        },
        "responseElements": {
          "x-amz-request-id": "V38TVB9J70XZ26CD",
          "x-amz-id-2": "s9J3JrMTJyW4NiD25zfcb6JZoPsyDFGPb2dVHHpFtaIm6VUlQ3ZNJQmZArSTfXNcuaJC91ithRSN+8cys5DalVQeZUfMHXnl"
        },
        "s3": {
          "s3SchemaVersion": "1.0",
          "configurationId": "hello",
          "bucket": {
            "name": "my-work-project-bucket",
            "ownerIdentity": {
              "principalId": "A3C3MPNETA4CEY"
            },
            "arn": "arn:aws:s3:::my-work-project-bucket"
          },
          "object": {
            "key": "stock_data/$NIKK/$NIKK_20211002.png",
            "size": 19714,
            "eTag": "0139f2a60b23be511ce5ed1c5b9af8ef",
            "sequencer": "0061545D9D5976EB66"
          }
        }
      }
    ]
  }

なお、テスト実行時の引数は、samconfig.yamlに記載することで省略できます。

samconfig.yaml
[default.local_invoke]
[default.local_invoke.parameters]
event = "events/event.json"
実行コマンド
# sam local invoke StockRustFunction -e events/event.json
# ↓
$ sam local invoke StockRustFunction

S3トリガーのパラメータが多すぎる

前述のs3.jsonの通りですがs3をトリガーにするとパラメータがものすごくたくさん渡ってきます。
この中で、関数から使いたいのはイベント時間、S3バケット名とキー(画像名)しかありません。

使いたいパラメータのみを定義した、strustを作成して、ハンドラーfuncevent: S3PutEventにて受け取ります。
知識不足で使いたいパラメータの親階層から順にstrustを定義していく形になっていますが、もっと良い方法がある気がします。

tweet.rs
use serde::{Deserialize, Serialize};

struct S3PutEvent {
    // 大文字始まりのjsonを小文字(records)で探してしまうので大文字(Records)明示
    // #[serde(rename_all = "PascalCase")]でもOK
    #[serde(rename = "Records")]
    Records: Vec<Record>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Record {
    eventTime: String,
    s3: S3Data,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct S3Data {
    bucket: Bucket,
    object: Object,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Bucket {
    name: String,
    arn: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Object {
    key: String,
    size: usize,
    #[serde(rename = "eTag")]
    etag: String,
    sequencer: String,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    let func = handler_fn(func);
    lambda_runtime::run(func).await?;
    Ok(())
}
async fn func(event: S3PutEvent, _: Context) -> Result<CustomOutput, Error> {
    // イベントを受け取る
    let time = &event.Records[0].eventTime;
    let s3_data = &event.Records[0].s3;
}

おわりに

これを作った経緯は、日経平均があまりに上がらないので、上がらない前提で売買できないものかと、IN出来そうなタイミングをチャート画像で分析していたためです。
1633868018728.jpg
※分析内容は単なる思いつきで、確度が高いものではありません。

分析にチャート画像を使っていたため、実際に注文を出す際も株価などの数値ではなく、毎日チャート画像を通知するものが欲しかったです。
完成する前に最高の売り場が来てしまいました。
sc.png

ありがとうございました。

2023/05/13 追記

TwitterAPIを取り巻く環境の変化と本開発と運用における目的は果たせたのでトリガーとなっているイベントの「無効化」を行いました。
Amazon EventBridge > ルール > GetStockChartDailySchedule

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?