Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
>>> Updated Nov 1:
Tim tested tbray5.erl on T5120, for his 971,538,252 bytes of data in 4,625,236 lines log file, got:
real 0m20.74s user 3m51.33s sys 0m8.00s
The result was what I guessed, since the elapsed time of my code was 3 times of Anders' on my machine. I'm glad that Erlang performs linearly on different machines/os.
My code not the fastest. I did not apply Boyer-Moore searching, thus scan_chunk_1/4 has to test/skip binary 1byte by 1byte when not exactly matched. Anyway, this code shows how to code binary efficiently, and demos the performance of traversing binary byte by byte (the performance is not so bad now, right?). And also, it's what I want: a balance between simple, readable and speed.
Another approach for lazy man is something binary pattern match hacking, we can modify scan_chunk_1/4 to:
scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
Offset =
case Bin of
<<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
34;
<<_:S/binary,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
35;
<<_:S/binary,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
36;
<<_:S/binary,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
37;
<<_:S/binary,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
38;
<<_:S/binary,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
39;
<<_:S/binary,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
40;
<<_:S/binary,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
41;
<<_:S/binary,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
42;
<<_:S/binary,_,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
43;
_ -> undefined
end,
case Offset of
undefined -> scan_chunk_1(Bin, DataL, S + 10, Dict);
_ ->
case match_until_space_newline(Bin, S + Offset) of
{true, E} ->
Skip = S + Offset - 12, L = E - Skip,
<<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
{false, E} ->
scan_chunk_1(Bin, DataL, E + 1, Dict)
end
end;
scan_chunk_1(_, _, _, Dict) -> Dict.
The elapsed time dropped to 1.424 sec immediatley vs 2.792 sec before, speedup about 100%, on my 4-CPU linux box.
If you are patient, you can copy-paste 100... such lines :-) (in this case, I'd rather to pick Boyer-Moore), and the elapsed time will drop a little bit more, but not much after 10 lines or so.
========
>>> Updated Oct 29:
Pihis updated his WideFinder, applied guildline II, and beat Ruby on his one-core box. And he also modified Anders's code by removing all un-necessary remaining binary bindings (which cause un-necessay sub-binary splitting), then, Steve tested the refined code, and got 0.567s on his famous 8-CPU linux box. Now, we may reach the real Disk/IO bound, should we try parallelized file reading? but, I've tired of more widefinders. BTW, May we have regexped pattern match in syntax level? The End.
========
>>> Updated Oct 26:
Code cleanup
========
Binary usaully is more efficent than List in Erlang.
The memory size of Binary is 3 to 6 words plus Data itself, the Data can be allocated / deallocated in global heap, so the Data can be shared over function calls, shared over processes when do message passing (on the same node), without copying. That's why heap size affects a lot on binary.
The memory size of List is 1 word per element + the size of each element, and List is always copying between function calls, on message passing.
In my previous blogs about Tim's exercise, I suspected the performance of Binary traverse. But, with more advices, experience, it seems binary can work very efficient as an ideal Dataset processing struct.
But, there are some guidelines for efficient binary in Erlang, I'll try to give out here, which I learned from the exercise and experts.
I. Don't split a binary unless the split binaries are what you exactly want
Splitting/combining binaries is expensive, so when you want to get values from a binary at some offsets:
Do
<<_:Offset/binary, C, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).
Do Not *
<<_:Offset/binary, C/binary, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).
* This may be good in R12B
And when you want to split a binary to get Head or Tail only:
Do
<<Head:Offset/binary,_/binary>> = Bin.
Do Not
{Head, _} = split_binary(Bin, Offset).
II. Calculate the final offsets first, then split it when you've got the exactly offsets
When you traverse binary to test the bytes/bits, calculate and collect the final offsets first, don't split binary (bind named Var to sub-binary) at that time. When you've got all the exactly offsets, split what you want finally:
Do
get_nth_word(Bin, N) ->
Offsets = calc_word_offsets(Bin, 0, [0]),
S = element(N, Offsets),
E = element(N + 1, Offsets),
L = E - S,
<<_:S/binary,Word:L/binary,_/binary>> = Bin,
io:format("nth Word: ~p", [Word]).
calc_word_offsets(Bin, Offset, Acc) when Offset < size(Bin) ->
case Bin of
<<_:Offset/binary,$ ,_/binary>> ->
calc_word_offsets(Bin, Offset + 1, [Offset + 1 | Acc]);
_ ->
calc_word_offsets(Bin, Offset + 1, Acc)
end;
calc_word_offsets(_, _, Acc) -> list_to_tuple(lists:reverse(Acc)).
Bin = <<"This is a binary test">>,
get_nth_word(Bin, 4). % <<"binary ">>
Do Not
get_nth_word_bad(Bin, N) ->
Words = split_words(Bin, 0, []),
Word = element(N, Words),
io:format("nth Word: ~p", [Word]).
split_words(Bin, Offset, Acc) ->
case Bin of
<<Word:Offset/binary,$ ,Rest/binary>> ->
split_words(Rest, 0, [Word | Acc]);
<<_:Offset/binary,_,_/binary>> ->
split_words(Bin, Offset + 1, Acc);
_ -> list_to_tuple(lists:reverse([Bin | Acc]))
end.
Bin = <<"This is a binary test">>,
get_nth_word_bad(Bin, 4). % <<"binary">>
III. Use "+h Size" option or [{min_heap_size, Size}] with spawn_opt
This is very important for binary performance. It's somehow a Key Number for binary performance. With this option set properly, the binary performs very well, otherwise, worse.
IV. Others
- Don't forget to compile to native by adding "-compile([native])." in your code.
- Maybe "+A Size" to set the number of threads in async thread pool also helps a bit when do IO.
Practice
Steve and Anders have pushed widefinder in Erlang to 1.1 sec on 8-CPU linux box. Their code took about 1.9 sec on my 4-CPU box. Then, how about a concise version?
According to above guide, based on my previous code and Per's dict code, Luke's spawn_worker, I rewrote a concise and straightforward tbray5.erl (less than 80 LOC), without any extra c-drived modules for Tim's exercise , and got about 2.972 sec for 1 milli lines log file, and 15.695 sec for 5 milli lines, vs no-parallelized Ruby's 4.161 sec and 20.768 sec on my 2.80Ghz 4-CPU Intel Xeon linux box:
BTW, using ets instead of dict is almost the same.
$ erlc -smp tbray5.erl $ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt real 0m2.972s user 0m9.685s sys 0m0.748s $ time erl +h 8192 -smp -noshell -run tbray5 start o5000k.ap -s erlang halt real 0m15.695s user 0m53.551s sys 0m4.268s
On 2.0GHz 2-core MacBook (Ruby code took 2.447 sec):
$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt real 0m3.034s user 0m4.853s sys 0m0.872s
The Code: tbray5.erl
-module(tbray5). -compile([native]). -export([start/1]). -define(BUFFER_SIZE, (1024 * 10000)). start(FileName) -> Dicts = [wait_result(Worker) || Worker <- read_file(FileName)], print_result(merge_dicts(Dicts)). read_file(FileName) -> {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, 0, []). read_file_1(File, Offset, Workers) -> case file:pread(File, Offset, ?BUFFER_SIZE) of eof -> file:close(File), Workers; {ok, Bin} -> DataL = split_on_last_newline(Bin), Worker = spawn_worker(self(), fun scan_chunk/1, {Bin, DataL}), read_file_1(File, Offset + DataL + 1, [Worker | Workers]) end. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, S) when S > 0 -> case Bin of <<_:S/binary,$\n,_/binary>> -> S; _ -> split_on_last_newline_1(Bin, S - 1) end; split_on_last_newline_1(_, S) -> S. scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()). scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 -> case Bin of <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> case match_until_space_newline(Bin, S + 34) of {true, E} -> Skip = S + 23, L = E - Skip, <<_:Skip/binary,Key:L/binary,_/binary>> = Bin, scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict)); {false, E} -> scan_chunk_1(Bin, DataL, E + 1, Dict) end; _ -> scan_chunk_1(Bin, DataL, S + 1, Dict) end; scan_chunk_1(_, _, _, Dict) -> Dict. match_until_space_newline(Bin, S) when S < size(Bin) -> case Bin of <<_:S/binary,10,_/binary>> -> {false, S}; <<_:S/binary,$.,_/binary>> -> {false, S}; <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1}; _ -> match_until_space_newline(Bin, S + 1) end; match_until_space_newline(_, S) -> {false, S}. spawn_worker(Parent, F, A) -> erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end). wait_result({Pid, Ref}) -> receive {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end; {'DOWN', Ref, _, _, Reason} -> exit(Reason) end. merge_dicts([D1,D2|Rest]) -> merge_dicts([dict:merge(fun(_, V1, V2) -> V1 + V2 end, D1, D2) | Rest]); merge_dicts([D]) -> D. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
Tim's Erlang Exercise - Summary
>>> Updated Nov 1:
Tim tested my last attempt tbray5.erl, which was described on Learning Coding Binary (Was Tim's Erlang Exercise - Round VI), got for his 971,538,252 bytes of data in 4,625,236 lines log file:
real 0m20.74s user 3m51.33s sys 0m8.00s
It's not the fastest, since I did not apply Boyer-Moore searching. But it's what I want: a balance between simple, readable and speed.
========
>>> Updated Oct 24:
The Erlang code can be faster than un-parallelized Ruby, a new version run 2.97 sec on the 4-CPU box: Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
========
>>> Updated Oct 22:
As Bjorn's suggestion, I added "+h 4096" option for 'erl', which means "sets the default heap size of processes to the size 4096", the elapsed time dropped from 7.7s to 5.5s immediately:
time erl +h 4096 -smp -noshell -run tbray4 start o1000k.ap 10 -s erlang halt
The +h option seems to affect on binary version a lot, but few on list version. This may be caused by that list is always copied and binary may be left in process' heap and passed by point?
The default heap size for each process is set to 233 Word, this number may be suitable for a lot of concurrent processes to avoid too much memory exhaust. But for some parallelization tasks, with less processes, or with enough memory, the heap size can be adjusted to a bit large.
Anyway, I think Erlang/OTP has been very good there for Concurrency, but there may be still room to optimize for Parallelization.
BTW, with +h option, and some tips for efficient binary, the most concise binary version tbray5.erl can run into 3 sec now.
========
This is a performance summary on Tim's Erlang exercise on large dataset processing, I only compare the results on a 4-CPU Intel Xeon 2.80G linux box:
| Log File | Time | Erlang(1 Proc) | Erlang(Many Proc) | Erlang(Many Proc) +h 4096 | Ruby |
| 1 milli lines | real | 22.088s | 7.700s | 5.475s | 4.161s |
| user | 21.161s | 25.750s | 18.785s | 3.592s | |
| sys | 0.924s | 3.552s | 1.352s | 0.568s | |
| 5 milli lines | real | 195.570s | 37.669s | 27.911s | 20.768s |
| user | 192.496s | 126.296s | 98.162s | 19.009s | |
| sys | 3.480s | 17.789s | 7.344s | 3.116s |
Notice:
- The Erlang code is tbray4.erl, which can be found in previous blog, the Ruby code is from Tim's blog.
- Erlang code is parallelized, Ruby code not.
- Erlang code is with tons of code, but, parallelization is not free lunch.
- With an 8-CPU box, Erlang's version should exceed or near to non-parallelized Ruby version*.
- Although we are talking about multiple-core era, but I'm not sure if disk/io is also ready.
* Per Steve's testing.
CN Erlounge II
It was last weekend, in Zhuhai, China, a two day CN Erlounge II, discussed topics of Erlang and FP:
- Why I choose Erlang? - by xushiwei
- Erlang emulator implementation - by mryufeng
- Port & driver - by codeplayer
- Py 2 Erl - by Zoom.Quiet
- STM: Lock free concurrent Overview - by Albert Lee
- mnesia - by mryufeng
And a new logo of "ERL.CHINA" was born as:

(Picture source - http://www.haokanbu.com/story/1104/)
More information about CN Erlounge II can be found here and some pictures.
I did not schedule for this meeting, maybe next time.
Learning Coding Parallelization (Was Tim's Erlang Exercise - Round V)
>>> Updated Oct 20:
After cleaned up my 4-CPU linux box, the result for the 1M records file is about 7.7 sec, for the 5M records file is about 38 sec.
========
>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I added another version tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. If you'd like to have a test on your machine, please try both.
========
Well, I think I've learned a lot from doing Tim's exercise, not only the List vs Binary in Erlang, but also computing in parallel. Coding Concurrency is farely easy in Erlang, but coding Parallelization is not only about the Languages, it's also a real question.
I wrote tbray3.erl in The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV) and got a fairly good result by far on my 2-core MacBook. But things always are a bit complex. As Steve pointed in the comment, when he tried tbray3.erl on his 8-core linux box:
"I ran it in a loop 10 times, and the best time I saw was 13.872 sec, and user/CPU time was only 16.150 sec, so it’s apparently not using the multiple cores very well."
I also encoutered this issue on my 4-CPU Intel Xeon CPU 2.80GHz debian box, it runs even worse (8.420s) than my 2-core MacBook (4.483s).
I thought about my code a while, and found that my code seems spawning too many processes for scan_chunk, as the scan_chunk's performance has been improved a lot, each process will finish its task very quickly, too quick to the file reading, the inceasing CPUs have no much chance to play the game, the cycled 'reading'-'spawning scan process' is actually almost sequential now, there has been very few simultaneously alive scanning processes. I think I finally meet the file reading bound.
But wait, as I claimed before, that reading file to memory is very fast in Erlang, for a 200M log file, it takes less than 800ms. The time elapsed for tbray3.erl is about 4900ms, far away from 800ms, why I say the file reading is the bound now?
The problem here is: since I suspect the performance of traversing binary byte by byte, I choose to convert binary to list to scan the world. Per my testing results, list is better than binary when is not too longer, in many cases, not longer than several KBytes. And, to make the code clear and readable, I also choose splitting big binary when read file in the meanwhile, so, I have to read file in pieces of no longer than n KBytes. For a very big file, the reading procedure is broken to several ten-thousands steps, which finally cause the whole file reading time elapsed is bit long. That's bad.
So, I decide to write another version, which will read file in parallel (Round III), and split each chunk on lastest new-line (Round II), scan the words using pattern match (Round IV), and yes, I'll use binary instead of list this time, try to solve the worse performance of binary-traverse by parallel, on multiple cores.
The result is interesting, it's the first time I achieved around 10 sec in my 2-core MacBook when use binary match only, and it's also the first time, on my dummy 4-CPU Intel Xeon CPU 2.80GHz debian box, I got better result (7.700 sec) than my MacBook.
>>> Updated Oct 15:
Steve run the code on his 8-core 2.33 GHz Intel Xeon Linux box, with the best time was 4.920 sec:
"the best time I saw for your newest version was 4.920 sec on my 8-core Linux box. Fast! However, user time was only 14.751 sec, so I’m not sure it’s using all the cores that well. Perhaps you’re getting down to where I/O is becoming a more significant factor."
Please see Steve's One More Erlang Wide Finder and his widefinder attempts.
========
Result on 2.0GHz 2-core MacBook:
$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt 8900 : 2006/09/29/Dynamic-IDE 2000 : 2006/07/28/Open-Data 1300 : 2003/07/25/NotGaming 800 : 2003/10/16/Debbie 800 : 2003/09/18/NXML 800 : 2006/01/31/Data-Protection 700 : 2003/06/23/SamsPie 600 : 2006/09/11/Making-Markup 600 : 2003/02/04/Construction 600 : 2005/11/03/Cars-and-Office-Suites Time: 10527.50 ms real 0m10.910s user 0m13.927s sys 0m6.413s
Result on 4-CPU Intel Xeon CPU 2.80GHz debian box,:
# When process number is set to 20: time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt real 0m7.700s user 0m25.750s sys 0m3.552s # When process number is set to 1: $ time erl -smp -noshell -run tbray4_bin start o1000k.ap 1 -s erlang halt real 0m22.035s user 0m21.525s sys 0m0.512s # On a 940M 5 million lines log file: time erl -smp -noshell -run tbray4_bin start o5000k.ap 100 -s erlang halt 44500 : 2006/09/29/Dynamic-IDE 10000 : 2006/07/28/Open-Data 6500 : 2003/07/25/NotGaming 4000 : 2003/10/16/Debbie 4000 : 2003/09/18/NXML 4000 : 2006/01/31/Data-Protection 3500 : 2003/06/23/SamsPie 3000 : 2006/09/11/Making-Markup 3000 : 2003/02/04/Construction 3000 : 2005/11/03/Cars-and-Office-Suites Time: 37512.76 ms real 0m37.669s user 2m6.296s sys 0m17.789s
On the 4-CPU linux box, comparing the elapsed time between ProcNum = 20 and ProcNum = 1, the elapsed time of parallelized one was only 35% of un-parallelized one, speedup about 185%. The ratio was almost the same as my pread_file.erl testing on the same machine.
It's actually a combination of code in my four previous blogs. Although the performance is not so good as tbray3.erl on my MacBook, but I'm happy that this version is a fully parallelized one, from reading file, scanning words etc. it should scale better than all my previous versions.
-module(tbray4). -compile([native]). -export([start/1, start/2]). -include_lib("kernel/include/file.hrl"). start([FileName, ProcNum]) when is_list(ProcNum) -> start(FileName, list_to_integer(ProcNum)). start(FileName, ProcNum) -> Start = now(), Main = self(), Counter = spawn(fun () -> count_loop(Main) end), Collector = spawn(fun () -> collect_loop(Counter) end), pread_file(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. pread_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), pread_file_1(FileName, ChunkSize, ProcNum, Collector). pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> [spawn(fun () -> Length = if I == ProcNum - 1 -> ChunkSize * 2; %% lastest chuck true -> ChunkSize end, {ok, File} = file:open(FileName, [raw, binary]), {ok, Bin} = file:pread(File, ChunkSize * I, Length), file:close(File), {Data, Tail} = split_on_last_newline(Bin), Collector ! {seq, I, Data, Tail} end) || I <- lists:seq(0, ProcNum - 1)], Collector ! {chunk_num, ProcNum}. collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter). collect_loop_1(Chunks, PrevTail, LastSeq, Counter) -> receive {chunk_num, ChunkNum} -> Counter ! {chunk_num, ChunkNum}, collect_loop_1(Chunks, PrevTail, LastSeq, Counter); {seq, I, Data, Tail} -> SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]), {Chunks1, PrevTail1, LastSeq1} = process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter), collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter) end. count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0). count_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; count_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> count_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq}; process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) -> case LastSeq + 1 of I -> spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), process_chunks(T, ChunkBuf, Tail, I, Counter); _ -> process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I wrote another version: tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. Here's a result for this version on a 940M file with 5 million lines, with ProcNum set to 200 and 400)
# On 2-core MacBook: $ time erl -smp -noshell -run tbray4b start o5000k.ap 200 -s erlang halt real 0m50.498s user 0m49.746s sys 0m11.979s # On 4-cpu linux box: $ time erl -smp -noshell -run tbray4b start o5000k.ap 400 -s erlang halt real 1m2.136s user 1m59.907s sys 0m7.960s
The code: tbray4b.erl
-module(tbray4b). -compile([native]). -export([start/1, start/2]). -include_lib("kernel/include/file.hrl"). start([FileName, ProcNum]) when is_list(ProcNum) -> start(FileName, list_to_integer(ProcNum)). start(FileName, ProcNum) -> Start = now(), Main = self(), Counter = spawn(fun () -> count_loop(Main) end), Collector = spawn(fun () -> collect_loop(Counter) end), read_file(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, ChunkSize, 0, Collector). read_file_1(File, ChunkSize, I, Collector) -> case file:read(File, ChunkSize) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, Bin} -> spawn(fun () -> {Data, Tail} = split_on_last_newline(Bin), Collector ! {seq, I, Data, Tail} end), read_file_1(File, ChunkSize, I + 1, Collector) end. collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter). collect_loop_1(Chunks, PrevTail, LastSeq, Counter) -> receive {chunk_num, ChunkNum} -> Counter ! {chunk_num, ChunkNum}, collect_loop_1(Chunks, PrevTail, LastSeq, Counter); {seq, I, Data, Tail} -> SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]), {Chunks1, PrevTail1, LastSeq1} = process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter), collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter) end. count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0). count_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; count_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> count_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq}; process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) -> case LastSeq + 1 of I -> spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), process_chunks(T, ChunkBuf, Tail, I, Counter); _ -> process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
=======
The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)
Playing with Tim's Erlang Exercise is so much fun.
I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.
Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.
This come out my third solution, which matchs whole
{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) }
likeness using the pattern:
"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]
and then fetchs
[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])
as the matched key, with no need to split the chunk to lines.
But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.
On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec
# smp enabled: $ erlc -smp tbray3.erl $ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt 8900 : <<"2006/09/29/Dynamic-IDE">> 2000 : <<"2006/07/28/Open-Data">> 1300 : <<"2003/07/25/NotGaming">> 800 : <<"2003/10/16/Debbie">> 800 : <<"2003/09/18/NXML">> 800 : <<"2006/01/31/Data-Protection">> 700 : <<"2003/06/23/SamsPie">> 600 : <<"2006/09/11/Making-Markup">> 600 : <<"2003/02/04/Construction">> 600 : <<"2005/11/03/Cars-and-Office-Suites">> Time: 4142.83 ms real 0m4.483s user 0m5.804s sys 0m0.615s # no-smp: $ erlc tbray3.erl $ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt real 0m7.050s user 0m6.183s sys 0m0.644s
The smp enable result speedup about 57%
On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:
real 0m8.420s user 0m11.637s sys 0m0.452s
And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.
The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.
The code:
-module(tbray3). -compile([native]). -export([start/1]). %% The best Bin Buffer Size is 4096 * 1 - 4096 * 5 -define(BUFFER_SIZE, (4096 * 5)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(File, Collector) -> read_file_1(File, [], 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> Collector ! {chunk_num, I}, file:close(File); {ok, Bin} -> {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)), spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []). split_on_last_newline_1(List, Tail) -> case List of [] -> {lists:reverse(List), []}; [$\n|Rest] -> {lists:reverse(Rest), Tail}; [C|Rest] -> split_on_last_newline_1(Rest, [C | Tail]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. scan_chunk(List) -> scan_chunk_1(List, dict:new()). scan_chunk_1(List, Dict) -> case List of [] -> Dict; "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> case match_until_space_newline(Rest, []) of {Rest1, []} -> scan_chunk_1(Rest1, Dict); {Rest1, Word} -> Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]), scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict)) end; [_|Rest] -> scan_chunk_1(Rest, Dict) end. match_until_space_newline(List, Word) -> case List of [] -> {[], []}; [10|_] -> {List, []}; [$.|_] -> {List, []}; [$ |_] -> {List, lists:reverse(Word)}; [C|Rest] -> match_until_space_newline(Rest, [C | Word]) end.
I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.
-module(tbray3_bin). -compile([native]). -export([start/1]). -define(BUFFER_SIZE, (4096 * 10000)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. read_file(File, Collector) -> read_file_1(File, <<>>, 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, Bin} -> {Data, NextTail} = split_on_last_newline(Bin), spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
Take a Break as Trader
Udated Dec 7: Well, I bought some stocks again today. Let's see year 2008.
I sold out all my hold on Stock Exchange of China this morning, and will take a break until end of this year. Wow, what a year.
Reading File in Parallel in Erlang (Was Tim Bray's Erlang Exercise - Round III)
My first solution for Tim's exercise tried to read file in parallel, but I just realized by reading file module's source code, that file:open(FileName, Options) will return a process instead of IO device. Well, this means a lot:
- It's a process, so, when you request more data on it, you actually send message to it. Since you only send 2 integer: the offset and length, sending message should be very fast. But then, this process (File) will wait for receiving data from disk/io. For one process, the receiving is sequential rather than parallelized.
- If we look the processes in Erlang as ActiveObjects, which send/receive messages/data in async, since the receiving is sequential in one process, requesting/waiting around one process(or, object) is almost safe for parallelized programming, you usaully do not need to worry about lock/unlock etc. (except the outside world).
- We can open a lot of File processes to read data in parallel, the bound is the disk/IO and the os' resources limit.
I wrote some code to test file reading in parallel, discardng the disk cache, on my 2-core MacBook, reading file with two processes can speedup near 200% to one process.
The code:
-module(file_pread). -compile([native]). -export([start/2]). -include_lib("kernel/include/file.hrl"). start(FileName, ProcNum) -> [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]]. start(FileName, ProcNum, Fun) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), Fun(FileName, ProcNum, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, undefined, 0). collect_loop_1(Main, ChunkNum, ChunkNum) -> Main ! stop; collect_loop_1(Main, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, ChunkNumX, ProcessedNum); {seq, _Seq} -> collect_loop_1(Main, ChunkNum, ProcessedNum + 1) end. get_chunk_size(FileName, ProcNum) -> {ok, #file_info{size=Size}} = file:read_file_info(FileName), Size div ProcNum. read_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), {ok, File} = file:open(FileName, [raw, binary]), read_file_1(File, ChunkSize, 0, Collector). read_file_1(File, ChunkSize, I, Collector) -> case file:read(File, ChunkSize) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, _Bin} -> Collector ! {seq, I}, read_file_1(File, ChunkSize, I + 1, Collector) end. pread_file(FileName, ProcNum, Collector) -> ChunkSize = get_chunk_size(FileName, ProcNum), pread_file_1(FileName, ChunkSize, ProcNum, Collector). pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> [spawn(fun () -> %% if it's the lastest chuck, read all bytes left, %% which will not exceed ChunkSize * 2 Length = if I == ProcNum - 1 -> ChunkSize * 2; true -> ChunkSize end, {ok, File} = file:open(FileName, [raw, binary]), {ok, _Bin} = file:pread(File, ChunkSize * I, Length), Collector ! {seq, I}, file:close(File) end) || I <- lists:seq(0, ProcNum - 1)], Collector ! {chunk_num, ProcNum}.
The pread_file/3 is parallelized, it always opens new File process for each reading process instead of sharing one opened File process during all reading processes. The read_file/3 is non-parallelized.
To evaulate: (run at least two-time for each test to average disk/IO caches.)
$ erlc -smp file_pread.erl
$ erl -smp
1> file_pread:start("o100k.ap", 2).
time: 691.72 ms
time: 44.37 ms
[ok,ok]
2> file_pread:start("o100k.ap", 2).
time: 74.50 ms
time: 43.59 ms
[ok,ok]
3> file_pread:start("o1000k.ap", 2).
time: 1717.68 ms
time: 408.48 ms
[ok,ok]
4> file_pread:start("o1000k.ap", 2).
time: 766.00 ms
time: 393.71 ms
[ok,ok]
5>
Let's compare the results for each file (we pick the second testing result of each), the speedup:
- o100k.ap, 20M, 74.50 / 43.59 - 1= 70%
- o1000k.ap, 200M, 766.00 / 393.71 - 1 = 95%
On another 4-CPU debian machine, with 4 processes, the best result I got:
4> file_pread:start("o1000k.ap", 4).
time: 768.59 ms
time: 258.57 ms
[ok, ok]
5>
The parallelized reading speedup 768.59 / 258.57 -1 = 197%
I've updated my first solution according to this testing, opening new File process for each reading process instead of sharing the same File process. Of cource, there are still issues that I pointed in Tim Bray's Erlang Exercise on Large Dataset Processing - Round II
Although the above result can also be achieved in other Languages, but I find that coding parallelization in Erlang is a pleasure.
Tim Bray's Erlang Exercise on Large Dataset Processing - Round II
Updated Oct 09: Added more benchmark results under linux on other machines.
Updated Oct 07: More concise code.
Updated Oct 06: Fixed bugs: 1. Match "GET /ongoing/When/" instead of "/ongoing/When/"; 2. split_on_last_newline should not reverse Tail.
Backed from a short vacation, and sit down in front of my computer, I'm thinking about Tim Bray's exercise again.
As I realized, the most expensive procedure is splitting dataset to lines. To get the multiple-core benefit, we should parallelize this procedure instead of reading file to binary or macthing process only.
In my previous solution, there are at least two issues:
- Since the file reading is fast in Erlang, then, parallelizing the file reading is not much helpful.
- The buffered_read actually can be merged with the buffered file reading.
And, Per's solution parallelizes process_match procedure only, based on a really fast divide_to_lines, but with hacked binary matching syntax.
After a couple of hours working, I finially get the second version of tbray.erl (with some code from Per's solution).
- Read file to small pieces of binary (about 4096 bytes each chunk), then convert to list.
- Merge the previous tail for each chunk, search this chunk from tail, find the last new line mark, split this chunk to line-bounded data part, and tail part for next chunk.
- The above steps are difficult to parallelize. If we try, there will be about 30 more LOC, and not so readable.
- Spawn a new process at once to split line-bounded chunk to lines, process match and update dict.
- Thus we can go on reading file with non-stop.
- A collect_loop will receive dicts from each process, and merge them.
What I like of this version is, it scales on mutiple-core almost linearly! On my 2.0G 2-core MacBook, it took about 13.522 seconds with non-smp, 7.624 seconds with smp enabled (for a 200M data file, with about 50,000 processes spawned). The 2-core smp result achieves about 77% faster than non-smp result. I'm not sure how will it achieve on an 8-core computer, but we'll finally reach the limit due to the un-parallelized procedures.
The Erlang time results:
$ erlc tbray.erl $ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m13.522s user 0m12.265s sys 0m1.199s $ erlc -smp tbray.erl $ time erl -smp +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m7.624s user 0m13.302s sys 0m1.602s # For 5 million lines, 958.4M size: $ time erl -smp +P 300000 -noshell -run tbray start o5000k.ap -s erlang halt > /dev/null real 0m37.085s user 1m5.605s sys 0m7.554s
And the original Tim's Ruby version:
$ time ruby tbray.rb o1000k.ap > /dev/null real 0m2.447s user 0m2.123s sys 0m0.306s # For 5 million lines, 958.4M size: $ time ruby tbray.rb o5000k.ap > /dev/null real 0m12.115s user 0m10.494s sys 0m1.473s
Erlang time result on 2-core 1.86GHz CPU RedHat linux box, with kernel:
Linux version 2.6.18-1.2798.fc6 (brewbuilder@hs20-bc2-4.build.redhat.com) (gcc v
ersion 4.1.1 20061011 (Red Hat 4.1.1-30)) #1 SMP Mon Oct 16 14:37:32 EDT 2006
is 7.7 seconds.
Erlang time result on 2.80GHz 4-cpu xeon debian box, with kernel:
Linux version 2.6.15.4-big-smp-tidy (root@test) (gcc version 4.0.3 20060128 (prerelease) (Debian 4.0
.2-8)) #1 SMP Sat Feb 25 21:24:23 CST 2006
The smp result on this 4-cpu computer is questionable. It speededup only 50% than non-smp, even worse than my 2.0GHz 2-core MacBook. I also tested the Big Bang on this machine, it speedup less than 50% too.
$ erlc tbray.erl $ time erl -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m22.279s user 0m21.597s sys 0m0.676s $ erlc -smp tbray.erl $ time erl -smp +S 4 +P 60000 -noshell -run tbray start o1000k.ap -s erlang halt > /dev/null real 0m14.765s user 0m28.722s sys 0m0.840s
Notice:
- All tests run several times to have the better result expressed, so, the status of disk/io cache should be near.
- You may need to compile tbray.erl to two different BEAMs, one for smp version, and one for no-smp version.
- If you'd like to process bigger file, you can use +P processNum to get more simultaneously alive Erlang processes. For BUFFER_SIZE=4096, you can set +P arg as FileSize / 4096, or above. From Erlang's Efficiency Guide:
Processes
The maximum number of simultaneously alive Erlang processes is by default 32768. This limit can be raised up to at most 268435456 processes at startup (see documentation of the system flag +P in the erl(1) documentation). The maximum limit of 268435456 processes will at least on a 32-bit architecture be impossible to reach due to memory
To evaluate with smp enable: (Erlang/OTP R11B-5 for Windows may not support smp yet)
erl -smp +P 60000
> tbray:start("o1000k.ap").
The code: (pretty formatted by ErlyBird 0.15.1)
-module(tbray_blog). -compile([native]). -export([start/1]). %% The best Bin Buffer Size is 4096 -define(BUFFER_SIZE, 4096). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(File, Collector) -> read_file_1(File, [], 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> Collector ! {chunk_num, I}, file:close(File); {ok, Bin} -> {Data, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)), spawn(fun () -> Collector ! {dict, scan_lines(Data)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []). split_on_last_newline_1(List, Tail) -> case List of [] -> {lists:reverse(List), []}; [$\n|Rest] -> {lists:reverse(Rest), Tail}; [C|Rest] -> split_on_last_newline_1(Rest, [C | Tail]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~p\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. scan_lines(List) -> scan_lines_1(List, [], dict:new()). scan_lines_1(List, Line, Dict) -> case List of [] -> match_and_update_dict(lists:reverse(Line), Dict); [$\n|Rest] -> scan_lines_1(Rest, [], match_and_update_dict(lists:reverse(Line), Dict)); [C|Rest] -> scan_lines_1(Rest, [C | Line], Dict) end. match_and_update_dict(Line, Dict) -> case process_match(Line) of false -> Dict; {true, Word} -> dict:update_counter(Word, 1, Dict) end. process_match(Line) -> case Line of [] -> false; "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> case match_until_space(Rest, []) of [] -> false; Word -> {true, [Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ Word} end; [_|Rest] -> process_match(Rest) end. match_until_space(List, Word) -> case List of [] -> []; [$.|_] -> []; [$ |_] -> lists:reverse(Word); [C|Rest] -> match_until_space(Rest, [C | Word]) end.
Lessons learnt:
- Split large binary to proper size chunks, then convert to list for further processing
- Parallelize the most expensive part (of course)
- We need a new or more complete Efficent Erlang
![(please configure the [header_logo] section in trac.ini)](/chrome/!97aa87b5/site/your_project_logo.png)
rss