NIO selector通信

server端实现

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        // 1. 创建 selector,管理多个channel
        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 非阻塞模式

        // 2. 建立 selector 和 channel 的联系
        /**
         * SelectionKey 就是事件发生后,通过它可以知道哪个channel发生的事件
         * accept 在有连接请求时触发
         * connect 连接建立后触发
         * read 可读事件
         * write 可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        // key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        ssc.bind(new InetSocketAddress(8901));
        while (true) {
            // 3. select 方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
            // select 在事件未处理时,不会阻塞,事件发生后要么处理,要么取消
            selector.select();
            // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                /**
                 * selector会在发生事件后向selectedKeys中加入key,但不会自动删除,所以处理key 时,
                 * 要从selectedKeys 集合中删除,否则已处理过的channel accept会返回空
                 */
                iter.remove();
                log.info("key: {}", key);
                // 5. 区分事件类型
                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.info("{}", sc);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(8);
                        int read = channel.read(buffer); // 客户端正常断开,返回值-1
                        if (read == -1) {
                            key.cancel();
                        } else {
                            buffer.flip();
                            System.out.println(Charset.defaultCharset().decode(buffer));
                        }
                    } catch (IOException e) {
                        // 当客户端异常断开时,会触发read事件,处理报错
                        e.printStackTrace();
                        // 从selector的key集合中删除
                        key.cancel();
                    }
                }

            }
        }
    }
}

client端实现比较简单

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8901));
        System.out.println("waiting...");
        sc.write(Charset.defaultCharset().encode("你好红红火火恍恍惚惚"));
    }

这样通信是没问题了,但是一个中文占3个字节,这里会有一个消息边界处理的问题,所以控制台输出结果会看到乱码。
WX20220425-092017
解决方法有如下几种:

  • 固定消息长度,服务器安预订长度读取,缺点是浪费带宽
  • 按分隔符拆分,缺点是效率低
  • TLV 格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,可以获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,影响server吞吐量,http用的也是这种方式
    • Http 1.1 是TLV
    • Http 2 是LTV

WX20220425-103332

ByteBuffer大小分配

  • 每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
  • ByteBuffer不能太大,比如一个ByteBuffer 1M,要支持百万连接就要1T内存,因此需要设计大小可变的ByteBuffer
    • 一个思路是首先分配一个较小的buffer,例如4k,发现不够,再分配8k,将4k的内容拷贝到8k的buffer。这样的缺点是拷贝耗费性能。
    • 另一个思路是用多个数组组成buffer,一个数组不够,把多出来的写入新的数组。缺点是消息存储不连续,读取复杂。