Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。
Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty
本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。
一个好的协议有两个标准:
(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。
(2)传输数据和业务对象之间的转换速度要快。
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
一、协议的定义
无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。
(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
数据格式定义:
字段1键名长度 字段1键名 字段1值长度 字段1值
字段2键名长度 字段2键名 字段2值长度 字段2值
字段3键名长度 字段3键名 字段3值长度 字段3值
… … … …
长度为整型,占4个字节
代码中用两个Vo对象来表示:XLRequest和XLResponse。
1

package org.jboss.netty.example.xlsvr.vo;
2

3

import java.util.HashMap;
4

import java.util.Map;
5

6
/**
7
* @author hankchen
10
* 2012-2-3 下午02:46:52
11
*/
12
13
14
/**
15
* 响应数据
16
*/
17
18
/**
19
* 通用协议介绍
20
*
21
* 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
22
* (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
23
* 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
24
* (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
25
* 数据格式定义:
26
* 字段1键名长度 字段1键名 字段1值长度 字段1值
27
* 字段2键名长度 字段2键名 字段2值长度 字段2值
28
* 字段3键名长度 字段3键名 字段3值长度 字段3值
29
* … … … …
30
* 长度为整型,占4个字节
31
*/
32
public class XLResponse {
33
private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
34
private byte encrypt;// 加密类型。0表示不加密
35
private byte extend1;// 用于扩展协议。暂未定义任何值
36
private byte extend2;// 用于扩展协议。暂未定义任何值
37
private int sessionid;// 会话ID
38
private int result;// 结果码
39
private int length;// 数据包长
40
41
private Map<String,String> values=new HashMap<String, String>();
42
43
private String ip;
44
45
public void setValue(String key,String value){
46
values.put(key, value);
47
}
48
49
public String getValue(String key){
50
if (key==null) {
51
return null;
52
}
53
return values.get(key);
54
}
55
56
public byte getEncode() {
57
return encode;
58
}
59
60
public void setEncode(byte encode) {
61
this.encode = encode;
62
}
63
64
public byte getEncrypt() {
65
return encrypt;
66
}
67
68
public void setEncrypt(byte encrypt) {
69
this.encrypt = encrypt;
70
}
71
72
public byte getExtend1() {
73
return extend1;
74
}
75
76
public void setExtend1(byte extend1) {
77
this.extend1 = extend1;
78
}
79
80
public byte getExtend2() {
81
return extend2;
82
}
83
84
public void setExtend2(byte extend2) {
85
this.extend2 = extend2;
86
}
87
88
public int getSessionid() {
89
return sessionid;
90
}
91
92
public void setSessionid(int sessionid) {
93
this.sessionid = sessionid;
94
}
95
96
public int getResult() {
97
return result;
98
}
99
100
public void setResult(int result) {
101
this.result = result;
102
}
103
104
public int getLength() {
105
return length;
106
}
107
108
public void setLength(int length) {
109
this.length = length;
110
}
111
112
public Map<String, String> getValues() {
113
return values;
114
}
115
116
public String getIp() {
117
return ip;
118
}
119
120
public void setIp(String ip) {
121
this.ip = ip;
122
}
123
124
public void setValues(Map<String, String> values) {
125
this.values = values;
126
}
127
128
@Override
129
public String toString() {
130
return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
131
+ ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";
132
}
133
}
1

package org.jboss.netty.example.xlsvr.vo;
2

3

import java.util.HashMap;
4

import java.util.Map;
5

6
/**
7
* @author hankchen
8
* 2012-2-3 下午02:46:41
9
*/
10
11
/**
12
* 请求数据
13
*/
14
15
/**
16
* 通用协议介绍
17
*
18
* 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
19
* (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
20
* 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
21
* (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
22
* 数据格式定义:
23
* 字段1键名长度 字段1键名 字段1值长度 字段1值
24
* 字段2键名长度 字段2键名 字段2值长度 字段2值
25
* 字段3键名长度 字段3键名 字段3值长度 字段3值
26
* … … … …
27
* 长度为整型,占4个字节
28
*/
29
public class XLRequest {
30
private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
31
private byte encrypt;// 加密类型。0表示不加密
32
private byte extend1;// 用于扩展协议。暂未定义任何值
33
private byte extend2;// 用于扩展协议。暂未定义任何值
34
private int sessionid;// 会话ID
35
private int command;// 命令
36
private int length;// 数据包长
37
38
private Map<String,String> params=new HashMap<String, String>(); //参数
39
40
private String ip;
41
42
public byte getEncode() {
43
return encode;
44
}
45
46
public void setEncode(byte encode) {
47
this.encode = encode;
48
}
49
50
public byte getEncrypt() {
51
return encrypt;
52
}
53
54
public void setEncrypt(byte encrypt) {
55
this.encrypt = encrypt;
56
}
57
58
public byte getExtend1() {
59
return extend1;
60
}
61
62
public void setExtend1(byte extend1) {
63
this.extend1 = extend1;
64
}
65
66
public byte getExtend2() {
67
return extend2;
68
}
69
70
public void setExtend2(byte extend2) {
71
this.extend2 = extend2;
72
}
73
74
public int getSessionid() {
75
return sessionid;
76
}
77
78
public void setSessionid(int sessionid) {
79
this.sessionid = sessionid;
80
}
81
82
public int getCommand() {
83
return command;
84
}
85
86
public void setCommand(int command) {
87
this.command = command;
88
}
89
90
public int getLength() {
91
return length;
92
}
93
94
public void setLength(int length) {
95
this.length = length;
96
}
97
98
public Map<String, String> getParams() {
99
return params;
100
}
101
102
public void setValue(String key,String value){
103
params.put(key, value);
104
}
105
106
public String getValue(String key){
107
if (key==null) {
108
return null;
109
}
110
return params.get(key);
111
}
112
113
public String getIp() {
114
return ip;
115
}
116
117
public void setIp(String ip) {
118
this.ip = ip;
119
}
120
121
public void setParams(Map<String, String> params) {
122
this.params = params;
123
}
124
125
@Override
126
public String toString() {
127
return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
128
+ ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";
129
}
130
}
131
二、协议的编码和解码
对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。
1

package org.jboss.netty.example.xlsvr.util;
2

3

import java.net.SocketAddress;
4

import java.nio.charset.Charset;
5

import java.util.HashMap;
6

import java.util.Map;
7

import java.util.Map.Entry;
8

9

import org.jboss.netty.buffer.ChannelBuffer;
10

import org.jboss.netty.buffer.ChannelBuffers;
11

import org.jboss.netty.channel.Channel;
12

13
/**
14
* @author hankchen
15
* 2012-2-4 下午01:57:33
16
*/
17
public class ProtocolUtil {
18
19
/**
20
* 编码报文的数据部分
21
* @param encode
22
* @param values
23
* @return
24
*/
25
public static ChannelBuffer encode(int encode,Map<String,String> values){
26
ChannelBuffer totalBuffer=null;
27
if (values!=null && values.size()>0) {
28
totalBuffer=ChannelBuffers.dynamicBuffer();
29
int length=0,index=0;
30
ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];
31
Charset charset=XLCharSetFactory.getCharset(encode);
32
for(Entry<String,String> entry:values.entrySet()){
33
String key=entry.getKey();
34
String value=entry.getValue();
35
ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
36
buffer.writeInt(key.length());
37
buffer.writeBytes(key.getBytes(charset));
38
buffer.writeInt(value.length());
39
buffer.writeBytes(value.getBytes(charset));
40
channelBuffers[index++]=buffer;
41
length+=buffer.readableBytes();
42
}
43
44
for (int i = 0; i < channelBuffers.length; i++) {
45
totalBuffer.writeBytes(channelBuffers[i]);
46
}
47
}
48
return totalBuffer;
49
}
50
51
/**
52
* 解码报文的数据部分
53
* @param encode
54
* @param dataBuffer
55
* @return
56
*/
57
public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){
58
Map<String,String> dataMap=new HashMap<String, String>();
59
if (dataBuffer!=null && dataBuffer.readableBytes()>0) {
60
int processIndex=0,length=dataBuffer.readableBytes();
61
Charset charset=XLCharSetFactory.getCharset(encode);
62
while(processIndex<length){
63
/**
64
* 获取Key
65
*/
66
int size=dataBuffer.readInt();
67
byte [] contents=new byte [size];
68
dataBuffer.readBytes(contents);
69
String key=new String(contents, charset);
70
processIndex=processIndex+size+4;
71
/**
72
* 获取Value
73
*/
74
size=dataBuffer.readInt();
75
contents=new byte [size];
76
dataBuffer.readBytes(contents);
77
String value=new String(contents, charset);
78
dataMap.put(key, value);
79
processIndex=processIndex+size+4;
80
}
81
}
82
return dataMap;
83
}
84
85
/**
86
* 获取客户端IP
87
* @param channel
88
* @return
89
*/
90
public static String getClientIp(Channel channel){
91
/**
92
* 获取客户端IP
93
*/
94
SocketAddress address = channel.getRemoteAddress();
95
String ip = "";
96
if (address != null) {
97
ip = address.toString().trim();
98
int index = ip.lastIndexOf(‘:‘);
99
if (index < 1) {
100
index = ip.length();
101
}
102
ip = ip.substring(1, index);
103
}
104
if (ip.length() > 15) {
105
ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));
106
}
107
return ip;
108
}
109
}
110
三、服务器端实现
服务器端提供的功能是:
1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。
2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。
为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。
具体代码如下:
1

package org.jboss.netty.example.xlsvr;
2

3

import java.util.Random;
4

5

import org.jboss.netty.channel.Channel;
6

import org.jboss.netty.channel.ChannelFuture;
7

import org.jboss.netty.channel.ChannelHandlerContext;
8

import org.jboss.netty.channel.ChannelHandler.Sharable;
9

import org.jboss.netty.channel.ChannelStateEvent;
10

import org.jboss.netty.channel.ExceptionEvent;
11

import org.jboss.netty.channel.MessageEvent;
12

import org.jboss.netty.channel.SimpleChannelHandler;
13

import org.jboss.netty.example.xlsvr.vo.XLResponse;
14

import org.slf4j.Logger;
15

import org.slf4j.LoggerFactory;
16

17
/**
18
* @author hankchen
19
* 2012-1-30 下午03:22:24
20
*/
21
22
@Sharable
23
public class XLServerHandler extends SimpleChannelHandler {
24
private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);
25
26
@Override
27
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
28
logger.info("messageReceived");
29
if (e.getMessage() instanceof String) {
30
String content=(String)e.getMessage();
31
logger.info("content is "+content);
32
if ("shutdown".equalsIgnoreCase(content)) {
33
//e.getChannel().close();
34
XLServer.shutdown();
35
}else {
36
sendResponse(ctx);
37
}
38
}else {
39
logger.error("message is not a String.");
40
e.getChannel().close();
41
}
42
}
43
44
@Override
45
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
46
logger.error(e.getCause().getMessage(),e.getCause());
47
e.getCause().printStackTrace();
48
e.getChannel().close();
49
}
50
51
@Override
52
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
53
logger.info("channelConnected");
54
sendResponse(ctx);
55
}
56
57
@Override
58
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
59
logger.info("channelClosed");
60
//删除通道
61
XLServer.allChannels.remove(e.getChannel());
62
}
63
64
@Override
65
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
66
logger.info("channelDisconnected");
67
super.channelDisconnected(ctx, e);
68
}
69
70
@Override
71
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
72
logger.info("channelOpen");
73
//增加通道
74
XLServer.allChannels.add(e.getChannel());
75
}
76
77
/**
78
* 发送响应内容
79
* @param ctx
80
* @param e
81
* @return
82
*/
83
private ChannelFuture sendResponse(ChannelHandlerContext ctx){
84
Channel channel=ctx.getChannel();
85
Random random=new Random();
86
XLResponse response=new XLResponse();
87
response.setEncode((byte)0);
88
response.setResult(1);
89
response.setValue("name","hankchen");
90
response.setValue("time", String.valueOf(System.currentTimeMillis()));
91
response.setValue("age",String.valueOf(random.nextInt()));
92
/**
93
* 发送接收信息的时间戳到客户端
94
* 注意:Netty中所有的IO操作都是异步的!
95
*/
96
ChannelFuture future=channel.write(response); //发送内容
97
return future;
98
}
99
}
100
四、客户端实现
客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。
关键代码如下:
全文代码较多,写了很多注释,希望对读者有用,谢谢!
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
http://www.blogjava.net/hankchen/archive/2012/02/04/369378.html
使用Netty实现通用二进制协议的高效数据传输
原文:http://www.cnblogs.com/findumars/p/6357305.html