实验3:基于UDP服务设计可靠传输协议并编程实现 实验内容: 任务3-2:在任务3-1的基础上,将停等机制改成基于滑动窗口的流量控制机制,采用固定窗口大小,支持累积确认,完成给定测试文件的传输。
报文格式
#pragma pack(1) struct message //报文格式{ int flag; DWORD SendIP, RecvIP; u_short SendPort, RecvPort; int msgseq; int ackseq; int index; int filelength; int fill ; u_short checksum; char msg[BUF_SIZE]; }; #pragma pack()
滑动窗口与累积确认 代码见附录-消息处理流程
变量 client端:
server端:
执行过程 server端:
按顺序接收对方发来的消息msgseq(收到的消息序号)==recvnextseq ,且校验和正确 ,返回对应的ACK(ackseq=msgseq)
如果发生消息失序 ,即msgseq!=recvnextseq ,或校验和错误 ,则丢弃消息,返回第recvnextseq-1条消息的ACK
client端:
注:
靠client端的接收线程 完成
接收对方返回的ACK,其中的ackseq 代表msgsend[0:ackseq] (包含ackseq)已经全部被对方收到且确认
如果base<ackseq+1 ,则说明server端对新消息进行了确认,滑动窗口,使base=ackseq+1 ,并重置计时器
如果base>=ackseq ,则说明server端或client端发送的消息发生了失序,由于msgsend[0:base-1]已经被确认 ,所以可以不对该项情况进行处理,直接跳过等待接收下一条消息即可
发送线程
滑动窗口机制需要在发送前预先将窗口内的消息存储至内存中
如果窗口未满且当前状态不需要重传,则继续发送信息,发送完毕后重置计时器
如果超时未收到对方发来的所需的ACK,则按超时处理 ,重新发送窗口中已发送但是未确认的消息
如果出现重发一定次数依然无法收到所需ACK的情况,进行断网处理
当所有消息都被对方确认即base==buffersize (发送缓冲区),结束当前发送线程
交互流程 由于采用的是滑动窗口机制 ,在client端在发送消息之前,需要将要发送的消息存储输入msgsend 中,如果传输的消息过大,应采用分片 处理,即分多次存入数组,为了减小重传代价 ,每片都应添加SYN 和FIN 消息。这里由于需要发送的文件较小,略过分片功能
注:下面所说的”client端发送“是指将消息添加至发送缓冲区
client端 :(采用滑动窗口机制,没有明确的状态设置)
server端 :
- 初始:状态0
建立连接 :收到一条SYN ,回复对应的ACK ,连接建立成功,进入状态1
发送文件 :收到包含SF 、index 、filelength 的消息,开始接收文件,进入状态2。对于每条消息都返回一条ACK 。当收到包含标志位EF 的消息,检查index与收到文件消息的条数 ,若一致,则返回ACK ,退回状态1
断开连接 :收到包含标识为FIN 的消息,返回ACK ,进入状态0
注: 当server端和client端在一定时间内没有接收或不能接收到对方的消息 ,需要做断网处理 ,即自动退回到状态0
建立连接 由于只需实现文件的单向传输,所以只需两次握手即可建立连接
int buildconnectionCli () { message a; a.set_syn(); a.msgseq = buffersize; message::copy(msgsend[buffersize++], a); return 1 ; }
连接断开 与连接建立类似,由于是单向传输,两次挥手即可
int byecli () { message a; a.set_fin(); a.msgseq = buffersize; message::copy(msgsend[buffersize++], a); return 1 ; }
差错重传 使用校验和 方式检测是否出现差错,校验和计算方式与rdt3.0类似,只是根据本次实验设计的报文格式进行了简单修改。校验和计算包括0-255bit。
计算方法如下
void message::setchecksum () { int sum = 0 ; u_char* temp = (u_char*)this ; for (int i = 0 ; i < 16 ; i++) { sum += temp[2 * i] << 8 + temp[2 * i + 1 ]; while (sum >= 0x10000 ) { int t = sum >> 16 ; sum += t; } } this ->checksum = ~(u_short)sum; }
bool message::checkchecksum () { int sum = 0 ; u_char* temp = (u_char*)this ; for (int i = 0 ; i < 16 ; i++) { sum += temp[2 * i] << 8 + temp[2 * i + 1 ]; while (sum >= 0x10000 ) { int t = sum >> 16 ; sum += t; } } if (checksum + (u_short)sum == 65535 ) return true ; return false ; }
当server端检测到校验和错误时,不发送NAK ,只返回对当前已经确认的消息序号 ,等待client端超时重新发送 。
文件传输
client端:
读入文件至内存中
文件起始消息SF置1 ,将发送文件所需要的消息条数index 和最后一条消息msg段有效位数 填入报头中index 和filelength 相应位置,msg成员填入文件名
将内存中的消息复制到message类对象的msg段
最后一条消息EF置1 ,告知对方文件发送结束
void readfile (char * name,char content[10000 ][1024 ],int &length, int & index) { index = 0 ; length = 0 ; ifstream in (name, ifstream::binary) ; if (!in) { cout << "文件无效" << endl ; return ; } char t = in.get (); while (in) { content[index][length % 1024 ] = t; length++; if (length % 1024 == 0 ) { index++; length = 0 ; } t = in.get ; } in.close (); }
int sendfile (char * name) { message a; a.set_startfile(); int index = 0 ; int length = 0 ; readfile(name, content, length, index); a.index = index; a.filelength = length; strcpy (a.msg, name); a.msgseq = buffersize; message::copy(msgsend[buffersize++], a); if (index==0 && length == 0 ) { return 0 ; } for (int i = 0 ; i < index; i++) { for (int j = 0 ; j < 1024 ; j++) { msgsend[buffersize].msg[j] = content[i][j]; } msgsend[buffersize].msgseq = buffersize; buffersize++; } for (int i = index; i <= index; i++) { message b; for (int j = 0 ; j < length; j++) { b.msg[j] = content[i][j]; } b.msgseq = buffersize; b.set_endfile(); message::copy(msgsend[buffersize++], b); } return 1 ; }
server端:
接收到包含SF=1 的消息,读出index、filelength和文件名 ,返回ACK
循环接收对方发来的消息,使用多线程 将msg段复制到内存中 ,逐条返回ACK
DWORD WINAPI filehandler (LPVOID lparam) { filepacket* pkt = (filepacket*)(LPVOID)lparam; memset (content[pkt->index], 0 , 1024 ); for (int j=0 ;j<pkt->length;j++) { content[pkt->index][j] = pkt->a.msg[j]; } return 0 ; }
当收到的消息包含标识为EF 时,表示该条消息msg段有效位数为filelength ,将其复制到内存中。校验index与收到文件消息的条数 ,如果正确返回ACK
将内存中的数据写入文件
void outfile (char * name, char content[50000 ][1024 ], int length, int & index) { ofstream fout (name, ofstream::binary) ; for (int i = 0 ; i < index; i++) { for (int j = 0 ; j < FILE_PACKET_LENGTH; j++) fout << content[i][j]; } for (int j = 0 ; j < length; j++) fout << content[index][j]; fout.close (); }
附录 socket的创建 头文件
#include <WinSock2.h>//windows socket 编程头文件 #pragma comment(lib,"ws2_32.lib" )
WSADATA wsaData; if (WSAStartup(MAKEWORD(2 , 2 ), &wsaData) != 0 ){ cout << "socket初始化失败" << endl ; return 0 ; } sock = socket(AF_INET, SOCK_DGRAM, 0 ); if (sock == INVALID_SOCKET){ cout << "socket创建失败" ; return -1 ; } struct timeval timeout ;timeout.tv_sec = 1 ; timeout.tv_usec = 0 ; if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof (timeout)) == -1 ) { cout << "setsockopt failed:" ; } cout << "server3-1" << endl ;addrop.sin_addr.s_addr = inet_addr("192.168.89.1" ); addrop.sin_family = AF_INET; addrop.sin_port = htons(SPORT); addr.sin_addr.s_addr = inet_addr("192.168.89.1" ); addr.sin_family = AF_INET; addr.sin_port = htons(CPORT); if (bind(sock, (SOCKADDR*)&addrop, sizeof (SOCKADDR)) == -1 ){ cout << "bind error" << endl ; return -1 ; } ...... closesocket(sock); WSACleanup(); return 0 ;
消息的发送与接收
为了简化后续代码,将send和recv简化封装成函数
注意这里由于使用的是UDP ,每次发送和接收都需要指明目的或者来源IP地址
void simplesend (message& a) { a.set_exist(); a.setchecksum(); if (sendto(sock, (char *)&a, sizeof (message), 0 , (struct sockaddr*)&addr, sizeof (sockaddr)) == SOCKET_ERROR); { } } void simplerecv (message& a) { memset (a.msg, 0 , sizeof (a.msg)); recvfrom(sock, (char *)&a, sizeof (message), 0 , (struct sockaddr*)&addr, &addr_len); }
支持用户输入IP
cout << "是否使用默认IP?是1,否2 " ;int i;cin >> i;if (i == 2 ){ char s[20 ] = {}; char c[20 ] = {}; cout << "请输入服务器IP: " ; cin >> s; cout << "请输入客户端IP: " ; cin >> c; addrop.sin_addr.s_addr = inet_addr(s); addr.sin_addr.s_addr = inet_addr(c); }
message类 初始化
message::message() { memset (this , 0 , sizeof (message)); SendPort = SPORT; RecvPort = CPORT; SendIP = addr.sin_addr.s_addr; RecvIP = addrop.sin_addr.s_addr; }
标志位的获取与设置 int message::get_syn () { if (this ->flag & 0x02 ) return 1 ; else return 0 ; }
void message::set_syn () { if (get_syn() == 0 ) flag += 0x02 ; }
消息处理流程 server clockstart = clock(); while (1 ){ while (1 ) { int flag = 0 ; simplerecv(msgrecv[recvnextseq]); if (msgrecv[recvnextseq].get_exist()) { if (msgrecv[recvnextseq].msgseq == recvnextseq&&msgrecv[recvnextseq].checkchecksum()) { message b; b.set_ack(); b.ackseq = recvnextseq; simplesend(b); clockstart = clock(); if (status && msgrecv[recvnextseq].get_fin()) { status = 0 ; cout << "断开连接" << endl ; memset (msgrecv, 0 , sizeof (msgrecv)); recvnextseq = 0 ; filestatus = 0 ; fileseq = 0 ; break ; } else { if (msgrecv[recvnextseq].get_syn()) { if (!buildconnectionSer()) return 0 ; else status = 1 ; } else if (status && msgrecv[recvnextseq].get_startfile()) { cout << "接收文件" << endl ; memset (name, 0 , sizeof (name)); filestatus = 1 ; index = msgrecv[recvnextseq].index; length = msgrecv[recvnextseq].filelength; strcpy (name, msgrecv[recvnextseq].msg); } else if (status && filestatus && msgrecv[recvnextseq].get_endfile()) { filestatus = 0 ; if (fileseq != index) { cout << "出错" << endl ; return 0 ; } filepacket* packet = new filepacket; packet->a = msgrecv[recvnextseq]; packet->index = index; packet->length = length; hThread1 = ::CreateThread(NULL , NULL , filehandler, LPVOID(packet), 0 , &dwThreadId1); fileseq++; fileseq = 0 ; cout << "文件接收结束" << endl ; } else if (status && filestatus) { fileseq++; filepacket* packet = new filepacket; packet->a = msgrecv[recvnextseq]; packet->index = fileseq - 1 ; packet->length = 1024 ; hThread1 = ::CreateThread(NULL , NULL , filehandler, LPVOID(packet), 0 , &dwThreadId1); } } recvnextseq++; } else { if (flag % 3 == 0 ) { message a; a.set_ack(); a.ackseq = recvnextseq - 1 ; simplesend(a); } flag++; } } else { clockend = clock(); if (status && (clockend - clockstart) / CLOCKS_PER_SEC >= WAIT_TIME*SENT_TIMES && recvnextseq) { status=0 ; break ; } } } WaitForSingleObject(hThread1, INFINITE); outfile(name, content, length, index); int op; cout << "接收文件1,退出0" << endl ; cin >> op; if (op == 0 ) break ; }
client while (1 ){ int op; cout << "传输文件1,退出0 " ; cin >> op; if (op == 0 ) break ; char name[30 ]; cout << "请输入文件名 " ; cin >> name; buildconnectionCli(); sendfile(name); byecli(); clock_t timestart = clock(); if (base < buffersize); { hThread2 = ::CreateThread(NULL , NULL , sendhandler, LPVOID(i), 0 , &dwThreadId2); hThread1 = ::CreateThread(NULL , NULL , recvhandler, LPVOID(i), 0 , &dwThreadId1); WaitForSingleObject(hThread1,INFINITE); WaitForSingleObject(hThread2, INFINITE); if (base == buffersize) { cout << "文件发送结束" << endl ; } } clock_t timeend = clock(); double endtime = (double )(timeend - timestart) / CLOCKS_PER_SEC; cout << "Total time:" << endtime << endl ; cout << "吞吐率:" << (double )(buffersize) * sizeof (message)/endtime * 8 / 1024 / 1024 << "Mbps" << endl ; memset (name, 0 , sizeof (name)); buffersize = 0 ; base = 0 ; sendnextseq = 0 ; memset (msgsend, 0 , sizeof (msgsend)); }