本文较长,如果想直接看代码可以查看项目源码地址: https://github.com/hetutu5238/rpc-demo.git
项目基于maven ,项目创建过程略,效果如下即可
rpc-parent的pom.xml如下即可
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns="http://maven.apache.org/POM/4.0.0" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.maglith</groupId> 8 <artifactId>rpc-parent</artifactId> 9 <packaging>pom</packaging> 10 <version>1.0-SNAPSHOT</version> 11 <modules> 12 <module>rpc-server</module> 13 <module>rpc-client</module> 14 <module>rpc-common</module> 15 </modules> 16 <properties> 17 <java.version>1.8</java.version> 18 </properties> 19 <dependencies> 20 <dependency> 21 <groupId>io.netty</groupId> 22 <artifactId>netty-all</artifactId> 23 <version>4.1.43.Final</version> 24 </dependency> 25 <dependency> 26 <groupId>com.alibaba</groupId> 27 <artifactId>fastjson</artifactId> 28 <version>1.2.62</version> 29 </dependency> 30 <dependency> 31 <groupId>com.alibaba.nacos</groupId> 32 <artifactId>nacos-client</artifactId> 33 <version>1.1.4</version> 34 </dependency> 35 </dependencies> 36 37 <build> 38 <plugins> 39 <plugin> 40 <groupId>org.apache.maven.plugins</groupId> 41 <artifactId>maven-compiler-plugin</artifactId> 42 <version>3.5.1</version> 43 <configuration> 44 <source>1.8</source> 45 <target>1.8</target> 46 </configuration> 47 </plugin> 48 </plugins> 49 </build> 50 </project>
本项目使用nacos作为注册中心
这个在 https://www.cnblogs.com/hetutu-5238/p/11089577.html 中的第2步已有说明
在common项目下创建com.rpc包 ,以下cmmon项目的操作均已该包为基础
public class Assert { private Assert(){}; public static void notNull(Object obj, String message) { if (obj == null) { throw new RuntimeException(message); } } public static void on(boolean flag, String message) { if (flag) { throw new RuntimeException(message); } } }
1 @Target(ElementType.TYPE) 2 @Retention(RetentionPolicy.RUNTIME) 3 public @interface RpcService { 4 5 Class value(); 6 }
ClientRepository 用来存储服务调用连接
1 public class ClientRepository { 2 3 private static Map<String, Channel> repo = new ConcurrentHashMap<>(); 4 5 public static void put(String key , Channel channel) { 6 if ( key == null || channel == null ) { 7 System.err.println("channel or it‘s name can‘t be null "); 8 return; 9 } 10 repo.put(key , channel); 11 } 12 13 public static void remove(String key) { 14 repo.remove(key); 15 } 16 17 public static Channel getChannel(String key) { 18 if ( repo.get(key) != null && !repo.get(key).isActive() ) { 19 remove(key); 20 } 21 return repo.get(key); 22 } 23 }
ServiceRepository 用来存储可提供服务信息
1 public class ServiceRepository { 2 3 private static final Map<String, Object> repo = new HashMap<>(); 4 5 public static void put(String serviceName , Object service) { 6 repo.put(serviceName , service); 7 } 8 9 public static void remove(String key) { 10 repo.remove(key); 11 } 12 13 public static Object getService(String serviceName) { 14 return repo.get(serviceName); 15 } 16 17 public static Set<String> getAllServiceName() { 18 return repo.keySet(); 19 } 20 }
ServiceConnectionFactory用来创建新的调用连接
1 public class ServiceConnectionFactory { 2 3 4 public static Channel createConnection(String host , int port) throws InterruptedException { 5 String key = host + ":" + port; 6 System.out.println("创建新连接:" + key); 7 if ( ClientRepository.getChannel(key) != null ) { 8 ClientRepository.remove(key); 9 } 10 Bootstrap server = new Bootstrap(); 11 Channel c = server.group(workerGroup()) 12 .channel(NioSocketChannel.class) 13 .remoteAddress(new InetSocketAddress(host , port)) 14 .handler(new MyClientChannelInitializer()) 15 .connect() 16 .sync() 17 .channel(); 18 ClientRepository.put(key , c); 19 return c; 22 } 23 24 private static EventLoopGroup workerGroup() { 25 return new NioEventLoopGroup(10); 26 } 27 }
该类中用的MyClientChannelInitializer会在后面写到
package com.rpc.config; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; import com.rpc.repository.ServiceRepository; import com.rpc.util.Assert; import org.apache.commons.lang3.StringUtils; import java.io.File; import java.io.InputStream; import java.net.URL; import java.util.*; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 16:24 */ public class RegisterConfig { //配置文件 private static Properties register; //nacos注册服务 private static NamingService namingService; private static volatile boolean init = false; private static volatile boolean serverInit = false; public static boolean getInit() { return init; } public static boolean getServerInit() { return serverInit; } /** * serverFlag为true则代表服务端启动 serverFlag为false则代表客户端启动 * @param serverFlag */ public static void init(boolean serverFlag) { if ( (serverFlag && getServerInit()) || (!serverFlag && getInit()) ) { return; } System.out.println("服务端初始化开始" + new Date()); Properties properties = new Properties(); try (InputStream resourceAsStream = RegisterConfig.class.getClassLoader().getResourceAsStream("register.properties");) { Assert.notNull(resourceAsStream,"register.properties is required"); properties.load(resourceAsStream); register = properties; //获取nacos服务 namingService = NamingFactory.createNamingService(String.valueOf(register.get("register.host"))); if ( serverFlag ){ regis(); serverInit = true; } } catch (Exception e) { throw new RuntimeException(e); } System.out.println("服务端初始化结束" + new Date()); init = true; } public static Integer getPort() { Assert.notNull(register,"RegisterConfig must be initialized"); return Integer.valueOf(Objects.toString(register.get("server.port"))); } public static NamingService getNamingService() { Assert.notNull(namingService,"RegisterConfig must be initialized"); return namingService; } /** * 注册到注册中心 */ private static void regis() { try { String scanpkgs = String.valueOf(register.get("scanpkgs")); Assert.on(StringUtils.isBlank(scanpkgs),"scan service package can‘t be null"); initService(scanpkgs.split(",")); //将服务注册到注册中心 Set<String> serviceNames = ServiceRepository.getAllServiceName(); if ( !serviceNames.isEmpty() ) { for (String serviceName : serviceNames) { //将服务注册到注册中心 namingService.registerInstance(serviceName , String.valueOf(register.get("server.host")) , Integer.valueOf(Objects.toString(register.get("server.port")))); System.out.println(String.format("已注册服务:%s,服务地址为:%s" , register.get("server.host") , register.get("server.port"))); } } } catch (Exception e) { throw new RuntimeException(e); } } /** * 初始化服务 * @param pkgs 需要扫描的包 */ private static void initService(String[] pkgs) { List<Class<?>> classes = new ArrayList<>(); //获取所有包名下的符合条件的服务 try { for (String pkg : pkgs) { classes.addAll(getRpcClass(pkg)); } for (Class<?> c : classes) { Object o = c.newInstance(); //将服务存储到服务仓库 ServiceRepository.put(c.getAnnotation(com.rpc.anno.RpcService.class).value().getName() , o); } } catch (Exception e) { throw new RuntimeException(e); } } private static List<Class<?>> getRpcClass(String pkg) throws ClassNotFoundException { List<Class<?>> result = new ArrayList<>(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Assert.notNull(classLoader,"Can‘t get ClassLoader,Thread: " + Thread.currentThread().getName()); //获取目录 String packageName = pkg.replace(‘.‘ , ‘/‘); URL resource = classLoader.getResource(packageName); Assert.notNull(resource,"Can‘t get package,package doesn‘t exist: " + packageName); String fileUrl = resource.getFile(); File f = new File(fileUrl); if ( f.exists() ) { File[] files = f.listFiles(); if ( files != null ) { for (File file : files) { String fileName; //如果是文件 if ( !file.isDirectory() && (fileName = file.getName()).endsWith(".class") ) { Class<?> c = Class.forName(pkg + "." + fileName.substring(0 , fileName.length() - 6)); if ( c.getAnnotation(com.rpc.anno.RpcService.class) != null ) { result.add(c); } } else if ( file.isDirectory() ) { List<Class<?>> rpcClass = getRpcClass(pkg + "." + file.getName()); if ( rpcClass != null && !rpcClass.isEmpty() ) { result.addAll(rpcClass); } } } } } else { throw new RuntimeException("Can‘t get package,package doesn‘t exist: " + pkg); } return result; } }
该类用来初始化客户端或者服务端,init()方法传入的参数即标识初始化类型
register.properties主要为配置类,在后面的客户端或者服务端项目添加 不要在common项目下添加,内容如下
#必须设置注册中心地址 register.host=127.0.0.1:8848 #如果为服务端则必须设置 设置扫描包 多个包以","分割 scanpkgs=com.rpc #如果为服务端则必须设置 设置本地服务地址 server.host=127.0.0.1 #如果为服务端则必须设置 设置本地服务端口 server.port=8080
RpcRequest 服务请求类
package com.rpc.support; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 10:52 */ public class RpcRequest implements Serializable { //任务id生成器 private static final AtomicLong REQUEST_ID = new AtomicLong(0); //请求id private long id; //请求的服务名 该项目为接口的全限定名 private String serviceName; //请求的方法名 private String methodName; //请求的参数类型 private Class<?>[] paramsTypes; //请求的参数 private Object[] params; public long getId() { return id; } public void setId(long id) { this.id = id; } public void newId() { this.id = REQUEST_ID.getAndIncrement(); } public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getParamsTypes() { return paramsTypes; } public void setParamsTypes(Class<?>[] paramsTypes) { this.paramsTypes = paramsTypes; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } @Override public String toString() { return "RpcRequest{" + "id=" + id + ", serviceName=‘" + serviceName + ‘\‘‘ + ", methodName=‘" + methodName + ‘\‘‘ + ‘}‘; } }
RpcResponse类 即请求的回应类
package com.rpc.support; import java.io.Serializable; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 10:52 */ public class RpcResponse implements Serializable { //错误信息 private Throwable error; //返回数据 private Object response; //对应的请求id private long id; public long getId() { return id; } public void setId(Long id) { this.id = id; } public Throwable getError() { return error; } public void setError(Throwable error) { this.error = error; } public Object getResponse() { return response; } public void setResponse(Object response) { this.response = response; } }
RpcFuture 该类主要用来存储发送的请求并获得返回结果
package com.rpc.support; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 10:18 */ public class RpcFuture { //存储所有已发送 但还未得到响应的请求 private static final Map<Long, RpcFuture> REPO = new ConcurrentHashMap<>(); //每次请求对应的锁 private final Object lock = new Object(); //超时时间 private final int timeOut = 10; //任务id private long id; //请求内容 private RpcRequest rpcRequest; //请求对应的相应内容 private volatile RpcResponse rpcResponse; public RpcFuture(long id , RpcRequest rpcRequest) { this.id = id; this.rpcRequest = rpcRequest; } public static void receive(RpcResponse resp) { RpcFuture remove = REPO.remove(resp.getId()); remove.doRecieve(resp); } public static void putFuture(Long id , RpcFuture rpcFuture) { REPO.put(id , rpcFuture); } private boolean done() { return this.rpcResponse != null; } private void doRecieve(RpcResponse resp) { synchronized (lock) { this.rpcResponse = resp; //唤醒请求时的等待 lock.notifyAll(); } } public Object getResponse() { long millis = System.currentTimeMillis(); //也可以使用BlockingQueue代替锁 但是会使线程阻塞 java8的话竞争不激烈情况下建议synchronized代替显式锁 synchronized (lock) { if ( !done() ) { try { while ( !done() ) { lock.wait(timeOut * 1000); //被唤醒后判断下完成或者超时 if ( done() || System.currentTimeMillis() - millis > timeOut * 1000 ) { break; } } } catch (Exception e) { throw new RuntimeException(e); } if ( !done() ) { throw new RuntimeException("服务已超时 服务id:" + id + " 服务内容:" + rpcRequest); } } } return rpcResponse.getResponse(); } }
RpcClient 主要为客户端角色时使用,用来传输请求信息
package com.rpc.client; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.pojo.Instance; import com.rpc.config.RegisterConfig; import com.rpc.repository.ServiceConnectionFactory; import com.rpc.support.RpcRequest; import com.rpc.repository.ClientRepository; import com.rpc.support.RpcFuture; import com.rpc.util.Assert; import io.netty.channel.Channel; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 11:41 */ public class RpcClient { public static Object transfer(RpcRequest rpcRequest) { try { //配置中心已实现负载均衡 RegisterConfig.init(false); //配置中心拿配置 Instance ins = RegisterConfig.getNamingService().selectOneHealthyInstance(rpcRequest.getServiceName()); Assert.notNull(ins,"service not be find :"+rpcRequest.getServiceName()); String key = ins.getIp() + ":" + ins.getPort(); Channel c = ClientRepository.getChannel(key); if ( c == null ) { //如果系统没有缓存这个服务的连接则创建 String[] split = key.split(":"); c = ServiceConnectionFactory.createConnection(split[ 0 ] , Integer.valueOf(split[ 1 ])); } Long id = rpcRequest.getId(); RpcFuture rpcFuture = new RpcFuture(id , rpcRequest); RpcFuture.putFuture(id , rpcFuture); c.writeAndFlush(rpcRequest); return rpcFuture.getResponse(); } catch (NacosException e) { throw new RuntimeException("register connection error" , e); } catch (InterruptedException e) { throw new RuntimeException("register connection error" , e); } } }
Server类主要为服务端使用 用来创建服务 监听信息
package com.rpc.client; import com.rpc.config.RegisterConfig; import com.rpc.netty.MyServerChannelInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; /** * @Description: 客户1 * @author: zhoum * @Date: 2019-11-14 * @Time: 10:41 */ public class Server { private static ServerBootstrap server; private static volatile boolean start = false; public static void init() throws InterruptedException { if ( start ) { return; } System.err.println("开始初始化 端口:"); //注册服务 RegisterConfig.init(true); if ( server == null ) { server = new ServerBootstrap(); start = true; } Channel channel = server.group(parentGroup() , workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) .childHandler(new MyServerChannelInitializer()) .option(ChannelOption.SO_BACKLOG , 100) .childOption(ChannelOption.SO_KEEPALIVE , true) .bind(RegisterConfig.getPort()) .sync() .channel(); channel.closeFuture().sync(); } private static EventLoopGroup parentGroup() { return new NioEventLoopGroup(1); } private static EventLoopGroup workerGroup() { return new NioEventLoopGroup(10); } }
该类中用到的MyServerChannelInitializer类会在后面说明
在netty下创建serilaze包,创建序列化接口以及实现类 ,对传输数据进行序列化,本项目目前使用json序列化
package com.rpc.netty.serilaze; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 11:21 */ public interface Serilazier { byte[] serialize(Object msg); <T> T deserialize(byte[] bytes , Class<T> clz); }
package com.rpc.netty.serilaze; import com.alibaba.fastjson.JSONObject; import java.nio.charset.Charset; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 11:22 */ public class JsonSerilizer implements Serilazier { @Override public byte[] serialize(Object msg) { return JSONObject.toJSONString(msg).getBytes(Charset.defaultCharset()); } @Override public <T> T deserialize(byte[] bytes , Class<T> clz) { return JSONObject.parseObject(new String(bytes , Charset.defaultCharset()) , clz); } }
在netty包下创建codec包,在codec包下创建编码解码器 用来对传输的信息进行编码解码
package com.rpc.netty.codec; import com.rpc.netty.serilaze.Serilazier; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 11:45 */ public class RpcEncoder extends MessageToByteEncoder<Object> { private Serilazier serilazier; public RpcEncoder(Serilazier serilazier) { this.serilazier = serilazier; } @Override protected void encode(ChannelHandlerContext ctx , Object msg , ByteBuf out) { byte[] serialize = serilazier.serialize(msg); out.writeInt(serialize.length); out.writeBytes(serialize); } }
package com.rpc.netty.codec; import com.rpc.netty.serilaze.Serilazier; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 11:47 */ public class RpcDecoder extends ByteToMessageDecoder { private Serilazier serilazier; private Class<?> c; public RpcDecoder(Serilazier serilazier , Class<?> c) { this.serilazier = serilazier; this.c = c; } @Override protected void decode(ChannelHandlerContext ctx , ByteBuf in , List<Object> out) throws Exception { if ( in.readableBytes() < 4 ) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if ( dataLength < 0 ) { ctx.close(); } if ( in.readableBytes() < dataLength ) { in.resetReaderIndex(); return; } byte[] bytes = new byte[ dataLength ]; in.readBytes(bytes); Object obj = serilazier.deserialize(bytes , c); out.add(obj); } }
在netty包下创建RpcInvoker消息处理接口,用来处理消息,并在netty包下创建invoke包,创建RpcRequest请求消息处理器,RpcResponse响应消息处理器
package com.rpc.netty; import io.netty.channel.Channel; /** * @Description: * @author: zhoum * @Date: 2019-11-28 * @Time: 17:46 */ public interface RpcInvoker { void handle(Channel channel , Object object); }
package com.rpc.netty.invoke; import com.rpc.netty.RpcInvoker; import com.rpc.support.RpcRequest; import com.rpc.support.RpcResponse; import com.rpc.repository.ServiceRepository; import io.netty.channel.Channel; import java.lang.reflect.Method; /** * RpcRequest请求处理器 * * @author: zhoum * @Date: 2019-11-26 * @Time: 11:50 */ public class RpcRequestInvoker implements RpcInvoker { @Override public void handle(Channel channel , Object object) { RpcResponse res = new RpcResponse(); try { RpcRequest request; if ( !(object instanceof RpcRequest) ) { res.setError(new Exception("params must be instance of com.rpc.support.RpcRequest")); channel.writeAndFlush(res); return; } request = (RpcRequest) object; //将请求的id直接赋给响应实体 res.setId(request.getId()); //找到该服务类的处理类 Object service = ServiceRepository.getService(request.getServiceName()); if ( service == null ) { res.setError(new Exception("can‘t find service for " + request.getServiceName())); channel.writeAndFlush(res); return; } //执行方法 Method method = service.getClass().getMethod(request.getMethodName() , request.getParamsTypes()); Object invoke = method.invoke(service , request.getParams()); res.setResponse(invoke); channel.writeAndFlush(res); } catch (Exception e) { res.setError(e); channel.writeAndFlush(res); } } }
package com.rpc.netty.invoke; import com.rpc.netty.RpcInvoker; import com.rpc.support.RpcResponse; import com.rpc.support.RpcFuture; import io.netty.channel.Channel; /** * RpcResponse响应处理器 * @author: zhoum * @Date: 2019-11-26 * @Time: 11:50 */ public class RpcResponseInvoker implements RpcInvoker { @Override public void handle(Channel channel , Object object) { if ( object instanceof RpcResponse ) { RpcResponse resp = (RpcResponse) object; //处理响应数据 RpcFuture.receive(resp); } } }
在netty包下创建AbstractHandleAdapter类继承ChannelInboundHandlerAdapter来约束必须传入消息处理器,并且创建netty请求处理器与响应处理器。并交由构造函数中的消息处理器处理
package com.rpc.netty; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @Description: * @author: zhoum * @Date: 2019-11-29 * @Time: 9:39 */ public abstract class AbstractHandleAdapter extends ChannelInboundHandlerAdapter { protected RpcInvoker rpcInvoker; public AbstractHandleAdapter(RpcInvoker rpcInvoker) { this.rpcInvoker = rpcInvoker; } }
package com.rpc.netty; import io.netty.channel.ChannelHandlerContext; /** * @Description: 请求处理适配器 * @author: zhoum * @Date: 2019-11-14 * @Time: 11:40 */ public class ChannelServerMessageHandler extends AbstractHandleAdapter { public ChannelServerMessageHandler(RpcInvoker rpcInvoker) { super(rpcInvoker); } /** * 有新连接 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String address = ctx.channel().remoteAddress().toString(); System.out.println("收到信息: " + address); super.channelActive(ctx); } /** * 连接断开 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("断开连接: " + ctx.channel().remoteAddress().toString()); super.channelInactive(ctx); } /** * 读取到的消息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception { System.out.println("服务器收到信息" + msg); //处理器处理 rpcInvoker.handle(ctx.channel() , msg); } /** * 出现异常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception { System.err.println("出现异常: " + ctx.channel().remoteAddress().toString()); super.exceptionCaught(ctx , cause); } }
package com.rpc.netty; import com.rpc.repository.ClientRepository; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; /** * netty响应处理适配器 * @Description: * @author: zhoum * @Date: 2019-11-14 * @Time: 11:40 */ public class ChannelClientMessageHandler extends AbstractHandleAdapter { public ChannelClientMessageHandler(RpcInvoker rpcInvoker) { super(rpcInvoker); } /** * 有新连接 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String address = ctx.channel().remoteAddress().toString(); System.out.println("有新连接:"+address); super.channelActive(ctx); } /** * 连接断开 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress(); //移除连接 ClientRepository.remove(ipSocket.getAddress().getHostAddress()+":"+ipSocket.getPort()); System.out.println("连接断开"); super.channelInactive(ctx); } /** * 读取到的消息 */ @Override public void channelRead(ChannelHandlerContext ctx , Object msg) { //处理器处理 rpcInvoker.handle(ctx.channel() , msg); } /** * 出现异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception { System.err.println("出现异常: " + ctx.channel().remoteAddress().toString()); InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress(); //移除连接 ClientRepository.remove(ipSocket.getAddress().getHostAddress()+":"+ipSocket.getPort()); super.exceptionCaught(ctx , cause); } }
最后在netty包下创建 MyClientChannelInitializer ,MyServerChannelInitializer (填之前的坑) ,这个在netty初始化客户端或者服务端的时候用上
package com.rpc.netty; import com.rpc.netty.serilaze.JsonSerilizer; import com.rpc.netty.codec.RpcDecoder; import com.rpc.netty.codec.RpcEncoder; import com.rpc.netty.invoke.RpcRequestInvoker; import com.rpc.support.RpcRequest; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @Description: 服务端处理器 * @author: zhoum * @Date: 2019-11-14 * @Time: 11:25 */ public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); //添加编解码器 pipeline.addLast(new RpcDecoder(new JsonSerilizer() , RpcRequest.class)); pipeline.addLast(new RpcEncoder(new JsonSerilizer())); //添加消息处理器 pipeline.addLast(new ChannelServerMessageHandler(new RpcRequestInvoker())); } }
package com.rpc.netty; import com.rpc.netty.serilaze.JsonSerilizer; import com.rpc.netty.codec.RpcDecoder; import com.rpc.netty.codec.RpcEncoder; import com.rpc.netty.invoke.RpcResponseInvoker; import com.rpc.support.RpcResponse; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; /** * @Description: 客户端处理器 * @author: zhoum * @Date: 2019-11-14 * @Time: 11:25 */ public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); //添加编解码器 pipeline.addLast(new RpcEncoder(new JsonSerilizer())); pipeline.addLast(new RpcDecoder(new JsonSerilizer() , RpcResponse.class)); //添加消息处理器 pipeline.addLast(new ChannelClientMessageHandler(new RpcResponseInvoker())); } }
package com.rpc.util; import com.alibaba.fastjson.JSONObject; import com.rpc.client.RpcClient; import com.rpc.support.RpcRequest; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 14:09 */ public class ProxyUtil implements InvocationHandler { public <T> T getProxy(Class<T> t) { T o = (T) Proxy.newProxyInstance(t.getClassLoader() , new Class<?>[]{t} , this); return o; } @Override public Object invoke(Object proxy , Method method , Object[] args) throws Throwable { //创建请求 RpcRequest request = new RpcRequest(); //每次请求构造新的id request.newId(); request.setServiceName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParams(args); request.setParamsTypes(method.getParameterTypes()); Object resp = RpcClient.transfer(request); Assert.on(resp == null || resp == null,"返回结果序列化错误"); if ( resp instanceof JSONObject ) { Class<?> returnType = method.getReturnType(); JSONObject jobj = (JSONObject) resp; Object o = jobj.toJavaObject(returnType); return o; } return resp; } }
至此 框架代码已经开发完毕,测试如下
在依旧在common项目下建立test包 然后创建测试接口和测试类
package com.rpc.test; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 14:38 */ public interface TestService { TestEntity getTest(String username , String password); TestEntity getTest1(String username , String password); }
package com.rpc.test; import java.io.Serializable; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 14:38 */ public class TestEntity implements Serializable { private String username; private String password; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "TestEntity{" + "username=‘" + username + ‘\‘‘ + ", password=‘" + password + ‘\‘‘ + ‘}‘; } }
然后打开rpc-server项目的pom.xml 引入common项目,内容如下即可
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rpc-parent</artifactId> <groupId>com.maglith</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rpc-server</artifactId> <dependencies> <dependency> <artifactId>rpc-common</artifactId> <groupId>com.maglith</groupId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
在rpc-server项目中的resource下创建register.properties文件 内容如下
#必须设置注册中心地址 register.host=127.0.0.1:8848 #如果为服务端则必须设置 设置扫描包 多个包以","分割 scanpkgs=com.rpc #如果为服务端则必须设置 设置本地服务地址 server.host=127.0.0.1 #如果为服务端则必须设置 设置本地服务端口 server.port=8080
创建上面测试接口的实现类
package com.rpc.netty.invoke; import com.rpc.anno.RpcService; import com.rpc.test.TestEntity; import com.rpc.test.TestService; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 14:40 */ @RpcService(TestService.class) public class TestServiceImpl implements TestService { @Override public TestEntity getTest(String username , String password) { TestEntity testEntity = new TestEntity(); testEntity.setUsername(username); testEntity.setPassword(password); return testEntity; } @Override public TestEntity getTest1(String username , String password) { TestEntity testEntity = new TestEntity(); testEntity.setUsername(username); testEntity.setPassword(password); return testEntity; } }
创建测试主类
package com.rpc.util; import com.rpc.client.Server; /** * @Description: * @author: zhoum * @Date: 2019-11-27 * @Time: 14:44 */ public class RpcServerMain { public static void main(String[] args) throws InterruptedException { Server.init(); } }
点击运行后出现如下即代表服务端初始化成功
打开rpc-client项目 依旧引入cmmon项目 内容如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rpc-parent</artifactId> <groupId>com.maglith</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rpc-client</artifactId> <dependencies> <dependency> <artifactId>rpc-common</artifactId> <groupId>com.maglith</groupId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
在rpc-client项目中的resource下创建register.properties文件 内容如下
#必须设置注册中心地址
register.host=127.0.0.1:8848
创建调用测试类
1 package com.rpc; 2 3 4 import com.rpc.util.ProxyUtil; 5 import com.rpc.test.TestEntity; 6 import com.rpc.test.TestService; 7 8 /** 9 * @Description: 10 * @author: zhoum 11 * @Date: 2019-11-27 12 * @Time: 14:41 13 */ 14 public class Consumer { 15 16 public static void main(String[] args) { 17 ProxyUtil proxyUtil = new ProxyUtil(); 18 TestService proxy = proxyUtil.getProxy(TestService.class); 19 int i = 1; 20 while ( i < 500 ) { 21 TestEntity test = proxy.getTest("你好" + i++ , "helloword" + i); 22 TestEntity test1 = proxy.getTest1("你好" + i++ , "helloword" + i); 23 System.out.println("收到信息:" + test); 24 System.out.println("收到信息:" + test1); 25 } 26 } 27 }
然后点击运行
这时候切换到server项目的控制台,会出现如下信息
主要思想即使用netty构造一个rpc调用框架,并使用nacos作为服务注册与发现中心,也可以使用zookeeper,看个人喜好,可以用来理解分布式微服务的思想。
项目的数据流转如下图
原文:https://www.cnblogs.com/hetutu-5238/p/11958615.html