Java 动态代理一个简单的demo:(用以对比Hadoop中的动态代理)
public interface Hello { void sayHello(String to); void print(String p); }
public class HelloImpl implements Hello { public void sayHello(String to) { System.out.println("Say hello to " + to); } public void print(String s) { System.out.println("print : " + s); } }
public class LogHandler implements InvocationHandler { private Object dele; public LogHandler(Object obj) { this.dele = obj; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { doBefore(); //在这里完全可以把下面这句注释掉,而做一些其它的事情 Object result = method.invoke(dele, args); after(); return result; } private void doBefore() { System.out.println("before...."); } private void after() { System.out.println("after...."); } }
public class ProxyTest { public static void main(String[] args) { HelloImpl impl = new HelloImpl(); LogHandler handler = new LogHandler(impl); //这里把handler与impl新生成的代理类相关联 Hello hello = (Hello) Proxy.newProxyInstance(impl.getClass().getClassLoader(), impl.getClass().getInterfaces(), handler); //这里无论访问哪个方法,都是会把请求转发到handler.invoke hello.print("All the test"); hello.sayHello("Denny"); } }
public interface IPCQueryStatus extends VersionedProtocol { IPCFileStatus getFileStatus(String filename); }
IPCQueryStatus query = (IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr, new Configuration()); IPCFileStatus status = query.getFileStatus("\tmp\testIPC");
public static VersionedProtocol getProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { ...... VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); ...... return proxy; ...... }
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId;// 用来标示一个connection,用以复用 private Client client;//最重要的成员变量,RPC客户端 private boolean isClosed = false; public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory);//★ } ...... public Object invoke(Object proxy, Method method, Object[] args) ...... ObjectWritable value = (ObjectWritable) Invocation(method, args), remoteId); ...... return value.get(); } }
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter ... synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { ... } } ... if (call.error != null) { ... throw call.error; ... } else { return call.value; } } }
private class Connection extends Thread { ...... public void run() { ...... while (waitForWork()) {//wait here for work - read or close connection receiveResponse(); } ...... } ...... private void receiveResponse() { ...... touch(); try { int id = in.readInt(); // try to read an id ...... Call call = calls.get(id); int state = in.readInt(); // read call status if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); calls.remove(id); } else if (state == Status.FATAL.state) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } catch (IOException e) { markClosed(e); } } }
protected synchronized void callComplete() { this.done = true; notify(); // notify caller } public synchronized void setException(IOException error) { this.error = error; callComplete(); } public synchronized void setValue(Writable value) { this.value = value; callComplete(); }
void Listener.doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); ... count = c.readAndProcess(); ... } public int Connection.readAndProcess() throws IOException, InterruptedException { ...... processOneRpc(data.array()); ...... } private void Connection.processOneRpc(byte[] buf) throws IOException, InterruptedException { if (headerRead) { processData(buf); } else { processHeader(buf); ...... } } private void Connection.processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id ...... Writable param = ReflectionUtils.newInstance(paramClass, conf);//★??paramClass在哪儿设置的★ param.readFields(dis); Call call = new Call(id, param, this); callQueue.put(call); // queue the call; maybe blocked here }