由于一篇Blog写太长无法发表,这里面我们将继续分析下dist.c中的setnode_3这个函数的作用和net_kernel得到连接成功之后又进行了什么操作。
BIF_RETTYPE setnode_3(BIF_ALIST_3)
{
BIF_RETTYPE ret;
Uint flags;
unsigned long version;
Eterm ic, oc;
Eterm *tp;
DistEntry *dep = NULL;
Port *pp = NULL;
/* Prepare for success */
ERTS_BIF_PREP_RET(ret, am_true);
/*
* Check and pick out arguments
*/
if (!is_node_name_atom(BIF_ARG_1) ||
is_not_internal_port(BIF_ARG_2) ||
(erts_this_node->sysname == am_Noname)) {
goto badarg;
}
if (!is_tuple(BIF_ARG_3))
goto badarg;
tp = tuple_val(BIF_ARG_3);
if (*tp++ != make_arityval(4))
goto badarg;
if (!is_small(*tp))
goto badarg;
flags = unsigned_val(*tp++);
if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
goto badarg;
ic = *(++tp);
oc = *(++tp);
if (!is_atom(ic) || !is_atom(oc))
goto badarg;
/* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
if (BIF_P->common.u.alive.reg)
erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
erts_dsprintf(dsbufp,
" attempted to enable connection to node %T "
"which is not able to handle extended references.\n",
BIF_ARG_1);
erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
goto badarg;
}
/*
* Arguments seem to be in order.
*/
/* get dist_entry */
dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
if (dep == erts_this_dist_entry)
goto badarg;
else if (!dep)
goto system_limit; /* Should never happen!!! */
//通过Port的ID获取Port的结构
pp = erts_id2port_sflgs(BIF_ARG_2,
BIF_P,
ERTS_PROC_LOCK_MAIN,
ERTS_PORT_SFLGS_INVALID_LOOKUP);
erts_smp_de_rwlock(dep);
if (!pp || (erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLG_EXITING))
goto badarg;
if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
goto badarg;
//如果当前cid和传入的Port的ID相同,且port的sist_entry和找到的dep相同
//那么直接进入结束阶段
if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
goto done; /* Already set */
if (dep->status & ERTS_DE_SFLG_EXITING) {
/* Suspend on dist entry waiting for the exit to finish */
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
erts_smp_mtx_lock(&dep->qlock);
erts_proclist_store_last(&dep->suspended, plp);
erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}
ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
if (pp->dist_entry || is_not_nil(dep->cid))
goto badarg;
erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
/*
* Dist-ports do not use the "busy port message queue" functionality, but
* instead use "busy dist entry" functionality.
*/
{
ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
}
//更新Port所关联的dist
pp->dist_entry = dep;
dep->version = version;
dep->creation = 0;
ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
#if 1
dep->send = (pp->drv_ptr->outputv
? dist_port_commandv
: dist_port_command);
#else
dep->send = dist_port_command;
#endif
ASSERT(dep->send);
#ifdef DEBUG
erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == 0);
erts_smp_mtx_unlock(&dep->qlock);
#endif
//更新dist_entry的cid
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
erts_smp_de_rwunlock(dep);
dep = NULL; /* inc of refc transferred to port (dist_entry field) */
//增加远程节点的数量
inc_no_nodes();
//发送监控信息到调用的进程
send_nodes_mon_msgs(BIF_P,
am_nodeup,
BIF_ARG_1,
flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
NIL);
done:
if (dep && dep != erts_this_dist_entry) {
erts_smp_de_rwunlock(dep);
erts_deref_dist_entry(dep);
}
if (pp)
erts_port_release(pp);
return ret;
yield:
ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
goto done;
badarg:
ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
goto done;
system_limit:
ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
goto done;
}
setnode_3首先是将,得到的远程节点的名字放入dist的hash表中,并且将这个表项和连接到远程节点的Port进行了关联。
接着将和远程节点进行连接的Port标记为ERTS_PORT_SFLG_DISTRIBUTION,这样在一个Port出现Busy的时候我们能区分出是普通的Port还是远程连接的Port,在一个Port被销毁的时候,是否要调用dist.c中的erts_do_net_exits来告诉Erts远程节点已经掉线了。当这些都顺利的完成了之后,会在这个Erts内部广播nodeup这个消息,那么nodeup的接收者又是谁呢?nodeup的接收者是那些通过process_flag函数设置了monitor_nodes标记的进程,当然monitor_nodes这选项文档中是没有的。如果我们想监听nodeup事件,只能通过net_kernel:monitors函数来完成。
我们上次说到负责连接远程节点的进程会通知net_kernel进程,让我们接着看下net_kernel收到消息做了什么。
handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}},
State) ->
case {Immediate, ets:lookup(sys_dist, Node)} of
{true, [Conn]} when Conn#connection.state =:= pending,
Conn#connection.owner =:= SetupPid ->
ets:insert(sys_dist, Conn#connection{state = up,
address = Address,
waiting = [],
type = Type}),
SetupPid ! {self(), inserted},
reply_waiting(Node,Conn#connection.waiting, true),
{noreply, State};
_ ->
SetupPid ! {self(), bad_request},
{noreply, State}
end;
更新ets中的状态,同时发送消息给所有等待的进程,告诉他们远程连接已经成功了,你们可以继续进行后续操作了。
这个时候你会惊奇的发现,心跳在什么地方呢?不急,我们再回头看下net_kernel的init函数
init({Name, LongOrShortNames, TickT}) ->
process_flag(trap_exit,true),
case init_node(Name, LongOrShortNames) of
{ok, Node, Listeners} ->
process_flag(priority, max),
Ticktime = to_integer(TickT),
Ticker = spawn_link(net_kernel, ticker, [self(), Ticktime]),
{ok, #state{name = Name,
node = Node,
type = LongOrShortNames,
tick = #tick{ticker = Ticker, time = Ticktime},
connecttime = connecttime(),
connections =
ets:new(sys_dist,[named_table,
protected,
{keypos, 2}]),
listen = Listeners,
allowed = [],
verbose = 0
}};
Error ->
{stop, Error}
end.
net_kernel首先创建了一个ticker进程,它专门负责发心跳给net_kernel进程,然后net_kernel进程会遍历所有远程连接的进程,让他们进行一次心跳。当我们改变了一个节点的心跳时间的时候,我们会开启一个aux_ticker进程帮助我们进行过度,直到所有节点都知道了我们改变了心跳周期为止,当所有节点都知道我们改变了心跳周期,这个aux_ticker进程也就结束了它的历史性任务,安静的退出了。
那么是如何发现远程节点退出的,当然是TCP数据传输发生了故障Port被清理掉了,这个可参见dist.c中的erts_do_net_exits。
当这些都完成了,我们将继续回到global模块和global_group模块中去分析下nodeup的时候,两个节点是如何同步他们的全局名字的。
原文:http://my.oschina.net/u/236698/blog/389751