avatar

计算机网络作业3-2 基于UDP服务设计可靠传输协议并编程实现

实验3:基于UDP服务设计可靠传输协议并编程实现

实验内容:

任务3-2:在任务3-1的基础上,将停等机制改成基于滑动窗口的流量控制机制,采用固定窗口大小,支持累积确认,完成给定测试文件的传输。

报文格式

image-20201213165003849
#pragma pack(1)//以下内容按1Byte对齐,如果没有这条指令会以4Byte对齐,例如u_short类型会用2B存信息,2B补零,方便后续转换成char*格式
struct message//报文格式
{
//ACK=0x01, SYN=0x02, FIN=0x04,EXIST 0x10,startfile 0x20,endfile 0x40
int flag;//标志位
DWORD SendIP, RecvIP;//发送端IP和接收端IP
u_short SendPort, RecvPort;//发送端端口和接收端端口
int msgseq;//消息序号
int ackseq;//恢复ack时确认对方的消息的序号
int index;//用于描述文件大小,需要多少条消息才能传输完成(1条index=0,2条index=1,以此类推)
int filelength;//本次实验使用固定报文长度,故只需要告诉对方有多少条和最后一条除首部信息外有多少位有效信息
int fill;//补零成为16bit的倍数,便于计算校验和
u_short checksum;//校验和
char msg[BUF_SIZE];//报文的具体内容,本次实验简化为固定长度
};
#pragma pack()//恢复4Byte编址格式

滑动窗口与累积确认

代码见附录-消息处理流程

image-20201213183125281

变量

client端:

  • msgsend:用于存储发送的消息,一维数组

  • base:窗口底部,代表消息msgsend[0:base-1]部分已经全部传输完毕,且收到了对方返回的对应ACKmsgsend[base]是下面要接收对应ACK的消息

  • N:窗口大小,msgsend[base:base+N-1]是当前允许发送的消息,本次实验取N=10

  • sendnextseq:下一条要发送的消息

server端:

  • msgrecv:用于存储接收的消息,一维数组

  • recvnextseq:下一条要接收并返回ACK的消息,编号小于recvnextseq的消息已经全部返回ACK

执行过程

server端:

image-20201213182752647
  • 按顺序接收对方发来的消息msgseq(收到的消息序号)==recvnextseq,且校验和正确返回对应的ACK(ackseq=msgseq)
  • 如果发生消息失序,即msgseq!=recvnextseq,或校验和错误,则丢弃消息,返回第recvnextseq-1条消息的ACK

client端:

注:

  • 为了实现消息的同时收发,提高效率,使用多线程编程

  • 为了避免线程间信息交互带来的问题,计时器部分全部在发送线程完成

靠client端的接收线程完成

image-20201213182855094
  • 接收对方返回的ACK,其中的ackseq代表msgsend[0:ackseq](包含ackseq)已经全部被对方收到且确认
    • 如果base<ackseq+1,则说明server端对新消息进行了确认,滑动窗口,使base=ackseq+1,并重置计时器
    • 如果base>=ackseq,则说明server端或client端发送的消息发生了失序,由于msgsend[0:base-1]已经被确认,所以可以不对该项情况进行处理,直接跳过等待接收下一条消息即可

发送线程

image-20201213183055317
  • 滑动窗口机制需要在发送前预先将窗口内的消息存储至内存中
  • 如果窗口未满且当前状态不需要重传,则继续发送信息,发送完毕后重置计时器
  • 如果超时未收到对方发来的所需的ACK,则按超时处理,重新发送窗口中已发送但是未确认的消息
  • 如果出现重发一定次数依然无法收到所需ACK的情况,进行断网处理
  • 当所有消息都被对方确认即base==buffersize(发送缓冲区),结束当前发送线程

交互流程

由于采用的是滑动窗口机制,在client端在发送消息之前,需要将要发送的消息存储输入msgsend中,如果传输的消息过大,应采用分片处理,即分多次存入数组,为了减小重传代价,每片都应添加SYNFIN消息。这里由于需要发送的文件较小,略过分片功能

注:下面所说的”client端发送“是指将消息添加至发送缓冲区

client端:(采用滑动窗口机制,没有明确的状态设置)

  • 建立连接:发送一条带有标识SYN的消息,表示希望建立连接

  • 发送文件

  • 消息1,文件开始标识位SF=1index=发送文件需要的消息条数-1filelength=最后一条消息msg成员的有效位数

    • 发送文件,每条msg的有效长度固定为1024,最后一条EF=1
  • 断开连接:发送一条标识为FIN的消息

server端

​ - 初始:状态0

  • 建立连接:收到一条SYN,回复对应的ACK,连接建立成功,进入状态1

  • 发送文件:收到包含SFindexfilelength的消息,开始接收文件,进入状态2。对于每条消息都返回一条ACK 。当收到包含标志位EF的消息,检查index与收到文件消息的条数,若一致,则返回ACK,退回状态1

  • 断开连接:收到包含标识为FIN的消息,返回ACK,进入状态0

注:当server端和client端在一定时间内没有接收或不能接收到对方的消息,需要做断网处理,即自动退回到状态0

建立连接

由于只需实现文件的单向传输,所以只需两次握手即可建立连接

image-20201213095738718
int buildconnectionCli()//客户端,连接发起方
{
message a;
a.set_syn();//SYN
a.msgseq = buffersize;
message::copy(msgsend[buffersize++], a);
return 1;
}

连接断开

与连接建立类似,由于是单向传输,两次挥手即可

int byecli()//server端只需要回复ACK,功能比较简单没有封装成函数,处理流程详见附录
{
message a;
a.set_fin();//FIN
a.msgseq = buffersize;
message::copy(msgsend[buffersize++], a);//添加至发送消息队列中
return 1;
}

差错重传

使用校验和方式检测是否出现差错,校验和计算方式与rdt3.0类似,只是根据本次实验设计的报文格式进行了简单修改。校验和计算包括0-255bit。

image-20201213165032779

计算方法如下

void message::setchecksum()//发送方计算校验和并填入响应位置
{
int sum = 0;
u_char* temp = (u_char*)this;//以1B为单位进行处理,注意设置为unsigned类型,否则在后续相加时会按照默认最高位是正负号,影响计算结果
for (int i = 0; i < 16; i++)//取报文的前16组,每组16bit,共计32字节256bit
{
sum += temp[2 * i] << 8 + temp[2 * i + 1];
while (sum >= 0x10000)
{//溢出
int t = sum >> 16;//将最高位回滚添加至最低位
sum += t;
}
}
this->checksum = ~(u_short)sum;//按位取反,方便校验计算
}
bool message::checkchecksum()//接收方对校验和进行检验
{
int sum = 0;
u_char* temp = (u_char*)this;
for (int i = 0; i < 16; i++)
{
sum += temp[2 * i] << 8 + temp[2 * i + 1];
while (sum >= 0x10000)
{//溢出
int t = sum >> 16;//计算方法与设置校验和相同
sum += t;
}
}
//把计算出来的校验和和报文中该字段的值相加,如果等于0xffff,则校验成功
if (checksum + (u_short)sum == 65535)
return true;
return false;
}

当server端检测到校验和错误时,不发送NAK,只返回对当前已经确认的消息序号,等待client端超时重新发送

文件传输

image-20201213202840946

client端:

  • 读入文件至内存中
  • 文件起始消息SF置1,将发送文件所需要的消息条数index最后一条消息msg段有效位数填入报头中indexfilelength相应位置,msg成员填入文件名
  • 将内存中的消息复制到message类对象的msg段
  • 最后一条消息EF置1,告知对方文件发送结束
//读入文件
void readfile(char* name,char content[10000][1024],int &length, int & index)
{
index = 0;
length = 0;
ifstream in(name, ifstream::binary);//以二进制方式读入文件
if (!in)
{
cout << "文件无效" << endl;
return;
}
char t = in.get();
while (in)//如果读入失败(读入完成)则退出
{//将文件内容存入content数组中
content[index][length % 1024] = t;//每行1024个字节,对应msg成员长度为1024
length++;
if (length % 1024 == 0)//上一条缓冲区
{
index++; length = 0;
}
t = in.get;
}
in.close();
}
int sendfile(char* name)//发送文件
{
//开始发送文件消息
message a;
a.set_startfile();//设置SF标识
int index = 0; int length = 0;
readfile(name, content, length, index);//读入文件,复制到content二维数组
a.index = index;//文件大小相关信息
a.filelength = length;
strcpy(a.msg, name);//文件基本信息
a.msgseq = buffersize;//设置消息序号
message::copy(msgsend[buffersize++], a);//添加到发送消息队列中
if (index==0 && length == 0)//读入0个字节
{
return 0;//文件读入失败
}
//文件具体内容
for (int i = 0; i < index; i++)
{
for (int j = 0; j < 1024; j++)
{
msgsend[buffersize].msg[j] = content[i][j];
}
msgsend[buffersize].msgseq = buffersize;
buffersize++;//发送缓冲区大小增加
}
for (int i = index; i <= index; i++)
{//添加文件的最后一条消息
message b;
for (int j = 0; j < length; j++)
{
b.msg[j] = content[i][j];
}
b.msgseq = buffersize;
b.set_endfile();//添加EF标识
message::copy(msgsend[buffersize++], b);
}
return 1;
}

server端:

  • 接收到包含SF=1的消息,读出index、filelength和文件名,返回ACK

  • 循环接收对方发来的消息,使用多线程msg段复制到内存中,逐条返回ACK

    //处理接收到的文件消息
    DWORD WINAPI filehandler(LPVOID lparam)
    {//多线程函数只能传入一个参数,多个参数需要打包成class或者struct
    filepacket* pkt = (filepacket*)(LPVOID)lparam;
    memset(content[pkt->index], 0, 1024);
    for(int j=0;j<pkt->length;j++)
    {//可能中间会有'\0'不可以直接使用strcpy
    content[pkt->index][j] = pkt->a.msg[j];
    }
    return 0;
    }
  • 当收到的消息包含标识为EF时,表示该条消息msg段有效位数为filelength,将其复制到内存中。校验index与收到文件消息的条数,如果正确返回ACK

  • 将内存中的数据写入文件

void outfile(char* name, char content[50000][1024], int length, int& index)
{
ofstream fout(name, ofstream::binary);//以二进制方式写入文件
for (int i = 0; i < index; i++)
{
for (int j = 0; j < FILE_PACKET_LENGTH; j++)
fout << content[i][j];//以字节为单位进行写入
}
for (int j = 0; j < length; j++)
fout << content[index][j];
fout.close();
}

附录

socket的创建

头文件

#include <WinSock2.h>//windows socket 编程头文件
#pragma comment(lib,"ws2_32.lib")//链接ws2_32.lib库文件到此项目中
//加载socket库
WSADATA wsaData;
//MAKEWORD(2.2),成功返回0
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
cout << "socket初始化失败" << endl;
return 0;
}
//创建Socket
//创建一个socket,并将该socket绑定到一个特定的传输层
sock = socket(AF_INET, SOCK_DGRAM, 0);//地址类型(ipv4),数据报套接字
if (sock == INVALID_SOCKET)
{
cout << "socket创建失败";
return -1;
}

//设置recv函数为非阻塞
struct timeval timeout;
timeout.tv_sec = 1;//秒
timeout.tv_usec = 0;//微秒
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) == -1) {
cout << "setsockopt failed:";
}
cout << "server3-1" << endl;
//初始化地址,代码支持输入IP,这里只演示了使用默认IP的情况
addrop.sin_addr.s_addr = inet_addr("192.168.89.1");
addrop.sin_family = AF_INET;//IPv4
addrop.sin_port = htons(SPORT);//端口,30000

addr.sin_addr.s_addr = inet_addr("192.168.89.1");
addr.sin_family = AF_INET;
addr.sin_port = htons(CPORT);//6666
//绑定
//强制类型转换,SOCKADDR_IN方便赋值 SOCKADDR方便传输
//server端需要bind而client端不需要
if (bind(sock, (SOCKADDR*)&addrop, sizeof(SOCKADDR)) == -1)
{
cout << "bind error" << endl; return -1;
}
......//程序执行,消息收发
closesocket(sock);//关闭socket
WSACleanup();
return 0;

消息的发送与接收

为了简化后续代码,将send和recv简化封装成函数

注意这里由于使用的是UDP,每次发送和接收都需要指明目的或者来源IP地址

void simplesend(message& a)
{
a.set_exist();//表示该条消息存在,方便非阻塞式recv判断是否接收到了消息
a.setchecksum();//设置校验和
//用sock发送a中的内容,注意第二个参数是char*,所以需要强制类型转换
//第三个参数为发送缓冲区长度,注意不能设置过长,否则会造成缓冲区溢出
//第四个代表相关操作,一般设置为0
//第五个是目的IP地址,即向哪个IP发送消息
//第六个是addr的长度
if (sendto(sock, (char*)&a, sizeof(message), 0, (struct sockaddr*)&addr, sizeof(sockaddr)) == SOCKET_ERROR);//如果发送失败,这里由于后续有确认重传机制,不在发送函数中进行处理
{
}
//打印每条发送报文标志位相关信息,方便观察传输过程,由于IO时间消耗很大,这里注释掉
//if (a.flag) { cout << "发送 "; a.print(); }
}

void simplerecv(message& a)
{
memset(a.msg, 0, sizeof(a.msg));//初始化全为0,防止收到前面收到消息的影响
//参数与sendto函数类似,只是第五项的目的IP地址改为源IP地址,即从哪个IP接收消息
recvfrom(sock, (char*)&a, sizeof(message), 0, (struct sockaddr*)&addr, &addr_len);
//if (a.flag) { cout << "接收 "; a.print(); }
}

支持用户输入IP

//这里仅以一端作为演示
cout << "是否使用默认IP?是1,否2 ";
int i;
cin >> i;
if (i == 2)
{
char s[20] = {};
char c[20] = {};
cout << "请输入服务器IP: " ;
cin >> s;
cout << "请输入客户端IP: ";
cin >> c;
addrop.sin_addr.s_addr = inet_addr(s);
addr.sin_addr.s_addr = inet_addr(c);
}

message类

初始化

message::message() {
memset(this, 0, sizeof(message));//初始化全0
SendPort = SPORT;//这里以server端为例
RecvPort = CPORT;
SendIP = addr.sin_addr.s_addr;
RecvIP = addrop.sin_addr.s_addr;
}

标志位的获取与设置

int message::get_syn()
{
if (this->flag & 0x02)//flag的右第二位是0还是1
return 1;
else return 0;
}
void message::set_syn()
{
if (get_syn() == 0)
flag += 0x02;
}

消息处理流程

server

//server端
clockstart = clock();//设置timer
while (1)
{
while (1)
{
int flag = 0;
simplerecv(msgrecv[recvnextseq]);//接收消息,填入收消息缓冲区
if (msgrecv[recvnextseq].get_exist())//收到消息且顺序正确
{
if (msgrecv[recvnextseq].msgseq == recvnextseq&&msgrecv[recvnextseq].checkchecksum())//如果的消息序号正确,校验和正确
{//回复对该消息的ACK确认
message b;
b.set_ack();
b.ackseq = recvnextseq;
simplesend(b);//发送消息b
clockstart = clock();//重启计时器
if (status && msgrecv[recvnextseq].get_fin())
{
status = 0;//断开连接,状态转换
cout << "断开连接" << endl;
memset(msgrecv, 0, sizeof(msgrecv));//重置缓冲区
recvnextseq = 0;//重置
filestatus = 0;
fileseq = 0;
break;//可以断开连接
}
else
{
if (msgrecv[recvnextseq].get_syn())//建立连接请求
{
if (!buildconnectionSer())
return 0;//建立连接失败
else status = 1;
}
else if (status && msgrecv[recvnextseq].get_startfile())
{
cout << "接收文件" << endl;
//接收文件基本信息
memset(name, 0, sizeof(name));
filestatus = 1;//接收文件状态
index = msgrecv[recvnextseq].index;
length = msgrecv[recvnextseq].filelength;
strcpy(name, msgrecv[recvnextseq].msg);//文件名
}
else if (status && filestatus && msgrecv[recvnextseq].get_endfile())//文件发送结束
{
filestatus = 0;
if (fileseq != index)//核对接收到的文件消息条数和基本信息是否一致
{
cout << "出错" << endl;
return 0;
}
filepacket* packet = new filepacket;
packet->a = msgrecv[recvnextseq];
packet->index = index;
packet->length = length;
//这里为了不让写入内存过程影响消息收发,采用多线程
//多线程函数只能传入一个参数,多余一个的需要打包成结构体
hThread1 = ::CreateThread(NULL, NULL, filehandler, LPVOID(packet), 0, &dwThreadId1);
fileseq++;
fileseq = 0;
cout << "文件接收结束" << endl;
}
else if (status && filestatus)
{
fileseq++;
filepacket* packet = new filepacket;
packet->a = msgrecv[recvnextseq];
packet->index = fileseq - 1;//文件消息序号
packet->length = 1024;
hThread1 = ::CreateThread(NULL, NULL, filehandler, LPVOID(packet), 0, &dwThreadId1);
}
}
recvnextseq++;//正确处理当前消息,下面可以处理下一条消息了
}
else//消息乱序或校验和失败
{
if (flag % 3 == 0)//防止发送过多重复信息
{
message a;
a.set_ack();
a.ackseq = recvnextseq - 1;//返回对最近的确认消息的ACK
simplesend(a);
}
flag++;
}
}
else//没有收到消息
{
clockend = clock();//计时
if (status && (clockend - clockstart) / CLOCKS_PER_SEC >= WAIT_TIME*SENT_TIMES && recvnextseq)//需要接收到消息且超时
{
status=0;
break;//做断网处理
}
}
}
WaitForSingleObject(hThread1, INFINITE);//等待当前所有进程执行完毕
outfile(name, content, length, index);//文件写入
int op;
cout << "接收文件1,退出0" << endl;
cin >> op;
if (op == 0)
break;
}

client

while (1)
{
int op;
cout << "传输文件1,退出0 ";
cin >> op;
if (op == 0)
break;
char name[30];
cout << "请输入文件名 ";
cin >> name;
//将要发送的消息添加到缓冲区中
buildconnectionCli();
sendfile(name);
byecli();
clock_t timestart = clock();//开始计时
if (base < buffersize);//还有消息未发送或者未确认
{//收线程和发线程
hThread2 = ::CreateThread(NULL, NULL, sendhandler, LPVOID(i), 0, &dwThreadId2);
hThread1 = ::CreateThread(NULL, NULL, recvhandler, LPVOID(i), 0, &dwThreadId1);
//等待线程执行完毕
WaitForSingleObject(hThread1,INFINITE);
WaitForSingleObject(hThread2, INFINITE);
if (base == buffersize)
{
cout << "文件发送结束" << endl;
}
}
clock_t timeend = clock();
double endtime = (double)(timeend - timestart) / CLOCKS_PER_SEC;
cout << "Total time:" << endtime << endl; //s为单位
cout << "吞吐率:" << (double)(buffersize) * sizeof(message)/endtime * 8 / 1024 / 1024 << "Mbps" << endl;
//重置,防止影响下一次发送消息
memset(name, 0, sizeof(name));
buffersize = 0;
base = 0;
sendnextseq = 0;
memset(msgsend, 0, sizeof(msgsend));
}
Author: Michelle19l
Link: https://gitee.com/michelle19l/michelle19l/2021/01/15/大三上/计网/计网作业3-2/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
Donate
  • 微信
    微信
  • 支付寶
    支付寶