mnesia作为一个天生分布式的数据库,为保证各个节点数据的一致性,会对于每个非读操作进行节点间的数据同步。
dirty_write也是要同步数据的,他和普通写操作的区别在于:
1.对操作不加锁。
2.同步数据请求不要求回执。
这样就会大大的增强操作的执行效率。在调用dirty_write后,mnesia会给除自己以外的所有联通节点发送消息,告诉他们应该执行的操作,然后自己再执行mnesia_tm:do_dirty。其他节点收到同步数据的请求后,也会调用mnesia_tm:do_dirty。
然后我做了一下测试分别是在不同节点数的情况下,执行100000个并发进程去dirty_write同一个mnesia表,结果如下:
1.当单独一个节点时,并发100000个进程去脏写mnesia,平均每次的耗时是18微秒左右。
2.但是2个节点的话,并发100000个进程去脏写mnesia,平均每次的耗时是54微秒左右。
3.而3个节点的话,并发100000个进程去脏写mnesia,平均每次的耗时是82微秒左右。
也就是说,发送同步消息的消耗会随着节点数的上升而提高。如果我们的项目节点书在10以上的时候,整个一个do_dirty操作会高到不能忍受的地步,这该怎么办呢?
要想知道解决办法我们还要在分析一下dirty_write的流程来找到哪里是整个操作的瓶颈。
1.最开始猜测是发送消息多个节点多了20-30微秒,可是实际经过我的测试,一个节点向另一个节点发消息,10万并发,平均2微秒左右。那就不对了,多余的10多微秒都干嘛去了呢?为了确认是否是发送消息影响了整个流程,我改了mnesia_tm这个module的代码,把发送消息的代码注释掉了,然后重复上面的测试,3个节点的情况下,平均耗时缩小到了25微秒左右。也就是说确实是发送消息占用了多余的时间。那为什么单独测试消耗的时间很小呢,我猜测应该是单独测并发多少万,如果操作简单的话,实际上同时运行的进程数很少,因为有可能第1000个进程还没启动第一个进程就运行完毕了呢,这样的话同时并发根本到不了1000以上,所以测试并发消息发送的那个测试并不准确。而mnesia的测试,由于整个流程的占用时间很长,所以同时并发的进程数可能达到了几万,那么同时消息发送可能达到几万,那么占用时间就相应的增加了。结论就是erlang的send操作,如果并发量越大,那么每个send操作占用的时间越长。所以发送消息这边没法提高速度,现有情况已经是最优的了。
2.那么剩下来能优化的地方就是mnesia_tm:do_dirty这方面了,因为所有的节点包括本地节点都要用do_dirty来操作ets表更新数据,这方面可以通过同时操作多个ets表来避免阻塞。也就是分表。原来读写一张表变成读写多个同样表结构的表。通过实验测试:
1.还是3个节点,并且分3张表,并发100000个进程去脏写mnesia,平均每次的耗时是54微秒左右。
2.还是3个节点,并且分4张表,并发100000个进程去脏写mnesia,平均每次的耗时是28微秒左右。
效率有明显的提升。说明这个方法有效。
?
下面是测试的代码:
测试分表性能和多节点性能:
-module (test_mnesia_split).
-export ([start/0, dw/0, tdw/0, stop/0, dw1/0, tdw1/0]).
-record (test_mnesia, {userid, pid}).
-record (test_mnesia1, {userid, pid}).
-record (test_mnesia2, {userid, pid}).
-record (test_mnesia3, {userid, pid}).
start() ->
NodeList = node_list(),
rpc_call(mnesia, stop, [], stopped),
mnesia:delete_schema(NodeList),
mnesia:create_schema(NodeList),
rpc_call(mnesia, start, [], ok),
{atomic,ok} = mnesia:create_table(test_mnesia, [{ram_copies, NodeList},
{attributes, record_info(fields, test_mnesia)}]),
{atomic,ok} = mnesia:create_table(test_mnesia1, [{ram_copies, NodeList},
{attributes, record_info(fields, test_mnesia1)}]),
{atomic,ok} = mnesia:create_table(test_mnesia2, [{ram_copies, NodeList},
{attributes, record_info(fields, test_mnesia2)}]),
{atomic,ok} = mnesia:create_table(test_mnesia3, [{ram_copies, NodeList},
{attributes, record_info(fields, test_mnesia3)}]),
{atomic,ok} = mnesia:add_table_index(test_mnesia, pid).
% rpc_call(mnesia, stop, [], stopped).
stop() ->
NodeList = node_list(),
rpc_call(mnesia, stop, [], stopped),
mnesia:delete_schema(NodeList).
dw() ->
<<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12),
random:seed (A, B, C),
Random = random:uniform(3),
UserId = <<"user_1@android">>,
mnesia:dirty_write(#test_mnesia{userid = UserId, pid = UserId}).
tdw() ->
tc:ct(?MODULE, dw, [], 100000).
dw1() ->
<<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12),
random:seed (A, B, C),
Random = random:uniform(3),
UserId = <<"user_1@android">>,
case Random of
1 ->
mnesia:dirty_write(#test_mnesia1{userid = UserId, pid = UserId});
2 ->
mnesia:dirty_write(#test_mnesia2{userid = UserId, pid = UserId});
3 ->
mnesia:dirty_write(#test_mnesia3{userid = UserId, pid = UserId});
_ ->
mnesia:dirty_write(#test_mnesia{userid = UserId, pid = UserId})
end.
tdw1() ->
tc:ct(?MODULE, dw1, [], 100000).
%% ===================================================================
%% Internal functions
%% ===================================================================
node_list() ->
[node() | nodes()].
rpc_call(Module, Function, Args, Result) ->
rpc_call(node_list(), Module, Function, Args, Result).
rpc_call([H|T], Module, Function, Args, Result) ->
Result = rpc:call(H, Module, Function, Args),
rpc_call(T, Module, Function, Args, Result);
rpc_call([], _, _, _, _) ->
ok.
?
?
测试消息发送的性能:
-module (ts).
-export ([start_rec/0, send/0, start/1]).
start_rec() ->
Pid = spawn(fun() -> rec() end),
case whereis(rec) of
undefined ->
register(rec, Pid);
_ ->
ok
end.
rec() ->
receive
stop ->
stop;
_ ->
rec()
end.
send() ->
send(1000).
send(N) when N > 1 ->
{rec, ‘s2@192.168.1.137‘} ! a,
send(N - 1);
send(1) ->
{Microsecond, _} = timer:tc(erlang, send, [{rec, ‘s2@192.168.1.137‘},a]),
collector ! {result, Microsecond},
ok.
collector(Max, Min, Sum, N, I, List) when N >= I ->
receive
{result, Microsecond} ->
NewSum = Sum + Microsecond,
if
Max == 0 ->
NewMax = NewMin = Microsecond;
Max < Microsecond ->
NewMax = Microsecond,
NewMin = Min;
Min > Microsecond ->
NewMax = Max,
NewMin = Microsecond;
true ->
NewMax = Max,
NewMin = Min
end,
collector(NewMax, NewMin, NewSum, N, I + 1, [Microsecond|List])
end;
collector(Max, Min, Sum, N, _, List) ->
Aver = Sum / N,
{Less5, Less10, Less20, Less50, Less100, Less500, Other} = distribution(List, Aver),
{Max, Min, Sum, Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other}.
distribution(List, Aver) ->
distribution(List, Aver, 0, 0, 0, 0, 0, 0, 0).
distribution([H|T], Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other) ->
if
H < 5 ->
distribution(T, Aver, Less5 + 1, Less10, Less20, Less50, Less100, Less500, Other);
H < 10 ->
distribution(T, Aver, Less5, Less10 + 1, Less20, Less50, Less100, Less500, Other);
H < 20 ->
distribution(T, Aver, Less5, Less10, Less20 + 1, Less50, Less100, Less500, Other);
H < 50 ->
distribution(T, Aver, Less5, Less10, Less20, Less50 + 1, Less100, Less500, Other);
H < 100 ->
distribution(T, Aver, Less5, Less10, Less20, Less50, Less100 + 1, Less500, Other);
H < 500 ->
distribution(T, Aver, Less5, Less10, Less20, Less50, Less100, Less500 + 1, Other);
true ->
distribution(T, Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other + 1)
end;
distribution([], _Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other) ->
{Less5, Less10, Less20, Less50, Less100, Less500, Other}.
start(N) ->
case whereis(collector) of
undefined ->
register(collector, self());
_ ->
ok
end,
loop(N),
{Max, Min, Sum, Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other} = collector(0, 0, 0, N, 1, []),
io:format ("=====================~n"),
io:format ("spawn [~p] processes of erlang:send/3:~n", [N]),
io:format ("Maximum: ~p(μs)\t~p(s)~n", [Max, Max / 1000000]),
io:format ("Minimum: ~p(μs)\t~p(s)~n", [Min, Min / 1000000]),
io:format ("Sum: ~p(μs)\t~p(s)~n", [Sum, Sum / 1000000]),
io:format ("Average: ~p(μs)\t~p(s)~n", [Aver, Aver / 1000000]),
io:format ("0 <= x < 5: ~p~n", [Less5]),
io:format ("5 <= x < 10: ~p~n", [Less10]),
io:format ("10 <= x < 20: ~p~n", [Less20]),
io:format ("20 <= x < 50: ~p~n", [Less50]),
io:format ("50 <= x < 100: ~p~n", [Less100]),
io:format ("100 <= x < 500: ~p~n", [Less500]),
io:format ("x => 500: ~p~n", [Other]),
io:format ("=====================~n").
loop(N) when N > 0 ->
spawn(fun ts:send/0),
loop(N - 1);
loop(0) ->
ok.
?
原文:http://wudixiaotie.iteye.com/blog/2237009