前几天在群里看到这样一个图片,引起了我的兴趣:如果要用UDP实现类似TCP的可靠传输,一般需要手工实现的机制有那些?接下来我就以我的理解来讨论一下这个问题。
那么先说结论吧:
我们都知道TCP是面向连接的,进行可靠性传输的协议。接下来我们来看看TCP使用哪些技术来实现可靠的传输协议呢?
所谓三次握手,就是在建立TCP连接时,需要客户端和服务端总共发送3个包确认连接的建立。在socket编程中,这一过程有客户端执行connect触发。整个流程如下:
简单来说,就是:
通俗的说,加入你老大要跟你说你的程序有bug,给你打电话的过程如下:
老大:你听的到吗?
你:我听的到,你听得到吗?
老大:嗯,我也听得到。
确认双方都能听得到对方的讲话,开始讲正事。。。。
所谓四次挥手,即断开TCP连接时,需要客户端和服务端总共发送4个包以确认连接的断开,在socket编程中,这一过程由客户端或者服务端任一方执行close来触发。由于TCP是双全工的,因此每个方向都必须要单独关闭,这一原则是当一方完成数据发送后,发送一个FIN来终止这一方的连接,收到一个FIN只是意味着这一方向上没有数据流动了,即不会再发送数据了,但是在这个TCP连接上仍然能够发送数据,直到灵一方向也发送了FIN。首先进行关闭的一方将执行主动关闭,而另一方则执行被动关闭。整个流程如下:
简单来说,就是:
TCP将每个字节的数据都进行了编号,即为序列号(seq),确认序号=序号+1,每个ACK都有对应的确认序列号,意思是告诉发送者已经收到了数据,下一个数据应该从哪里发送
所谓流量控制,就是让发送方的发送速率不要太快,要让对方来的及接受。
当发送一次数据,等到确认应答时才可以发送下一个数据段,这样的效率会很低,我们利用滑动窗口,无需等待确认应答而可以继续发送数据的最大值;收到第一个ACK后,滑动窗口向后移;操作系统为了维护这个滑动窗口,需要开辟发送缓冲区来记录当前还有那些数据没有应答,只有确认应答过的数据,才能从缓冲区删除掉;窗口越大,网络的吞吐率越高TCP为每一个连接设有一个持续计时器,只要TCP连接的一方收到对方的零窗口通知,就启动持续计时器,若持续计时器设置的时间到期,就发送一个零窗口探测报文段(仅携带一个字节的数据),对方就在确认这个探测报文段时给出了现在的窗口 值,如果仍然是0,那么收到这个报文段的一方就重新设置持续计时器;如果窗口不是0,那么死锁的僵局就可以打破
在这传输过程中,出现了丢包的情况,这里就不做解说了。
虽然有了滑动窗口机制,如果一开始就发送大量数据,很有可能引发很多问题。所以TCP加入慢启动机制,先发少量的数据探探路,看看当前网络的拥塞状态,再决定按照多大的速率进行传送,刚开始时,定义拥塞窗口的大小为1,每次接收到一个ACK应答,拥塞窗口值加1,每次发送数据包的时候,将拥塞窗口和接收端主机反馈的窗口大小做比较,取较小的值作为实际发送的窗口。这样 的拥塞窗口增长的速度是指数级别的,慢启动只是指初始时慢,但是增长速度很快,不久就可以造成网 络拥塞。为了不让窗口一直加倍增长,我们引入一个慢启动的阈值,当拥塞窗口超过这个阈值的时候,不在按指数方式增长,而是按照线性方式增长。
拥塞控制,归根结底是TCP协议想尽可能快的把数据传输给对方,但又要避免给网络造成最大压力的最好方案
UDP是User Datagram Protocol,一种无连接的传输层协议,提供面向事务的简单不可靠信息传送服务。可靠性由上层应用实现,所以要实现udp可靠性传输,必须通过应用层来实现和控制。像实时视频,直播等要求以稳定的速度发送,能够容忍一些数据的丢失,但是不允许又较大的时延,就会采用UDP协议。
UDP提供的是不可靠传输服务,具有TCP没有的优势:
package com.example.charon.entity;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Arrays;
/**
* @className: MessageEntity
* @description: 请求发送消息实体的结构定义
* @author: charon
* @create: 2020-09-18 08:20
*/
public class RequestMessage {
/**定义数据的长度*/
private int totalLen;
/**生成唯一的id*/
private int id;
/**数据*/
private byte[] data;
/**发送次数*/
private int sendCount = 0;
/**最后一次发送时间*/
private Long lastSendTime = System.currentTimeMillis();
/**发送者接受应答的地址*/
private SocketAddress recvRespAddr;
/**接收者的地址*/
private SocketAddress remoteAddr;
public RequestMessage(int id, byte[] data) {
this.id = id;
this.data = data;
// 4+4是因为每个int类型占4个字节
this.totalLen = 4 + 4 + data.length;
}
/**
* 构造器将收到的udp数据解析为tcp的requestMessage对象
* @param udpData udp数据
*/
public RequestMessage(byte[] udpData){
try {
ByteArrayInputStream bais = new ByteArrayInputStream(udpData);
DataInputStream dis = new DataInputStream(bais);
this.totalLen = dis.readInt();
this.id = dis.readInt();
this.data = new byte[totalLen - 4 - 4];
dis.readFully(data);
} catch (IOException e) {
e.printStackTrace();
}
}
public byte[] toByte(){
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
// 写和读需要一一对应
dos.writeInt(totalLen);
dos.writeInt(id);
dos.write(data);
dos.flush();
return baos.toByteArray();
}catch (IOException e){
e.printStackTrace();
}
return null;
}
public int getTotalLen() {
return totalLen;
}
public void setTotalLen(int totalLen) {
this.totalLen = totalLen;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public int getSendCount() {
return sendCount;
}
public void setSendCount(int sendCount) {
this.sendCount = sendCount;
}
public Long getLastSendTime() {
return lastSendTime;
}
public void setLastSendTime(Long lastSendTime) {
this.lastSendTime = lastSendTime;
}
/**
* Gets the value of recvRespAddr
*
* @return the value of recvRespAddr
*/
public SocketAddress getRecvRespAddr() {
return recvRespAddr;
}
/**
* Sets the recvRespAddr
*
* @param recvRespAddr recvRespAddr
*/
public void setRecvRespAddr(SocketAddress recvRespAddr) {
this.recvRespAddr = recvRespAddr;
}
/**
* Gets the value of remoteAddr
*
* @return the value of remoteAddr
*/
public SocketAddress getRemoteAddr() {
return remoteAddr;
}
/**
* Sets the remoteAddr
*
* @param remoteAddr remoteAddr
*/
public void setRemoteAddr(SocketAddress remoteAddr) {
this.remoteAddr = remoteAddr;
}
@Override
public String toString() {
return "RequestMessage{" +
"totalLen=" + totalLen +
", id=" + id +
", data=" + Arrays.toString(data) +
", sendCount=" + sendCount +
", lastSendTime=" + lastSendTime +
‘}‘;
}
}
package com.example.charon.entity;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Arrays;
/**
* @className: ResponseMessage
* @description: 响应信息实体的结构定义
* @author: charon
* @create: 2020-09-18 09:02
*/
public class ResponseMessage {
/**总长度*/
private int totalLen;
/**对应接收到消息的id*/
private int repId;
/**响应的数据*/
private byte[] data;
/**接收状态 0:正确接收 其他:错误 */
private int state = 0;
/**应答方的发送时间*/
private Long resTime;
/**发送次数*/
private int sendCount;
/**最后一次发送时间*/
private Long lastSendTime = System.currentTimeMillis();
/**接收者的地址*/
private SocketAddress remoteAddr;
public ResponseMessage(int repId, int state, byte[] data) {
this.repId = repId;
this.state = state;
this.data = data;
// 4+4+4是因为每个int类型占4个字节
this.totalLen = 4 + 4 + 4 + data.length;
}
public ResponseMessage(byte[] udpData){
try {
ByteArrayInputStream bais = new ByteArrayInputStream(udpData);
DataInputStream dis = new DataInputStream(bais);
this.totalLen = dis.readInt();
this.repId = dis.readInt();
this.state = dis.readInt();
this.data = new byte[totalLen - 4 - 4 -4 ];
dis.readFully(data);
} catch (IOException e) {
e.printStackTrace();
}
}
public byte[] toByte(){
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
// 写和读需要一一对应
dos.writeInt(totalLen);
dos.writeInt(repId);
dos.writeInt(state);
dos.write(data);
dos.flush();
return baos.toByteArray();
}catch (IOException e){
e.printStackTrace();
}
return null;
}
public int getTotalLen() {
return totalLen;
}
public void setTotalLen(int totalLen) {
this.totalLen = totalLen;
}
public int getRepId() {
return repId;
}
public void setRepId(int repId) {
this.repId = repId;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Long getResTime() {
return resTime;
}
public void setResTime(Long resTime) {
this.resTime = resTime;
}
/**
* Gets the value of sendCount
*
* @return the value of sendCount
*/
public int getSendCount() {
return sendCount;
}
/**
* Sets the sendCount
*
* @param sendCount sendCount
*/
public void setSendCount(int sendCount) {
this.sendCount = sendCount;
}
/**
* Gets the value of lastSendTime
*
* @return the value of lastSendTime
*/
public Long getLastSendTime() {
return lastSendTime;
}
/**
* Sets the lastSendTime
*
* @param lastSendTime lastSendTime
*/
public void setLastSendTime(Long lastSendTime) {
this.lastSendTime = lastSendTime;
}
/**
* Gets the value of remoteAddr
*
* @return the value of remoteAddr
*/
public SocketAddress getRemoteAddr() {
return remoteAddr;
}
/**
* Sets the remoteAddr
*
* @param remoteAddr remoteAddr
*/
public void setRemoteAddr(SocketAddress remoteAddr) {
this.remoteAddr = remoteAddr;
}
/**
* Gets the value of data
*
* @return the value of data
*/
public byte[] getData() {
return data;
}
/**
* Sets the data
*
* @param data data
*/
public void setData(byte[] data) {
this.data = data;
}
@Override
public String toString() {
return "ResponseMessage{" +
"totalLen=" + totalLen +
", repId=" + repId +
", data=" + Arrays.toString(data) +
", state=" + state +
", resTime=" + resTime +
", sendCount=" + sendCount +
", lastSendTime=" + lastSendTime +
‘}‘;
}
}
package com.example.charon;
import com.example.charon.entity.RequestMessage;
import com.example.charon.entity.ResponseMessage;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @className: DataGramSend
* @description: 数据报文发送:
* 1.发送消息线程负责发送,发送后将消息放入容器中等待应答,
* 2.接受线程接收应答,并发送消息给接收端自己已收到信息,从容器中匹配后删除
* 3.重发线程负责重发,未收到应答的消息,发送3次后移出
* @author: charon
* @create: 2020-09-18 23:47
*/
public class DatagramSend {
/**本地要发送的地址对象*/
private SocketAddress localAddress;
/**发送的socket对象*/
private DatagramSocket datagramSender;
/**目标地址*/
private SocketAddress remoteAddress;
/**本地缓存已发送的消息 Map key 为消息ID,value为消息对象本身*/
private Map<Integer, RequestMessage> msgQueue = new ConcurrentHashMap<>();
public static void main(String[] args) throws SocketException {
new DatagramSend();
}
public DatagramSend() throws SocketException {
localAddress = new InetSocketAddress("127.0.0.1",13000);
datagramSender = new DatagramSocket(localAddress);
remoteAddress = new InetSocketAddress("127.0.0.1",14000);
// 启动三个线程
// 1.发送消息线程
startSendThread();
// 接收线程接收应答
startRecvResponseThread();
// 重发线程负责重发
startReSendThread();
}
/**
* 启动重发线程
*/
@SuppressWarnings("all")
private void startReSendThread() {
new Thread(new Runnable() {
@Override
public void run() {
try{
while (true){
resendMsg();
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
/**
* 重发业务:判断map中的消息,如果超过3s未收到应答,则重发
*/
private void resendMsg() {
// 返回队列中所有的key
Set<Integer> keySet = msgQueue.keySet();
Iterator<Integer> iterator = keySet.iterator();
while(iterator.hasNext()){
Integer key = iterator.next();
RequestMessage requestMessage = msgQueue.get(key);
// 如果重发超过3次,则移出
if(requestMessage.getSendCount() >= 3){
iterator.remove();
System.out.println("发送端--检测道丢失的消息:" + requestMessage);
}
long startTime = System.currentTimeMillis();
// 等待时间不超过3s
if((startTime - requestMessage.getLastSendTime()) > 3000 && requestMessage.getSendCount() < 3 ){
byte[] buffer = requestMessage.toByte();
try {
DatagramPacket datagramPacket = new DatagramPacket(buffer,buffer.length,requestMessage.getRemoteAddr());
datagramSender.send(datagramPacket);
requestMessage.setSendCount(requestMessage.getSendCount()+1);
System.out.println("客户端重新发送消息:"+requestMessage);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 启动接受应答线程
*/
@SuppressWarnings("all")
private void startRecvResponseThread() {
new Thread(new Runnable() {
@Override
public void run() {
try{
recvResponse();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
/**
* 接受应答消息
*/
private void recvResponse() throws IOException {
System.out.println("接收端-接受应答线程启动");
while (true){
byte[] recvData = new byte[100];
//创建接受数据包对象
DatagramPacket recvRespPacket = new DatagramPacket(recvData,recvData.length);
//发送
datagramSender.receive(recvRespPacket);
//接受返回数据
RequestMessage requestMessage = new RequestMessage(recvRespPacket.getData());
int repId = requestMessage.getId();
RequestMessage requestMessage1 = msgQueue.get(new Integer(repId));
if(requestMessage1 != null){
System.out.println("发送端-原来发送的数据:"+requestMessage1);
System.out.println("接受的数据:" + requestMessage);
System.out.println("发送端-已收到接收端返回的信息:"+new String(requestMessage.getData()));
msgQueue.remove(repId);
//发送端需要告诉接收端,返回的数据已经收到
//发送的数据
byte[] msgData = (repId+" 数据已收到").getBytes();
//创建要发送的消息对象
RequestMessage sendMessage = new RequestMessage(repId,msgData);
//要发送的数据,将要发送的数据转为字节数组
byte[] buffer = sendMessage.toByte();
//创建书包,指定内容,指定目标地址
DatagramPacket datagramSocket = new DatagramPacket(buffer,buffer.length,remoteAddress);
//发送数据
datagramSender.send(datagramSocket);
}
}
}
/**
* 发送消息线程
*/
@SuppressWarnings("all")
private void startSendThread() {
new Thread(new Runnable() {
@Override
public void run() {
try{
send();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
/**
* 模拟发送消息
*/
private void send() throws IOException, InterruptedException {
System.out.println("发送端-发送数据线程启动");
// 确认机制,id从0开始
int id = 0;
//模拟发送10个请求
while (id < 1){
id++;
//发送的数据
byte[] msgData = (id+" hello").getBytes();
//创建要发送的消息对象
RequestMessage sendMessage = new RequestMessage(id,msgData);
//要发送的数据,将要发送的数据转为字节数组
byte[] buffer = sendMessage.toByte();
//创建书包,指定内容,指定目标地址
DatagramPacket datagramSocket = new DatagramPacket(buffer,buffer.length,remoteAddress);
//发送数据
datagramSender.send(datagramSocket);
// 缓存当前发送的请求
sendMessage.setSendCount(1);
sendMessage.setRemoteAddr(remoteAddress);
msgQueue.put(id,sendMessage);
System.out.println("客户端-数据已发送,缓存:"+sendMessage);
Thread.sleep(1000);
}
}
}
package com.example.charon;
import com.example.charon.entity.RequestMessage;
import com.example.charon.entity.ResponseMessage;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @className: DatagramRecive
* @description: 数据报文接受方
* @author: charon
* @create: 2020-09-20 17:56
*/
public class DatagramRecive {
private SocketAddress localAddress;
private DatagramSocket datagramSender;
/**目标地址*/
private SocketAddress remoteAddress;
/**本地缓存已发送的消息 Map key 为消息ID,value为消息对象本身*/
private Map<Integer, ResponseMessage> msgQueue = new ConcurrentHashMap<>();
public static void main(String[] args) throws IOException {
new DatagramRecive();
}
public DatagramRecive() throws SocketException {
localAddress = new InetSocketAddress("127.0.0.1",14000);
datagramSender = new DatagramSocket(localAddress);
remoteAddress = new InetSocketAddress("127.0.0.1",13000);
// 启动接收线程
startDecvThread();
// 接收线程接收应答
startDecvResponseThread();
// 重发线程负责重发
startReSendThread();
}
/**
* 重发
*/
@SuppressWarnings("all")
private void startReSendThread() {
new Thread(new Runnable() {
@Override
public void run() {
try{
while (true){
resendMsg();
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
private void resendMsg() {
// 返回队列中所有的key
Set<Integer> keySet = msgQueue.keySet();
Iterator<Integer> iterator = keySet.iterator();
while(iterator.hasNext()){
Integer key = iterator.next();
ResponseMessage responseMessage = msgQueue.get(key);
// 如果重发超过3次,则移出
if(responseMessage.getSendCount() >= 3){
iterator.remove();
System.out.println("发送端--检测道丢失的消息:" + responseMessage);
}
long startTime = System.currentTimeMillis();
// 等待时间不超过3s
if((startTime - responseMessage.getLastSendTime()) > 3000 && responseMessage.getSendCount() < 3 ){
byte[] buffer = responseMessage.toByte();
try {
DatagramPacket datagramPacket = new DatagramPacket(buffer,buffer.length,responseMessage.getRemoteAddr());
datagramSender.send(datagramPacket);
responseMessage.setSendCount(responseMessage.getSendCount()+1);
System.out.println("客户端重新发送消息:"+responseMessage);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@SuppressWarnings("all")
private void startDecvResponseThread() {
new Thread(new Runnable() {
@Override
public void run() {
try{
decvResponse();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
private void decvResponse() throws IOException {
System.out.println("发送端-接受应答线程启动");
while (true){
byte[] recvData = new byte[100];
//创建接受数据包对象
DatagramPacket recvRespPacket = new DatagramPacket(recvData,recvData.length);
//接受数据
datagramSender.receive(recvRespPacket);
//接受返回数据
RequestMessage requestMessage = new RequestMessage(recvRespPacket.getData());
int repId = requestMessage.getId();
System.out.println("接收端接受到的数据id:" + repId);
ResponseMessage responseMessage = msgQueue.get(new Integer(repId));
if(responseMessage != null){
System.out.println("接收端发送的源数据:"+responseMessage);
System.out.println("接收端已收到发送端返回的数据:"+ new String(requestMessage.getData()));
msgQueue.remove(repId);
}
}
}
@SuppressWarnings("all")
private void startDecvThread() {
new Thread(new Runnable() {
@Override
public void run() {
try{
recvMsg();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
private void recvMsg() throws IOException {
System.out.println("启动接收线程");
while (true){
// 接受发送端发送过来的数据 100表示缓存的长度
byte[] recvData = new byte[100];
DatagramPacket datagramPacket = new DatagramPacket(recvData,recvData.length);
datagramSender.receive(datagramPacket);
//获取接收端发送的数据
RequestMessage requestMessage = new RequestMessage(datagramPacket.getData());
String requestMessageData = new String(requestMessage.getData());
System.out.println("接收端收到发送端的数据:" + requestMessageData);
// 将接收到的数据发送给发送端,
byte[] responseData = (requestMessageData+" world").getBytes();
ResponseMessage responseMessage = new ResponseMessage(requestMessage.getId(),0,responseData);
System.out.println("接收端返回的数据:" + new String(responseMessage.getData()));
byte[] data = responseMessage.toByte();
DatagramPacket dp = new DatagramPacket(data,data.length,remoteAddress);
datagramSender.send(dp);
//将接收端返回的数据存入队列中,用于后面监听重发机制
responseMessage.setLastSendTime(System.currentTimeMillis());
responseMessage.setSendCount(1);
responseMessage.setData(responseMessage.getData());
//对于接收端来说,它需要返回的地址就是请求消息的本地
responseMessage.setRemoteAddr(remoteAddress);
msgQueue.put(requestMessage.getId(),responseMessage);
System.out.println("接收端-已发送应答:" + responseMessage);
}
}
}
参考文章:http://www.360doc.com/content/13/0602/11/11220452_289877920.shtml
https://blog.csdn.net/codes_first/article/details/78453713?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-4.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-4.channel_param
原文:https://www.cnblogs.com/pluto-charon/p/13777837.html