这篇开始讲XxlRpcInvoker(客户端),然后也像Provider那样去看看使用情况,然后我们幻想的应该是这样的:
XxlRpcInvokerFactory invoker = new XxlRpcInvokerFactory(); invoker.setXX(X x); ... invoker.start();
然而,实际上会看到这么个东西:
// init client XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); referenceBean.setClient(NettyClient.class); referenceBean.setSerializer(HessianSerializer.class); referenceBean.setCallType(CallType.SYNC); referenceBean.setLoadBalance(LoadBalance.ROUND); referenceBean.setIface(DemoService.class); referenceBean.setVersion(null); referenceBean.setTimeout(500); referenceBean.setAddress("127.0.0.1:7080"); referenceBean.setAccessToken(null); referenceBean.setInvokeCallback(null); referenceBean.setInvokerFactory(null); DemoService demoService = (DemoService) referenceBean.getObject(); // test UserDTO userDTO = demoService.sayHi("[SYNC]jack"); System.out.println(userDTO);
嗯,这样使用起来多少有点迷茫。。虽然看到这些参数也比较亲切:设置NettyClient通信、序列化方案为Hessian、需要访问的接口名DemoService(version主要用来区分统一服务的不同版本),因为是测试,这里直接指定的服务提供者的地址是本地的7080端口,其实也可以通过指定InvokerFactory来设置注册中心的信息,从而从注册中心获取指定服务的地址。没错,这里的InvokerFactory有两个功能,设置注册中心和进行异步结果的返回。
DemoService demoService = (DemoService) referenceBean.getObject();
注意到这段代码,返回了一个DemoService对象,其实就是我们远程调用的结果。这里使用的Java的动态代理来实现:
public Object getObject() throws Exception { // initClient initClient(); // newProxyInstance return Proxy.newProxyInstance(Thread.currentThread() .getContextClassLoader(), new Class[] { iface }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // method param String className = method.getDeclaringClass().getName(); // iface.getName() String varsion_ = version; String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); Object[] parameters = args; // filter for generic if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) { Class<?>[] paramTypes = null; if (args[3]!=null) { String[] paramTypes_str = (String[]) args[3]; if (paramTypes_str.length > 0) { paramTypes = new Class[paramTypes_str.length]; for (int i = 0; i < paramTypes_str.length; i++) { paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]); } } } className = (String) args[0]; varsion_ = (String) args[1]; methodName = (String) args[2]; parameterTypes = paramTypes; parameters = (Object[]) args[4]; } // filter method like "Object.toString()" if (className.equals(Object.class.getName())) { logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName); throw new XxlRpcException("xxl-rpc proxy class-method not support"); } // address String finalAddress = address; if (finalAddress==null || finalAddress.trim().length()==0) { if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) { // discovery String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_); TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey); // load balance if (addressSet==null || addressSet.size()==0) { // pass } else if (addressSet.size()==1) { finalAddress = addressSet.first(); } else { finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet); } } } if (finalAddress==null || finalAddress.trim().length()==0) { throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty"); } // request XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); xxlRpcRequest.setRequestId(UUID.randomUUID().toString()); xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis()); xxlRpcRequest.setAccessToken(accessToken); xxlRpcRequest.setClassName(className); xxlRpcRequest.setMethodName(methodName); xxlRpcRequest.setParameterTypes(parameterTypes); xxlRpcRequest.setParameters(parameters); xxlRpcRequest.setVersion(version); // send if (CallType.SYNC == callType) { // future-response set XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null); try { // do invoke clientInstance.asyncSend(finalAddress, xxlRpcRequest); // future get XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS); if (xxlRpcResponse.getErrorMsg() != null) { throw new XxlRpcException(xxlRpcResponse.getErrorMsg()); } return xxlRpcResponse.getResult(); } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); throw (e instanceof XxlRpcException)?e:new XxlRpcException(e); } finally{ // future-response remove futureResponse.removeInvokerFuture(); } } else if (CallType.FUTURE == callType) { // future-response set XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null); try { // invoke future set XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse); XxlRpcInvokeFuture.setFuture(invokeFuture); // do invoke clientInstance.asyncSend(finalAddress, xxlRpcRequest); return null; } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); // future-response remove futureResponse.removeInvokerFuture(); throw (e instanceof XxlRpcException)?e:new XxlRpcException(e); } } else if (CallType.CALLBACK == callType) { // get callback XxlRpcInvokeCallback finalInvokeCallback = invokeCallback; XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback(); if (threadInvokeCallback != null) { finalInvokeCallback = threadInvokeCallback; } if (finalInvokeCallback == null) { throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null."); } // future-response set XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback); try { clientInstance.asyncSend(finalAddress, xxlRpcRequest); } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); // future-response remove futureResponse.removeInvokerFuture(); throw (e instanceof XxlRpcException)?e:new XxlRpcException(e); } return null; } else if (CallType.ONEWAY == callType) { clientInstance.asyncSend(finalAddress, xxlRpcRequest); return null; } else { throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid"); } } }); }
看到这么长一段代码头还是头还是很大的。但是其实简单来讲可以分为这几个流程:
因此,最终客户端获得的接口对象实际是Proxy.newInstance产生的代理对象,通过本地执行接口的对应方法取得回传的执行结果。还是蛮神奇的!
其实获取调用结果的过程,还是很能体现异步这个思想的,包括多线程,很多东西看起来非常吸引人,以这些项目作为入门级源码月度还是非常好的。有时间再加上异步返回的笔记好了。
原文:https://www.cnblogs.com/hblearning/p/xxl_rpc_invoker.html