riak core 是 riak的主要组成部分,主要负责分布式的部分,虽然官方有自己的存储后端,但是我们也可以使用其他的后端存储。
riak core 在每个节点上都是使用master/worker配置,这样作为一个工作单元来执行,riak core的worker为vnode worker, 在每个节点上由riak_core_sup生成,vnode worker对应的模块为riak_core_vnode
。
如:
节点1:
Node: ‘mfmn2@127.0.0.1‘, Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},
{current_function,{gen_server,loop,6}},
{initial_call,{proc_lib,init_p,5}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[<0.148.0>,<0.152.0>,<0.154.0>,<0.155.0>,<0.153.0>,<0.150.0>,
<0.151.0>,<0.149.0>,<0.140.0>,<0.144.0>,<0.146.0>,<0.147.0>,
<0.145.0>,<0.142.0>,<0.143.0>,<0.141.0>,<0.136.0>,<0.138.0>,
<0.139.0>,<0.137.0>,<0.77.0>,<0.135.0>]},
{dictionary,[{‘$ancestors‘,[riak_core_sup,<0.76.0>]},
{‘$initial_call‘,{supervisor_pre_r14b04,init,1}}]},
{trap_exit,true},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.75.0>},
{total_heap_size,3571},
{heap_size,2584},
{stack_size,9},
{reductions,4359},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,10},
{minor_gcs,6}]},
{suspending,[]}]
节点2:
Node: ‘mfmn1@127.0.0.1‘, Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},
{current_function,{gen_server,loop,6}},
{initial_call,{proc_lib,init_p,5}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[<0.183.0>,<0.274.0>,<0.337.0>,<0.375.0>,<0.387.0>,<0.371.0>,
<0.310.0>,<0.326.0>,<0.226.0>,<0.262.0>,<0.210.0>,<0.218.0>,
<0.153.0>,<0.165.0>,<0.177.0>,<0.171.0>,<0.159.0>,<0.135.0>,
<0.147.0>,<0.141.0>,<0.123.0>,<0.129.0>,<0.77.0>]},
{dictionary,[{‘$ancestors‘,[riak_core_sup,<0.76.0>]},
{‘$initial_call‘,{supervisor_pre_r14b04,init,1}}]},
{trap_exit,true},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.75.0>},
{total_heap_size,1974},
{heap_size,987},
{stack_size,9},
{reductions,8777},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,10},
{minor_gcs,2}]},
{suspending,[]}]
节点3:
Node: ‘mfmn3@127.0.0.1‘, Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},
{current_function,{gen_server,loop,6}},
{initial_call,{proc_lib,init_p,5}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[<0.152.0>,<0.167.0>,<0.179.0>,<0.185.0>,<0.182.0>,<0.173.0>,
<0.176.0>,<0.170.0>,<0.161.0>,<0.164.0>,<0.158.0>,<0.155.0>,
<0.137.0>,<0.143.0>,<0.149.0>,<0.146.0>,<0.140.0>,<0.128.0>,
<0.134.0>,<0.131.0>,<0.125.0>,<0.77.0>]},
{dictionary,[{‘$ancestors‘,[riak_core_sup,<0.76.0>]},
{‘$initial_call‘,{supervisor_pre_r14b04,init,1}}]},
{trap_exit,true},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.75.0>},
{total_heap_size,3194},
{heap_size,2584},
{stack_size,9},
{reductions,4507},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,10},
{minor_gcs,10}]},
{suspending,[]}]
3个节点的vnode worker 加起来刚好22 + 23 + 22 - 3 = 64
(mfmn3@127.0.0.1)3> supervisor:count_children(riak_core_vnode_sup).
[{specs,1},{active,21},{supervisors,0},{workers,21}]
(mfmn3@127.0.0.1)4>
减去3的原因。
从上面那张图可以看出, riak_core_vnode_master负责与vnode的通信,这些vnode都是fsm,如:
获取当前节点的所有vnode:
(mfmn3@127.0.0.1)8> riak_core_vnode_master:all_nodes(mfmn_vnode).
[<0.173.0>,<0.179.0>,<0.185.0>,<0.143.0>,<0.155.0>,
<0.164.0>,<0.182.0>,<0.170.0>,<0.161.0>,<0.140.0>,<0.146.0>,
<0.152.0>,<0.158.0>,<0.176.0>,<0.167.0>,<0.149.0>,<0.137.0>,
<0.125.0>,<0.128.0>,<0.131.0>,<0.134.0>]
(mfmn3@127.0.0.1)9>
这也再次证明了该节点的vnode个数为21.
向某个vnode发出Ping请求
这个例子是try-try-try
的例子:
前面会携带一个Pid,而这个Pid是根据hash在ets表中索引出来的,这个Pid是vnode 的Pid,根据这个Pid可以索引到具体的vnode,最后Mod:handle_command是用户的回调函数。
如果master没有找到对应的vnode,那么他会新建一个vnode:
get_vnode(Idx, State=#state{vnode_mod=Mod}) ->
case idx2vnode(Idx, State) of
no_match ->
{ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx),
MonRef = erlang:monitor(process, Pid),
add_vnode_rec(#idxrec{idx=Idx,pid=Pid,monref=MonRef}, State),
Pid;
X -> X
end.
因为vnode下面是存储后端,所以只要定位到vnode就可以访问后端存储。
下面是try try try
对应的代码:
start(_StartType, _StartArgs) ->
case mfmn_sup:start_link() of
{ok, Pid} ->
ok = riak_core:register([{vnode_module, mfmn_vnode}]),
ok = riak_core_ring_events:add_guarded_handler(mfmn_ring_event_handler, []),
ok = riak_core_node_watcher_events:add_guarded_handler(mfmn_node_event_handler, []),
ok = riak_core_node_watcher:service_up(mfmn, self()),
{ok, Pid};
{error, Reason} ->
{error, Reason}
end.
stop(_State) ->
ok.
启动master, 注册vnode, 铁添加mfmn_ring_event_handler,mfmn_node_event_handler...
下面就讲解一下try try try
的例子,高手勿喷!!!
这是一个Real Time Stastics
,简称RTS
--实时统计应用。
这个系统要解决的两个问题是解析记录和分发记录,这交给entry vnode
处理;第二个时接收实时统计,交给stat
vnode
处理。
###什么是vnode
erlang
process
如你所见,虚拟节点要处理很多东西,不过Basho已经帮我们处理掉,我们只要实现所提供的vnode
behaviour
即可。用户只要理解输入和输出,然后定义所需的回调函数即可。
init/1和termiante/2回调函数是虚拟节点的生命边缘,既是起止和终止位置,当一个连接到vnode的进程崩溃,那么handle_exit/3将会被调用。
###init([Index]) -> Result
?Index :: int() >= 0
Result :: {ok, State}
State :: term()
rts注册了3种vnode -- rts_vnode
、rts_entry_vnode
、 rts_stat_vnode
、
对应的master vnode 分别为:rts_vnode_master
、rts_entry_vnode_master
、rts_stat_vnode_master
。
每种vnode负责不同服务.
rts提供了http的接口方便用户录入数据,其中rts_wm_entry:process_post的数据结构如下
Client: "progski"
Entry: "0.0.0.0 - - [21/Mar/2011:18:18:19 +0000] \"GET /blog/2011/aol_meet_riak.html HTTP/1.1\" 200 5865 \"http://www.google.com/\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.151 Safari/534.16\""
Client是?gunzip -c progski.access.log.gz | ./replay
progski
传递过来的.
然后,根据rts_entry来索引可用vnode,
PrefList = riak_core_apl:get_apl(DocIdx, 1, rts_entry),
[IdxNode] = PrefList,
rts_entry_vnode:entry(IdxNode, Client, Entry).
发送命令:
riak_core_vnode_master:command(IdxNode,
{entry, Client, Entry},
?MASTER).
其中?MASTER为rts_entry_vnode_master
.
如图:
最后交给stat vnode 进行统计。
原文:http://blog.csdn.net/laohan_/article/details/39972673