[TOC]
Ceph在目前的网络层面上有三个重要概念,分别是 Messenger,Pipe,Connection。Messenger实际上可以理解为一个监听地址和多个连接的集合。比如每个OSD 中会有cluster_messenger 和public_messenger,顾名思义cluster_messenger 负责给OSD 与其他OSD 和Monitor 的通信并提供了一个监听地址,public_messenger负责与客户端的通信并提供了一个面向客户端的监听地址。因此cluster_messenger 中负责的连接会全部是面向其他OSD 或者Monitor 的连接。Pipe实际上是一个 Session 的载体,为了解决网络连接不稳定或者临时闪断连接的问题,Pipe会一直维护面向一个终端地址的会话状态,如类似 TCP 包序号的消息序号和发送队列。Connection 就是一个 socket 的 wrapper,它从属于某一个 Pipe。(引用)
上图是一个OSD端的网络逻辑。OSD继承自Dispatcher类,它其中有SimpleMessenger类的成员变量cluster_messenger和client_messenger。
SimpleMessenger类中有Accepter接收类、DispatchQueue派发类和成员为Pipe类的set。
Accepter中线程负责将图中Listen送入Pipe set中。
DispatchQueue类中线程负责将Pipe set中的数据拿出来处理,调用它中的Dispatcher类的成员函数ms_dispatcher将数据交给后端(Dispatcher类就是OSD,在创建SimpleMessenger就把自己传入了)
每一对通信的peer之间创建四个线程维护连接状态(每一端两个线程writer_thread和reader_thread,分别负责读和写)。参见:消息的接收-Pipe的connect和accept 章节步骤2。
以下分别对应了socket函数对应实现位置:
? ::socket------Accepter::bind
? ::bind------Accepter::bind
? ::listen------Accepter::bind
? ::accept------Accepter::entry
? ::send------Pipe::tcp_write
? ::recv------Pipe::tcp_read
? ::close------Accepter::stop
如图所示以OSD为例(在ceph_osd.cc中)描述了消息模块的生命周期 ,首先创建6个messenger(ms_public、ms_cluster、ms_hbclient、ms_hb_back_server、ms_hb_front_server、ms_objecter),然后分别进行相应IP地址及端口的绑定工作。绑定后开启消息模块进行通信,接着开始OSD服务的初始化工作,在OSD初始化过程中,添加任务到dispatch列表,然后进入工作状态,循环的接收请求,并启动osd相关处理线程,循环进行处理。当消息处理完毕后会一直处于wait状态,阻塞式的等待线程结束,如果线程结束,那么删除之前创建的messenger实体。这就是整个消息梳理的大致周期流程。
启动mon、osd、msd、fuse均会走类似的启动流程,消息处理机制也大致相同。本文以osd进程的启动介绍消息通信的处理。
详细处理过程如下:
OSD注册的Messenger实例列表:
Messenger实例名称 | 作用 |
---|---|
*ms_public | 用来处理OSD和Client之间的消息 |
*ms_cluster | 用来处理OSD和集群其他OSD及MON之间的消息 |
*ms_hbclient | 用来处理OSD和其它OSD保持心跳的消息 |
*ms_hb_back_server | 用来处理OSD接收心跳消息 |
*ms_hb_front_server | 用来处理OSD发送心跳消息 |
*ms_objecter | 用来处理OSD和Objecter之间的消息 |
struct Policy {
/// If true, the Connection is tossed out on errors.
bool lossy;
/// If true, the underlying connection can‘t be re-established from this end.
bool server;
/// If true, we will standby when idle
bool standby;
/// If true, we will try to detect session resets
bool resetcheck;
- lossy: 如果为true,当连接出现错误,就删除连接。
- server:如果为true,为服务端,都是被动连接。
- standby:如果为true,当空闲时,该连接处于等待状态。
- resetcheck:如果为true,连接出错后会进行重连。
该策略初始化在set_policy()函数时,已经设定好了。通过stateful_server()、stateless_server()、lossless_peer()、lossless_peer_reuse()、lossy_client()、lossless_client()进行初始化。
不同的消息实例会创建不同类型的策略,比如ms_public,OSD和mon之间的通信通过lossy_client()注册。后续消息处理的时候,会根据设置的策略,进入不同的处理机制。
通信的双方需要约定数据格式。否则收到对方发送的数据,不知道如何解析。Message提供了消息的基本结构:
class Message : public RefCountedObject {
protected:
ceph_msg_header header; // headerelope
ceph_msg_footer footer;
bufferlist payload; // "front" unaligned blob
bufferlist middle; // "middle" unaligned blob
bufferlist data; // data payload (page-alignment will be preserved where possible)
...
};
消息内容可以分为3个部分header、user data、footer。其中user data可以细分三个部分payload、middle、data。payload 一般是ceph操作的元数据 , middle是预留字段目前没有使用。 data是一般为读写的数据。
接下来先介绍header:
struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload */
__le32 middle_len;/* bytes in middle payload */
__le32 data_len; /* bytes of data payload */
__le16 data_off; /* sender: include full offset;
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
/* oldest code we think can decode this. unknown if zero. */
__le16 compat_version;
__le16 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
因为payload/middle/data大小一般是变长,因此,为了能正确地解析三者,header中记录了三者的长度:
接下来看footer的数据结构:
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
// sig holds the 64 bits of the digital signature for the message PLR
__le64 sig;
__u8 flags;
} __attribute__ ((packed));
在footer中会计算payload/middle/data的crc,填入front_crc middle_crc和data_crc。
类accepter主要用来在server端监听,接收消息连接。
class Accepter : public Thread {
SimpleMessenger *msgr;
bool done;
int listen_sd; //监听的socket
uint64_t nonce; //
}
该类继承自Thread,本身也是一个线程类,循环的监听server的端口。
类DispatchQueue主要用于把接收到的请求保存在内部,通过其内部的线程,调用SimpleMessenger类注册的dispatch类的处理函数处理相应的消息。
class DispatchQueue {
class QueueItem {}
PrioritizedQueue<QueueItem, uint64_t> mqueue;//接受消息的优先队列
set<pair<double, Message*> > marrival;//接收到的消息集合 pair为(recv_time, message)
map<Message *, set<pair<double, Message*> >::iterator> marrival_map;//消息-> 所在集合位置的映射。
}
mqueue是一个优先级队列,用来保存消息,marrival保存接收到的消息集合,marrival_map保存消息到所在集合位置的映射。
函数 DispatchQueue::enqueue ()把接收到的消息,保存在mqueue消息队列中,函数DispatchQueue::entry()为线程处理函数,最终调用ms_dispatch()函数来处理消息。
类SimpleMessenger实现了messager接口。
class SimpleMessenger : public SimplePolicyMessenger {
Accepter accepter;//用于接收客户端的链接请求
DispatchQueue dispatch_queue;//接收到的消息分发队列
Pipe *add_accept_pipe(int sd);
bool did_bind;//是否绑定
__u32 global_seq;//全局消息seq号
ceph_spinlock_t global_seq_lock;
ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
set<Pipe*> accepting_pipes;//正在处理的pipes
set<Pipe*> pipes;//所有的pipes
list<Pipe*> pipe_reap_queue;//准备释放的pipes
}
Pipe类实现了两个端口之间类似管道的功能。每一个pipe,都有一个reader线程和一个write线程,分别用于处理消息的接收和请求的发送。
class Pipe : public RefCountedObject {
class Reader : public Thread {
Pipe *pipe;
public:
explicit Reader(Pipe *p) : pipe(p) {}
void *entry() { pipe->reader(); return 0; }
} reader_thread;//接收线程,接受数据
class Writer : public Thread {
Pipe *pipe;
public:
explicit Writer(Pipe *p) : pipe(p) {}
void *entry() { pipe->writer(); return 0; }
} writer_thread;//发送线程,发送数据
SimpleMessenger *msgr;//
uint64_t conn_id;//分配给pipe自己的唯一ID
int sd;//sockfd 标识一个套接字
struct iovec msgvec[SM_IOV_MAX]; //发送消息的iovec结构
int port;
int peer_type;
entity_addr_t peer_addr;
Messenger::Policy policy;
Mutex pipe_lock;
int state;
atomic_t state_closed; // 如果该值为非0,state = STATE_CLOSE,
PipeConnectionRef connection_state;//PipeConnection 的引用
map<int, list<Message*> > out_q; //准备发送的消息队列
DispatchQueue *in_q;//接收到消息的队列
list<Message*> sent;//当前要发送的消息
uint64_t out_seq;//发送序号
uint64_t in_seq, in_seq_acked;//接收序号、ACK信号
}
下面以CS模型进行分析消息的发送和接收的过程。主要以simple_client.cc和simple_server.cc示例代码展开分析。
test/messenge/simple_client.cc
1.创建messenger实例
Messenger *Messenger::create()函数创建一个messenger实例,指定消息类型为simple。默认支持simple、async、xio三种类型。
2.设置默认消息策略
调用函数messenger->set_default_policy() 创建默认的消息策略。
3.创建dispatcher,开启接收消息处理线程
调用函数messenger->add_dispatcher_head() 注册dispatcher,注册dispatcher,开启线程ms_dispatch和ms_local进行接收到的消息的处理。
4.启动messenger实例
调用函数SimpleMessenger::start(),开启实例。
5.获取与目的端连接
调用函数SimpleMessenger::get_connection(dest)获取与目的端的连接。在函数内进行判断本端地址和目的端地址是否相同,若相同,那么判断为本端消息通信,返回local_connection,即处理的是本地连接;若不相同,那么判断为远端消息通信,然后判断与远端的pipe是否存在,若存在,返回连接PipeConnectionRef,若不存在,创建pipe,pipe状态置位STATE_CONNECTING,并添加到rank_pipe中,Pipe *SimpleMessenger::connect_rank开启发送线程ms_pipe_write, 调用pipe->writer()发送消息,调用Pipe消息发送线程。 此时线程ms_pipe_write调用writer由于没有要发送的数据,进入休眠阶段。
Tips:客户端调用get_connection函数,创建pipe, 实际是在pipe::writer()函数中,调用connect() 发起连接请求,然后服务端线程ms_accept调用accept函数接收到请求,服务端创建pipe, 线程ms_pipe_read来处理接收的通信请求,实际是在pipe::reader()函数中调用accept()函数接收请求,而线程ms_accept继续调用pipe::accept函数循环接收请求连接。(对应接收消息整体流程章节的步骤4)
6.发送消息
调用函数conn->send_message(m)把消息发送出去。实际上调用的是SimpleMessenger::_send_message(Message m, Connection con)接口,然后通过函数SimpleMessenger::submit_message()对消息进行提交。在该函数里,判断如果pipe存在且为非closed状态,那么调用_send(Message m)函数从out_q消息队列中发送消息。如果pipe不存在,那么调用函数Pipe SimpleMessenger::connect_rank创建一个pipe,然后调用_send(Message m)函数从out_q消息队列中发送消息。然后也通过Pipe消息发送线程完成消息的发送工作。调用_send(Message m)函数时,会唤醒线程ms_pipe_write,进行消息的发送。
test/messenge/simple_server.cc
1.创建messenger实例
Messenger *Messenger::create()函数创建一个messenger实例,指定消息类型为simple。默认支持simple、async、xio三种类型。
2.设置默认消息策略
调用函数messenger->set_default_policy() 创建默认的消息策略。
3.bind指定IP和端口
调用函数SimpleMessenger::bind(),首先创建一个socket,然后绑定socket到指定的IP和端口,然后调用listen()函数监听消息的连接。绑定成功后设置did_bind = true。
4.创建dispatcher,开启接收消息线程
调用函数messenger->add_dispatcher_head() 注册dispatcher,注册dispatcher,开启线程ms_dispatch和ms_local进行接收到的消息的处理。同时由于步骤3绑定成功,did_bind = true,这里调用Accepter::start()函数创建线程ms_accepter,主要调用accept系统函数,循环接收请求连接。
然后调用函数SimpleMessenger::add_accept_pipe()创建pipe,pipe状态置为STATE_ACCEPTING,调用start_reader(),创建线程ms_pipe_read,开始pipe->reader(),进行通信请求处理,进入Pipe消息接收子流程。最后把该pipe加入accepting_pipes和pipe的set中。
注意:在函数add_accept_pipe中,创建专门的pipe线程ms_pipe_read负责此次通信,而ms_accepter线程继续accept,而不是停下来处理通信请求。
Tips:客户端调用get_connection函数,创建pipe, 实际是在pipe::writer()函数中,调用connect() 发起连接请求,然后服务端线程ms_accept调用accept函数接收到请求,服务端创建pipe, 线程ms_pipe_read来处理接收的通信请求,实际是在pipe::reader()函数中调用accept()函数接收请求,而线程ms_accept继续调用pipe::accept函数循环接收请求连接。(对应发送消息整体流程章节的步骤5)
5.启动messenger实例
调用函数SimpleMessenger::start(),开启实例。同时设置started == true。
6.循环处理消息。
调用函数SimpleMessenger::wait()阻塞方式等待线程ms_dispatch和ms_local结束,循环的处理消息。进入DispatchQueue线程处理流程。
在函数SimpleMessenger::wait()内部,当线程处理流程结束,开始进行清除相关工作,主要清除本地消息、停止接收线程ms_accepter接收连接、回收pipe信息并把started置位 false。
7.删除messenger
执行delete messenger删除相应的messenger。
线程处理函数主要是DispatchQueue线程存在的时候,处理分发队列中的消息。
对于client端,调用函数get_connection,如果需要创建新的pipe,会调用Pipe *SimpleMessenger::connect_rank(),
conn = messenger->get_connection(dest_server)
client端会创建Pipe的写进程,写进程的主函数是Pipe::writer , 而此时Pipe处于Pipe::STATE_CONNECTING状态。当Pipe处于Pipe::STATE_CONNECTING状态,writer函数会调用Pipe::connect函数,该函数负责与服务器建立连接,真正意义上的通信通道。
对于server端,服务器端的Accepter线程正阻塞在accept系统调用上,等待client调用connect系统调用来连,一旦服务器端的accept函数返回,Accepter中的线程就会调用add_accept_pipe函数来创建一个新的Pipe,全权负责和client的通信,新创建的Pipe处于Pipe::STATE_ACCEPTING,Pipe的读线程的主函数是Pipe::reader,在该函数中,如果Pipe状态是STATE_ACCEPTING,会调用Pipe::accept函数和client进行通信,创建会话。
在通信的初始化阶段,Pipe::connect和Pipe::accept是一对,他俩互相协商,互相通信,建立连接关系。下图是建立关系中主要流程图:
在客户端connect函数结束时开启reader线程,主要调用函数handle_ack()处理发送消息后的ack信息。在服务端accept函数结束时开启writer线程,主要调用函数write_ack()函数处理接收的消息处理后的ack信息。这就是pipe通信的机制,每端同时开启读写线程进行处理。
原文:https://blog.51cto.com/wendashuai/2870402