9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

dots.女子部Advent Calendar 2016

Day 19

embulkアンチパターン

Last updated at Posted at 2016-12-20

こんばんわ、yuunaです。
.dots女子部 Adventカレンダー19日目の予定だった記事です。何故「だったか」というと時差ボケですっとぼけていたからです。

いい加減人生でcsvを変換するコードを書くのは疲れました。
私はRubyistの端くれなのでrubyで書きますが、ぶっちゃけPHPでもperlでもPythonでも似たようなことをやっています。人生いままでたぶん3万回はCSVを開いてきて閉じてきて、たぶん死ぬまでにあと10万回は開いたり閉じたりするのでしょう。

まあこんなコード書きますよね

CSV.open("./to.csv", "w", encoding:"CP932", headers: header, write_headers: true) do |csv|
  CSV.open("./from.csv", encoding:"CP932:UTF-8", headers: true).each do |row|
    row["hoge"] = row["hoge"].to_i * 2
    csv << row
  end
end

このコード自体、今朝書いたコードなのはさておき、だからなんだって感じですが、まあ正直バグの温床だし使い捨てでもないかぎり辞めたいですよね。そうですよね。

とそんなところに出てきたのがembulkですよ!CSV界隈の銀の弾丸ですよ!!!詳しくはぐぐってください。端的にいえばムーディー勝山みたいにデーターを右から左にながしてくれるんです。
※なんでこの時期にムーディー勝山かというとうちのちびすけがエグスプロージョンの大ファンでひたすら見させられたからなのですよ。

そしてCSVの閉じたりとか開けたりとかエンコード変換とかdemilter変換とかほんとにめんどくさいのを自動でやってくれるんでほんともうこれなきゃいけいけないです。

でもまあfilterの書き方あんまりでてないので今回はfilterの書き方でも話をしてみたいとおもいます。

正直基本的なembulkの話は適当にさがしてください。もうそゆのかく人生は疲れました。(やり投げ)

example.rb

TDさんの偉いところは簡単なサンプルなら作ってくれることですね。

$ embulk example

とするとあら不思議example.rbが出来ています。

module Embulk
  module Filter

    class ExampleFilterPlugin < FilterPlugin
      # filter plugin file name must be: embulk/filter/<name>.rb
      Plugin.register_filter('example', self)

      def self.transaction(config, in_schema, &control)
        task = {
          'key' => config.param('key', :string, default: "filter_key"),
          'value' => config.param('value', :string, default: "filter_value")
        }

        idx = in_schema.size
        out_columns = in_schema + [Column.new(idx, task['key'], :string)]

        puts "Example filter started."
        yield(task, out_columns)
        puts "Example filter finished."
      end

      def initialize(task, in_schema, out_schema, page_builder)
        super
        @value = task['value']
      end

      def close
      end

      def add(page)
        page.each do |record|
          @page_builder.add(record + [@value])
        end
      end

      def finish
        @page_builder.finish
      end
    end

  end
end

コードの解説をすると

  • Plugin.register_filter('example', self)でプラグイン名を設定
  • self.transactionに受け取るパラメーター、スキーマーの設定をする。
  • あとはaddでpageから呼び出してここで変換処理する。

で、pageから呼び出したrecordをなんかてきと〜に加工して@page_builder.addにArrayをわたせばOK!なのですがこれが以外と分かりずらい、というわけでこんなコードにしています。rowが100とか200になると指おって数えるのも発狂するわけですよ。

まあ、なれるとExcelで開いてAZだから51とか一瞬で出てくるんですが人生にはなにもプラスにならないわけでして。

def add(page)
  page.each do |record|
    hash = Hash[page.schema.names.zip(record)]
    #ここでhashに処理する
    row = page.schema.names.map { |key| hash[key]}
    @page_builder.add(row)
  end
end

こんなかんじすればみんな大好きhashでアクセスできるので直感的ですよね。

先日書いたのは住所を分割するプラグインなのですが、addの部分はこんなかんじで書いています。

def add(page)
  page.each do |record|
    hash = Hash[page.schema.names.zip(record)]
    address = hash[@key]
    rex = /(...??[都道府県])((?:旭川|伊達|石狩|盛岡|奥州|田村|南相馬|那須塩原|東村山|武蔵村山|羽村|十日町|上越|富山|野々市|大町|蒲郡|四日市|姫路|大和郡山|廿日市|下松|岩国|田川|大村|宮古|富良野|別府|佐伯|黒部|小諸|塩尻|玉野|周南)市|(?:余市|高市|[^市]{2,3}?)郡(?:玉村|大町|.{1,5}?)[町村]|(?:.{1,4}市)?[^町]{1,4}?区|.{1,7}?[市町村])(.+)/
    a = address.match(rex).captures
    @page_builder.add(record + a)
  end
end

酷いコードですよねー。でもこれで住所分割がembulkという安定基盤の上で実行できるのでほんと楽です。

マルチスレッドの話

折角なので、embulkっぽくない使い方の話をするとそもそもembulkってマルチスレッドでゲート・オブ・バビロンみたいに一気に召喚、じゃなくて転送するところがいいのですが、CSVだとファイル分割されて非常に不便なのでスレッドを1に制限してやったりしています。まさにTreasureDataの想定の斜め上。

また、適宜分割されてaddが呼ばれるのでされているので前後の行関係をみるツールの場合は、addではクラス変数に追加していおいてfinishで処理みたいな実装にします。


class ExampleFilterPlugin < FilterPlugin
@pages = []
#略。

def add(page)
  page.each do |record|
    @pages << record
  end
end

def finish
  @pages.each do |row|
    @page_builder.add(row)
  end
  @page_builder.finish
end

こんなかんじ。

formatter

exampleではできませんが、formatterもディレクトリーつくればロードしてくれます。
追記: newすればできるのですがいちいちgemにもしたくないですし。おすし。

module Embulk
  module Formatter

    class Avalon < FormatterPlugin
      Plugin.register_formatter("avalon", self)


      def self.transaction(config, schema, &control)
        # configuration code:
        task = {
          'servant' => config.param('servant', :string)
        }
        yield(task)
      end

      def init
        # your data
        @current_file == nil
        @current_file_size = 0
      end

      def close
      end
      def add(page)
        page.each do row
           @current_file.write row.join(",")
        end
      end
    end
  end
end

こんなかんじ。お空の上でかいてるので、上のコードはテストしていないのであしからず。

かわった事例の紹介

NextEngineっていうEC業界ではかなりつかわれているAPIのRubySDKをつかってる私は当然ながらembulkのNextEngineプラグインを書いてみました。すごい誰得なんだけど。

みたい人いたら公開するのでコメントかいてください。
(どうせ誰も居ないだろうという)

バルクでアクセスしてくれるので、めっちゃ便利です。

終わりに

本業は数人で、物流バックエンドの改善コンサルティングやっています。とはいえ、ありとあらゆるテクニックを駆使するエンジニアリングを必要とするので仕事にご興味ある方は適当にご連絡ください。

先週もAccess97とOracleからデーターを抜き出す簡単なお仕事していました。もうね。。。

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?