首页 > 编程语言 > 详细

java模拟实现一个分布式服务调用框架

时间:2019-11-29 18:03:55      阅读:90      评论:0      收藏:0      [点我收藏+]

本文是选择的是rpc,类似dubbo,如果喜欢http协议的形式建议查看feign的源码进行了解

本文较长,如果想直接看代码可以查看项目源码地址: https://github.com/hetutu5238/rpc-demo.git

1.idea中创建父项目rpc-parent,子项目 rpc-common ,rpc-client(测试客户端),rpc-server(测试服务端)

  项目基于maven ,项目创建过程略,效果如下即可

  技术分享图片

 

 

   rpc-parent的pom.xml如下即可

  

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xmlns="http://maven.apache.org/POM/4.0.0"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>com.maglith</groupId>
 8     <artifactId>rpc-parent</artifactId>
 9     <packaging>pom</packaging>
10     <version>1.0-SNAPSHOT</version>
11     <modules>
12         <module>rpc-server</module>
13         <module>rpc-client</module>
14         <module>rpc-common</module>
15     </modules>
16     <properties>
17         <java.version>1.8</java.version>
18     </properties>
19     <dependencies>
20         <dependency>
21             <groupId>io.netty</groupId>
22             <artifactId>netty-all</artifactId>
23             <version>4.1.43.Final</version>
24         </dependency>
25         <dependency>
26             <groupId>com.alibaba</groupId>
27             <artifactId>fastjson</artifactId>
28             <version>1.2.62</version>
29         </dependency>
30         <dependency>
31             <groupId>com.alibaba.nacos</groupId>
32             <artifactId>nacos-client</artifactId>
33             <version>1.1.4</version>
34         </dependency>
35     </dependencies>
36 
37     <build>
38         <plugins>
39             <plugin>
40                 <groupId>org.apache.maven.plugins</groupId>
41                 <artifactId>maven-compiler-plugin</artifactId>
42                 <version>3.5.1</version>
43                 <configuration>
44                     <source>1.8</source>
45                     <target>1.8</target>
46                 </configuration>
47             </plugin>
48         </plugins>
49     </build>
50 </project>

2.下载并启动nacos

  本项目使用nacos作为注册中心

  这个在 https://www.cnblogs.com/hetutu-5238/p/11089577.html 中的第2步已有说明

 

现在开始编写核心的common项目

  在common项目下创建com.rpc包 ,以下cmmon项目的操作均已该包为基础

1.创建util包,并创建Assert类

public class Assert {

    private Assert(){};

    public static void notNull(Object obj, String message) {
        if (obj == null) {
            throw new RuntimeException(message);
        }
    }

    public static void on(boolean flag, String message) {
        if (flag) {
            throw new RuntimeException(message);
        }
    }
}

 

 

2.创建anno包,并创建RpcService注解 该注解的主要用来标注提供服务 标注该接口的类必须实现接口

1 @Target(ElementType.TYPE)
2 @Retention(RetentionPolicy.RUNTIME)
3 public @interface RpcService {
4 
5     Class value();
6 }

 

3. 创建Respository包

  ClientRepository 用来存储服务调用连接

 1 public class ClientRepository {
 2 
 3     private static Map<String, Channel> repo = new ConcurrentHashMap<>();
 4 
 5     public static void put(String key , Channel channel) {
 6         if ( key == null || channel == null ) {
 7             System.err.println("channel or it‘s name can‘t be null ");
 8             return;
 9         }
10         repo.put(key , channel);
11     }
12 
13     public static void remove(String key) {
14         repo.remove(key);
15     }
16 
17     public static Channel getChannel(String key) {
18         if ( repo.get(key) != null && !repo.get(key).isActive() ) {
19             remove(key);
20         }
21         return repo.get(key);
22     }
23 }

    ServiceRepository 用来存储可提供服务信息

 1 public class ServiceRepository {
 2 
 3     private static final Map<String, Object> repo = new HashMap<>();
 4 
 5     public static void put(String serviceName , Object service) {
 6         repo.put(serviceName , service);
 7     }
 8 
 9     public static void remove(String key) {
10         repo.remove(key);
11     }
12 
13     public static Object getService(String serviceName) {
14         return repo.get(serviceName);
15     }
16 
17     public static Set<String> getAllServiceName() {
18         return repo.keySet();
19     }
20 }

  

  ServiceConnectionFactory用来创建新的调用连接

 1 public class ServiceConnectionFactory {
 2 
 3 
 4     public static Channel createConnection(String host , int port) throws InterruptedException {
 5         String key = host + ":" + port;
 6         System.out.println("创建新连接:" + key);
 7         if ( ClientRepository.getChannel(key) != null ) {
 8             ClientRepository.remove(key);
 9         }
10         Bootstrap server = new Bootstrap();
11         Channel c = server.group(workerGroup())
12                 .channel(NioSocketChannel.class)
13                 .remoteAddress(new InetSocketAddress(host , port))
14                 .handler(new MyClientChannelInitializer())
15                 .connect()
16                 .sync()
17                 .channel();
18         ClientRepository.put(key , c);
19         return c;
22     }
23 
24     private static EventLoopGroup workerGroup() {
25         return new NioEventLoopGroup(10);
26     }
27 }

  该类中用的MyClientChannelInitializer会在后面写到

4. 创建config包。并创建RegisterConfig

package com.rpc.config;

import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;

import com.rpc.repository.ServiceRepository;
import com.rpc.util.Assert;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.util.*;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 16:24
 */
public class RegisterConfig {
    //配置文件
    private static Properties register;
    //nacos注册服务
    private static NamingService namingService;

    private static volatile boolean init = false;

    private static volatile boolean serverInit = false;

    public static boolean getInit() {
        return init;
    }

    public static boolean getServerInit() {
        return serverInit;
    }

    /**
     * serverFlag为true则代表服务端启动 serverFlag为false则代表客户端启动
     * @param serverFlag
     */
    public static void init(boolean serverFlag) {
        if ( (serverFlag && getServerInit()) ||  (!serverFlag && getInit()) ) {
            return;
        }
        System.out.println("服务端初始化开始" + new Date());
        Properties properties = new Properties();
        try (InputStream resourceAsStream = RegisterConfig.class.getClassLoader().getResourceAsStream("register.properties");) {
            Assert.notNull(resourceAsStream,"register.properties is required");
            properties.load(resourceAsStream);
            register = properties;
            //获取nacos服务
            namingService = NamingFactory.createNamingService(String.valueOf(register.get("register.host")));
            if ( serverFlag ){
                regis();
                serverInit = true;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        System.out.println("服务端初始化结束" + new Date());
        init = true;
    }

    public static Integer getPort() {
        Assert.notNull(register,"RegisterConfig must be initialized");
        return Integer.valueOf(Objects.toString(register.get("server.port")));
    }

    public static NamingService getNamingService() {
        Assert.notNull(namingService,"RegisterConfig must be initialized");
        return namingService;
    }

    /**
     * 注册到注册中心
     */
    private static void regis() {
        try {
            String scanpkgs = String.valueOf(register.get("scanpkgs"));
            Assert.on(StringUtils.isBlank(scanpkgs),"scan service package can‘t be null");
            initService(scanpkgs.split(","));
            //将服务注册到注册中心
            Set<String> serviceNames = ServiceRepository.getAllServiceName();
            if ( !serviceNames.isEmpty() ) {
                for (String serviceName : serviceNames) {
                    //将服务注册到注册中心
                    namingService.registerInstance(serviceName , String.valueOf(register.get("server.host")) , Integer.valueOf(Objects.toString(register.get("server.port"))));
                    System.out.println(String.format("已注册服务:%s,服务地址为:%s" , register.get("server.host") , register.get("server.port")));
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 初始化服务
     * @param pkgs 需要扫描的包
     */
    private static void initService(String[] pkgs) {
        List<Class<?>> classes = new ArrayList<>();
        //获取所有包名下的符合条件的服务
        try {
            for (String pkg : pkgs) {
                classes.addAll(getRpcClass(pkg));
            }
            for (Class<?> c : classes) {
                Object o = c.newInstance();
                //将服务存储到服务仓库
                ServiceRepository.put(c.getAnnotation(com.rpc.anno.RpcService.class).value().getName() , o);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    private static List<Class<?>> getRpcClass(String pkg) throws ClassNotFoundException {
        List<Class<?>> result = new ArrayList<>();
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Assert.notNull(classLoader,"Can‘t get ClassLoader,Thread: " + Thread.currentThread().getName());
        //获取目录
        String packageName = pkg.replace(‘.‘ , ‘/‘);
        URL resource = classLoader.getResource(packageName);
        Assert.notNull(resource,"Can‘t get package,package doesn‘t exist: " + packageName);
        String fileUrl = resource.getFile();
        File f = new File(fileUrl);
        if ( f.exists() ) {
            File[] files = f.listFiles();
            if ( files != null ) {
                for (File file : files) {
                    String fileName;
                    //如果是文件
                    if ( !file.isDirectory() && (fileName = file.getName()).endsWith(".class") ) {
                        Class<?> c = Class.forName(pkg + "." + fileName.substring(0 , fileName.length() - 6));
                        if ( c.getAnnotation(com.rpc.anno.RpcService.class) != null ) {
                            result.add(c);
                        }
                    } else if ( file.isDirectory() ) {
                        List<Class<?>> rpcClass = getRpcClass(pkg + "." + file.getName());
                        if ( rpcClass != null && !rpcClass.isEmpty() ) {
                            result.addAll(rpcClass);
                        }
                    }
                }
            }
        } else {
            throw new RuntimeException("Can‘t get package,package doesn‘t exist: " + pkg);
        }
        return result;

    }


}

 

该类用来初始化客户端或者服务端,init()方法传入的参数即标识初始化类型
register.properties主要为配置类,在后面的客户端或者服务端项目添加 不要在common项目下添加,内容如下
#必须设置注册中心地址
register.host=127.0.0.1:8848
#如果为服务端则必须设置 设置扫描包 多个包以","分割
scanpkgs=com.rpc
#如果为服务端则必须设置 设置本地服务地址
server.host=127.0.0.1
#如果为服务端则必须设置 设置本地服务端口
server.port=8080

 5.创建support包 该包下的类主要为传输类

  RpcRequest 服务请求类

package com.rpc.support;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 10:52
 */
public class RpcRequest implements Serializable {
    //任务id生成器
    private static final AtomicLong REQUEST_ID = new AtomicLong(0);
    //请求id
    private long id;
    //请求的服务名  该项目为接口的全限定名
    private String serviceName;
    //请求的方法名
    private String methodName;
    //请求的参数类型
    private Class<?>[] paramsTypes;
    //请求的参数
    private Object[] params;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public void newId() {
        this.id = REQUEST_ID.getAndIncrement();
    }

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParamsTypes() {
        return paramsTypes;
    }

    public void setParamsTypes(Class<?>[] paramsTypes) {
        this.paramsTypes = paramsTypes;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }

    @Override
    public String toString() {
        return "RpcRequest{" +
                "id=" + id +
                ", serviceName=‘" + serviceName + ‘\‘‘ +
                ", methodName=‘" + methodName + ‘\‘‘ +
                ‘}‘;
    }
}

  RpcResponse类  即请求的回应类

package com.rpc.support;

import java.io.Serializable;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 10:52
 */
public class RpcResponse implements Serializable {

    //错误信息
    private Throwable error;
    //返回数据
    private Object response;
    //对应的请求id
    private long id;

    public long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Throwable getError() {
        return error;
    }

    public void setError(Throwable error) {
        this.error = error;
    }

    public Object getResponse() {
        return response;
    }

    public void setResponse(Object response) {
        this.response = response;
    }


}

  RpcFuture  该类主要用来存储发送的请求并获得返回结果

package com.rpc.support;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 10:18
 */
public class RpcFuture {
    //存储所有已发送 但还未得到响应的请求
    private static final Map<Long, RpcFuture> REPO = new ConcurrentHashMap<>();
    //每次请求对应的锁
    private final Object lock = new Object();
    //超时时间
    private final int timeOut = 10;
    //任务id
    private long id;
    //请求内容
    private RpcRequest rpcRequest;
    //请求对应的相应内容
    private volatile RpcResponse rpcResponse;

    public RpcFuture(long id , RpcRequest rpcRequest) {
        this.id = id;
        this.rpcRequest = rpcRequest;
    }

    public static void receive(RpcResponse resp) {
        RpcFuture remove = REPO.remove(resp.getId());
        remove.doRecieve(resp);

    }

    public static void putFuture(Long id , RpcFuture rpcFuture) {
        REPO.put(id , rpcFuture);
    }

    private boolean done() {
        return this.rpcResponse != null;
    }

    private void doRecieve(RpcResponse resp) {
        synchronized (lock) {
            this.rpcResponse = resp;
            //唤醒请求时的等待
            lock.notifyAll();
        }
    }

    public Object getResponse() {
        long millis = System.currentTimeMillis();
        //也可以使用BlockingQueue代替锁  但是会使线程阻塞 java8的话竞争不激烈情况下建议synchronized代替显式锁
        synchronized (lock) {
            if ( !done() ) {
                try {
                    while ( !done() ) {
                        lock.wait(timeOut * 1000);
                        //被唤醒后判断下完成或者超时
                        if ( done() || System.currentTimeMillis() - millis > timeOut * 1000 ) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                if ( !done() ) {
                    throw new RuntimeException("服务已超时 服务id:" + id + " 服务内容:" + rpcRequest);
                }
            }
        }

        return rpcResponse.getResponse();
    }
}

 

6.创建client包 该包下的类供客户端或服务端调用

  RpcClient 主要为客户端角色时使用,用来传输请求信息

package com.rpc.client;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.rpc.config.RegisterConfig;
import com.rpc.repository.ServiceConnectionFactory;
import com.rpc.support.RpcRequest;
import com.rpc.repository.ClientRepository;
import com.rpc.support.RpcFuture;
import com.rpc.util.Assert;
import io.netty.channel.Channel;


/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 11:41
 */
public class RpcClient {


    public static Object transfer(RpcRequest rpcRequest) {
        try {
            //配置中心已实现负载均衡
            RegisterConfig.init(false);
            //配置中心拿配置
            Instance ins = RegisterConfig.getNamingService().selectOneHealthyInstance(rpcRequest.getServiceName());
            Assert.notNull(ins,"service not be find :"+rpcRequest.getServiceName());
            String key = ins.getIp() + ":" + ins.getPort();
            Channel c = ClientRepository.getChannel(key);
            if ( c == null ) {
                //如果系统没有缓存这个服务的连接则创建
                String[] split = key.split(":");
                c = ServiceConnectionFactory.createConnection(split[ 0 ] , Integer.valueOf(split[ 1 ]));
            }
            Long id = rpcRequest.getId();
            RpcFuture rpcFuture = new RpcFuture(id , rpcRequest);
            RpcFuture.putFuture(id , rpcFuture);
            c.writeAndFlush(rpcRequest);
            return rpcFuture.getResponse();
        } catch (NacosException e) {
            throw new RuntimeException("register connection error" , e);
        } catch (InterruptedException e) {
            throw new RuntimeException("register connection error" , e);
        }

    }
}

  Server类主要为服务端使用 用来创建服务 监听信息

package com.rpc.client;

import com.rpc.config.RegisterConfig;
import com.rpc.netty.MyServerChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @Description: 客户1
 * @author: zhoum
 * @Date: 2019-11-14
 * @Time: 10:41
 */
public class Server {
    
    private static ServerBootstrap server;

    private static volatile boolean start = false;

    public static void init() throws InterruptedException {
        if ( start ) {
            return;
        }
        System.err.println("开始初始化 端口:");
        //注册服务
        RegisterConfig.init(true);
        if ( server == null ) {
            server = new ServerBootstrap();
            start = true;
        }
        Channel channel = server.group(parentGroup() , workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                .childHandler(new MyServerChannelInitializer())
                .option(ChannelOption.SO_BACKLOG , 100)
                .childOption(ChannelOption.SO_KEEPALIVE , true)
                .bind(RegisterConfig.getPort())
                .sync()
                .channel();

        channel.closeFuture().sync();


    }

    private static EventLoopGroup parentGroup() {
        return new NioEventLoopGroup(1);
    }

    private static EventLoopGroup workerGroup() {
        return new NioEventLoopGroup(10);
    }

}

  该类中用到的MyServerChannelInitializer类会在后面说明

7.创建netty包  该包主要用来存放 对请求和响应的数据进行序列化,反序列化,以及编码解码工具类,以及对请求的处理和返回结果

 在netty下创建serilaze包,创建序列化接口以及实现类 ,对传输数据进行序列化,本项目目前使用json序列化 


package com.rpc.netty.serilaze;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 11:21
 */
public interface Serilazier {
    byte[] serialize(Object msg);

    <T> T deserialize(byte[] bytes , Class<T> clz);
}

package com.rpc.netty.serilaze;

import com.alibaba.fastjson.JSONObject;

import java.nio.charset.Charset;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 11:22
 */
public class JsonSerilizer implements Serilazier {

    @Override
    public byte[] serialize(Object msg) {
        return JSONObject.toJSONString(msg).getBytes(Charset.defaultCharset());
    }

    @Override
    public <T> T deserialize(byte[] bytes , Class<T> clz) {
        return JSONObject.parseObject(new String(bytes , Charset.defaultCharset()) , clz);
    }
}

   在netty包下创建codec包,在codec包下创建编码解码器  用来对传输的信息进行编码解码

package com.rpc.netty.codec;

import com.rpc.netty.serilaze.Serilazier;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 11:45
 */
public class RpcEncoder extends MessageToByteEncoder<Object> {

    private Serilazier serilazier;

    public RpcEncoder(Serilazier serilazier) {
        this.serilazier = serilazier;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx , Object msg , ByteBuf out) {
        byte[] serialize = serilazier.serialize(msg);
        out.writeInt(serialize.length);
        out.writeBytes(serialize);
    }
}
package com.rpc.netty.codec;

import com.rpc.netty.serilaze.Serilazier;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 11:47
 */
public class RpcDecoder extends ByteToMessageDecoder {

    private Serilazier serilazier;

    private Class<?> c;

    public RpcDecoder(Serilazier serilazier , Class<?> c) {
        this.serilazier = serilazier;
        this.c = c;
    }


    @Override
    protected void decode(ChannelHandlerContext ctx , ByteBuf in , List<Object> out) throws Exception {
        if ( in.readableBytes() < 4 ) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if ( dataLength < 0 ) {
            ctx.close();
        }
        if ( in.readableBytes() < dataLength ) {
            in.resetReaderIndex();
            return;
        }
        byte[] bytes = new byte[ dataLength ];
        in.readBytes(bytes);
        Object obj = serilazier.deserialize(bytes , c);
        out.add(obj);
    }
}

  在netty包下创建RpcInvoker消息处理接口,用来处理消息,并在netty包下创建invoke包,创建RpcRequest请求消息处理器,RpcResponse响应消息处理器

package com.rpc.netty;

import io.netty.channel.Channel;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-28
 * @Time: 17:46
 */
public interface RpcInvoker {

    void handle(Channel channel , Object object);
}
package com.rpc.netty.invoke;

import com.rpc.netty.RpcInvoker;
import com.rpc.support.RpcRequest;
import com.rpc.support.RpcResponse;


import com.rpc.repository.ServiceRepository;
import io.netty.channel.Channel;

import java.lang.reflect.Method;

/**
 * RpcRequest请求处理器
 *
 * @author: zhoum
 * @Date: 2019-11-26
 * @Time: 11:50
 */
public class RpcRequestInvoker implements RpcInvoker {

    @Override
    public void handle(Channel channel , Object object) {
        RpcResponse res = new RpcResponse();
        try {
            RpcRequest request;
            if ( !(object instanceof RpcRequest) ) {
                res.setError(new Exception("params must be instance of com.rpc.support.RpcRequest"));
                channel.writeAndFlush(res);
                return;
            }
            request = (RpcRequest) object;
            //将请求的id直接赋给响应实体
            res.setId(request.getId());
            //找到该服务类的处理类
            Object service = ServiceRepository.getService(request.getServiceName());
            if ( service == null ) {
                res.setError(new Exception("can‘t find service for " + request.getServiceName()));
                channel.writeAndFlush(res);
                return;
            }
            //执行方法
            Method method = service.getClass().getMethod(request.getMethodName() , request.getParamsTypes());
            Object invoke = method.invoke(service , request.getParams());
            res.setResponse(invoke);
            channel.writeAndFlush(res);
        } catch (Exception e) {
            res.setError(e);
            channel.writeAndFlush(res);
        }
    }

}
package com.rpc.netty.invoke;

import com.rpc.netty.RpcInvoker;
import com.rpc.support.RpcResponse;
import com.rpc.support.RpcFuture;
import io.netty.channel.Channel;

/**
 * RpcResponse响应处理器
 * @author: zhoum
 * @Date: 2019-11-26
 * @Time: 11:50
 */
public class RpcResponseInvoker implements RpcInvoker {

    @Override
    public void handle(Channel channel , Object object) {
        if ( object instanceof RpcResponse ) {
            RpcResponse resp = (RpcResponse) object;
            //处理响应数据
            RpcFuture.receive(resp);
        }
    }

}

  

  在netty包下创建AbstractHandleAdapter类继承ChannelInboundHandlerAdapter来约束必须传入消息处理器,并且创建netty请求处理器与响应处理器。并交由构造函数中的消息处理器处理

package com.rpc.netty;

import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-29
 * @Time: 9:39
 */
public abstract class AbstractHandleAdapter extends ChannelInboundHandlerAdapter {

    protected RpcInvoker rpcInvoker;

    public AbstractHandleAdapter(RpcInvoker rpcInvoker) {
        this.rpcInvoker = rpcInvoker;
    }
}

 

 

package com.rpc.netty;

import io.netty.channel.ChannelHandlerContext;

/**
 * @Description: 请求处理适配器
 * @author: zhoum
 * @Date: 2019-11-14
 * @Time: 11:40
 */
public class ChannelServerMessageHandler extends AbstractHandleAdapter {


    public ChannelServerMessageHandler(RpcInvoker rpcInvoker) {
        super(rpcInvoker);
    }

    /**
     * 有新连接
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String address = ctx.channel().remoteAddress().toString();
        System.out.println("收到信息: " + address);
        super.channelActive(ctx);
    }

    /**
     * 连接断开
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("断开连接: " + ctx.channel().remoteAddress().toString());
        super.channelInactive(ctx);
    }

    /**
     * 读取到的消息
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {
        System.out.println("服务器收到信息" + msg);
        //处理器处理
        rpcInvoker.handle(ctx.channel() , msg);
    }

    /**
     * 出现异常
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception {
        System.err.println("出现异常: " + ctx.channel().remoteAddress().toString());
        super.exceptionCaught(ctx , cause);
    }
}

 

 

package com.rpc.netty;

import com.rpc.repository.ClientRepository;
import io.netty.channel.ChannelHandlerContext;

import java.net.InetSocketAddress;


/** 
 * netty响应处理适配器
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-14
 * @Time: 11:40
 */
public class ChannelClientMessageHandler extends AbstractHandleAdapter {

    public ChannelClientMessageHandler(RpcInvoker rpcInvoker) {
        super(rpcInvoker);
    }
    /**
     * 有新连接
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String address = ctx.channel().remoteAddress().toString();
        System.out.println("有新连接:"+address);
        super.channelActive(ctx);
    }

    /**
     * 连接断开
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
        //移除连接
        ClientRepository.remove(ipSocket.getAddress().getHostAddress()+":"+ipSocket.getPort());
        System.out.println("连接断开");
        super.channelInactive(ctx);
    }

    /**
     * 读取到的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx , Object msg)  {
        //处理器处理
        rpcInvoker.handle(ctx.channel() , msg);
    }

    /**
     * 出现异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception {
        System.err.println("出现异常: " + ctx.channel().remoteAddress().toString());
        InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
        //移除连接
        ClientRepository.remove(ipSocket.getAddress().getHostAddress()+":"+ipSocket.getPort());
        super.exceptionCaught(ctx , cause);
    }
}

 

 

最后在netty包下创建 MyClientChannelInitializer ,MyServerChannelInitializer  (填之前的坑) ,这个在netty初始化客户端或者服务端的时候用上

package com.rpc.netty;

import com.rpc.netty.serilaze.JsonSerilizer;
import com.rpc.netty.codec.RpcDecoder;
import com.rpc.netty.codec.RpcEncoder;
import com.rpc.netty.invoke.RpcRequestInvoker;
import com.rpc.support.RpcRequest;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


/**
 * @Description: 服务端处理器
 * @author: zhoum
 * @Date: 2019-11-14
 * @Time: 11:25
 */
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        //添加编解码器
        pipeline.addLast(new RpcDecoder(new JsonSerilizer() , RpcRequest.class));
        pipeline.addLast(new RpcEncoder(new JsonSerilizer()));
        //添加消息处理器
        pipeline.addLast(new ChannelServerMessageHandler(new RpcRequestInvoker()));
    }


}
package com.rpc.netty;

import com.rpc.netty.serilaze.JsonSerilizer;
import com.rpc.netty.codec.RpcDecoder;
import com.rpc.netty.codec.RpcEncoder;
import com.rpc.netty.invoke.RpcResponseInvoker;
import com.rpc.support.RpcResponse;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @Description: 客户端处理器
 * @author: zhoum
 * @Date: 2019-11-14
 * @Time: 11:25
 */
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        //添加编解码器
        pipeline.addLast(new RpcEncoder(new JsonSerilizer()));
        pipeline.addLast(new RpcDecoder(new JsonSerilizer() , RpcResponse.class));
        //添加消息处理器
        pipeline.addLast(new ChannelClientMessageHandler(new RpcResponseInvoker()));

    }


}

 

8.在util包下创建ProxyUtil 用来构造代理类

package com.rpc.util;

import com.alibaba.fastjson.JSONObject;
import com.rpc.client.RpcClient;
import com.rpc.support.RpcRequest;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 14:09
 */
public class ProxyUtil implements InvocationHandler {

    public <T> T getProxy(Class<T> t) {
        T o = (T) Proxy.newProxyInstance(t.getClassLoader() , new Class<?>[]{t} , this);
        return o;
    }


    @Override
    public Object invoke(Object proxy , Method method , Object[] args) throws Throwable {
        //创建请求
        RpcRequest request = new RpcRequest();
        //每次请求构造新的id
        request.newId();
        request.setServiceName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParams(args);
        request.setParamsTypes(method.getParameterTypes());
        Object resp = RpcClient.transfer(request);
        Assert.on(resp == null || resp == null,"返回结果序列化错误");
        if ( resp instanceof JSONObject ) {
            Class<?> returnType = method.getReturnType();
            JSONObject jobj = (JSONObject) resp;
            Object o = jobj.toJavaObject(returnType);
            return o;
        }

        return resp;
    }
}

 

至此 框架代码已经开发完毕,测试如下

测试阶段

在依旧在common项目下建立test包 然后创建测试接口和测试类

package com.rpc.test;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 14:38
 */
public interface TestService {

    TestEntity getTest(String username , String password);

    TestEntity getTest1(String username , String password);
}
package com.rpc.test;

import java.io.Serializable;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 14:38
 */
public class TestEntity implements Serializable {

    private String username;

    private String password;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "TestEntity{" +
                "username=‘" + username + ‘\‘‘ +
                ", password=‘" + password + ‘\‘‘ +
                ‘}‘;
    }
}

  

  服务端


然后打开rpc-server项目的pom.xml 引入common项目,内容如下即可
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-parent</artifactId>
        <groupId>com.maglith</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-server</artifactId>
    <dependencies>
        <dependency>
            <artifactId>rpc-common</artifactId>
            <groupId>com.maglith</groupId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

    </dependencies>

</project>

  在rpc-server项目中的resource下创建register.properties文件 内容如下

  

#必须设置注册中心地址
register.host=127.0.0.1:8848
#如果为服务端则必须设置 设置扫描包 多个包以","分割
scanpkgs=com.rpc
#如果为服务端则必须设置 设置本地服务地址
server.host=127.0.0.1
#如果为服务端则必须设置 设置本地服务端口
server.port=8080

  创建上面测试接口的实现类

package com.rpc.netty.invoke;

import com.rpc.anno.RpcService;
import com.rpc.test.TestEntity;
import com.rpc.test.TestService;

/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 14:40
 */
@RpcService(TestService.class)
public class TestServiceImpl implements TestService {
    @Override
    public TestEntity getTest(String username , String password) {
        TestEntity testEntity = new TestEntity();
        testEntity.setUsername(username);
        testEntity.setPassword(password);
        return testEntity;
    }

    @Override
    public TestEntity getTest1(String username , String password) {
        TestEntity testEntity = new TestEntity();
        testEntity.setUsername(username);
        testEntity.setPassword(password);
        return testEntity;
    }
}

  创建测试主类


package com.rpc.util;


import com.rpc.client.Server;


/**
 * @Description:
 * @author: zhoum
 * @Date: 2019-11-27
 * @Time: 14:44
 */
public class RpcServerMain {

    public static void main(String[] args) throws InterruptedException {
        Server.init();
    }
}

  点击运行后出现如下即代表服务端初始化成功

技术分享图片

 

 

  客户端

 

打开rpc-client项目 依旧引入cmmon项目 内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-parent</artifactId>
        <groupId>com.maglith</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-client</artifactId>
    <dependencies>
        <dependency>
            <artifactId>rpc-common</artifactId>
            <groupId>com.maglith</groupId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

    </dependencies>

</project>

  在rpc-client项目中的resource下创建register.properties文件 内容如下

#必须设置注册中心地址
register.host=127.0.0.1:8848

  创建调用测试类

 1 package com.rpc;
 2 
 3 
 4 import com.rpc.util.ProxyUtil;
 5 import com.rpc.test.TestEntity;
 6 import com.rpc.test.TestService;
 7 
 8 /**
 9  * @Description:
10  * @author: zhoum
11  * @Date: 2019-11-27
12  * @Time: 14:41
13  */
14 public class Consumer {
15 
16     public static void main(String[] args) {
17         ProxyUtil proxyUtil = new ProxyUtil();
18         TestService proxy = proxyUtil.getProxy(TestService.class);
19         int i = 1;
20         while ( i < 500 ) {
21             TestEntity test = proxy.getTest("你好" + i++ , "helloword" + i);
22             TestEntity test1 = proxy.getTest1("你好" + i++ , "helloword" + i);
23             System.out.println("收到信息:" + test);
24             System.out.println("收到信息:" + test1);
25         }
26     }
27 }

  然后点击运行

技术分享图片

 

 

  这时候切换到server项目的控制台,会出现如下信息

技术分享图片

 

 

 

到此  整个框架已经编写完毕并且测试成功

主要思想即使用netty构造一个rpc调用框架,并使用nacos作为服务注册与发现中心,也可以使用zookeeper,看个人喜好,可以用来理解分布式微服务的思想。

项目的数据流转如下图

技术分享图片

 

java模拟实现一个分布式服务调用框架

原文:https://www.cnblogs.com/hetutu-5238/p/11958615.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!