首页 > Web开发 > 详细

使用Netty实现通用二进制协议的高效数据传输

时间:2017-01-29 10:43:32      阅读:1224      评论:0      收藏:0      [点我收藏+]

Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。

Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty

本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。

一个好的协议有两个标准:

(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。

(2)传输数据和业务对象之间的转换速度要快。

(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

一、协议的定义

无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。

(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
       编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
       数据格式定义:
       字段1键名长度    字段1键名 字段1值长度    字段1值
       字段2键名长度    字段2键名 字段2值长度    字段2值
       字段3键名长度    字段3键名 字段3值长度    字段3值
       …    …    …    …
       长度为整型,占4个字节

  代码中用两个Vo对象来表示:XLRequest和XLResponse。

  1技术分享package org.jboss.netty.example.xlsvr.vo;
  2技术分享
  3技术分享import java.util.HashMap;
  4技术分享import java.util.Map;
  5技术分享
  6技术分享/**
  7技术分享 *  @author hankchen
10技术分享 *  2012-2-3 下午02:46:52
11技术分享 */
12技术分享
13技术分享
14技术分享/**
15技术分享 * 响应数据
16技术分享 */
17技术分享
18技术分享/**
19技术分享 * 通用协议介绍
20技术分享 * 
21技术分享 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
22技术分享 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
23技术分享 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
24技术分享 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
25技术分享 * 数据格式定义:
26技术分享 * 字段1键名长度    字段1键名 字段1值长度    字段1值
27技术分享 * 字段2键名长度    字段2键名 字段2值长度    字段2值
28技术分享 * 字段3键名长度    字段3键名 字段3值长度    字段3值
29技术分享 * …    …    …    …
30技术分享 * 长度为整型,占4个字节
31技术分享 */
32技术分享public class XLResponse {
33技术分享    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
34技术分享    private byte encrypt;// 加密类型。0表示不加密
35技术分享    private byte extend1;// 用于扩展协议。暂未定义任何值
36技术分享    private byte extend2;// 用于扩展协议。暂未定义任何值
37技术分享    private int sessionid;// 会话ID
38技术分享    private int result;// 结果码
39技术分享    private int length;// 数据包长
40技术分享    
41技术分享    private Map<String,String> values=new HashMap<String, String>();
42技术分享    
43技术分享    private String ip;
44技术分享    
45技术分享    public void setValue(String key,String value){
46技术分享        values.put(key, value);
47技术分享    }
48技术分享    
49技术分享    public String getValue(String key){
50技术分享        if (key==null) {
51技术分享            return null;
52技术分享        }
53技术分享        return values.get(key);
54技术分享    }
55技术分享
56技术分享    public byte getEncode() {
57技术分享        return encode;
58技术分享    }
59技术分享
60技术分享    public void setEncode(byte encode) {
61技术分享        this.encode = encode;
62技术分享    }
63技术分享
64技术分享    public byte getEncrypt() {
65技术分享        return encrypt;
66技术分享    }
67技术分享
68技术分享    public void setEncrypt(byte encrypt) {
69技术分享        this.encrypt = encrypt;
70技术分享    }
71技术分享
72技术分享    public byte getExtend1() {
73技术分享        return extend1;
74技术分享    }
75技术分享
76技术分享    public void setExtend1(byte extend1) {
77技术分享        this.extend1 = extend1;
78技术分享    }
79技术分享
80技术分享    public byte getExtend2() {
81技术分享        return extend2;
82技术分享    }
83技术分享
84技术分享    public void setExtend2(byte extend2) {
85技术分享        this.extend2 = extend2;
86技术分享    }
87技术分享
88技术分享    public int getSessionid() {
89技术分享        return sessionid;
90技术分享    }
91技术分享
92技术分享    public void setSessionid(int sessionid) {
93技术分享        this.sessionid = sessionid;
94技术分享    }
95技术分享
96技术分享    public int getResult() {
97技术分享        return result;
98技术分享    }
99技术分享
100技术分享    public void setResult(int result) {
101技术分享        this.result = result;
102技术分享    }
103技术分享
104技术分享    public int getLength() {
105技术分享        return length;
106技术分享    }
107技术分享
108技术分享    public void setLength(int length) {
109技术分享        this.length = length;
110技术分享    }
111技术分享
112技术分享    public Map<String, String> getValues() {
113技术分享        return values;
114技术分享    }
115技术分享
116技术分享    public String getIp() {
117技术分享        return ip;
118技术分享    }
119技术分享
120技术分享    public void setIp(String ip) {
121技术分享        this.ip = ip;
122技术分享    }
123技术分享
124技术分享    public void setValues(Map<String, String> values) {
125技术分享        this.values = values;
126技术分享    }
127技术分享
128技术分享    @Override
129技术分享    public String toString() {
130技术分享        return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
131技术分享                + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";
132技术分享    }
133技术分享}

 

  1技术分享package org.jboss.netty.example.xlsvr.vo;
  2技术分享
  3技术分享import java.util.HashMap;
  4技术分享import java.util.Map;
  5技术分享
  6技术分享/**
  7技术分享 *  @author hankchen
  8技术分享 *  2012-2-3 下午02:46:41
  9技术分享 */
10技术分享
11技术分享/**
12技术分享 * 请求数据
13技术分享 */
14技术分享
15技术分享/**
16技术分享 * 通用协议介绍
17技术分享 * 
18技术分享 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
19技术分享 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
20技术分享 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
21技术分享 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
22技术分享 * 数据格式定义:
23技术分享 * 字段1键名长度    字段1键名 字段1值长度    字段1值
24技术分享 * 字段2键名长度    字段2键名 字段2值长度    字段2值
25技术分享 * 字段3键名长度    字段3键名 字段3值长度    字段3值
26技术分享 * …    …    …    …
27技术分享 * 长度为整型,占4个字节
28技术分享 */
29技术分享public class XLRequest {
30技术分享    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
31技术分享    private byte encrypt;// 加密类型。0表示不加密
32技术分享    private byte extend1;// 用于扩展协议。暂未定义任何值
33技术分享    private byte extend2;// 用于扩展协议。暂未定义任何值
34技术分享    private int sessionid;// 会话ID
35技术分享    private int command;// 命令
36技术分享    private int length;// 数据包长
37技术分享    
38技术分享    private Map<String,String> params=new HashMap<String, String>(); //参数
39技术分享    
40技术分享    private String ip;
41技术分享
42技术分享    public byte getEncode() {
43技术分享        return encode;
44技术分享    }
45技术分享
46技术分享    public void setEncode(byte encode) {
47技术分享        this.encode = encode;
48技术分享    }
49技术分享
50技术分享    public byte getEncrypt() {
51技术分享        return encrypt;
52技术分享    }
53技术分享
54技术分享    public void setEncrypt(byte encrypt) {
55技术分享        this.encrypt = encrypt;
56技术分享    }
57技术分享
58技术分享    public byte getExtend1() {
59技术分享        return extend1;
60技术分享    }
61技术分享
62技术分享    public void setExtend1(byte extend1) {
63技术分享        this.extend1 = extend1;
64技术分享    }
65技术分享
66技术分享    public byte getExtend2() {
67技术分享        return extend2;
68技术分享    }
69技术分享
70技术分享    public void setExtend2(byte extend2) {
71技术分享        this.extend2 = extend2;
72技术分享    }
73技术分享
74技术分享    public int getSessionid() {
75技术分享        return sessionid;
76技术分享    }
77技术分享
78技术分享    public void setSessionid(int sessionid) {
79技术分享        this.sessionid = sessionid;
80技术分享    }
81技术分享
82技术分享    public int getCommand() {
83技术分享        return command;
84技术分享    }
85技术分享
86技术分享    public void setCommand(int command) {
87技术分享        this.command = command;
88技术分享    }
89技术分享
90技术分享    public int getLength() {
91技术分享        return length;
92技术分享    }
93技术分享
94技术分享    public void setLength(int length) {
95技术分享        this.length = length;
96技术分享    }
97技术分享
98技术分享    public Map<String, String> getParams() {
99技术分享        return params;
100技术分享    }
101技术分享    
102技术分享    public void setValue(String key,String value){
103技术分享        params.put(key, value);
104技术分享    }
105技术分享    
106技术分享    public String getValue(String key){
107技术分享        if (key==null) {
108技术分享            return null;
109技术分享        }
110技术分享        return params.get(key);
111技术分享    }
112技术分享
113技术分享    public String getIp() {
114技术分享        return ip;
115技术分享    }
116技术分享
117技术分享    public void setIp(String ip) {
118技术分享        this.ip = ip;
119技术分享    }
120技术分享
121技术分享    public void setParams(Map<String, String> params) {
122技术分享        this.params = params;
123技术分享    }
124技术分享
125技术分享    @Override
126技术分享    public String toString() {
127技术分享        return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
128技术分享                + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";
129技术分享    }
130技术分享}
131技术分享

二、协议的编码和解码

对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。

 

1技术分享package org.jboss.netty.example.xlsvr.codec;
2技术分享
3技术分享import java.nio.ByteBuffer;
4技术分享
5技术分享import org.jboss.netty.buffer.ChannelBuffer;
6技术分享import org.jboss.netty.buffer.ChannelBuffers;
7技术分享import org.jboss.netty.channel.ChannelHandlerContext;
8技术分享import org.jboss.netty.channel.Channels;
9技术分享import org.jboss.netty.channel.MessageEvent;
10技术分享import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
11技术分享import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
12技术分享import org.jboss.netty.example.xlsvr.vo.XLResponse;
13技术分享import org.slf4j.Logger;
14技术分享import org.slf4j.LoggerFactory;
15技术分享
16技术分享/**
17技术分享 *  @author hankchen
18技术分享 *  2012-2-3 上午10:48:15
19技术分享 */
20技术分享
21技术分享/**
22技术分享 * 服务器端编码器
23技术分享 */
24技术分享public class XLServerEncoder extends SimpleChannelDownstreamHandler {
25技术分享    Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);
26技术分享    
27技术分享    @Override
28技术分享    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
29技术分享        XLResponse response=(XLResponse)e.getMessage();
30技术分享        ByteBuffer headBuffer=ByteBuffer.allocate(16);
31技术分享        /**
32技术分享         * 先组织报文头
33技术分享         */
34技术分享        headBuffer.put(response.getEncode());
35技术分享        headBuffer.put(response.getEncrypt());
36技术分享        headBuffer.put(response.getExtend1());
37技术分享        headBuffer.put(response.getExtend2());
38技术分享        headBuffer.putInt(response.getSessionid());
39技术分享        headBuffer.putInt(response.getResult());
40技术分享        
41技术分享        /**
42技术分享         * 组织报文的数据部分
43技术分享         */
44技术分享        ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues()); 
45技术分享        int length=dataBuffer.readableBytes();
46技术分享        headBuffer.putInt(length);
47技术分享        /**
48技术分享         * 非常重要
49技术分享         * ByteBuffer需要手动flip(),ChannelBuffer不需要
50技术分享         */
51技术分享        headBuffer.flip();
52技术分享        ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();
53技术分享        totalBuffer.writeBytes(headBuffer);
54技术分享        logger.info("totalBuffer size="+totalBuffer.readableBytes());
55技术分享        totalBuffer.writeBytes(dataBuffer);
56技术分享        logger.info("totalBuffer size="+totalBuffer.readableBytes());
57技术分享        Channels.write(ctx, e.getFuture(), totalBuffer);
58技术分享    }
59技术分享
60技术分享}
61技术分享

 

 

1技术分享package org.jboss.netty.example.xlsvr.codec;
2技术分享
3技术分享import org.jboss.netty.buffer.ChannelBuffer;
4技术分享import org.jboss.netty.buffer.ChannelBuffers;
5技术分享import org.jboss.netty.channel.Channel;
6技术分享import org.jboss.netty.channel.ChannelHandlerContext;
7技术分享import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
8技术分享import org.jboss.netty.example.xlsvr.vo.XLResponse;
9技术分享import org.jboss.netty.handler.codec.frame.FrameDecoder;
10技术分享
11技术分享/**
12技术分享 *  @author hankchen
13技术分享 *  2012-2-3 上午10:47:54
14技术分享 */
15技术分享
16技术分享/**
17技术分享 * 客户端解码器
18技术分享 */
19技术分享public class XLClientDecoder extends FrameDecoder {
20技术分享
21技术分享    @Override
22技术分享    protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {
23技术分享        if (buffer.readableBytes()<16) {
24技术分享            return null;
25技术分享        }
26技术分享        buffer.markReaderIndex();
27技术分享        byte encode=buffer.readByte();
28技术分享        byte encrypt=buffer.readByte();
29技术分享        byte extend1=buffer.readByte();
30技术分享        byte extend2=buffer.readByte();
31技术分享        int sessionid=buffer.readInt();
32技术分享        int result=buffer.readInt();
33技术分享        int length=buffer.readInt(); // 数据包长
34技术分享        if (buffer.readableBytes()<length) {
35技术分享            buffer.resetReaderIndex();
36技术分享            return null;
37技术分享        }
38技术分享        ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);
39技术分享        buffer.readBytes(dataBuffer, length);
40技术分享        
41技术分享        XLResponse response=new XLResponse();
42技术分享        response.setEncode(encode);
43技术分享        response.setEncrypt(encrypt);
44技术分享        response.setExtend1(extend1);
45技术分享        response.setExtend2(extend2);
46技术分享        response.setSessionid(sessionid);
47技术分享        response.setResult(result);
48技术分享        response.setLength(length);
49技术分享        response.setValues(ProtocolUtil.decode(encode, dataBuffer));
50技术分享        response.setIp(ProtocolUtil.getClientIp(channel));
51技术分享        return response;
52技术分享    }
53技术分享
54技术分享}

 

  1技术分享package org.jboss.netty.example.xlsvr.util;
  2技术分享
  3技术分享import java.net.SocketAddress;
  4技术分享import java.nio.charset.Charset;
  5技术分享import java.util.HashMap;
  6技术分享import java.util.Map;
  7技术分享import java.util.Map.Entry;
  8技术分享
  9技术分享import org.jboss.netty.buffer.ChannelBuffer;
10技术分享import org.jboss.netty.buffer.ChannelBuffers;
11技术分享import org.jboss.netty.channel.Channel;
12技术分享
13技术分享/**
14技术分享 *  @author hankchen
15技术分享 *  2012-2-4 下午01:57:33
16技术分享 */
17技术分享public class ProtocolUtil {
18技术分享    
19技术分享    /**
20技术分享     * 编码报文的数据部分
21技术分享     * @param encode
22技术分享     * @param values
23技术分享     * @return
24技术分享     */
25技术分享    public static ChannelBuffer encode(int encode,Map<String,String> values){
26技术分享        ChannelBuffer totalBuffer=null;
27技术分享        if (values!=null && values.size()>0) {
28技术分享            totalBuffer=ChannelBuffers.dynamicBuffer();
29技术分享            int length=0,index=0;
30技术分享            ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];
31技术分享            Charset charset=XLCharSetFactory.getCharset(encode);
32技术分享            for(Entry<String,String> entry:values.entrySet()){
33技术分享                String key=entry.getKey();
34技术分享                String value=entry.getValue();
35技术分享                ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
36技术分享                buffer.writeInt(key.length());
37技术分享                buffer.writeBytes(key.getBytes(charset));
38技术分享                buffer.writeInt(value.length());
39技术分享                buffer.writeBytes(value.getBytes(charset));
40技术分享                channelBuffers[index++]=buffer;
41技术分享                length+=buffer.readableBytes();
42技术分享            }
43技术分享            
44技术分享            for (int i = 0; i < channelBuffers.length; i++) {
45技术分享                totalBuffer.writeBytes(channelBuffers[i]);
46技术分享            }
47技术分享        }
48技术分享        return totalBuffer;
49技术分享    }
50技术分享    
51技术分享    /**
52技术分享     * 解码报文的数据部分
53技术分享     * @param encode
54技术分享     * @param dataBuffer
55技术分享     * @return
56技术分享     */
57技术分享    public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){
58技术分享        Map<String,String> dataMap=new HashMap<String, String>();
59技术分享        if (dataBuffer!=null && dataBuffer.readableBytes()>0) {
60技术分享            int processIndex=0,length=dataBuffer.readableBytes();
61技术分享            Charset charset=XLCharSetFactory.getCharset(encode);
62技术分享            while(processIndex<length){
63技术分享                /**
64技术分享                 * 获取Key
65技术分享                 */
66技术分享                int size=dataBuffer.readInt();
67技术分享                byte [] contents=new byte [size];
68技术分享                dataBuffer.readBytes(contents);
69技术分享                String key=new String(contents, charset);
70技术分享                processIndex=processIndex+size+4;
71技术分享                /**
72技术分享                 * 获取Value
73技术分享                 */
74技术分享                size=dataBuffer.readInt();
75技术分享                contents=new byte [size];
76技术分享                dataBuffer.readBytes(contents);
77技术分享                String value=new String(contents, charset);
78技术分享                dataMap.put(key, value);
79技术分享                processIndex=processIndex+size+4;
80技术分享            }
81技术分享        }
82技术分享        return dataMap;
83技术分享    }
84技术分享    
85技术分享    /**
86技术分享     * 获取客户端IP
87技术分享     * @param channel
88技术分享     * @return
89技术分享     */
90技术分享    public static String getClientIp(Channel channel){
91技术分享        /**
92技术分享         * 获取客户端IP
93技术分享         */
94技术分享        SocketAddress address = channel.getRemoteAddress();
95技术分享        String ip = "";
96技术分享        if (address != null) {
97技术分享            ip = address.toString().trim();
98技术分享            int index = ip.lastIndexOf(‘:‘);
99技术分享            if (index < 1) {
100技术分享                index = ip.length();
101技术分享            }
102技术分享            ip = ip.substring(1, index);
103技术分享        }
104技术分享        if (ip.length() > 15) {
105技术分享            ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));
106技术分享        }
107技术分享        return ip;
108技术分享    }
109技术分享}
110技术分享

三、服务器端实现

服务器端提供的功能是:

1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。

2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。

为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。

具体代码如下:

1技术分享package org.jboss.netty.example.xlsvr;
2技术分享
3技术分享import java.net.InetSocketAddress;
4技术分享import java.util.concurrent.Executors;
5技术分享
6技术分享import org.jboss.netty.bootstrap.ServerBootstrap;
7技术分享import org.jboss.netty.channel.Channel;
8技术分享import org.jboss.netty.channel.ChannelPipeline;
9技术分享import org.jboss.netty.channel.group.ChannelGroup;
10技术分享import org.jboss.netty.channel.group.ChannelGroupFuture;
11技术分享import org.jboss.netty.channel.group.DefaultChannelGroup;
12技术分享import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
13技术分享import org.jboss.netty.example.xlsvr.codec.XLServerEncoder;
14技术分享import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
15技术分享import org.jboss.netty.handler.codec.frame.Delimiters;
16技术分享import org.jboss.netty.handler.codec.string.StringDecoder;
17技术分享import org.jboss.netty.util.CharsetUtil;
18技术分享import org.slf4j.Logger;
19技术分享import org.slf4j.LoggerFactory;
20技术分享
21技术分享/**
22技术分享 *  @author hankchen
23技术分享 *  2012-1-30 下午03:21:38
24技术分享 */
25技术分享
26技术分享public class XLServer {
27技术分享    public static final int port =8080;
28技术分享    public static final Logger logger=LoggerFactory.getLogger(XLServer.class);
29技术分享    public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");
30技术分享    private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
31技术分享    
32技术分享    public static void main(String [] args){
33技术分享        try {
34技术分享            XLServer.startup();
35技术分享        } catch (Exception e) {
36技术分享            e.printStackTrace();
37技术分享        }
38技术分享    }
39技术分享    
40技术分享    public static boolean startup() throws Exception{
41技术分享        /**
42技术分享         * 采用默认ChannelPipeline管道
43技术分享         * 这意味着同一个XLServerHandler实例将被多个Channel通道共享
44技术分享         * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!
45技术分享         */
46技术分享        ChannelPipeline pipeline=serverBootstrap.getPipeline(); 
47技术分享        /**
48技术分享         * 解码器是基于文本行的协议,\r\n或者\n\r
49技术分享         */
50技术分享        pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
51技术分享        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
52技术分享        pipeline.addLast("encoder", new XLServerEncoder());
53技术分享        pipeline.addLast("handler", new XLServerHandler());
54技术分享        
55技术分享        serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀
56技术分享        serverBootstrap.setOption("child.keepAlive", true); //注意child前缀
57技术分享        
58技术分享        /**
59技术分享         * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象
60技术分享         */
61技术分享        Channel channel=serverBootstrap.bind(new InetSocketAddress(port));
62技术分享        allChannels.add(channel);
63技术分享        logger.info("server is started on port "+port);
64技术分享        return false;
65技术分享    }
66技术分享    
67技术分享    public static void shutdown() throws Exception{
68技术分享        try {
69技术分享            /**
70技术分享             * 主动关闭服务器
71技术分享             */
72技术分享            ChannelGroupFuture future=allChannels.close();
73技术分享            future.awaitUninterruptibly();//阻塞,直到服务器关闭
74技术分享            //serverBootstrap.releaseExternalResources();
75技术分享        } catch (Exception e) {
76技术分享            e.printStackTrace();
77技术分享            logger.error(e.getMessage(),e);
78技术分享        }
79技术分享        finally{
80技术分享            logger.info("server is shutdown on port "+port);
81技术分享            System.exit(1);
82技术分享        }
83技术分享    }
84技术分享}
85技术分享

 

  1技术分享package org.jboss.netty.example.xlsvr;
  2技术分享
  3技术分享import java.util.Random;
  4技术分享
  5技术分享import org.jboss.netty.channel.Channel;
  6技术分享import org.jboss.netty.channel.ChannelFuture;
  7技术分享import org.jboss.netty.channel.ChannelHandlerContext;
  8技术分享import org.jboss.netty.channel.ChannelHandler.Sharable;
  9技术分享import org.jboss.netty.channel.ChannelStateEvent;
10技术分享import org.jboss.netty.channel.ExceptionEvent;
11技术分享import org.jboss.netty.channel.MessageEvent;
12技术分享import org.jboss.netty.channel.SimpleChannelHandler;
13技术分享import org.jboss.netty.example.xlsvr.vo.XLResponse;
14技术分享import org.slf4j.Logger;
15技术分享import org.slf4j.LoggerFactory;
16技术分享
17技术分享/**
18技术分享 *  @author hankchen
19技术分享 *  2012-1-30 下午03:22:24
20技术分享 */
21技术分享
22技术分享@Sharable
23技术分享public class XLServerHandler extends SimpleChannelHandler {
24技术分享    private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);
25技术分享    
26技术分享    @Override
27技术分享    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
28技术分享        logger.info("messageReceived");
29技术分享        if (e.getMessage() instanceof String) {
30技术分享            String content=(String)e.getMessage();
31技术分享            logger.info("content is "+content);
32技术分享            if ("shutdown".equalsIgnoreCase(content)) {
33技术分享                //e.getChannel().close();
34技术分享                XLServer.shutdown();
35技术分享            }else {
36技术分享                sendResponse(ctx);
37技术分享            }
38技术分享        }else {
39技术分享            logger.error("message is not a String.");
40技术分享            e.getChannel().close();
41技术分享        }
42技术分享    }
43技术分享
44技术分享    @Override
45技术分享    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
46技术分享        logger.error(e.getCause().getMessage(),e.getCause());
47技术分享        e.getCause().printStackTrace();
48技术分享        e.getChannel().close();
49技术分享    }
50技术分享
51技术分享    @Override
52技术分享    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
53技术分享        logger.info("channelConnected");
54技术分享        sendResponse(ctx);
55技术分享    }
56技术分享
57技术分享    @Override
58技术分享    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
59技术分享        logger.info("channelClosed");
60技术分享        //删除通道
61技术分享        XLServer.allChannels.remove(e.getChannel());
62技术分享    }
63技术分享
64技术分享    @Override
65技术分享    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
66技术分享        logger.info("channelDisconnected");
67技术分享        super.channelDisconnected(ctx, e);
68技术分享    }
69技术分享
70技术分享    @Override
71技术分享    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
72技术分享        logger.info("channelOpen");
73技术分享        //增加通道
74技术分享        XLServer.allChannels.add(e.getChannel());
75技术分享    }
76技术分享
77技术分享    /**
78技术分享     * 发送响应内容
79技术分享     * @param ctx
80技术分享     * @param e
81技术分享     * @return
82技术分享     */
83技术分享    private ChannelFuture sendResponse(ChannelHandlerContext ctx){
84技术分享        Channel channel=ctx.getChannel();
85技术分享        Random random=new Random();
86技术分享        XLResponse response=new XLResponse();
87技术分享        response.setEncode((byte)0);
88技术分享        response.setResult(1);
89技术分享        response.setValue("name","hankchen");
90技术分享        response.setValue("time", String.valueOf(System.currentTimeMillis()));
91技术分享        response.setValue("age",String.valueOf(random.nextInt()));
92技术分享        /**
93技术分享         * 发送接收信息的时间戳到客户端
94技术分享         * 注意:Netty中所有的IO操作都是异步的!
95技术分享         */
96技术分享        ChannelFuture future=channel.write(response); //发送内容
97技术分享        return future;
98技术分享    }
99技术分享}
100技术分享

四、客户端实现

客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。

关键代码如下:

1技术分享/**
2技术分享 *  Copyright (C): 2012
3技术分享 *  @author hankchen
4技术分享 *  2012-1-30 下午03:21:26
5技术分享 */
6技术分享
7技术分享/**
8技术分享 * 服务器特征:
9技术分享 * 1、使用专用解码器解析服务器发过来的数据
10技术分享 * 2、客户端主动关闭连接
11技术分享 */
12技术分享public class XLClient {
13技术分享    public static final int port =XLServer.port;
14技术分享    public static final String host ="localhost";
15技术分享    private static final Logger logger=LoggerFactory.getLogger(XLClient.class);
16技术分享    private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
17技术分享    private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);
18技术分享    
19技术分享    /**
20技术分享     * @param args
21技术分享     * @throws Exception 
22技术分享     */
23技术分享    public static void main(String[] args) throws Exception {
24技术分享        ChannelFuture future=XLClient.startup();
25技术分享        logger.info("future state is "+future.isSuccess());
26技术分享    }
27技术分享    
28技术分享    /**
29技术分享     * 启动客户端
30技术分享     * @return
31技术分享     * @throws Exception
32技术分享     */
33技术分享    public static ChannelFuture startup() throws Exception {
34技术分享        /**
35技术分享         * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式
36技术分享         * 例如,下面的代码形式是错误的:
37技术分享         * ChannelPipeline pipeline=clientBootstrap.getPipeline();
38技术分享         * pipeline.addLast("handler", new XLClientHandler());
39技术分享         */
40技术分享        clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置
41技术分享        /**
42技术分享         * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象
43技术分享         */
44技术分享        clientBootstrap.setOption("tcpNoDelay", true);
45技术分享        clientBootstrap.setOption("keepAlive", true);
46技术分享        
47技术分享        ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));
48技术分享        /**
49技术分享         * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态
50技术分享         */
51技术分享        future.awaitUninterruptibly();
52技术分享        /**
53技术分享         * 如果连接失败,我们将打印连接失败的原因。
54技术分享         * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。
55技术分享         */
56技术分享        if (!future.isSuccess()) {
57技术分享            future.getCause().printStackTrace();
58技术分享        }else {
59技术分享            logger.info("client is connected to server "+host+":"+port);
60技术分享        }
61技术分享        return future;
62技术分享    }
63技术分享    
64技术分享    /**
65技术分享     * 关闭客户端
66技术分享     * @param future
67技术分享     * @throws Exception
68技术分享     */
69技术分享    public static void shutdown(ChannelFuture future) throws Exception{
70技术分享        try {
71技术分享            /**
72技术分享             * 主动关闭客户端连接,会阻塞等待直到通道关闭
73技术分享             */
74技术分享            future.getChannel().close().awaitUninterruptibly();
75技术分享            //future.getChannel().getCloseFuture().awaitUninterruptibly();
76技术分享            /**
77技术分享             * 释放ChannelFactory通道工厂使用的资源。
78技术分享             * 这一步仅需要调用 releaseExternalResources()方法即可。
79技术分享             * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。
80技术分享             */
81技术分享            clientBootstrap.releaseExternalResources();
82技术分享        } catch (Exception e) {
83技术分享            e.printStackTrace();
84技术分享            logger.error(e.getMessage(),e);
85技术分享        }
86技术分享        finally{
87技术分享            System.exit(1);
88技术分享            logger.info("client is shutdown to server "+host+":"+port);
89技术分享        }
90技术分享    }
91技术分享}

 

1技术分享public class XLClientPipelineFactory implements ChannelPipelineFactory{
2技术分享
3技术分享    @Override
4技术分享    public ChannelPipeline getPipeline() throws Exception {
5技术分享        ChannelPipeline pipeline=Channels.pipeline();
6技术分享        /**
7技术分享         * 使用专用的解码器,解决数据分段的问题
8技术分享         * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。
9技术分享         */
10技术分享        pipeline.addLast("decoder", new XLClientDecoder());
11技术分享        /**
12技术分享         * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!
13技术分享         */
14技术分享        pipeline.addLast("handler", new XLClientHandler());
15技术分享        return pipeline;
16技术分享    }
17技术分享
18技术分享}

 

1技术分享/**
2技术分享 *  Copyright (C): 2012
3技术分享 *  @author hankchen
4技术分享 *  2012-1-30 下午03:21:52
5技术分享 */
6技术分享
7技术分享/**
8技术分享 * 服务器特征:
9技术分享 * 1、使用专用的编码解码器,解决数据分段的问题
10技术分享 * 2、使用POJO替代ChannelBuffer传输
11技术分享 */
12技术分享public class XLClientHandler extends SimpleChannelHandler {
13技术分享    private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);
14技术分享    private final AtomicInteger count=new AtomicInteger(0); //计数器
15技术分享    
16技术分享    @Override
17技术分享    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
18技术分享        processMethod1(ctx, e); //处理方式一
19技术分享    }
20技术分享    
21技术分享    /**
22技术分享     * @param ctx
23技术分享     * @param e
24技术分享     * @throws Exception
25技术分享     */
26技术分享    public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
27技术分享        logger.info("processMethod1……,count="+count.addAndGet(1));
28技术分享        XLResponse serverTime=(XLResponse)e.getMessage();
29技术分享        logger.info("messageReceived,content:"+serverTime.toString());
30技术分享        Thread.sleep(1000);
31技术分享        
32技术分享        if (count.get()<10) {
33技术分享            //从新发送请求获取最新的服务器时间
34技术分享            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes()));
35技术分享        }else{
36技术分享            //从新发送请求关闭服务器
37技术分享            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes()));
38技术分享        }
39技术分享    }
40技术分享    
41技术分享    @Override
42技术分享    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
43技术分享        logger.info("exceptionCaught");
44技术分享        e.getCause().printStackTrace();
45技术分享        ctx.getChannel().close();
46技术分享    }
47技术分享
48技术分享    @Override
49技术分享    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
50技术分享        logger.info("channelClosed");
51技术分享        super.channelClosed(ctx, e);
52技术分享    }
53技术分享    
54技术分享    
55技术分享}

全文代码较多,写了很多注释,希望对读者有用,谢谢!

(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

http://www.blogjava.net/hankchen/archive/2012/02/04/369378.html

 

使用Netty实现通用二进制协议的高效数据传输

原文:http://www.cnblogs.com/findumars/p/6357305.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!