34
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

初心者のElixir |> Flow,GenStage |> Concurrent MapReduce

Posted at
1 / 21

When

2017/04/01

At

Elixir Conf Japan 2017 LT


Self Introduction

ちきさん
(Tomohiro Noguchi)

Twitter/GitHub/Qiita: @ovrmrw

I've started to learn Elixir 1 month ago.

3a2512bb-aa72-4515-af42-1f1721252f39.jpg


What I talk about

:star2: Reading a JSON file and doing MapReduce.

(JSONファイルを読み込んでMapReduceしたい。)

:star2: Differences between Eager, Lazy and Flow(Concurrent).

(Eager, Lazy, Flow(Concurrent)の違いについて。)

:star2: Flow & GenStage is extremely fast.

(Flowめちゃくちゃ速い。)

===

GitHub repository for this slide. ovrmrw/elixir_file_read_eager_lazy_flow


JSON file to read (300,000 lines) (100MB)

names_300000.json
{"name":{"first":"Lue","last":"Laserna"},"gender":"female","birthday":"1983-09-18","contact":{"address":{"street":"19 Deer Loop","zip":"90732","city":"San Pedro","state":"CA"},"email":["lue.laserna@nosql-matters.org","laserna@nosql-matters.org"],"region":"310","phone":["310-8268551","310-7618427"]},"likes":["chatting"],"memberSince":"2011-05-05"}
{"name":{"first":"Jasper","last":"Grebel"},"gender":"male","birthday":"1989-04-29","contact":{"address":{"street":"19 Florida Loop","zip":"66840","city":"Burns","state":"KS"},"email":["jasper.grebel@nosql-matters.org"],"region":"316","phone":["316-2417120","316-2767391"]},"likes":["shopping"],"memberSince":"2011-11-10"}
{"name":{"first":"Kandra","last":"Beichner"},"gender":"female","birthday":"1963-11-21","contact":{"address":{"street":"6 John Run","zip":"98434","city":"Tacoma","state":"WA"},"email":["kandra.beichner@nosql-matters.org","kandra@nosql-matters.org"],"region":"253","phone":["253-0405964"]},"likes":["swimming"],"memberSince":"2012-03-18"}
{"name":{"first":"Jeff","last":"Schmith"},"gender":"male","birthday":"1977-10-14","contact":{"address":{"street":"14 198th St","zip":"72569","city":"Poughkeepsie","state":"AR"},"email":["jeff.schmith@nosql-matters.org"],"region":"870","phone":[]},"likes":["chatting","boxing","reading"],"memberSince":"2011-02-10"}
{"name":{"first":"Tuan","last":"Climer"},"gender":"male","birthday":"1951-04-06","contact":{"address":{"street":"6 Kansas St","zip":"07921","city":"Bedminster","state":"NJ"},"email":["tuan.climer@nosql-matters.org"],"region":"908","phone":["908-8376478"]},"likes":["ironing"],"memberSince":"2011-02-06"}
{.....}
{.....}

(データは arangodb/example-datasets からお借りしました)


Reading JSON file and doing MapReduce.


What I want to do is so-called "word counting"

  • Read file in stream
    • (ファイルをストリームで読み込む)
  • Parse JSON string to Map data line by line
    • (1行1行はJSONなのでパースする)
  • Get only the first name from the Map
    • (パース後のMapからfirst nameだけを取り出す)
  • Count how many times the same name appeared
    • (同じfirst nameが何回登場したかカウントする)
  • Sort in descending order of the count
    • カウントの降順でソートする

Read file in stream

def map_reduce(file) do
  File.stream!(file) # ★

Parse JSON string to Map data line by line

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1) # ★

Get only the first name from the Map

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)
  |> Enum.map(fn person -> 
    %{name: %{first: first}} = person 
    first
  end) # ★

Count how many times the same name appeared

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)
  |> Enum.map(fn person -> 
    %{name: %{first: first}} = person 
    first
  end)
  |> Enum.reduce(%{}, fn name, acc ->
    Map.update(acc, name, 1, & &1 + 1)
  end) # ★

Sort in descending order of the count

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)
  |> Enum.map(fn person -> 
    %{name: %{first: first}} = person 
    first
  end)
  |> Enum.reduce(%{}, fn name, acc ->
    Map.update(acc, name, 1, & &1 + 1)
  end)
  |> Enum.sort(fn {_, count1}, {_, count2} -> count1 >= count2 end) # ★
end

The result is below.

[{"Clair", 191}, {"Jae", 190}, {"Quinn", 187}, {"Andrew", 185}, {"Daryl", 182},
 {"Brandon", 181}, {"Juan", 181}, {"Andre", 181}, {"Bernie", 181}, {"Dan", 179},
 {"Sean", 179}, {"Dorian", 179}, {"Hong", 179}, {"Mickey", 179}, {"Louis", 178},
 {"Jan", 178}, {"Sammie", 178}, {"Minh", 178}, {"Numbers", 177}, {"Avery", 176},
 {"Michal", 176}, {"Jewell", 176}, {"Terrell", 176}, {"Julio", 175},
 {"Russell", 175}, {"Logan", 175}, {"Elisha", 175}, {"Cruz", 175},
 {"Sung", 175}, {"Deon", 175}, {"Noel", 174}, {"Jessie", 174}, {"Donald", 174},
 {"Sidney", 174}, {"Blair", 174}, {"Arthur", 174}, {"Tory", 174},
 {"David", 174}, {"Roy", 173}, {"Martin", 172}, {"Cody", 172}, {"Carrol", 171},
 {"Adrian", 171}, {"Paris", 171}, {"Micah", 171}, {"Kasey", 171},
 {"Antonia", 171}, {"Julian", 170}, {"Drew", ...}, {...}, ...]

It is understood that the name of Clair was the most frequent in 191 times.

(Clairという名前が191回で最も多く登場したということがわかる。)


Differences between Eager, Lazy and Flow(Concurrent)


Eager (逐次処理)

def map_reduce(:eager, file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)
  |> Enum.map(fn person -> 
    %{name: %{first: first}} = person 
    first
  end)
  |> Enum.reduce(%{}, fn name, acc ->
    Map.update(acc, name, 1, & &1 + 1)
  end)
  |> Enum.sort(fn {_, count1}, {_, count2} -> count1 >= count2 end)
end
  • Simple. Easy to understand.

  • Bad for memory.

  • Run on single thread. (slow for big data)

  • シンプルでわかりやすいが、各マップ処理でファイルの全行(30万行)を展開しているので大量にメモリを使う。

  • シングルスレッドで動作する。

  • データ量が極めて少ないときは有効。


Lazy (遅延処理)

def map_reduce(:lazy, file) do    
  File.stream!(file)
  |> Stream.map(&json_parse/1) # Enum ==> Stream
  |> Stream.map(fn person -> # Enum ==> Stream
    %{name: %{first: first}} = person
    first
  end)
  |> Enum.reduce(%{}, fn name, acc ->
    Map.update(acc, name, 1, & &1 + 1)
  end)
  |> Enum.sort(fn {_, count1}, {_, count2} -> count1 >= count2 end)
end
  • Good for memory.

  • Run also on single thread. (slow for big data)

  • マップ処理を Enum から Stream に変更することでメモリに優しくなる。

  • でもやっぱりシングルスレッドで動作する。


Flow (Concurrent) (並行処理)

def map_reduce(:flow, file) do    
  File.stream!(file)
  |> Flow.from_enumerable # added
  |> Flow.map(&json_parse/1) # Enum ==> Flow
  |> Flow.map(fn person -> # Enum ==> Flow
    %{name: %{first: first}} = person 
    first
  end)
  |> Flow.partition # added
  |> Flow.reduce(fn -> %{} end, fn name, acc -> # Enum ==> Flow
    Map.update(acc, name, 1, & &1 + 1)
  end)      
  |> Enum.sort(fn {_, count1}, {_, count2} -> count1 >= count2 end)
end
  • Concurrent both Map and Reduce.

  • Giving up ordering (but it's not a big deal).

  • Flow.from_enumerable から先は並列で動作する。(リデュース処理も並列!)

  • 並列化のためにorderingを捨てているので、最後にソートしているがEager, Lazyの場合とは微妙に異なる結果になる。(カウントが同じときの名前の並びが変わる)


Speed Comparison

CPU: Intel Core i5-2400 (4 cores)

  • Eager ... 12.2 sec :red_car:
  • Lazy ... 10.4 sec :bullettrain_side:
  • Flow(Concurrent) ... 2.7 sec :rocket:

===

Flow is extremely fast because it uses GenStage to draw full performance of multi-core with back-pressure concept.

(FlowはGenStageを使って、マルチコアの性能をback-pressureでフルに引き出すのでめちゃくちゃ速い。)


CPU usage graph

Eager ... uses only 1 core.

Cut2017_0325_1341_38.jpg

Lazy ... sticks to the upper limit rather than Eager.

Cut2017_0325_1340_13.jpg

Flow(Concurrent) ... All cores stick to the upper limit.

Cut2017_0325_1340_46.jpg


This video will help you to understand GenStage and Flow easily.

GenStage and Flow - José Valim (Lambda Days 2017) (YouTube)

===

(ElixirはYouTubeで動画漁ってると結構キャッチアップできる)


Thanks :raised_hands:

34
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?