はじめに
私は現在お仕事でdots.の開発をしています。
dots.での分析基盤を整えるためにTDを使うことになりました。
DBのデータをTDに流しこんでデータを扱いやすくしたいよねってなったのでその時の話です。
いろんなところで躓いたのでメモを残しておきます。
今回はその中でもDBとTDをどうやって繋ぐことにしたかっていうところを書きます。
環境
td-agentのバージョン
$ rpm -qi td-agent
Name : td-agent Relocations: /
Version : 2.2.0
・・・
td commandのバージョン
$ td --version
0.11.8.2
DB=>TDにデータを流し込む方法を選ぶ
DB=>TDにデータを流し込む方法は以下の2つを考えました
Data Connectorだとcrontabで設定せず、tdコマンドでcronを設定でき簡潔だったのでまずはData Connectorを選択しました。
Data Connector
設定ファイルを作成
Data Connectorを使うにはまず設定ファイルを作成します
in:
type: mysql
host: mysql_host_name
port: 3306
user: test_user
password: test_password
database: test_database
table: test_table
select: "*"
out:
mode: replace
- type
- TDに流し込むデータ元が何なのかを指定
- 今回はmysqlから流し込むのでmysqlにしました
- host 〜 table
- 接続先サーバーの情報で埋めます
- select
- selectするカラムを指定
- 複数ある場合は","で区切ります
その他よく使いそうなオプション
- where
- where条件入れたいよってときはwhereに条件式を書いてあげるとよいらしいです
- query
- selectではなくqueryにするとクエリをかけますJOINとかしたいときはqueryがよさそう
▼その他、詳しくはgithub repositoryのREADMEを読んで下さい
embulk/embulk-input-jdbc
さっそくコマンドを叩いてみる
設定ファイルを作成したら下記コマンドをたたきます
$ td connector:guess seed.yml -o load.yml
わろた
'connector' is not a td command. Run 'td' to show the list.
$ td --version
0.11.8.2
Install ‘td’ command v0.11.9 or later
ってドキュメントに書いてあったー/(^o^)\
ちなみにこの時td-agentのバージョンを2.3.0にあげてもらって0.13.0のtd commandをいれてもらいました。
$ rpm -qi td-agent
Name : td-agent Relocations: /
Version : 2.3.0
・・・
$ td --version
0.13.0
td commandのバージョンを上げてもらったので再挑戦
む・・・
java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at org.embulk.input.jdbc.AbstractJdbcInputPlugin.transaction(AbstractJdbcInputPlugin.java:144)
at org.embulk.plugin.compat.InputPluginWrapper.transaction(InputPluginWrapper.java:57)
at org.embulk.exec.PreviewExecutor.doPreview(PreviewExecutor.java:93)
at org.embulk.exec.PreviewExecutor.access$000(PreviewExecutor.java:27)
at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:67)
at org.embulk.exec.PreviewExecutor$1.run(PreviewExecutor.java:63)
at org.embulk.spi.Exec.doWith(Exec.java:25)
at org.embulk.exec.PreviewExecutor.preview(PreviewExecutor.java:63)
at com.treasuredata.embulk.http.server.PreviewService.doService(PreviewService.java:38)
at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:151)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:171)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:195)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:104)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:384)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:342)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:101)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:297)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:254)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1030)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:373)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:381)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:344)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:717)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.RequestLogHandler.handle(RequestLogHandler.java:95)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:479)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1059)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:497)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:248)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.GeneratedConstructorAccessor288.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338)
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232)
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064)
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790)
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
at sun.reflect.GeneratedConstructorAccessor206.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395)
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325)
at org.embulk.input.MySQLInputPlugin.newConnection(MySQLInputPlugin.java:102)
at org.embulk.input.MySQLInputPlugin.newConnection(MySQLInputPlugin.java:13)
at org.embulk.input.jdbc.AbstractJdbcInputPlugin.transaction(AbstractJdbcInputPlugin.java:141)
... 49 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213)
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297)
... 63 more
Error: 422: BulkLoad job preview failed: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
どうやらサーバーにグローバルIPをふっていないため、TD側からうちのDBにクエリ叩きにいけないらしい。
そんな根本的なところかよ・・ざわざわ
うちの環境ではDataConnector使えない
ということで、bulk importを使ってみることに。
#Bulk Import
Bulk Importはsessionつくってたくさんタスクつめこんでcommitするみたいな手順があるんですが今回はautoでやっちゃうので特に触れません。
知りたい方はドキュメント読んで下さいね。
##コマンドの書き方を確認
td importコマンドを使います
DB: dots_local
table: USERS
TD上にこちらのテーブルを用意します
MysqlのDB上にあるusersというテーブルを流し込みます
td import:auto \
--auto-create dots_local.USERS \
--format mysql \
--db-url jdbc:mysql://my_sql_host/my_sql_db \
--db-user my_user \
--db-password my_pass \
--time-column created_at \
users
- auto-create
- 特定のテーブルのbulk importのsessionを自動的に作成する
- format
- TDに流し込むデータ元が何なのかを指定
- 今回はmysqlから流し込むのでmysqlにしました
- db-url
- my_sql_hostとmy_sql_dbにそれぞれ繋ぎに行くmysqlのhostとDB名を記載
- db-user
- mysqlに接続するユーザー
- db-password
- mysqlに接続するユーザーのパスワード
- time-column
- timeカラムに指定するmysqlのカラム
- created_at(作成日時)をtimeカラムにしました
- テーブル名
- DB上のテーブル名を記載
###その他よく使いそうなオプション
最初2つは実際に使ったオプション
- --columns
- 流しこむカラムを指定
- 複数の場合は","区切りで指定します
- --column-type
- カラムの型を指定できます
- 指定がなければ勝手に型を判断して決めてくれます
- 複数指定したい場合はこれも","区切りかと思いきや、
--column-type = col1:string --column-type = col2:int
って感じで書かなきゃいけないみたいです - --column-types
- これを指定すれば↑みたいに何度も--column-type指定しなくていいんじゃないの・・・って思うじゃないですか
-
--column-types TYPE,TYPE,...
って使うのでカラムの順番に全部のカラムの型を指定しなくちゃいけない - --all-string
- 全てのカラムの型を強制的にstringにするやり方もあります
- --exclude-columns
- ↑とは逆に除きたいカラムを指定もできます
とか色々オプションがあってけっこう使い勝手がよい様子
他にもオプションはたくさんあるので気になる方は
td import:auto
とかうつとたくさん説明でてくるので見てみて下さい
さっそくコマンドを叩いてみる
$ td import:auto \
--auto-create dots_local.USERS \
--format mysql \
--db-url jdbc:mysql://my_sql_host/my_sql_db \
--db-user my_user \
--db-password my_pass \
--time-column created_at \
users
わろた
Java is not installed. 'td import' command requires Java (version 1.6 or later).
Alternatively, you can use the 'bulk_import' commands.
Since they are implemented in Ruby, they perform significantly slower.
$java -version
-bash: java: コマンドが見つかりません
java入ってなくて使えなかったーーwww
ってことでjavaを入れてもらって再挑戦。
$ java -version
openjdk version "1.8.0_71"
OpenJDK Runtime Environment (build 1.8.0_71-b15)
OpenJDK 64-Bit Server VM (build 25.71-b15, mixed mode)
Create dots_local_USERS_2016_02_29_1456741407652 bulk_import session
Uploading prepared sources
Session : dots_local_users_2016_02_29_1456741407652
Source : users (0 bytes)
Converting 'users'...
Connected successfully to jdbc:mysql://my_sql_host/my_sql_db
Prepare status:
Elapsed time: 2 sec.
Source : users
Status : ERROR
Read lines : 0
Valid rows : 0
Invalid rows : 0
Next steps:
=> check td-bulk-import.log and original users: unsupported jdbc type: -4.
Upload status:
Elapsed time: 5 sec.
Next Steps:
Cannot execute your command: null (java.lang.RuntimeException)
Error: Bulk Import returned error 1. Please check the 'td-bulk-import.log' logfile for details.
Next steps:
=> check td-bulk-import.log and original users: unsupported jdbc type: -4.
あんさぽーてっど(´・ω・`)??
###unsupported jdbc type: -4.
色々調べたところ、このusersというテーブルにはblob型のカラムがあってそれが悪さをしているようでした。
じゃあ、カラム指定すりゃいいじゃんと思って指定してみましたがエラーはからわず。
テーブル自体にサポート外の型があると動いてくれないようです。
##どうしよう
他のテーブルでも試してみたら文字列と数字と時間だけのテーブルはうまく流し込めました。
なので、下記の手段をとることにしました。
- blob型のカラムをもっているテーブル以外はmysqlから直接流しこむ
- blob型のカラムをもっているテーブルはまずselectした結果をcsv形式のファイルにする
- csvからtdに流し込む
まとめ
- ‘td’ command v0.11.9以上じゃないとData Connector使えなかった
- Data Connectorめっちゃ便利そうだったけどサーバーにグローバルIP割り当ててないと使えなかった
- java入ってないとtd import使えなかった
- テーブルに文字列、数字、時間以外の型が入ってるとmysqlのtd import使えなかった
- 使えなかったから一旦csvにしてからtdに流し込むことにした
反省点
対応しているバージョンと必要なものをちゃんとドキュメントから読み取りましょう
今回は以上にします
今回でやり方がわかったのでDBからTDに流しこみを行うscriptを作成した時の(苦労)話を書きます