一般讨论socket的并发安全性,都是指线程的安全性。。。而且绝大多数的情况下socket都不是线程安全的。。
当然一些框架可能会对socket进行一层封装,让其成为线程安全的。。。例如java的netty框架就是如此,将socket封装成channel,然后让channel封闭到一个线程中,那么这个channel的所有的读写都在它所在的线程中串行的进行,那么自然也就是线程安全的了。。。。。
其实很早看Gevent的源码的时候,就已经看过这部分的东西了,当时就已经知道gevent的socket不是协程安全的,也就是说gevnet的socket不能在不同的协程中同时读取或者写。。。。
例如我们不能同时在两个协程中调用socket.recv方法。。。。
不过好像自己现在已经忘了,那就再看看,顺便写篇博客记录下来,以防以后又忘记了,还找不到资料
那么为什么呢。。?我们来分析一下源代码吧,这里就拿send方法来分析,先来看看gevent的send方法的定义:
#这里发送数据不会保证发送的数据都发送完,而是能发送多少发送多少
#然后返回发送的数据大小
def send(self, data, flags=0, timeout=timeout_default):
sock = self._sock
if timeout is timeout_default:
timeout = self.timeout
try:
return sock.send(data, flags)
except error:
#EWOULDBLOCK 当前操作可能会阻塞 ,对于非阻塞的socket,也就是说明缓冲区已经写满了
#那么这里要做的事情就是等待当前的write_event事件,然后再写数据
ex = sys.exc_info()[1]
if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
raise
sys.exc_clear()
self._wait(self._write_event)
try:
return sock.send(data, flags)
except error:
ex2 = sys.exc_info()[1]
if ex2.args[0] == EWOULDBLOCK:
return 0
raise也就是说,如果当前没办反发送,那么就会调用_wait方法来等待_write_event事件,
#等待某一个watcher,可以是read或者write事件,这里可以带有超时
def _wait(self, watcher, timeout_exc=timeout('timed out')):
"""Block the current greenlet until *watcher* has pending events.
If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
By default *timeout_exc* is ``socket.timeout('timed out')``.
If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
"""
assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
if self.timeout is not None: #在等待之前,先挂起timeout,如果超市了,将会执行timeout
#如果超时先结束,那么会返回到当前协程抛出异常
timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
else:
timeout = None
try:
self.hub.wait(watcher) #在hub上面等待这个watcher,在这个里面会切换到hub的运行,然后等到watcher有反应了再切换回来
finally:
if timeout is not None:
timeout.cancel()其实这里很简单,就是在hub上面等待当前socket的写事件的watcher
#当在调用gevent.sleep的时候如果传入了大于零的时间,将会用这里来处理
#watcher是一个在loop上面注册的事件,可能是读,写或者定时
#用于在loop上面注册watcher,然后将当前协程切换出去
def wait(self, watcher):
waiter = Waiter() #首先创建一个waiter对象
unique = object()
watcher.start(waiter.switch, unique) #当watcher超时的时候将会调用waiter的switch方法这样就可以切换回来当前的协程
try:
result = waiter.get() #调用waiter的get方法,主要是让将当前调用sleep的greenlet切换出去,然后切换到hub的运行
assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)
finally:
watcher.stop()
这个应该很简单吧。。创建一个waiter事件,然后调用watcher对象的start方法,回调方法就设置成当前waiter对象的switch方法,然后调用get方法,将当前的协程切换出去。。。。
那么来来IO类型的watcher的start方法吧:
#I/Owatcher的定义
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
WATCHER_BASE(io) #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等
#这个其实其实就是启动watcher
#这里的callback一般情况下都是waiter对象的switch方法,这样,当有IO事件之后就可以回到之前的协程了
def start(self, object callback, *args, pass_events=False):
CHECK_LOOP2(self.loop)
if callback is None:
raise TypeError('callback must be callable, not None')
self.callback = callback
if pass_events:
self.args = (GEVENT_CORE_EVENTS, ) + args
else:
self.args = args
LIBEV_UNREF
libev.ev_io_start(self.loop._ptr, &self._watcher) #在libev的loop上面启动这个io watcher
PYTHON_INCREF
ACTIVE
嗯,这个是cython的代码,所以稍微别扭一些。。。,那么接下来再来看看libev的ev_io_start函数的实现吧:
void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
int fd = w->fd;
if (expect_false (ev_is_active (w)))
return;
assert (("libev: ev_io_start called with negative fd", fd >= 0));
assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));
EV_FREQUENT_CHECK;
ev_start (EV_A_ (W)w, 1); //将监视器设置为active
//判断当前的anfds的大小是否足够放入新的fd,如果不够的话,那么需要重新分配
array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
//将这个watcher放到当前fd的wather队列的头部
wlist_add (&anfds[fd].head, (WL)w);
/* common bug, apparently */
assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));
fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); //将该fd放到需要改变的数组,在合适的时候将会在loop上修改
w->events &= ~EV__IOFDSET;
EV_FREQUENT_CHECK;
}这里主要就是激活当前的watcher对象,然后将这个watcher对象放到当前文件描述符的watcher链表的头部。。。嗯。。也就是wlist_add方法要做的事情。。。其实看到这里就知道socket肯定不是协程安全的了。。。
嗯,相信大家也一定懂了。。。。不懂的话。。就再看看代码就知道了。。。。。
原文:http://blog.csdn.net/fjslovejhl/article/details/45956339