-
-
Save lemonhall/47babdc519ef75d1f40b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
last edit: 20140822 15:11 | |
本文分析的代码是 nanomsg-0.4-beta版本。 | |
要创建一个 PUB/SUB 服务,只需要六个 API,分别是: | |
1. nn_socket | |
2. nn_bind | |
3. nn_connect | |
4. nn_send | |
5. nn_recv | |
6. nn_close | |
1. nn_socket | |
1.1 nn_global_init | |
1.2 nn_global_create_socket | |
1.1.1 nn_global_add_transport (nn_tcp); | |
1.1.2 nn_global_add_socktype (nn_pub_socktype); | |
1.1.3 nn_global_add_socktype (nn_sub_socktype); | |
1.1.4 nn_pool_init (&self.pool); | |
1.1.5 nn_fsm_init_root (&self.fsm, nn_global_handler, nn_global_shutdown, &self.ctx); | |
1.1.6 nn_ctx_init (&self.ctx, nn_global_getpool (), NULL); | |
1.1.7 nn_fsm_start (&self.fsm); | |
nn_global 中的 transport 和 socktype 分别代表了底层实现方式和用户的使用方式。 | |
这里的 transport 和 socktype 都是 list, nn_global_add_transport 函数可以添加随意多的 transport 和 socktype。 | |
在创建 socket 的时候,遍历查找。 | |
此刻我们只关心 nn_tcp 和 nn_pub_socktype, nn_sub_socktype。 | |
定义如下: | |
static struct nn_socktype nn_sub_socktype_struct = { | |
AF_SP, NN_SUB, | |
NN_SOCKTYPE_FLAG_NOSEND, | |
nn_xsub_create, nn_xsub_ispeer, | |
NN_LIST_ITEM_INITIALIZER | |
}; | |
static struct nn_socktype nn_pub_socktype_struct = { | |
AF_SP, NN_PUB, | |
NN_SOCKTYPE_FLAG_NORECV, | |
nn_xpub_create, nn_xpub_ispeer, | |
NN_LIST_ITEM_INITIALIZER | |
}; | |
static struct nn_transport nn_tcp_vfptr = { | |
"tcp", NN_TCP, NULL, NULL, | |
nn_tcp_bind, nn_tcp_connect, | |
nn_tcp_optset, | |
NN_LIST_ITEM_INITIALIZER | |
}; | |
struct nn_transport *nn_tcp = &nn_tcp_vfptr; | |
1.1.4 nn_pool_init | |
本部分尚未完全完成,每个 pool 中只有一个 worker。 | |
nn_pool_init == nn_worker_init | |
1.1.4.1 nn_worker_init | |
int nn_worker_init (struct nn_worker *self) | |
{ | |
此处 efd 用于进程间的通信方式,底层使用 socketfd 实现。 | |
1.1.4.1.1 nn_poller_init (&self->poller); | |
1.1.4.1.2 nn_poller_add (&self->poller, nn_efd_getfd (&self->efd), &self->efd_hndl); | |
1.1.4.1.3 nn_thread_init (&self->thread, nn_worker_routine, self); | |
} | |
1.1.4.1.1 nn_poller_init | |
poller 底层用 epoll 实现,是一个标准的 poller。 | |
1.1.4.1.2 nn_poller_add | |
此处将前面的 efd 加入到 worker 的 poller,此时 &self->efd_hndl == NULL | |
1.1.4.1.3 nn_thread_init | |
此处启动新线程,将 self 传入,进入事件循环。 | |
可以预见,user 线程将事件加入到 worker 线程中后,通过 efd 通知 worker 线程有新事件。 | |
1.1.5 nn_fsm_init_root | |
此函数将 self 加入到 fsm 中去。处理函数为 nn_global_handler 和 nn_global_shutdown。 | |
其 ctx 即 context 上下文是 self.ctx。 | |
fsm 是类似于 c++ 的一种用法,将数据和处理的函数绑定在一起,处理函数以状态机实现。 | |
fsm 有 owner 的概念,即上层 fsm。一个事件本层的 fsm 处理完以后,想传给下层的 fsm 只需要直接调用即可。 | |
想传给上层的 fsm 则需要调用 nn_fsm_raise 这个函数,另外,nn_fsm_raiseto 可以将事件传递给没关系的某个 fsm。 | |
init_root 则表明此 fsm 没有上层。 | |
1.1.6 nn_ctx_init | |
ctx 即上下文,每个 ctx 包含一个 pool,每个 pool 中可以包含多个 worker(未完成,当前每个 pool 中一个 worker)。 | |
即每个 ctx 当前有一个 worker 线程。 | |
1.1.7 nn_fsm_start | |
fsm 状态变化,调用 nn_global_handler 处理。 | |
1.2 nn_global_create_socket | |
查找合适的 domain 和 protocol,占用一个 self.sock 的位置。然后调用 | |
1.2.1 nn_sock_init | |
int nn_sock_init (struct nn_sock *self, struct nn_socktype *socktype, int fd) | |
{ | |
1.2.1.1 nn_ctx_init (&self->ctx, nn_global_getpool (), nn_sock_onleave); | |
1.2.1.2 nn_fsm_init_root (&self->fsm, nn_sock_handler, nn_sock_shutdown, &self->ctx); | |
self->state = NN_SOCK_STATE_INIT; | |
本函数中初始化了两个 efd,分别是 send 和 recv,如果不需要,则关闭。 | |
1.2.1.3 rc = socktype->create ((void*) self, &self->sockbase); | |
self->socktype = socktype; | |
1.2.1.4 nn_fsm_start (&self->fsm); | |
} | |
1.2.1.1 nn_ctx_init | |
可以看出每个 sock 有自己独立的 ctx,但是共享一个全局的 pool,即共享一个全局的 worker 线程。 | |
1.2.1.2 nn_fsm_init_root | |
每个 sock 的 fsm 是独立的,没有上层。使用自己的 ctx,每个 sock 是一个相对独立的单位,不同 | |
的 sock 之间肯定是可以多线程处理的。 | |
1.2.1.3 socktype->create == nn_xpub_create | |
此函数将 sock 与本身的 sockbase 处理函数连接起来。 | |
static const struct nn_sockbase_vfptr nn_xpub_sockbase_vfptr = { | |
NULL, nn_xpub_destroy, nn_xpub_add, nn_xpub_rm, | |
nn_xpub_in, nn_xpub_out, nn_xpub_events, nn_xpub_send, | |
NULL, nn_xpub_setopt, nn_xpub_getop, }; | |
每个 sock 有自己的 socktype,负责上层调用,比如 create 和 ispeer,以及一些设定。 | |
而本身的 sockbase 则是为了处理数据的,比如 add 一个链接等。 | |
1.2.1.4 nn_fsm_start | |
此函数将本 sock 的状态从 INIT 改为 ACTIVE,表示本 sock 可以使用了。 | |
到此初始化工作完毕,可以看出 nanomsg 使用了层级式的结构,将概念抽象出来。 | |
添加 transport 和 socktype 十分方便,跟 linux 内核的处理方式颇为相像。 | |
事件的传递也是层级式的,比如一个 sendable 事件,首先将 socket 加入到最底层的 poller 中,当 socket 变 | |
为 sendable 之后,通过 fsm 通知上层,并改变状态到 sendable,上层收到事件以后,改状态,上传,直至用户态, | |
用户调用 send。 | |
send 事件同理,用户 send 数据,状态机层层改变,直至最底层,改变状态,发送数据,再次加入 poller。 | |
目前每个 ctx 一个 pool,每个 pool 一个 worker。以后如果 worker 可调节,则用户态处理逻辑,worker 处理链接。 | |
如果是网络型的业务,worker 间的协调应该和 nginx 类似,轮流 accept。 | |
如果是计算型的业务,用户多个线程之间类似,可以轮流 recv/send。 | |
2. nn_bind | |
nn_bind 是服务器端调用的函数,比如 PUB。 | |
2.1 nn_global_create_ep | |
查找合适的 transport 名字,调用 nn_sock_add_ep | |
2.1.1 nn_sock_add_ep (self.socks [s], tp, bind, addr); | |
int nn_sock_add_ep (struct nn_sock *self, struct nn_transport *transport, | |
int bind, const char *addr) | |
{ | |
2.1.1.1 rc = nn_ep_init (ep, NN_SOCK_SRC_EP, self, self->eid, transport, | |
bind, addr); | |
2.1.1.2 nn_ep_start (ep); | |
此处将 ep 加入到 sock 里面的 eps 中, eid 从 1 开始递增。 | |
} | |
2.1.1.1 nn_ep_init | |
nanomsg 把每一个端点叫做 endpoints,类似于 IP:Port 这样。 | |
类似 socktype, ep 也有 epbase,负责 ep 的销毁。 | |
每个 ep 有自己独立的 fsm,此时,全局 global 有自己的 fsm,每个 sock 有自己的 fsm。 | |
每个 sock 中的每个 ep 有自己的 fsm | |
int nn_ep_init (struct nn_ep *self, int src, struct nn_sock *sock, int eid, | |
struct nn_transport *transport, int bind, const char *addr) | |
{ | |
此处将 fsm 置于 IDLE | |
if (bind) | |
2.1.1.1.1 rc = transport->bind ((void*) self, &self->epbase); | |
else | |
2.1.1.1.2 rc = transport->connect ((void*) self, &self->epbase); | |
} | |
我们这里使用 TCP,则此处的两个函数分别为 nn_btcp_create 和 nn_ctcp_create | |
2.1.1.1.1 nn_btcp_create | |
nn_btcp 表示 bind tcp,其结构里包含 atcp,表示 accept tcp | |
int nn_btcp_create (void *hint, struct nn_epbase **epbase) | |
{ | |
2.1.1.1.1.1 nn_epbase_init (&self->epbase, &nn_btcp_epbase_vfptr, hint); | |
2.1.1.1.1.2 nn_fsm_init_root (&self->fsm, nn_btcp_handler, nn_btcp_shutdown, | |
nn_epbase_getctx (&self->epbase)); | |
2.1.1.1.1.3 nn_usock_init (&self->usock, NN_BTCP_SRC_USOCK, &self->fsm); | |
2.1.1.1.1.4 nn_fsm_start (&self->fsm); | |
} | |
nn_epbase_init | |
将 btcp_epbase 和本 ep 链接起来。 | |
nn_fsm_init_root | |
btcp 的 fsm 没有上层。 | |
nn_usock_init | |
usock 有自己的 fsm,其上层的 fsm 是 btcp 的 fsm | |
usock 是最底层实现功能的组件,这个结构体里包括了 in 和 out 两个结构体,表示要发出的数据和接收缓存的数据。 | |
包含了一堆的 task,和 worker thread 通信,将任务打包成 nn_worker_task 结构体。 | |
包含了一堆的 event,是本身发起的一系列事件。 | |
nn_fsm_start | |
btcp 的 fsm start 里执行了两个函数, nn_btcp_start_listening 和 nn_btcp_start_accepting。 | |
然后将状态改为 ACTIVE。表示 btcp 已经可以用了。 | |
nn_btcp_start_listening 里调用了 | |
nn_usock_start, 执行 OS 层的 socket,状态 STARTING | |
nn_usock_bind, 执行 OS 层的 bind | |
nn_usock_listen,执行 OS 层的 listen,状态 LISTENING | |
nn_btcp_start_accepting 里调用了 | |
nn_atcp_init | |
nn_atcp_start | |
进入 atcp 的逻辑 | |
tmp.1 nn_atcp_init | |
btcp.fsm 是 atcp.fsm 的上层 | |
atcp 有自己的 usock,即 atcp 的 usock 为 fd = accept(ls) 中的 fd,而 btcp 的 usock 为其中的 ls。 | |
atcp 又初始化了 stcp,stcp.fsm 的上层为 atcp.fsm。 | |
stcp 没有自己对应的 usock,包含了一个 pipebase,和前类似,pipe 表示一个链接。 | |
stcp 表示 stream tcp,其数据通信前包含一个 streamhdr。 | |
streamhdr 的 fsm 属于 stcp,fsm 真他妈多啊。 | |
pipebase 有自己的 fsm,但他的 fsm 上层是对应的 sock。 | |
tmp.2 nn_atcp_start | |
swap_owner 之后,atcp 拥有了 btcp 的 usock 的 ownership。 | |
同时还拥有自己的 usock。 | |
自己的 usock 的状态变为 BEING_ACCEPTED | |
listener 的 usock 状态为 ACCEPT | |
调用 nn_usock_accept,完成之后,状态从 IDLE 到 ACCEPTING | |
此时进行 OS 的 accept,如果成功,则 listener 和 atcp 同时到 DONE 状态。 | |
如果失败,则将两者包装成 nn_worker_task,加进 worker 队列,并 signal efd | |
worker 通过 nn_fsm_feed (task->owner, task->src, NN_WORKER_TASK_EXECUTE, task); | |
又会调用 atcp 的 accept 进行 accept,死循环。 | |
2.1.1.2 nn_ep_start | |
修改本 ep 状态为 ACTIVE | |
nn_bind 将下层的 TCP 链接准备好至 listen 状态,并加入到 worker 的 poller 中去。 | |
可以看到 sock 之下是 ep,ep 之下是 transport,transport 下面是 usock。因为 TCP 复杂,分了 | |
三个 transport:atcp, btcp, stcp。 | |
同时可以看到众多层 fsm 的好处,那就是,将任何一层单元包装成一个含有 fsm 的结构,在 thread 之间 | |
传递,任何线程都可以操作,而且其状态总是一致的,真正做到了异步。 | |
3. nn_connect | |
略 | |
4. nn_send | |
4.1 nn_sendmsg | |
4.1.1 nn_sock_send | |
4.1.1.1 self->sockbase->vfptr->send (self->sockbase, msg); | |
此处的 send 为 nn_xpub_send, nn_xpub_send 又调用 nn_dist_send 将数据最终通过合适的 pipe 发送出去。 | |
如果 self->sockbase->vfptr->send 失败,则没有合适的 send pipe。 | |
此时进入 nn_sock->sndfd 等待,返回之后继续 send | |
可见,nanomsg 整个逻辑是, user 线程只处理同步的操作,异步的操作交给 worker thread,中间通过 efd 来同 | |
步,通过 task 传递事件数据。 | |
5. nn_recv | |
略 | |
6. nn_close | |
略 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment