Posted at

Clojureで進捗を表示しながら並列処理

More than 1 year has passed since last update.

自然言語処理を目的に単語頻度をカウントするときのメモですが、並列処理させる部分だけ変更すれば簡単に他の目的に使えますのでたたき台にできると思います。

まずは使用するライブラリの依存関係をproject.cljに記載します。


project.clj



...
:dependencies [[org.clojure/clojure "1.9.0"]
[org.clojure/core.async "0.4.474"]
[clj-time "0.14.3"]]
...

次に本題のコードを記述しました。



(ns word-count
(:require
[clojure.string :as str]
[clojure.java.io :refer [reader]]
[clojure.core.async :refer [go-loop]]
[clj-time.local :as l]))

(defn progress-format [done all interval-done interval-ms unit]
(str "["(l/format-local-time (l/local-now) :basic-date-time-no-ms)"] "
done "/" all ", "
(if (zero? interval-ms)
"0.0"
(format "%.1f" (float (/ interval-done (/ interval-ms 1000)))))
" " unit " "
(format "(%.3f" (float (* 100 (/ done all))))
"%)"))

(defn word-count
[input-file & [option]]
(let [{:keys [min-count workers interval-ms step]
:or {min-count 5
workers 4
interval-ms 60000 ; 1 minute
step 1000000}} option
all-lines (with-open [r (reader input-file)] (count (line-seq r)))
over-all (atom 0)
done-lines (atom 0)
done-workers (atom 0)
all-wc (atom {})]
(with-open [r (reader input-file)]
(dotimes [w workers]
(go-loop [local-wc {} local-counter step]
(if-let [line (.readLine r)]
(let [word-freq (frequencies (remove #(or (= "<eos>" %) (= "" %) (= " " %) (= " " %)) (str/split line #" ")))
updated-wc (merge-with + local-wc word-freq)]
(swap! done-lines inc)
(if (zero? local-counter)
(do
(reset! all-wc (merge-with + @all-wc updated-wc))
(recur {} step))
(recur updated-wc (dec step))))
(do
(reset! all-wc (merge-with + @all-wc local-wc))
(swap! done-workers inc)))))
(loop [c 0]
(if-not (= @done-workers workers)
(let [diff @done-lines
updated-c (+ c diff)]
(reset! done-lines 0)
(println (progress-format updated-c all-lines diff interval-ms "lines/s"))
(Thread/sleep interval-ms)
(recur updated-c))
(reduce (fn [acc [k v]] (if (>= v min-count)
(assoc acc k (+ v (get acc k 0)))
(assoc acc "<unk>" (+ v (get acc "<unk>" 0)))))
{}
@all-wc))))))

上の関数は進捗を確認するための補助的な関数です。

下の関数が今回テーマになっている関数です。

clojure.asyncを使って並列処理をさせています。

毎回全体の単語頻度を表現するマップのall-wcに毎回マージするとかなり時間がかかったので、バッファリングをさせています。

最後のreduce内のifは最低頻度で単語を切るために使っているだけですので、カウントするだけなら消してください。

ゴテゴテした感じを隠蔽したければお好みでマクロで包むのもありだと思います。

実行すると次のように進捗が表示されながら並列で実行されます。

user=> (def wc (word-count "your-target-path"))

[20180510T000841+0900] 0/24441582, 0.0 lines/s (0.000%)
[20180510T000941+0900] 3751846/24441582, 62530.8 lines/s (15.350%)
[20180510T001041+0900] 7067907/24441582, 55267.7 lines/s (28.918%)
[20180510T001141+0900] 10362615/24441582, 54911.8 lines/s (42.397%)
[20180510T001241+0900] 13598096/24441582, 53924.7 lines/s (55.635%)
[20180510T001341+0900] 16897189/24441582, 54984.9 lines/s (69.133%)
[20180510T001441+0900] 20196605/24441582, 54990.3 lines/s (82.632%)
[20180510T001541+0900] 23314620/24441582, 51966.9 lines/s (95.389%)
#'user/wc
user=> (get wc "うどん")
2077

デフォルトで4スレッドで走らせて1分間隔で表示するようにしています。

ちなみに上記では24,419,168行のファイルを食わせています。

次のようにoptionを設定すると2スレッドで10秒間隔で表示されます。



(word-count "your-target-path" {:workers 2 :interval-ms 10000})

以上になります。