Hadoop RPC客户端(Client)向RPC建立连接时向RPC服务器发送两部分内容:RPC Header和Header。RPC Header的格式如下:
private void writeRpcHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(outStream)); // Write out the header, version and authentication method out.write(Server.HEADER.array()); out.write(Server.CURRENT_VERSION); authMethod.write(out); out.flush(); }
其中Server.HEADER为ByteBuffer.wrap("hrpc".getBytes()),SERVER.CURRENT_VERSION为 4,最后为authMethod。
RPC Header发送完成后,Client会发送header部分,其具体内容如下:
private void writeHeader() throws IOException { // Write out the ConnectionHeader DataOutputBuffer buf = new DataOutputBuffer(); header.write(buf); // Write out the payload length int bufLen = buf.getLength(); out.writeInt(bufLen); out.write(buf.getData(), 0, bufLen); }
Header的主要包含三部分的内容:
class ConnectionHeader implements Writable { private String protocol; private UserGroupInformation ugi = null; private AuthMethod authMethod; ...... }
RPC Header与Header在连接建立的时候发送并且只发送一次,在此之后,Client会发送每一次方法调用相关的信息:
Call call = new Call(param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter
下面看一下Call的具体内容:
private class Call { int id; // call id Writable param; // parameter ...... }
其中id是本次方法调用的唯一标识,param是Invocation的一个实例,Invocation包含了三部分内容:要调用方法的名字、方法调用参数类型数组、参数数组。
private static class Invocation implements Writable, Configurable { private String methodName; private Class[] parameterClasses; private Object[] parameters; ...... }
Call发送的具体代码如下:
d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); // first put the data length out.write(data, 0, dataLength);// write the data out.flush();
综上所述,RPC协议的具体格式如下:
RPC Header |
Header |
Call |
Call |
..... |
RPC Header:
‘hrpc’ |
4 |
authMethod |
Header:
Header长度(Int) |
ConnectionHeader |
Call
Call长度(INT) |
Call id(int) |
方法名 |
参数类型数组 |
参数数组 |
下面我们看一下RPC服务器端是如何采用上面的协议进行交互的。按照设想,RPC服务器接受客户端的连接请求后,服务器首先读取RPC Header,再读取Header,最后不断的读取方法调用(Call)。服务器端的Socket读取都是通过Reader内部类最终由Server内部类Connection类来读取。为了顺利读取Server.Connection设置了几个内部变量,如下图所示。这里要特别说明一下dataLengthBuffer这个变量,因为这个变量的名字存在歧义。在第一次读取的时候(读取RPC Header的时候),dataLengthBuffer保存的是‘hrpc’的byte数组;在其他时候(读取Header的时候或者Call的时候)保存的是长度(int)。除此之外,还需要注意的是RPC采用的异步通信的模式,服务器采用如下的方式判断一次读取已经完成——如果读取的结果为-1或者buffer没有读满,则表明本次读取已经完成但是数据还没有完全读取完成,需要继续等待下一次读取。
boolean rpcHeaderRead = false; //判断RPCHeader是否已经读取完成 boolean headerRead = false; //判断Header是否已经读取完成 ByteBuffer data;//Header的数据或者每一次Call的数据 ByteBuffer dataLengthBuffer;//长度,Header的长度、Call的长度或者
原文:http://blog.csdn.net/liuwei271551048/article/details/23920357