LoginSignup
3
2

More than 5 years have passed since last update.

TreasureDataをSparkのSourceにしたいんだが

Last updated at Posted at 2016-07-29

最初は一旦Listに展開してからsc.parallelizeしようとしてたけど、masterのメモリに乗りそうにないし
どうやらJdbcRDDとかいうのがあるらしいのでそれをつかう

早速実装

  val props = new Properties() {
    setProperty("apikey", conf.getString("apikey"))
    setProperty("useSSL", conf.getString("useSSL"))
    setProperty("type", conf.getString("jobType"))
  }

  val connection = DriverManager.getConnection(conf.getString("url"), props)

  val query = """
      select *
      from ad_impressions ai
      where TD_TIME_RANGE(ai.time, ?, ?, 'UTC')
      order by time asc
  """

  val treasuredataLogs: JdbcRDD[ArticleImpression] = new JdbcRDD(
    sc, // SparkContext
    () => TreasureData.getConnection(), // Jdbc Connection
    query, // Query String
        from.map(_.toDateTime(DateTimeZone.UTC).getMillis / 1000) | Long.MinValue, // the minimum value of the first placeholder
      to.map(_.toDateTime(DateTimeZone.UTC).getMillis / 1000) | Long.MaxValue, // the maximum value of the second placeholder
    10, // num partition 1~20
    rs => ArticleImpressionDxo.fromEntity(rs) // data exchange function
  )

JdbcRddのソースを見た感じ、勝手にcloseしてくれるっぽいネ(´ω`*)

  override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
    {
      context.addTaskCompletionListener{ context => closeIfNeeded() }

で、動いたけどなんかおっそいな。。。
なんでやろ

どうやら、クエリが何回も実行されてるっぽい?
TreasureDataのジョブ履歴を見たら10回クエリが実行されている。

 * @param numPartitions the number of partitions.
 *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
 *   the query would be executed twice, once with (1, 10) and once with (11, 20)

lower と upper に timeの範囲を入れて、クエリにプレースホルダを2つ設定する。
もしnum partitionの数で範囲を分割して実行してくれるって感じかな

今回は10で指定したので10クエリに分けてくれたっぽいネ
実際ジョブの履歴見たら、rangeを10個に分割してくれてた

select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468998720, 1469016000) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468981440, 1468998719) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468964160, 1468981439) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468946880, 1468964159) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468929600, 1468946879) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468912320, 1468929599) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468895040, 1468912319) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468877760, 1468895039) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468860480, 1468877759) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468843200, 1468860479) order by time asc

ありがとうJdbcRDD

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2