java从零开发TCPIP协议:实现TCP数据的收发机制

本节我们在上一节基础上进一步完成TCP协议的收发机制。上一节我们已经实现了向服务器方发送一个字符,本节我们要实现连续发送多个字符,并且能正常接收数据功能,完成了这些功能后,我们就可以基于此去开发其他构建在TCP之上的其他协议。

为了保证数据能正确的连续收发,本节的设计思路是使用一个队列将发送的数据存储起来,然后将数据包发送,只有等待收到对方回发的ack后,我们才将数据从队列中删除,如果数据包一直没有收到ack回应,我们就启动一个timer,自动将队列中的数据包进行发送,如果发送给定次数后还没有成功,那么就通知数据发送层发送失败,接下来我们看看相应代码设计。

class SendPacketWrapper {
    //将发送的数据包封装起来存储在队列中
    private byte[] packet_to_send;
    private int seq_num;
    private int ack_num;
    private int send_count = 0;

    public SendPacketWrapper(byte[] packet, int seq_num) {
        this.packet_to_send = packet;
        this.seq_num = seq_num;
        this.ack_num = seq_num + packet.length;
    }
    
    public byte[] get_packet() {
        return this.packet_to_send;
    }
    public int get_seq_num() {
        return this.seq_num;
    }
    public int get_ack_num() {
        return this.ack_num;
    }
    public void increase_send_count() {
        this.send_count++;
    }
    public int get_send_count() {
        return this.send_count;
    }

}

class  SendPacketTask extends TimerTask {
    private TCPThreeHandShakes  task_handler;
    public SendPacketTask(TCPThreeHandShakes  handler) {
        this.task_handler = handler;
    }
    @Override
    public void run() {
        this.task_handler.sendPacketInList();
    }
}

第一个类用于负责把发送的数据封装起来,他记录了数据的缓冲区,以及发送时对应的seq号,这样当数据包需要重发时就可以再次使用这个数值进行发送,同时也记录了应对的ack号,这样当对方返回ack值时,我们才能检验该数据包是否已经被对方接收。

接下来我们在类TCPThreeHandShakes中添加相应变量和代码:

public class TCPThreeHandShakes extends Application{
    。。。。
        private int  tcp_state = CONNECTION_IDLE;
    private static int PACKET_SEND_TIMES = 3; //连续发生3次不成功则失败
    private  Timer send_timer = new Timer(); //定时将发送队列中的数据包进行发送
    private int packet_resend_time = 2000; //每过一秒就发送队列中存储的数据包
    private SendPacketTask resend_packet_task = null;
    //每次发送数据包时先将它存储在队列中,发送出去收到ack后再将它从队列中去除
    private ArrayList<SendPacketWrapper> send_packet_list = new ArrayList<SendPacketWrapper>();
 public TCPThreeHandShakes(byte[] server_ip, short server_port, ITCPHandler tcp_handler)  {
        this.dest_ip = server_ip;
        this.dest_port = server_port;
         //指定一个固定端口,以便抓包调试  
        Random rand = new Random();
        this.port = (short)rand.nextInt();
        this.tcp_handler = tcp_handler;
        resend_packet_task = new SendPacketTask(this);
        send_timer.scheduleAtFixedRate(resend_packet_task, packet_resend_time, packet_resend_time);
    }
。。。。
}

我们添加了一系列与数据包发送和检验变量和代码,特别是启动一个timer,在每两秒就去检测数据包队列,如果里面还有数据包没有接收到对应的ack,也就是上次发送时对方没有成功接收,那么timer就会将数据包再次发送,如果已经发送超过给定次数,timer就会通知上层应用数据发送失败。

接下来我们要添加把数据包在发送时存储到队列和检验队列数据包的代码:

 private void savePacketToList(byte[] packet) {
       //如果数据包没有存在队列中就加入队列
       boolean contains = false;
       for(int i = 0; i < send_packet_list.size(); i++) {
           SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
           if (packet_wrapper.get_packet() == packet) {
               contains = true;
               break;
           }
       }
       
       if (contains == false) {
           SendPacketWrapper packet_wrapper = new SendPacketWrapper(packet, this.seq_num);
           this.send_packet_list.add(packet_wrapper);
       }
   }
   
   public void sendPacketInList() {
       ArrayList<SendPacketWrapper> wrapper_list = new  ArrayList<SendPacketWrapper>();
       //将所有在队列中的数据包系数发送,如果数据包发送次数大于给定次数则报告失败
       for(int i = 0; i < this.send_packet_list.size(); i++) {
           SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
           if (packet_wrapper.get_send_count() >= PACKET_SEND_TIMES) {
               this.tcp_handler.send_notify(false, packet_wrapper.get_packet());
           }
           else {
               int old_seq_num = this.seq_num;
               this.seq_num = packet_wrapper.get_seq_num();
               try {
                    createAndSendPacket(packet_wrapper.get_packet(), "ACK");
                } catch (Exception e) {
                    e.printStackTrace();
                }
               this.seq_num = old_seq_num;
               wrapper_list.add(packet_wrapper);
           }
       }
       
      this.send_packet_list = wrapper_list;
   }
   
   private void checkSendPacketByACK(int recv_ack) {
       ArrayList<SendPacketWrapper> wrapper_list = new  ArrayList<SendPacketWrapper>();
       //所有ack值小于返回ack的数据包都已经成功发送,此时要将数据包从队列移除并通知上层
       for(int i = 0; i < this.send_packet_list.size(); i++) {
           SendPacketWrapper packet_wrapper = this.send_packet_list.get(i);
           int ack =  packet_wrapper.get_ack_num();
           System.out.println("receive ack: " + ack);
           if (packet_wrapper.get_ack_num() <= recv_ack) {
               this.seq_num = packet_wrapper.get_ack_num();
               System.out.println("next seq num: "+ this.seq_num);
               this.tcp_handler.send_notify(true, packet_wrapper.get_packet());
           }
           else {
               wrapper_list.add(packet_wrapper);
           }
       }
       
      this.send_packet_list = wrapper_list;
      this.seq_num = recv_ack;
   }

在这三个函数中,第一个负责数据包第一次发送时将其存储在队列,第二个负责轮训队列,将上次没有发送成功的数据包继续发送,如果发送超过给定次数则向上层应用报告发送失败,最后一个函数是在收到对方发来的ack包后检验队列中哪些数据包发送成功,检验标准是所有ack小于对方发来ack数值的数据包都表明成功发送,接下来在handleData函数,也就是接收对方发来数据包的函数里我们增加如下流程:

@Override
    public void handleData(HashMap<String, Object> headerInfo) {
       。。。。
        if (tcp_state == CONNECTION_SEND || tcp_state == CONNECTION_CONNECTED) {
              tcp_state = CONNECTION_CONNECTED;
              checkSendPacketByACK(ack_num);
              if (data != null && data.length > 0 && seq_num == this.ack_num) {
                   /*
                    * 这里我们简化数据的接收流程,为了提升数据发送效率,很有可能数据包的到来次序与服务器发送时不一样
                    * ,但为了让实现逻辑简单,我们每次只接收指定数据包,例如当前我们等待seq编号为1,2,3的数据包,结果
                    * 数据包抵达的次序为3,1,2,那么我们就只接收数据包1,让对方再次发送数据包2,3,显然这样子会降低效率,
                    * 但为了实现逻辑简单,我们暂时做妥协
                    */
                   this.tcp_handler.recv_notify(data);
                   createAndSendPacket(null, "ACK");
               }
          }
。。。。、
}

新添加这段代码的作用是当对方数据包到来时,我们先抽取出包中的ack值,使用该值去检验队列中哪些数据包已经成功发送,同时如果对方发来的数据包中有数据的话,我们就把数据取出,然后提交给上层应用,最后我们看看上层如何使用该tcp连接层来实现数据发送:

package Application;

import java.net.InetAddress;

import utils.ITCPHandler;

public class TCPRawDataSender implements ITCPHandler{
    private  TCPThreeHandShakes  tcp_socket = null;  
    private String[] buffer = new String[] {"h", "e", "l", "l", "o"};
    private int buffer_p = 0;
    private byte[] current_send_packet = null;
    private void send_content() throws Exception {
        if (buffer_p < buffer.length) {
            System.out.println("send content: " + buffer[buffer_p]);
            byte[] send_content = buffer[buffer_p].getBytes();
            current_send_packet = send_content;
            tcp_socket.tcp_send(send_content);
        }
    }
    @Override
    public void connect_notify(boolean connect_res) {
        if (connect_res) {
            System.out.println("connection established!");
            try {
                send_content();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        else {
            System.out.println("connection fail!");
        }
    }
    
    private void close_connection() {
        try {
            tcp_socket.tcp_close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void send_notify(boolean send_res, byte[] packet_send) {
        if (send_res == true) {
            System.out.println("send data, buffer_p: " + buffer_p);
            if (packet_send == current_send_packet) {
                buffer_p++; 
                current_send_packet = null;
            }
        }
        
        if (buffer_p >= buffer.length || send_res == false) {
            String info = "send all data ";
            if (send_res == false) {
                info = "send fail with buffer_p: " + buffer_p;
            }
            System.out.println(info);
        } 
        else {
            try {
                send_content();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    @Override
    public void connect_close_notify(boolean close_res) {
        if (close_res == true) {
            System.out.println("connection close complete!");
        } else {
            System.out.println("connection close fail!");
        }
    }
    
    public void run() {
         try {
            InetAddress ip = InetAddress.getByName("192.168.2.127"); //220.181.43.8
            short port = 1234;
            tcp_socket = new TCPThreeHandShakes(ip.getAddress(), port, this);
            tcp_socket.tcp_connect();
            System.out.println("finish handshake!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    @Override
    public void recv_notify(byte[] packet_recv) {
        System.out.println("receive data: " + new String(packet_recv));
        close_connection();
    }

}

该类继承了ITCPHandler接口以便接收数据发送过程的各种调用,其中buffer中存储的是要发送给对方的数据,当connect_notify被调用时,如果连接成功,他就会使用send_content函数发送缓冲区里的一个字符,如果发送成功,它的send_notify会调用,在该函数里,他检验成功发送的数据是不是自己当前正在发生的数据,如果是它就将缓冲器指针挪动一位发送下一个字符,当所有数据发送完毕后,它会等待对方向它发送数据,一旦成功接收对方发来的数据,它的recv_notify函数会被调用,此时它把对方发送来的数据显示出来后,调用close_connection关闭连接

我在iphone上安装了一款名为tcp server的免费app做实验,我是上面代码与该app创建的tcp server服务器连接,然后将数据发送给他,并接收从它发过来的数据,最后运行结果如下图:

1.jpeg

更详细的讲解和代码调试演示过程,请点击链接

更多技术信息,包括操作系统,编译器,面试算法,机器学习,人工智能,请关照我的公众号:


这里写图片描述
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容