RyuseiSato
@RyuseiSato (SAI)

Are you sure you want to delete the question?

If your question is resolved, you may close it.

Leaving a resolved question undeleted may help others!

We hope you find it useful!

Fluentdを利用したツイート収集でcreated_atで日付型で保存したい。

解決したいこと

Fluentdを利用したツイート収集でdatetimeで日付型で保存したいです。

EC2のAmazon Linux2とRDSのデータベースを使い、Fluentdを経由してMySQLにツイートデータを格納しました。

mysql_bulkプラグインを使用し、日付をSTR_TO_DATE(?, '%a %b %d %H:%i:%s +0000 %Y')で保存使用としたのですが、mysql_bulkプラグインではsqlパラメータがサポートされておらず、保存することができませんでした。
日付型で保存するにはどう対応したら良いでしょうか。
解決方法を教えて欲しいです。

ただtext型で生の日付の取得はできました。

以下が出力されたデータです。

mysql> SELECT * FROM tweet LIMIT 3\G
*************************** 1. row ***************************
            id: 1485944507086835718
       user_id: 1042074786745262080
          text: #初リプ・初絡み大歓迎  #おやツイ 
 #藍井エイル 
raw_created_at: Tue Jan 25 11:55:52 +0000 2022
*************************** 2. row ***************************
            id: 1485944511302107136
       user_id: 1177086436782166021
          text: やっと帰ったー
raw_created_at: Tue Jan 25 11:55:53 +0000 2022
*************************** 3. row ***************************
            id: 1485944511268364289
       user_id: 776342196
          text: RT @mwhlll: لما احد يحسب اني قلتله كل شي بحياتي: https://t.co/KRVCmFDEHS
raw_created_at: Tue Jan 25 11:55:53 +0000 2022
*************************** 4. row ***************************

以下がMySQLの設定です。

CREATE TABLE IF NOT EXISTS tweet (
id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
text TEXT,
raw_created_at TEXT,
KEY id (id),
KEY idx_user_id (user_id)
) DEFAULT CHARSET=utf8;

以下がfluentdの設定です。

<source>
  @type twitter
  consumer_key 〇〇〇〇
  consumer_secret 〇〇〇〇
  access_token 〇〇〇〇
  access_token_secret 〇〇〇〇
  tag input.twitter
  timeline sampling
  lang ja
  output_format flat
</source>

<match input.twitter>
  @type mysql_bulk
  host 〇〇〇〇
  database textdata
  column_names id,user_id,text,created_at
  sql INSERT INTO tweet (id, user_id, text, created_at) VALUES (?, ?, ?, STR_TO_DATE(?, "%H:%i:%s +0000 %Y %b %d"))
  table tweet
  username 〇〇〇〇
  password 〇〇〇〇
  <buffer>
   flush_interval 10s
  </buffer>
</match>
0

1Answer

TEXT型で取得できた」とおっしゃっていますが、
貼り付けてあるSELECTの結果はraw_created_atNULLになってますし、
またmatchのセクションでは列名がcreated_atだったり無効なsqlパラメータがあったりと、
何一つ整合的になってないです。

今一度、現状を確認し、自身でその意味を整理されたうえで貼りなおしてください。


以下、ご自身でも動作検証ができるように基本的なことを書いていきます。
これらの知識と、現状の把握と、少しの検索があれば、自ずとアプローチの仕方は分かるはずです。
(もちろん、それでも行き詰まった場合は再度お答えします。)

最小構成

まず、Fluentdの設定(td-agent.confとします)から、既存のtwittermysql_bulkの設定をコメントアウトして、代わりに次を記述します。

td-agent.conf
<source>
  @type tail
  tag input.twitter
  path /tmp/dammy_input
  <parse>
    @type json
  </parse>
</source>

<match input.twitter>
  @type stdout
</match>

この構成で、dammy_inputにメッセージを追記します。

echo '{"id": 12345, "user_id": 6789, "created_at": "Wed Oct 10 20:19:24 +0000 2018"}' >> /tmp/dammy_input

うまくいけば、ログに次のような行が現れます。

td-agent.log
2022-01-26 12:34:56.123456789 +0900 test.tweet: {"id":12345,"user_id":6789,"created_at":"Wed Oct 10 20:19:24 +0000 2018"}

ここまで単純明快だと思います。
input.twitterというタグを介して入力と出力が繋がっているのが分かるでしょうか。

ここからそれぞれのセクションを置き換えることで、twittermysql_bulk単体の動作を見ることができます。
また、何かあればいつでもこの状態に戻ってくることができます。
プログラミング同様、いきなり全体を構築するのは無茶です

タイムスタンプのパース

次に、本題の日付に取り掛かります。
対処法は一つではありませんが、今回はFluentdにタイムスタンプをパースさせる方法を試してみます。

Fluentdでは、メッセージは本文とは別に、その日時を保持しています。(上記ログに出力されています)
デフォルトではこれは現在時刻ですが、本文から取得することもできます。
そのためには、sourceセクションやmatchセクションの内部にparseセクションを追加すればいいです。

今回はsourceセクションの時点でパースしましょう。
内部ではRubyのstrptimeが使われているらしいので、そっちのドキュメントも参照しながら設定すると、こんな感じです。

td-agent.conf
<source>
  @type tail
  tag input.twitter
  path /tmp/dammy_input
  <parse>
    @type json
    time_key created_at
    time_type string
    time_format %a %b %d %H:%M:%S %z %Y
  </parse>
</source>

これで再度dummy_inputに追記すると、
本文からcreated_atが消えて、タイムスタンプに反映されます。
(タイムゾーンをパースした結果、日本時間になります)

td-agent.log
2018-10-10 05:19:24.000000000 +0900 test.tweet: {"id":12345,"user_id":6789}

created_atへの反映

あとは入力と出力をそれぞれのプラグインに置き換えていくだけです。
細かく動作を確認し、失敗の原因を切り分けることを忘れないでください。

mysql_bulkでタイムスタンプをINSERTするには、key_names${time}と指定すればいいです。

これはタイムスタンプをMySQLが直接パース可能な文字列に変換するので、
挿入先の型は文字列でもDATETIMEでも大丈夫です。

0Like

Comments

  1. @RyuseiSato

    Questioner

    回答ありがとうございます。
    raw_created_atのNULLはコピペミスでした。現在は修正し、日付もTEXT型で取得できました。
    最小構成のdammy_inputは探したところ見つからなかったため、新たに作成するファイルのことで合っているでしょうか?

  2. > raw_created_atのNULLはコピペミスでした。現在は修正し、日付もTEXT型で取得できました。

    matchセクションもおかしいです。
    再度確認の上、設定項目の意味を逐一ご理解ください。

    > 最小構成のdammy_inputは探したところ見つからなかったため、新たに作成するファイルのことで合っているでしょうか?

    はい。
    直後にechoで作ってます。

Your answer might help someone💌