Netty权威指南(一)

1. Java的IO演进之路

1.1 I/O

1.1.1 LINUX网络I/O模型

Linux的内核将所有外部设备都看作一个文件来操作,对一个文件对读写操作会调用内核提供好的系统命令,返回一个file descriotor(fd,文件描述符)。而对一个socket的读写也会有相应的描述符,称为socketfd,描述符就是一个数字,它指向内核中的一个结构体(文件路径,数据去等一些属性)。

UNIX中,I/O模型种类有如下:

  1. 阻塞I/O模型:为最常用的io模型,缺省情况下所有文件操作都是阻塞的。
    以套接字接口为例:在进程空间中调用Recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间一直会等待,进程在从调用recvfrom开始到它返回到整段时间内都是被阻塞到,因此被称为阻塞I/O模型。
  1. 非阻塞I/O模型:recvfrom从应用层到内核到时候,如果该缓冲区没有数据到话,就直接返回一个EWOULDBLOCK错误,一般都是对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来。
  1. I/O复用模型:Linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select操作上,这是select/poll可以帮我们侦测多个fd是否处于就绪状态。select/poll上顺序扫描fd是否就绪,而且支持等fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,性能更高。当有fd就绪时,立即回调函数rollbock。
  1. 信号驱动I/O模型:首先开启套接口信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数。当数据准备就绪时,就为该进程生产一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据。
  1. 异步I/O:告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户直接到缓冲区)通知我们。
    这种模型与信号驱动模型到主要区别是:信号驱动I/O由内核通知我们何时开始下一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成。

1.1.2 I/O多路复用技术

在I/O编程过程中,当需要处理多个客户端接入请求时,可以利用多线程或者I/O多路复用技术处理。

I/O多路复用通过把多个I/O的阻塞复用到同一个select到阻塞上,从而可以在单线程的情况下可以同时处理多个客户端请求。I/O多路复用最大的优势就是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低系统的维护工作量,节省了系统资源,I/O多路复用主要应用场景为:

- 服务器需要同时处理多个处于监听状态或者多个连接状态的套接字。
- 服务器需要同时处理多种网络协议的套接字。

目前支持I/O多路复用的系统调用有select、pselect、poll、epoll

其中epoll为select的替代,相较于select,epoll做了很多改进,如下:

  1. 支持一个进程打开的socket描述符不受限制(仅受限与操作系统的最大文件句柄数)。
  2. I/O效率不会随着FD树木的增加而线性下降。
  3. 使用mmap加速内核与用户空间的消息传递。
  4. epoll的api更加简单

2. NIO

2.1 传统的BIO编程

网络编程的基本模型是Client/Server模型,也就行两个进程之间进行相互通信,其中服务端提供位置信息(ip&port),客户端通过连接操作向服务端监听的地址发起连接请求,通过3次握手建立连接,双方就可以通过网络套接字(socket)进行通信。

2.1.1 BIO通信模型

BIO通信的服务端通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成后通过输出流返回应答给客户端,线程销毁。

该模型最大的问题是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈现1:1的正比关系,导致系统的性能急剧下降。

2.2 伪异步I/O编程

为了解决同步阻塞I/O中一个链路一个线程的问题,通过一个线程池或者任务队列来处理多个客户端的请求接入,形成客户端数M:线程池最大线程数N的比例关系,其中M可以远远大于N。通过线程池可以灵活的调配线程资源,设置线程的最大值,防止线程耗尽。

2.2.1 伪异步I/O模型图

当有新的客户端接入时,将客户端的socket封装称一个Task投递到后端到线程池中进行处理。因为线程池到资源占用是可控的,无论多少个客户端并发访问们都不会导致资源的耗尽和宕机。

但是在同步I/O中,对当对Socket对输入流进行读取操作的时候,它会一直阻塞下去,直到发生如下三种事情:

  • 1.有数据可读
  • 2.可用数据已经读取完毕
  • 3.发生异常

这意味着当对方发送请求或者应答消息比较缓慢,或者网络传输较慢时,读取输入流一方当通信线程将被长时间阻塞,而且在此期间,其他接入消息只能在消息队列中排队。

2.2.2 同步I/O的问题

  1. 服务器处理缓慢,返回应答消息耗时也相应变慢。
  2. 采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,那该线程也同步阻塞。
  3. 加入所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
  4. 由于线程池采用阻塞队列实现,当队列积满以后,后续入队列的操作将被阻塞。
  5. 由于只有一个Accptor线程接收客户端请求,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
  6. 由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接受新的消息。

这几乎是一个链式的级联故障,那该怎么解决呢?

答案是NIO。

2.3 NIO(Non-block I/O)编程

2.3.1 NIO类库简介

首先介绍一下NIO的一些概念与功能:

1. 缓冲区Buffer

Buffer是一个对象,它包含一些要写入或者要读出的数据。在面向流的I/O中,可以将数据直接写入或者将数据直接读到Stream对象中。

在NIO库中,所有数据都是用缓冲区处理的。在读取数据时是直接渠道缓冲区中的;在写入数据时也是写入到缓冲区中。

缓冲区实质上似一个数组,通常是一个字节数组-ByteBuffer,也可以使用其他种类的数组。但是一个缓冲区不及您是一个数组,缓冲区提供了对数据对结构化访问以及维护读写位置(limit)等信息。

最常用对缓冲区是BtyeBuffer,下面是Java提供对根据数据类型区分的缓冲区:

  • ByteBuffer:字节缓冲区
  • CharBuffer:字符缓冲区
  • ShortBuffer:短整型缓冲区
  • IntBuffer:整形缓冲区
  • LongBuffer:长整形缓冲区
  • FloatBuffer:浮点型缓冲区
  • DoubleBuffer:双精度浮点型缓冲区

每一个Buffer类都是Buffer接口的一个子实例,除ByteBuffer外每一个Buffer类都有完全一样的操作,只是所处理的数据类型不一样。因为大多数标准I/O操作都使用BtyeBuffer,所以它具有一般缓冲区的操作之外还提供了一些特有的操作,以方便网络读写。

2. 通道 Channel

Channel是一个通道,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动,而通过可以用于读、写或者二者同时进行。

因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。

3. 多路复用器 Selector

多路复用器提供选择已经就绪的任务的能力,简单来说就是Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

一个多路复用器Selector可以同时轮询多个Channel,而且因为Jdk使用的是epoll()代替select,并没有最大连接句柄的限制。

2.3.2 NIO服务端

NIO服务端时序图

Step 1:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道。
Step 2:绑定监听端口,设置连接为非阻塞模式。
Step 3:创建Reactor线程,创建多路复用器并启动线程。
Step 4:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,检查ACCEPT事件。
Step 5:多路复用器在线程run方法的无限循环体内轮询准备就绪的key
Step 6:多路复用器监听到有新到客户端接入,处理新到接入请求,完成TCP三次握手,建立物理链路。
Step 7:设置客户端链路为非阻塞模式
Step 8:将新接入到库护短注册到Reactor线程到多路复用器上,监听读操作,读取客户端发送到网络消息。
Step 9:异步读取客户端请求消息到缓冲区。
Step 10:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续对豹纹,将解码成功对消息封装成task,投递到业务线程池中进行业务逻辑。
Step 11:将POJO对象encode成ByteBuffer,调用SocketChannel到异步write接口,将消息异步发送给客户端。

⚠️注意:如果发送区Tcp缓冲区满,会导致写半包,此时需要注册监听写操作位,循环写直到整包消息写入缓冲区。

2.3.3 NIO客户端

NIO客户端

Step 1:打开ServerSocketChannel,绑定客户端本地地址。
Step 2:设置SocketChannel为非阻塞模式,同时设置客户端连接的Tcp参数。
Step 3:异步连接服务端。
Step 4:判断是否连接成功,如果成功则直接注册读状态到多路复用器中。
Step 5:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP ACk应答。
Step 6:创建Reactor线程,创建多路复用器并启动线程。
Step 7:多路复用器在线程run方法的无限循环体内轮询准备就绪的key。
Step 8:接收connect事件进行处理。
Step 9:判断连接结果,如果成功注册读时间到多路复用器。
Step 10:注册读事件到多路复用器。
Step 11:异步读客户端请求到缓冲区。
Step 10:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续对豹纹,将解码成功对消息封装成task,投递到业务线程池中进行业务逻辑。
Step 11:将POJO对象encode成ByteBuffer,调用SocketChannel到异步write接口,将消息异步发送给客户端。

总结:
1)客户端发起的连接操作是异步的,可以通过多路复用器注册OP_CONNECT等带后续结果,不需要像之前一样被同步阻塞。
2)SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回。这样I/O通信线程就可以处理其他的链路无需同步等待。
3)线程模型的优化:没有连接句柄数

2.4 AIO(新异步非阻塞IO)编程

AIO为UNIX网络编程中的事件驱动I/O,它不需要多路复用器对注册对通道进行轮询操作即可实现异步读写,从而简化NIO对编程模型。

2.5 4种I/O的对比

2.5.1 不同I/O模型对比

- 同步阻塞I/O(BIO) 伪异步I/O 非阻塞I/O(NIO) 异步I/O(AIO)
客户端个数:I/O线程 1:1 M:N M:1 M:0(不需要启动额外的I/O线程,被动回调)
I/O类型 阻塞I/O 阻塞I/O 非阻塞I/O 非阻塞I/O
I/O类型(同步) 同步I/O 同步I/O 同步I/O(I/O多路复用) 异步I/O
可靠性 非常差
吞吐量

3 Netty入门

3.1 Netty服务端开发

🏠例子:

服务端

package com.likecat.netty.book;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 服务端
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:21
 */
public class TimeServer {

    public void bind(int port){
        //配置服务端nio线程组,处理网络事件
        EventLoopGroup boosGroup = new NioEventLoopGroup();//用于服务端接受客户端连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于进行SocketChannel网络读写

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
            //退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new TimeServer().bind(port);
    }
}

服务端具体操作类

package com.likecat.netty.book;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.UnsupportedEncodingException;
import java.util.Date;

/**
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:41
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;

        byte[] req = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);

        String body = new String(req, "UTF-8");
        System.out.println("The time server receive order :" + body);

        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

        //为了效率,write并不直接将消息写入到SocketChannel中,调用write方法只是把待发送的消息
        //放到发送缓冲数组中,再调用flush发送
        ctx.write(resp);
        System.out.println("写入完成");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        //将消息发送队列中的消息写到SocketChannel中发送给对方
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        ctx.close();
    }
}

3.2 Netty客户端开发

客户端

package com.likecat.netty.book;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * 客户端
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:06
 */
public class TimeClient {

    public void connect(int port ,String host) throws Exception {

        //配置nio线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
            //优雅退出,释放NIO线程组
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new TimeClient().connect(port,"127.0.0.1");
    }
}

客户端具体操作类

package com.likecat.netty.book;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:09
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        ByteBuf byteBuf = (ByteBuf) msg;

        byte[] req = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);

        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        System.out.println("释放资源");
        ctx.close();
    }
}

当客户端和服务端建立TCP链路成功后,Netty端NIO线程会调用channelActive方法,发送查询时间端指令给服务端,调用writeAndFlush方法将请求消息发送给服务端。

当服务端返回应答消息时,channelRead方法被调用。

3.3 运行

执行结果 -> 服务端

The time server receive order :QUERY TIME ORDER
写入完成

执行结果 -> 客户端

Now is :Mon Oct 17 11:25:40 CST 2022

4 TCP粘包/拆包问题

4.1 TCP粘包/拆包

TCP是个“流‘协议,所谓流,就是没有界限的一串数据。一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送。

4.1.1 TCP粘包/拆包问题说明

TCP粘包/拆包

TCP粘包/拆包具体分析如上图,假设客户端分别发送了2个数据包D1和D2给服务端,由于服务端一次读取到到字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分2此读取到了2个独立的数据包,分别是D1和D2,没有粘包,拆包。
  2. 服务点一次接收到了2个数据包,D1和D2粘合在一起,被称为TCP粘包。
  3. 服务点分2次读取到了2个数据包,一次读取到了完整的D1和D2包的一部分内容,第二次读取到了D2包到剩余内容,被称为TCP拆包。
  4. 服务端分2次读取道理2个数据包,第一次读取到了D1包的部分内容D1-1,第二次读取到了D1包的剩余内容D1-2和D2包的整包。

如果此时服务端TCP接受滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次次才能将D1和D2包接收完全,期间发送多次拆包。

4.1.2 TCP粘包/拆包发生的原因

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
  2. 进行MSS大小的TCP分段
  3. 以太网帧的payload大于MTU进行IP分片

4.1.3 粘包问题的解决策略

TCP无法理解上层业务逻辑,只能通过上层应用协议栈设计来解决,如下:

  1. 消息定长
  2. 在包尾增加回车换行符进行分割
  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段
  4. 更富在的应用层协议

接下来看Netty如果解决TCP粘包/拆包问题

4.2 未考虑TCP粘包案例

4.2.1 TimeServer改造

我们用上面的例子复刻一下粘包场景。

TimeServerHandler改造

package com.likecat.netty.book;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

/**
 * 粘包
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:41
 */
public class TimeServerApHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;

        byte[] req = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);

        String body = new String(req, "UTF-8").substring(0,req.length - System.getProperty("line.separator").length());
        //每读到一条消息后就记一次数,然后发送消息给客户端
        //按照设计,服务端接收到的消息总数应该于客户端发送的消息总数相同
        System.out.println("The time server receive order :" + body + "; the counter is : " + ++counter);

        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";

        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

        //为了效率,write并不直接将消息写入到SocketChannel中,调用write方法只是把待发送的消息
        //放到发送缓冲数组中,再调用flush发送
        ctx.write(resp);
        System.out.println("写入完成");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        //将消息发送队列中的消息写到SocketChannel中发送给对方
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        ctx.close();
    }
}

4.2.2 TimeClientHandler改造

TimeClientHandler改造

package com.likecat.netty.book;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 * 客户端粘包
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:09
 */
public class TimeClientApHandler extends ChannelInboundHandlerAdapter {

//    private final ByteBuf firstMessage;
    private int counter;
    private byte[] req;

    public TimeClientApHandler() {
       req = ( "QUERY TIME ORDER" + System.getProperty("line.separator") ).getBytes();
//        firstMessage = Unpooled.buffer(req.length);
//        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx){
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        ByteBuf byteBuf = (ByteBuf) msg;

        byte[] req = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);

        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body + "; the counter is :" + ++counter);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        System.out.println("释放资源");
        ctx.close();
    }
}

4.2.3 运行

执行结果

QUERY TIME ORDER; the counter is : 18

Now is :BAD ORDER
; the counter is :1

服务端运行结果表明他没有接收到100条消息,说明发生了TCP粘包。

客户端应该接收到100条当前时间到消息,但实际上只接收到了1条,说明服务端返回应答消息也发生了粘包。

当TCP粘包时,我们到程序就不能正常工作。

4.3 利用LineBasedFrameDecoder解决TCP粘包问题

4.3.1 支持TCP粘包的TimeServer

Netty默认提供了多种编解码器用于处理半包问题。

TimeServer

package com.likecat.netty.book;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * 服务端(支持粘包)
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:21
 */
public class TimeApServer {

    public void bind(int port){
        //配置服务端nio线程组,处理网络事件
        EventLoopGroup boosGroup = new NioEventLoopGroup();//用于服务端接受客户端连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于进行SocketChannel网络读写

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
            //退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new TimeServerApHandler());
        }
    }

    public static void main(String[] args) {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new TimeApServer().bind(port);
    }
}

TimeServerApHandler

package com.likecat.netty.book;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

/**
 * 粘包
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:41
 */
public class TimeServerApHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        String body = (String) msg;
        //每读到一条消息后就记一次数,然后发送消息给客户端
        //按照设计,服务端接收到的消息总数应该于客户端发送的消息总数相同
        System.out.println("The time server receive order :" + body + "; the counter is : " + ++counter);

        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";

        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());

        //为了效率,write并不直接将消息写入到SocketChannel中,调用write方法只是把待发送的消息
        //放到发送缓冲数组中,再调用flush发送
        ctx.write(resp);
        System.out.println("写入完成");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        //将消息发送队列中的消息写到SocketChannel中发送给对方
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        ctx.close();
    }
}

4.3.2 支持TCP粘包的TcpClient

TimeApClient

package com.likecat.netty.book;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * 粘包客户端
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:06
 */
public class TimeApClient {

    public void connect(int port ,String host) throws Exception {

        //配置nio线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeClientApHandler());
                        }
                    });

            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
            //优雅退出,释放NIO线程组
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new TimeApClient().connect(port,"127.0.0.1");
    }
}

TimeClientApHandler

package com.likecat.netty.book;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 * 客户端粘包
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:09
 */
public class TimeClientApHandler extends ChannelInboundHandlerAdapter {

//    private final ByteBuf firstMessage;
    private int counter;
    private byte[] req;

    public TimeClientApHandler() {
       req = ( "QUERY TIME ORDER" + System.getProperty("line.separator") ).getBytes();
//        firstMessage = Unpooled.buffer(req.length);
//        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx){
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        String body = (String) msg;
        System.out.println("Now is :" + body + "; the counter is :" + ++counter);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        System.out.println("释放资源");
        ctx.close();
    }
}

4.3.3 运行

服务端

The time server receive order :QUERY TIME ORDER; the counter is : 100
写入完成

客户端

Now is :Mon Oct 17 15:41:57 CST 2022; the counter is :98
Now is :Mon Oct 17 15:41:57 CST 2022; the counter is :99
Now is :Mon Oct 17 15:41:57 CST 2022; the counter is :100

4.3.4 LineBasedFrameDecoder 和 StringDecoder原理分析

LineBasedFrameDecoder的工作原理是它一次遍历ByteBuf的可读字节,判断看是否有\n或者\r\n,如果有就以次位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。

它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符2种解码方式,同时支持配置但行最大长度。如果连续读取到最大长度后仍然没有出现换行符,就会抛出异常。

StringDecoder的功能非常简单,就是将接收到到对象转换成字符串,然后继续调用后续到Handler。

5 分隔符和定长解码器到应用

TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,一般采用如下方式:

  1. 消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整到消息,然后将计数器置位,重新开始读取下一个数据报。
  2. 将回车换行符作为消息结束符
  3. 将特殊到分隔符作为消息到结束标志
  4. 通过在消息头中定义长度字段来表示消息到总长度

Netty对上面4种做了统一抽象,提供了4种解码器来解决对应对问题。上述使用LineBasedFrameDecoder 就是其中之一,还有DelimiterBasedFrameDecoder用于自动完成以分隔符做结束标志对解码,FixedLengthFrameDecoder用于自动完成对定长消息对解码。

5.1 DelimiterBasedFrameDecoder应用开发

🏠例子:本例将以$_作为分隔符

5.1.1 DelimiterBasedFrameDecoder服务端开发

EchoServer

package com.likecat.netty.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * ECHO服务端
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:21
 */
public class EchoServer {

    public void bind(int port){
        //配置服务端nio线程组,处理网络事件
        EventLoopGroup boosGroup = new NioEventLoopGroup();//用于服务端接受客户端连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于进行SocketChannel网络读写

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            //单体消息最大长度1024,当达到该长度后仍然没有查找到分隔符就抛出异常
                            channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            channel.pipeline().addLast(new StringDecoder());//将ByteBuf解码成字符串对象
                            channel.pipeline().addLast(new EchoServerHandler());//接收消息
                        }
                    });

            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
            //退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new EchoServer().bind(port);
    }
}

EchoServerHandler

package com.likecat.netty.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

/**
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:41
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    private int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("This is" + ++counter + "The time server receive client :" + body);

        body += "$_";
        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
        ctx.write(echo);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        ctx.close();
    }
}

5.1.2 DelimiterBasedFrameDecoder客户端开发

EchoClient

package com.likecat.netty.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * ECHO客户端
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:06
 */
public class EchoClient {

    public void connect(int port ,String host) throws Exception {

        //配置nio线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            //单体消息最大长度1024,当达到该长度后仍然没有查找到分隔符就抛出异常
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());//将ByteBuf解码成字符串对象
                            socketChannel.pipeline().addLast(new EchoClientHandler());//接收消息
                        }
                    });

            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
            //优雅退出,释放NIO线程组
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new EchoClient().connect(port,"127.0.0.1");
    }
}

EchoClientHandler

package com.likecat.netty.echo;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 11:09
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private int counter;
    static final String ECHO_REQ = "Hi LIKECAT,Welcome to my home.$_";

    public EchoClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx){
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        System.out.println("This is" + ++counter + "The times  receive server : [" + msg + "]");

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        ctx.flush();
    }
}

5.1.3 DelimiterBasedFrameDecoder运行

执行结果

This is1The time server receive client :Hi LIKECAT,Welcome to my home.
This is2The time server receive client :Hi LIKECAT,Welcome to my home.
This is3The time server receive client :Hi LIKECAT,Welcome to my home.
This is4The time server receive client :Hi LIKECAT,Welcome to my home.
This is5The time server receive client :Hi LIKECAT,Welcome to my home.
This is6The time server receive client :Hi LIKECAT,Welcome to my home.
This is7The time server receive client :Hi LIKECAT,Welcome to my home.
This is8The time server receive client :Hi LIKECAT,Welcome to my home.
This is9The time server receive client :Hi LIKECAT,Welcome to my home.
This is10The time server receive client :Hi LIKECAT,Welcome to my home.

如果没有使用DelimiterBasedFrameDecoder则会出现下面的情况,因为服务端一次读取了客户端发送的所有消息,导致TCP粘包拆包问题的出现。

执行结果

This is1The time server receive client :Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_Hi LIKECAT,Welcome to my home.$_

5.2 FixedLengthFrameDecoder应用开发

FixedLengthFrameDecoder是固定长度解码器。无论一次接收到多少数据包,它都会按照构造函数中设置都固定长度进行解码,如果是半包消息,则会缓存半包消息并等待下个包到达后进行拼包,知道读取一个完整的包。

5.2.1 FixedLengthFrameDecoder服务端开发

在服务端中新增FixedLengthFrameDecoder,长度设置为20。

FixedServer

package com.likecat.netty.fixed;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * 定长解码服务端
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:21
 */
public class FixedServer {

    public void bind(int port){
        //配置服务端nio线程组,处理网络事件
        EventLoopGroup boosGroup = new NioEventLoopGroup();//用于服务端接受客户端连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于进行SocketChannel网络读写

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new FixedLengthFrameDecoder(20));
                            channel.pipeline().addLast(new StringDecoder());//将ByteBuf解码成字符串对象
                            channel.pipeline().addLast(new FixedServerHandler());//接收消息
                        }
                    });

            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
            //退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;

        if(args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }

        new FixedServer().bind(port);
    }
}

FixedServerHandler

package com.likecat.netty.fixed;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author likecat
 * @version 1.0
 * @date 2022/10/17 10:41
 */
public class FixedServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
//        String body = (String) msg;
        System.out.println("The receive client :" + msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx){
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        ctx.close();
    }
}

5.2.2 FixedLengthFrameDecoder运行

这次我们通过命令的形式运行,执行下面命令,即可于服务端通信:

telnet localhost 8080

一条小咸鱼