Hadoop版本Hadoop2.6
RPC主要分为3个部分:(1)交互协议(2)客户端(3)服务端
(2)客户端
先展示RPC客户端实例代码
public class LoginController { public static void main(String[] args) throws IOException { //获取RPC LoginServiceInterface协议接口的代理对象 LoginServiceInterface proxy= RPC.getProxy(LoginServiceInterface.class,1L,new InetSocketAddress("localhost",10000),new Configuration()); String msg=proxy.login("xiaoming","123123"); System.out.println(msg); } }
(1)进入上述的RPC.getProxy方法,会发现是通过获取RpcEngine接口(默认实现是WritableRpcEngine),利用WritableRpcEngine的getProxy方法获取Proxy代理,如下所示
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { if (connectionRetryPolicy != null) { throw new UnsupportedOperationException( "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); } T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth)); return new ProtocolProxy<T>(protocol, proxy, true); }
(2)上述就是客户端获取代理的过程,但是其中是如何从服务端获取通过动态代理类Invoker实现,并将代理封装成ProtocolProxy类,在本文上述的例子中,该ProtocolProxy类没有干什么,只是通过getProxy()方法将封装的代理返回给客户端
那么我们接着分析动态代理类Invoker
Invoker成员有Clinet类,并且全局变量ClientCache对Client进行缓存。
动态代理类Invoker在代理对象发送请求时会自动执行invoke()方法,如下所示:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = Time.now(); } TraceScope traceScope = null; if (Trace.isTracing()) { traceScope = Trace.startSpan( method.getDeclaringClass().getCanonicalName() + "." + method.getName()); } ObjectWritable value; try { value = (ObjectWritable) client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId, fallbackToSimpleAuth); } finally { if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); }
3、上述中动态代理通过client.call方法向服务器发送请求获取返回值。
我们还看到Invocation类封装了方法和参数,Invocation通过实现Writable实现序列化,方便数据在网络中传输,作为数据传输层,相当于VO。
因此我们接着进入Clinet类,查看call方法干了什么。
首先我们先看看Client类的结构,Client类包含了几个内部类:
Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection :用以处理远程连接对象。继承了Thread
ConnectionId :唯一确定一个连接
Client类中call()方法如下所示:
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { final Call call = createCall(rpcKind, rpcRequest);//将传入的数据封装成call对象 Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth);//获得一个连接 try { connection.sendRpcRequest(call); // send the rpc request向服务端发送call对象 } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); } boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { return call.getRpcResponse(); } } }
4、从上述可以看到,rpcRequest是将方法和参数封装后的可序列号的对象,当做请求参数发送给服务端。
在上述方法中主要使用了两个类Call和Connection.
Call:封装了与服务端请求的状态,包括:
final int id; // call id该请求连接ID final int retry; // retry count该请求重试次数 final Writable rpcRequest; // the serialized rpc request该请求参数 Writable rpcResponse; // null if rpc has error该请求的返回值 IOException error; // exception, null if success该请求成功标示 final RPC.RpcKind rpcKind; // Rpc EngineKind使用RpcEngine的类型 boolean done; // true when call is done该请求完成标示
Connection则是实现了与服务端建立连接,发送请求,获取数据等功能。
5、Connection类解析
Connection类继承线程类Thread.
从3步可以看到在Clinet的call()方法通过getConnection()方法获取Connection,如下所示:
可以看出Client使用connections对客户端每一个connection进行缓存,
并通过setupIOstreams()方法与服务器建立Socket连接,并创建输入输出流connection.in,connection.out,
并通过start()方法启动该线程也就是运行Connection类的run()方法,等待服务端传回数据。
因此Connection类主要通过run()方法接受数据,通过sendRpcRequest()向服务端发送请求。
private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId, serviceClass); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); //we don‘t invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. connection.setupIOstreams(fallbackToSimpleAuth); return connection; }
5.1 Connection 的sendRpcRequest()向服务端发送请求
public void sendRpcRequest(final Call call) throws InterruptedException, IOException { if (shouldCloseConnection.get()) { return; } // Serialize the call to be sent. This is done from the actual // caller thread, rather than the sendParamsExecutor thread, // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items ‘1‘ and ‘2‘ are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId); header.writeDelimitedTo(d); call.rpcRequest.write(d); synchronized (sendRpcRequestLock) { Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); byte[] data = d.getData(); int totalLength = d.getLength(); out.writeInt(totalLength); // Total Length out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } }); try { senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } }
5.2 Connection 的run()获取服务端返回的数据
可以看到通过receiveRpcResponse()方法通过之前建立的输入流in获取服务器传来的数据,并将数据value传给call数据对象call.setRpcResponse(value);,
在call.setRpcResponse(value)方法中通过callComplete()将call数据对象设置成已完成,并通过notify()唤醒该call对象。
在Client的call()方法中,检测到call对象已完成后,就将call对象中的响应数据返回给调用者。
至此,一个完整的RPC远程过程调用的过程就完成了。
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { while (waitForWork()) {//wait here for work - read or close connection循环等待获取服务端数据 receiveRpcResponse();//获取服务端数据的具体实现 } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don‘t leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header); int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); Call call = calls.get(callId); RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value calls.remove(callId); call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc Request failed // Verify that length was correct if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection markClosed(re); } } } catch (IOException e) { markClosed(e); } }
原文:http://www.cnblogs.com/arbitrary/p/5628737.html