这篇开始讲XxlRpcInvoker(客户端),然后也像Provider那样去看看使用情况,然后我们幻想的应该是这样的:
XxlRpcInvokerFactory invoker = new XxlRpcInvokerFactory(); invoker.setXX(X x); ... invoker.start();
然而,实际上会看到这么个东西:
// init client
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(DemoService.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(500);
referenceBean.setAddress("127.0.0.1:7080");
referenceBean.setAccessToken(null);
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
DemoService demoService = (DemoService) referenceBean.getObject();
// test
UserDTO userDTO = demoService.sayHi("[SYNC]jack");
System.out.println(userDTO);
嗯,这样使用起来多少有点迷茫。。虽然看到这些参数也比较亲切:设置NettyClient通信、序列化方案为Hessian、需要访问的接口名DemoService(version主要用来区分统一服务的不同版本),因为是测试,这里直接指定的服务提供者的地址是本地的7080端口,其实也可以通过指定InvokerFactory来设置注册中心的信息,从而从注册中心获取指定服务的地址。没错,这里的InvokerFactory有两个功能,设置注册中心和进行异步结果的返回。
DemoService demoService = (DemoService) referenceBean.getObject();
注意到这段代码,返回了一个DemoService对象,其实就是我们远程调用的结果。这里使用的Java的动态代理来实现:
public Object getObject() throws Exception {
// initClient
initClient();
// newProxyInstance
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// method param
String className = method.getDeclaringClass().getName(); // iface.getName()
String varsion_ = version;
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = args;
// filter for generic
if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
Class<?>[] paramTypes = null;
if (args[3]!=null) {
String[] paramTypes_str = (String[]) args[3];
if (paramTypes_str.length > 0) {
paramTypes = new Class[paramTypes_str.length];
for (int i = 0; i < paramTypes_str.length; i++) {
paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
}
}
}
className = (String) args[0];
varsion_ = (String) args[1];
methodName = (String) args[2];
parameterTypes = paramTypes;
parameters = (Object[]) args[4];
}
// filter method like "Object.toString()"
if (className.equals(Object.class.getName())) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String finalAddress = address;
if (finalAddress==null || finalAddress.trim().length()==0) {
if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {
// discovery
String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
// load balance
if (addressSet==null || addressSet.size()==0) {
// pass
} else if (addressSet.size()==1) {
finalAddress = addressSet.first();
} else {
finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
}
}
}
if (finalAddress==null || finalAddress.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(methodName);
xxlRpcRequest.setParameterTypes(parameterTypes);
xxlRpcRequest.setParameters(parameters);
xxlRpcRequest.setVersion(version);
// send
if (CallType.SYNC == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
} finally{
// future-response remove
futureResponse.removeInvokerFuture();
}
} else if (CallType.FUTURE == callType) {
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
return null;
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
} else if (CallType.CALLBACK == callType) {
// get callback
XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}
// future-response set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
try {
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
// future-response remove
futureResponse.removeInvokerFuture();
throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
}
return null;
} else if (CallType.ONEWAY == callType) {
clientInstance.asyncSend(finalAddress, xxlRpcRequest);
return null;
} else {
throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
}
}
});
}
看到这么长一段代码头还是头还是很大的。但是其实简单来讲可以分为这几个流程:
因此,最终客户端获得的接口对象实际是Proxy.newInstance产生的代理对象,通过本地执行接口的对应方法取得回传的执行结果。还是蛮神奇的!
其实获取调用结果的过程,还是很能体现异步这个思想的,包括多线程,很多东西看起来非常吸引人,以这些项目作为入门级源码月度还是非常好的。有时间再加上异步返回的笔记好了。
原文:https://www.cnblogs.com/hblearning/p/xxl_rpc_invoker.html