10
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

DBのデータをTDに流し込む話第一弾

Last updated at Posted at 2016-02-29

はじめに

私は現在お仕事でdots.の開発をしています。
dots.での分析基盤を整えるためにTDを使うことになりました。
DBのデータをTDに流しこんでデータを扱いやすくしたいよねってなったのでその時の話です。
いろんなところで躓いたのでメモを残しておきます。
今回はその中でもDBとTDをどうやって繋ぐことにしたかっていうところを書きます。

環境

td-agentのバージョン

$ rpm -qi td-agent
Name        : td-agent                     Relocations: /
Version     : 2.2.0
・・・

td commandのバージョン

$ td --version
0.11.8.2

DB=>TDにデータを流し込む方法を選ぶ

DB=>TDにデータを流し込む方法は以下の2つを考えました

  1. Data Connector
  2. Bulk Import

Data Connectorだとcrontabで設定せず、tdコマンドでcronを設定でき簡潔だったのでまずはData Connectorを選択しました。

Data Connector

設定ファイルを作成

Data Connectorを使うにはまず設定ファイルを作成します

seed.yml
in:
  type: mysql
  host: mysql_host_name
  port: 3306
  user: test_user
  password: test_password
  database: test_database
  table: test_table
  select: "*"
out:
  mode: replace
  • type
  • TDに流し込むデータ元が何なのかを指定
  • 今回はmysqlから流し込むのでmysqlにしました
  • host 〜 table
  • 接続先サーバーの情報で埋めます
  • select
  • selectするカラムを指定
  • 複数ある場合は","で区切ります

その他よく使いそうなオプション

  • where
  • where条件入れたいよってときはwhereに条件式を書いてあげるとよいらしいです
  • query
  • selectではなくqueryにするとクエリをかけますJOINとかしたいときはqueryがよさそう

▼その他、詳しくはgithub repositoryのREADMEを読んで下さい
embulk/embulk-input-jdbc

さっそくコマンドを叩いてみる

設定ファイルを作成したら下記コマンドをたたきます

$ td connector:guess seed.yml -o load.yml

わろた

'connector' is not a td command. Run 'td' to show the list.
$ td --version
0.11.8.2

Install ‘td’ command v0.11.9 or later
ってドキュメントに書いてあったー/(^o^)\
ちなみにこの時td-agentのバージョンを2.3.0にあげてもらって0.13.0のtd commandをいれてもらいました。

$ rpm -qi td-agent
Name        : td-agent                     Relocations: /
Version     : 2.3.0
・・・
$ td --version
0.13.0

td commandのバージョンを上げてもらったので再挑戦

む・・・

java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
  	at com.google.common.base.Throwables.propagate(Throwables.java:160)
  	at org.embulk.input.jdbc.AbstractJdbcInputPlugin.transaction(AbstractJdbcInputPlugin.java:144)
  	at org.embulk.plugin.compat.InputPluginWrapper.transaction(InputPluginWrapper.java:57)
  	at org.embulk.exec.PreviewExecutor.doPreview(PreviewExecutor.java:93)
  	at org.embulk.exec.PreviewExecutor.access$000(PreviewExecutor.java:27)
  	at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:67)
  	at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:63)
  	at org.embulk.spi.Exec.doWith(Exec.java:25)
  	at org.embulk.exec.PreviewExecutor.preview(PreviewExecutor.java:63)
  	at com.treasuredata.embulk.http.server.PreviewService.doService(PreviewService.java:38)
  	at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
  	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.lang.reflect.Method.invoke(Method.java:497)
  	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
  	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:151)
  	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:171)
  	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:195)
  	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:104)
  	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:384)
  	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:342)
  	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:101)
  	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:271)
  	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
  	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
  	at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
  	at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
  	at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
  	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:297)
  	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:254)
  	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1030)
  	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:373)
  	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:381)
  	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:344)
  	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
  	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:717)
  	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
  	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
  	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
  	at org.eclipse.jetty.server.handler.RequestLogHandler.handle(RequestLogHandler.java:95)
  	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
  	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:479)
  	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1059)
  	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
  	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
  	at org.eclipse.jetty.server.Server.handle(Server.java:497)
  	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
  	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:248)
  	at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
  	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610)
  	at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539)
  	at java.lang.Thread.run(Thread.java:745)
  Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
  	at sun.reflect.GeneratedConstructorAccessor288.newInstance(Unknown Source)
  	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
  	at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
  	at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036)
  	at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338)
  	at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232)
  	at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265)
  	at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064)
  	at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790)
  	at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
  	at sun.reflect.GeneratedConstructorAccessor206.newInstance(Unknown Source)
  	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
  	at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
  	at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395)
  	at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325)
  	at org.embulk.input.MySQLInputPlugin.newConnection(MySQLInputPlugin.java:102)
  	at org.embulk.input.MySQLInputPlugin.newConnection(MySQLInputPlugin.java:13)
  	at org.embulk.input.jdbc.AbstractJdbcInputPlugin.transaction(AbstractJdbcInputPlugin.java:141)
  	... 49 more
  Caused by: java.net.ConnectException: Connection refused
  	at java.net.PlainSocketImpl.socketConnect(Native Method)
  	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
  	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  	at java.net.Socket.connect(Socket.java:589)
  	at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213)
  	at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297)
  	... 63 more

Error: 422: BulkLoad job preview failed: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.

どうやらサーバーにグローバルIPをふっていないため、TD側からうちのDBにクエリ叩きにいけないらしい。
そんな根本的なところかよ・・ざわざわ

うちの環境ではDataConnector使えない

ということで、bulk importを使ってみることに。

#Bulk Import

Bulk Importはsessionつくってたくさんタスクつめこんでcommitするみたいな手順があるんですが今回はautoでやっちゃうので特に触れません。
知りたい方はドキュメント読んで下さいね。

##コマンドの書き方を確認
td importコマンドを使います

DB: dots_local
table: USERS

TD上にこちらのテーブルを用意します
MysqlのDB上にあるusersというテーブルを流し込みます

td import:auto \
  --auto-create dots_local.USERS \
  --format mysql \
  --db-url jdbc:mysql://my_sql_host/my_sql_db \
  --db-user my_user \
  --db-password my_pass \
  --time-column created_at \
  users
  • auto-create
  • 特定のテーブルのbulk importのsessionを自動的に作成する
  • format
  • TDに流し込むデータ元が何なのかを指定
  • 今回はmysqlから流し込むのでmysqlにしました
  • db-url
  • my_sql_hostとmy_sql_dbにそれぞれ繋ぎに行くmysqlのhostとDB名を記載
  • db-user
  • mysqlに接続するユーザー
  • db-password
  • mysqlに接続するユーザーのパスワード
  • time-column
  • timeカラムに指定するmysqlのカラム
  • created_at(作成日時)をtimeカラムにしました
  • テーブル名
  • DB上のテーブル名を記載

###その他よく使いそうなオプション
最初2つは実際に使ったオプション

  • --columns
  • 流しこむカラムを指定
  • 複数の場合は","区切りで指定します
  • --column-type
  • カラムの型を指定できます
  • 指定がなければ勝手に型を判断して決めてくれます
  • 複数指定したい場合はこれも","区切りかと思いきや、--column-type = col1:string --column-type = col2:intって感じで書かなきゃいけないみたいです
  • --column-types
  • これを指定すれば↑みたいに何度も--column-type指定しなくていいんじゃないの・・・って思うじゃないですか
  • --column-types TYPE,TYPE,...って使うのでカラムの順番に全部のカラムの型を指定しなくちゃいけない
  • --all-string
  • 全てのカラムの型を強制的にstringにするやり方もあります
  • --exclude-columns
  • ↑とは逆に除きたいカラムを指定もできます

とか色々オプションがあってけっこう使い勝手がよい様子
他にもオプションはたくさんあるので気になる方は
td import:autoとかうつとたくさん説明でてくるので見てみて下さい

さっそくコマンドを叩いてみる

$ td import:auto \
  --auto-create dots_local.USERS \
  --format mysql \
  --db-url jdbc:mysql://my_sql_host/my_sql_db \
  --db-user my_user \
  --db-password my_pass \
  --time-column created_at \
  users

わろた

Java is not installed. 'td import' command requires Java (version 1.6 or later).
Alternatively, you can use the 'bulk_import' commands.
Since they are implemented in Ruby, they perform significantly slower.
$java -version
-bash: java: コマンドが見つかりません

java入ってなくて使えなかったーーwww
ってことでjavaを入れてもらって再挑戦。

$ java -version
openjdk version "1.8.0_71"
OpenJDK Runtime Environment (build 1.8.0_71-b15)
OpenJDK 64-Bit Server VM (build 25.71-b15, mixed mode)
Create dots_local_USERS_2016_02_29_1456741407652 bulk_import session

Uploading prepared sources
  Session    : dots_local_users_2016_02_29_1456741407652
  Source     : users (0 bytes)

Converting 'users'...
Connected successfully to jdbc:mysql://my_sql_host/my_sql_db

Prepare status:
  Elapsed time: 2 sec.
  Source     : users
    Status          : ERROR
    Read lines      : 0
    Valid rows      : 0
    Invalid rows    : 0


Next steps:
  => check td-bulk-import.log and original users: unsupported jdbc type: -4.


Upload status:
  Elapsed time: 5 sec.


Next Steps:

Cannot execute your command: null (java.lang.RuntimeException)
Error: Bulk Import returned error 1. Please check the 'td-bulk-import.log' logfile for details.
Next steps:
  => check td-bulk-import.log and original users: unsupported jdbc type: -4.

あんさぽーてっど(´・ω・`)??

###unsupported jdbc type: -4.
色々調べたところ、このusersというテーブルにはblob型のカラムがあってそれが悪さをしているようでした。
じゃあ、カラム指定すりゃいいじゃんと思って指定してみましたがエラーはからわず。
テーブル自体にサポート外の型があると動いてくれないようです。

##どうしよう
他のテーブルでも試してみたら文字列と数字と時間だけのテーブルはうまく流し込めました。
なので、下記の手段をとることにしました。

  • blob型のカラムをもっているテーブル以外はmysqlから直接流しこむ
  • blob型のカラムをもっているテーブルはまずselectした結果をcsv形式のファイルにする
  • csvからtdに流し込む

まとめ

  • ‘td’ command v0.11.9以上じゃないとData Connector使えなかった
  • Data Connectorめっちゃ便利そうだったけどサーバーにグローバルIP割り当ててないと使えなかった
  • java入ってないとtd import使えなかった
  • テーブルに文字列、数字、時間以外の型が入ってるとmysqlのtd import使えなかった
  • 使えなかったから一旦csvにしてからtdに流し込むことにした

反省点
対応しているバージョンと必要なものをちゃんとドキュメントから読み取りましょう

今回は以上にします
今回でやり方がわかったのでDBからTDに流しこみを行うscriptを作成した時の(苦労)話を書きます

10
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?