※この投稿はwordpress.comから実験的に移植したものです
https://stofublog.wordpress.com/
Sedによる文字列置換(ファイル読み込み・書き込みあり)をPowershellで再現したコードが遅すぎるという話(このシリーズ、パイプの処理速度全般に関して書いてるんですけど、前回と今回は比較対象がsedのような処理)
前回は
パイプのかわりにBlockingQueueをバッファとしてファイル読み込み・文字列置換・差いる書き込みをするコマンドレットを作ってみたところ、実行速度が最大で6.3倍になった。(C#コマンドレットをPowershellから起動)
今回はF#で同様なパイプ処理を書いてみようと思います
・今回のF#コード
open System
open System.Collections.Concurrent
open System.Threading.Tasks
open System.IO
open System.Text
open System.Threading.Tasks
open System.Text.RegularExpressions
let cat (encoding :string) (filename :string) =
let output =new BlockingCollection<string>()
let encoding = Encoding.GetEncoding(encoding)
use reader = new StreamReader(filename,encoding)
let mainLoop =
let mutable line=""
while line <> null do
line <- reader.ReadLine();
output.Add(line);
output.CompleteAdding()
let mainLoopTask =
Task.Factory.StartNew(Action(fun () ->mainLoop))
mainLoopTask |> ignore;
output;;
let ( |||| ) (input : BlockingCollection<'TI>) (fx: ( 'TI -> 'TO)) =
let output =new BlockingCollection<'TO>()
let mainLoop =
while not input.IsCompleted do
let line=input.Take();
if line <> null then
output.Add(fx line)
let mainLoopTasks = [| for i in 1 .. Environment.ProcessorCount -> Task.Factory.StartNew(Action (fun () -> mainLoop)) |]
let terminate =
Task.WaitAll mainLoopTasks
output.CompleteAdding()
let teriminateTask =
Task.Factory.StartNew(Action(fun () ->terminate))
teriminateTask |> ignore;
output;;
let ( ||||> ) (input : BlockingCollection<string>) (filename :string) =
let encodeString = "utf-8"
let encoding = Encoding.GetEncoding(encodeString)
use writer = new StreamWriter(filename,false,encoding)
let mainLoop =
while not input.IsCompleted do
let line = input.Take()
if line <> null then
writer.WriteLine(line);
let mainLoopTask =
Task.Factory.StartNew(Action(fun () ->mainLoop))
mainLoopTask
let sync ( task :Task ) =
task.Wait()
[<EntryPoint>]
let main argv =
cat "utf-8" @"c:\tmp\evtlog.csv" |||| ( fun line -> Regex.Replace(line,"イベント","お弁当")) ||||> @"c:\tmp\evtlog7.csv" |> sync
0 // 整数の終了コードを返します
・ベンチ結果
powershell パイプライン
PS D:\Users\naomasa\Documents\Visual Studio 2013\Projects\ParalellPipeline\Paral
ellPipeline\bin\Debug> 1..5 | % {(Measure-Command {cat c:\tmp\evtlog.csv | ? { $
_ -replace "イベント","お弁当"} | Out-File c:\tmp\evtlog8.csv -encoding utf8}).T
otalMilliseconds}
17412.2273
17407.1302
18177.021
17622.5944
17464.0425
F#パイプライン
PS D:\Users\naomasa\Documents\Visual Studio 2013\Projects\ParalellPipeline\Paral
ellPipeline\bin\Debug> 1..5 | % {(Measure-Command {.\ParalellPipeline.exe}).Tota
lMilliseconds}
626.2075
582.9214
605.5008
581.5356
580.3243
F#のパイプラインだと平均29.6倍 早くなりました
今回、F#の演算子定義を使って、パイプを定義。
バッファリングにBlockingQueue使っているので並列処理もできる。
(ただし、並列だと出力順序は保障されない)
テストに使用した論理8コアのPCにあわせて、8並列での処理になっているが、最初のPowershellのコードに比べて30倍弱の速度が出た。
当方のパイプの定義は、Unixのパイプをイメージしています。F#というかO'Camlのもともとのパイプ(|>)は関数(Function)と同義でバッファリングがないです。
(Unixのパイプはバッファリングがある。参考資料1の項目「実装」参照)
なので、F#のパイプにBlockingCollectionのバッファを付け足してみました、という実装になっています。
BlockingCollectionはスレッドセーフ(むしろそれをメインで使う)なので、非同期・並列処理をがしがしやってる今回のコードになりました。
(なので、今回の||||を並列パイプ ParalellPipeと呼んでます)
非同期・並列を惜しみなく投入して、かつF#を使った(F#の元になったO'Camlは非常に高速な処理系)ので
今回のようなベンチ結果になったとお思います。
・参考資料1
パイプ (コンピュータ)
http://ja.wikipedia.org/wiki/%E3%83%91%E3%82%A4%E3%83%97_(%E3%82%B3%E3%83%B3%E3%83%94%E3%83%A5%E3%83%BC%E3%82%BF)