本来周末是最好的学习时间,不过这周末收房子,可想而知事情自然也不会少。这段时间的周末,可能很少有时间学习了。见缝插针吧。
不说废话了,好好学习。上回通过代码理解了Netty底层信息的流的传递机制,不过只是一个感性上的认识。教会你应该如何使用和使用的时候应该注意的方面。但是有一些细节的问题,并没有提及。比如在上回(《Java NIO框架Netty教程(四)- ChannelBuffer》http://www.it165.net/pro/html/201207/3198.html)的代码里,我们通过:
1.
private
void
sendMessageByFrame(ChannelStateEvent e) {
2.
String msgOne =
"Hello, "
;
3.
String msgTwo =
"I‘m "
;
4.
String msgThree =
"client."
;
5.
e.getChannel().write(tranStr2Buffer(msgOne));
6.
e.getChannel().write(tranStr2Buffer(msgTwo));
7.
e.getChannel().write(tranStr2Buffer(msgThree));
8.
}
这样的方式,连续返送三次消息。但是如果你在服务端进行接收计数的话,你会发现,大部分时候都是接收到两次的事件请求。不过消息都是完整的。网上也有人提到过,进行10000次的连续放松,往往接受到的消息个数是999X的,总是就是消息数目上不匹配,这又是为何呢?笔者也只能通过阅读Netty的源码来找原因,我们一起来慢慢分析吧www.it165.net。
起点自然是选择在e.getChannel().writer()方法上。一路跟踪首先来到了:AbstractNioWorker.java类
001.
protected
void
write0(AbstractNioChannel<?> channel) {
002.
boolean
open =
true
;
003.
boolean
addOpWrite =
false
;
004.
boolean
removeOpWrite =
false
;
005.
boolean
iothread = isIoThread(channel);
006.
007.
long
writtenBytes =
0
;
008.
009.
final
SocketSendBufferPool sendBufferPool =
this
.sendBufferPool;
010.
final
WritableByteChannel ch = channel.channel;
011.
final
Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
012.
final
int
writeSpinCount = channel.getConfig().getWriteSpinCount();
013.
synchronized
(channel.writeLock) {
014.
channel.inWriteNowLoop =
true
;
015.
for
(;;) {
016.
MessageEvent evt = channel.currentWriteEvent;
017.
SendBuffer buf;
018.
if
(evt ==
null
) {
019.
if
((channel.currentWriteEvent = evt = writeBuffer.poll()) ==
null
) {
020.
removeOpWrite =
true
;
021.
channel.writeSuspended =
false
;
022.
break
;
023.
}
024.
025.
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
026.
}
else
{
027.
buf = channel.currentWriteBuffer;
028.
}
029.
030.
ChannelFuture future = evt.getFuture();
031.
try
{
032.
long
localWrittenBytes =
0
;
033.
for
(
int
i = writeSpinCount; i >
0
; i --) {
034.
localWrittenBytes = buf.transferTo(ch);
035.
if
(localWrittenBytes !=
0
) {
036.
writtenBytes += localWrittenBytes;
037.
break
;
038.
}
039.
if
(buf.finished()) {
040.
break
;
041.
}
042.
}
043.
044.
if
(buf.finished()) {
045.
// Successful write - proceed to the next message.
046.
buf.release();
047.
channel.currentWriteEvent =
null
;
048.
channel.currentWriteBuffer =
null
;
049.
evt =
null
;
050.
buf =
null
;
051.
future.setSuccess();
052.
}
else
{
053.
// Not written fully - perhaps the kernel buffer is full.
054.
addOpWrite =
true
;
055.
channel.writeSuspended =
true
;
056.
057.
if
(localWrittenBytes >
0
) {
058.
// Notify progress listeners if necessary.
059.
future.setProgress(
060.
localWrittenBytes,
061.
buf.writtenBytes(), buf.totalBytes());
062.
}
063.
break
;
064.
}
065.
}
catch
(AsynchronousCloseException e) {
066.
// Doesn‘t need a user attention - ignore.
067.
}
catch
(Throwable t) {
068.
if
(buf !=
null
) {
069.
buf.release();
070.
}
071.
channel.currentWriteEvent =
null
;
072.
channel.currentWriteBuffer =
null
;
073.
buf =
null
;
074.
evt =
null
;
075.
future.setFailure(t);
076.
if
(iothread) {
077.
fireExceptionCaught(channel, t);
078.
}
else
{
079.
fireExceptionCaughtLater(channel, t);
080.
}
081.
if
(t
instanceof
IOException) {
082.
open =
false
;
083.
close(channel, succeededFuture(channel));
084.
}
085.
}
086.
}
087.
channel.inWriteNowLoop =
false
;
088.
089.
// Initially, the following block was executed after releasing
090.
// the writeLock, but there was a race condition, and it has to be
091.
// executed before releasing the writeLock:
092.
//
094.
//
095.
if
(open) {
096.
if
(addOpWrite) {
097.
setOpWrite(channel);
098.
}
else
if
(removeOpWrite) {
099.
clearOpWrite(channel);
100.
}
101.
}
102.
}
103.
if
(iothread) {
104.
fireWriteComplete(channel, writtenBytes);
105.
}
else
{
106.
fireWriteCompleteLater(channel, writtenBytes);
107.
}
108.
}
这里, buf.transferTo(ch);的就是调用底层WritableByteChannel的write方法,把buffer写到管道里,传递过去。通过Debug可以看到,没调用一次这个方法,服务端的messageReceived方法就会进入断点一次。当然这个也只是表相,或者说也是在预料之内的。因为笔者从开始就怀疑是连续写入过快导致的问题,所以测试过每次write后停顿1秒。再write下一次。结果一切正常。
那么我们跟到这里的意义何在呢?笔者的思路是先证明不是在write端出现的写覆盖的问题,这样就可以从read端寻找问题。这里笔者也在这里加入了一个计数,测试究竟transferTo了几次。结果确实是3次。
1.
for
(
int
i = writeSpinCount; i >
0
; i --) {
2.
localWrittenBytes = buf.transferTo(ch);
3.
System.out.println(++count);
接下来就从接收端找找原因,在NioWorker的read方法,实现如下:
01.
@Override
02.
protected
boolean
read(SelectionKey k) {
03.
final
SocketChannel ch = (SocketChannel) k.channel();
04.
final
NioSocketChannel channel = (NioSocketChannel) k.attachment();
05.
06.
final
ReceiveBufferSizePredictor predictor =
07.
channel.getConfig().getReceiveBufferSizePredictor();
08.
final
int
predictedRecvBufSize = predictor.nextReceiveBufferSize();
09.
10.
int
ret =
0
;
11.
int
readBytes =
0
;
12.
boolean
failure =
true
;
13.
14.
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
15.
try
{
16.
while
((ret = ch.read(bb)) >
0
) {
17.
readBytes += ret;
18.
if
(!bb.hasRemaining()) {
19.
break
;
20.
}
21.
}
22.
failure =
false
;
23.
}
catch
(ClosedChannelException e) {
24.
// Can happen, and does not need a user attention.
25.
}
catch
(Throwable t) {
26.
fireExceptionCaught(channel, t);
27.
}
28.
29.
if
(readBytes >
0
) {
30.
bb.flip();
31.
32.
final
ChannelBufferFactory bufferFactory =
33.
channel.getConfig().getBufferFactory();
34.
final
ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
35.
buffer.setBytes(
0
, bb);
36.
buffer.writerIndex(readBytes);
37.
38.
recvBufferPool.release(bb);
39.
40.
// Update the predictor.
41.
predictor.previousReceiveBufferSize(readBytes);
42.
43.
// Fire the event.
44.
fireMessageReceived(channel, buffer);
45.
}
else
{
46.
recvBufferPool.release(bb);
47.
}
48.
49.
if
(ret <
0
|| failure) {
50.
k.cancel();
// Some JDK implementations run into an infinite loop without this.
51.
close(channel, succeededFuture(channel));
52.
return
false
;
53.
}
54.
55.
return
true
;
56.
}
在这个方法的外层是一个循环,不停的遍历,如果有SelectionKey k存在,则进入此方法读取buffer中的数据。这个SelectionKey 区分只是一种类型,这个设计到Java NIO中的Seletor机制,这个笔者准备下讲穿插一下。属于Netty底层的一个重要的机制。
messageReceived事件的触发,是在读取完当前缓冲池中所有的信息之后在触发的。这倒是可以解释,为什么即使我们收到事件的次数少,但是消息是完整的。
从目前来看,Netty通过Java 的NIO机制传递数据,数据读写跟事件没有严格的绑定机制。数据是以流的形式独立存在,读写都有一个缓冲池。
不过,这些还远未解决笔者的疑惑。笔者决定先了解一下Seletor机制,再回头来探索这个问题。
待解决……
Java NIO框架Netty教程(五) 消息收发次数不匹配的问题
原文:http://www.cnblogs.com/hashcoder/p/7648407.html