阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

通过Tomcat的Http11NioProtocol源码学习Java NIO设计

115次阅读
没有评论

共计 16531 个字符,预计需要花费 42 分钟才能阅读完成。

Tomcat 的 Http11NioProtocol 协议使用 Java NIO 技术实现高性能 Web 服务器。本文通过分析 Http11NioProtocol 源码来学习 Java NIO 的使用。从中可以了解到阻塞 IO 和非阻塞 IO 的配合,NIO 的读写操作以及 Selector.wakeup 的使用。

1. 初始化阶段
Java NIO 服务器端实现的第一步是开启一个新的 ServerSocketChannel 对象。Http11NioProtocol 的实现也不例外, 在 NioEndPoint 类的 init 方法可以看到这段代码。

代码 1:NioEndPoint.init() 方法

public void init()
    throws Exception {

    if (initialized)
        return;
    // 开启一个新的 ServerSocketChannel
    serverSock = ServerSocketChannel.open();
    // 设置 socket 的性能偏好
    serverSock.socket().setPerformancePreferences(socketProperties .getPerformanceConnectionTime(),
                                                  socketProperties.getPerformanceLatency(),
                                                  socketProperties.getPerformanceBandwidth());
    InetSocketAddress addr = (address!=null ?new InetSocketAddress(address ,port):new InetSocketAddress(port));
    // 绑定端口号,并设置 backlog
    serverSock.socket().bind(addr,backlog);
    // 将 serverSock 设置成阻塞 IO
    serverSock.configureBlocking(true); //mimic APR behavior

    // 初始化 acceptor 线程数
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn’t seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    // 初始化 poller 线程数
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }

    // 根据需要,初始化 SSL
    // 因为主要关注 Java NIO, 所以这一块代码就省略掉了
    if (isSSLEnabled()) {
      ……
    }
    //OutOfMemoryError 策略
    if (oomParachute >0) reclaimParachute(true);

    // 开启 NioSelectorPool
    selectorPool.open();
    initialized = true ;
}

 

在 NioEndPoint.init 方法中,可以看到 ServerSocketChannel 被设置成阻塞 IO,并且没有注册任何就绪事件。这样可以和阻塞 ServerSocket 一样方便地使用阻塞 accept 方法来接收客户端新来的连接。但不同的是当 NioEndPoint.Accept 线程通过 accept 方法获得一个新的 SocketChannel 后会构建一个 OP_REGISTER 类型的 PollerEvent 事件并放到 Poller.events 队列中。而我们使用 ServerSocket 实现服务器的时候,在接收到新连接后,一般是从线程池中取出一个线程来处理这个连接。

在 NioEndPoint.Accept 的 setSocketOptions 方法中可以看到获得 SocketChannel 后的处理过程。步骤如下:

1)将 SocketChannel 设置成非阻塞;

2)构建 OP_REGISTER 类型的 PollerEvent 对象,并放入到 Poller.events 队列中。

代码 2:NioEndPoint.Accept 类的 setSocketOptions 方法

protected boolean setSocketOptions(SocketChannel socket) {
    try {
      // 将客户端 Socket 设置为非阻塞,APR 风格
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        // 从缓存中取 NioChannel 对象,如果取不到直接构建一个
        NioChannel channel = nioChannels.poll();
        if (channel == null) {
            // 如果 sslContext 不等于 null, 需要启动 ssl
            if (sslContext != null) {
                ….
            }
            // 正常 tcp 启动
            else {
                // 构建 NioBufferHandler 对象
                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties .getAppReadBufSize(),
                                                                  socketProperties.getAppWriteBufSize(),
                                                                  socketProperties.getDirectBuffer());
                // 构建 NioChannel 对象
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            // 从缓存中取的 NioChannel 对象,将客户端 socket 设置进去
            channel.setIOChannel(socket);
            if (channel instanceof SecureNioChannel) {
                SSLEngine engine = createSSLEngine();
                ((SecureNioChannel)channel).reset(engine);
            } else {
                channel.reset();
            }
        }
        // 注册 NioChannel 对象
        getPoller0().register(channel);
    } catch (Throwable t) {
        try {
            log.error(“” ,t);
        } catch (Throwable tt){}
        // Tell to close the socket
        return false ;
    }
    return true ;
}

Poller 线程会从 Poller.events 队列中取出 PollerEvent 对象,并运行 PollerEvent.run() 方法。在 PollerEvent.run() 方法中发现是 OP_REGISTER 事件,则会在 Poller.selector 上注册 SocketChannel 对象的 OP_READ 就绪事件。

代码 3:PollerEvent.run() 方法代码片段

public void run() {
  if (interestOps == OP_REGISTER) {
      try {
          // 在 Poller.selector 上注册 OP_READ 就绪事件
          socket.getIOChannel().register(socket .getPoller().getSelector(), SelectionKey.OP_READ , key);
      } catch (Exception x) {
          log.error(“” , x);
      }
  }
  ……
}

至此,一个客户端连接准备工作就已经完成了。我们获得了一个客户端的 SocketChannel, 并注册 OP_READ 就绪事件到 Poller.selector 上(如图 1)。下面就可以进行数据读写了。

图 1:ServerSocketChannel 和 SocketChannel 的初始化状态

通过 Tomcat 的 Http11NioProtocol 源码学习 Java NIO 设计

2. Poller.selector 的 wakeup 方法
Poller 线程会做如下工作:

1) 通过 selection 操作获取已经选中的 SelectionKey 数量;

2) 执行 Poller.events 队列中的 PollerEvent;

3) 处理已经选中的 SelectionKey。

当有新 PollerEvent 对象加入 Poller.events 队列中,需要尽快执行第二步,而不应该阻塞的 selection 操作中。所以就需要配合 Selector.wakeup() 方法来实现这个需求。Tomcat 使用信号量 wakeupCounter 来控制 Selector.wakeup() 方法,阻塞 Selector.select() 方法和非阻塞 Selector.selectNow() 方法的使用。

当有新 PollerEvent 对象加入 Poller.events 队列中,会按照如下条件执行 Selector.wakeup() 方法。

当 wakeupCounter 加 1 后等于 0,说明 Poller.selector 阻塞在 selection 操作,这时才需要调用 Selector.wakeup() 方法。
当 wakeupCounter 加 1 后不等于 0,说明 Poller.selector 没有阻塞在 selection 操作,则不需要调用 Selector.wakeup() 方法。并且为了尽快执行第二步,Poller 线程在下一次直接调用非阻塞方法 Selector.selectNow()。
代码 4:Poller.addEvent() 方法,实现将 PollerEvent 对象加入 Poller.events 队列中。

public void addEvent(Runnable event) {
  events.offer(event);
  // 如果 wakeupCount 加 1 后等于 0,则调用 wakeup 方法
  if (wakeupCounter .incrementAndGet() == 0 ) selector.wakeup();
}

代码 5: Poller 线程的 selection 操作代码
if (wakeupCounter .get()>0) {
  keyCount = selector.selectNow();
 else {
  wakeupCounter.set(-1);
  keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);

这样的设计因为 Java NIO 的 wakeup 有如下的特性:
在 Selector 对象上调用 wakeup() 方法将会导致第一个没有返回的 selection 操作立即返回。如果当前没有进行的 selection 操作,那么下一次的 select() 方法的调用将立即返回。而这个将 wakeup 行为延迟到下一个 select() 方法经常不是我们想要的(当然也不是 Tomcat 想要的)。我们一般只是想从 sleeping 的线程 wakeup,但允许接下来的 selection 操作正常处理。
所以,Tomcat 通过 wakeupCounter 信号量的变化来控制只有阻塞在 selection 操作的时候才调用 Selector.wakeup() 方法。当有新 PollerEvent 对象加入 Poller.events 队列中,并且没有处于阻塞在 selection 操作中,则直接调用非阻塞方法 Selector.selectNow()。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2015-02/113900p2.htm

3. 读(写)数据

Poller 线程会调用 Poller.processKey() 方法处理已经选中的 SelectionKey。

该方法会完成下面工作:

1)取消在 Poller.selector 上注册的 OP_READ 就绪事件;

2)启动工作线程来处理网络请求;

      2-1)读取和解析 http 请求数据

      2-2)如果是动态内容,则会调用用户自定义的 Servlet 类处理并返回结果给浏览器;如果是静态内容,则会直接返回静态资源数据给浏览器。

我们在这就不详细讨论 http 协议的实现以及 Servlet 的使用,直接跳到网络 IO 读写实现类 NioSelectorPool。

NioSelectorPool 类也提供了产生 Selector 对象的功能,通过 NioSelectorPool.get() 方法就可以获得一个 Selector 对象。

根据命令行参数 -Dorg.apache.tomcat.util.net.NioSelectorShared 的设置决定是否在 SocketChannel 中共享 Selector。

  • 若会 True(默认), 则所有的 SocketChannel 共享一个 Selector;
  • 若为 False,  则每一个 SocketChannel 使用不同的 Selector(开启的 Selector 对象最多不超过 NioSelectorPool.maxSelectors)。

从 NioSelectorPool 类中获得的 Selector 对象会传入到 NioSelectorPool 的 read 和 write 方法,并在网络 IO 读写时候使用。

NioSelectorPool 类的读写方法提供了两种模式。通过方法的最后一个入参 block 控制。

1)读方法 read():

  • block 为 False, 则是非阻塞模式。如果读不到数据,则直接返回了;如果读到数据则继续读。
  • block 为 True, 则是阻塞模式。如果第一次读取不到数据,会在 NioSelectorPool 提供的 Selector 对象上注册 OP_READ 就绪事件,并循环调用 Selector.select(long) 方法,超时等待 OP_READ 就绪事件。如果 OP_READ 事件已经就绪,并且接下来读到数据,则会继续读。read() 方法整体会根据 readTimeout 设置进行超时控制。若超时,则会抛出 SocketTimeoutException 异常。

2)写方法 write():

  • block 为 False, 则是非阻塞模式。写数据之前不会监听 OP_WRITE 事件。如果没有成功,则直接返回。
  • block 为 True, 则是阻塞模式。第一次写数据之前不会监听 OP_WRITE 就绪事件。如果没有写成功,则会在 NioSelectorPool 提供的 selector 注册 OP_WRITE 事件。并循环调用 Selector.select(long) 方法,超时等待 OP_WRITE 就绪事件。如果 OP_WRITE 事件已经就绪,并且接下来写数据成功,则会继续写数据。write 方法整体会根据 writeTimeout 设置进行超时控制。如超时,则会抛出 SocketTimeoutException 异常。

另外如果是共享 Selector(NioSelectorShared=true) 并且阻塞模式 (block=true),则会使用 NioBlockingSelector 类实现读写数据。该类与 NioSelectorPool 使用 Java NIO 的策略是类似的,但实现略有不同,本文就不详细分析了。

图 2:事件注册在读写时候发生的变化

通过 Tomcat 的 Http11NioProtocol 源码学习 Java NIO 设计

下面是 NioSelectorPool 的 read 方法,实现从网络 IO 中读取数据。该方法有 5 个参数:

  • buf 保存从网络 IO 中读取到的数据;
  • socket NioChannel 对象,其中封装了 SocketChannel;
  • selector 为 block 模式使用的 Selector 对象,在实际调用的时候,会将 NioSelectorPool 类提供的 selector 对象传进去;
  • readTimout 读超时时间;
  • block 是否是阻塞模式,上面已经说明阻塞和非阻塞模式的区别。

代码 6:NioSelectorPool 的 read() 方法

public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {// 如果是共享 Selector 和阻塞模式,则使用 NioBlockingSelector 实现数据读取 
    if (SHARED && block) {return blockingSelector .read(buf,socket,readTimeout);
    }
    SelectionKey key = null;
    int read = 0;
    boolean timedout = false;
    // 一开始我们认为是可以读的 
    int keycount = 1; //assume we can read
    // 开始时间 
    long time = System.currentTimeMillis(); //start the timeout timer
    try {// 当没有超时,则继续读数据 
        while ((!timedout) ) {int cnt = 0;
            if (keycount > 0) {//only read if we were registered for a read
                cnt = socket.read(buf);
                if (cnt == -1) throw new EOFException();
                read += cnt;
                // 如果读取到数据,则继续读 
                if (cnt > 0) continue; //read some more
                // 如果没有读取到数据,并且不是 block 模式,则直接 break
                if (cnt==0 && (read>0 || (!block) ) ) break; //we are done reading
            }
            if (selector != null) {//perform a blocking read
                // 在 NioSelectionPool 提供的 selector 上注册 OP_READ 事件 
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
                else key.interestOps(SelectionKey.OP_READ);
                // 调用 Selector.select 方法 
                keycount = selector.select(readTimeout);
            }
            // 计算是否超时 
            if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout;
        } //while
          // 如果超时,抛出 SocketTimeoutException 异常 
        if (timedout) throw new SocketTimeoutException();} finally {// 在返回前,取消 SelectionKey,并将所有的 key 从 selector 中删掉 
        if (key != null) {key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return read;
}
下面是 NioSelectorPool 的写方法,实现向网络 IO 中写数据。该方法有 5 个参数:
  • buf 保存需要写入的数据;
  • socket NioChannel 对象,其中封装了 SocketChannel;
  • selector 为 block 模式使用的 Selector 对象,在实际调用的时候,会将 NioSelectorPool 类提供的 selector 对象传进去;
  • writeTimeout 写超时时间;
  • block 是否是阻塞模式,上面已经说明阻塞和非阻塞模式的区别;
  • lastWrite 最近写入数据的 byte 数量。

代码 7:NioSelectorPool.write() 方法

public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                 long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException {// 如果是共享 Selector 和阻塞模式,则使用 NioBlockingSelector 实现写数据 
    if (SHARED && block) {return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
    }
    SelectionKey key = null;
    int written = 0;
    boolean timedout = false;
   // 一开始我们认为是可以读的 
   int keycount = 1; //assume we can write
   // 记录开始时间 
    long time = System.currentTimeMillis(); //start the timeout timer
    try {while ((!timedout) && buf.hasRemaining()) {int cnt = 0;
            if (keycount > 0) {//only write if we were registered for a write
                cnt = socket.write(buf); //write the data
                if (lastWrite!=null) lastWrite.set(cnt);
                if (cnt == -1) throw new EOFException();
               
                written += cnt;
                 // 如果写数据成功,重新记录超时开始时间,并继续读 
                if (cnt > 0) {time = System. currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
                // 如果写入数据为 0,并且是非阻塞模式,则直接退出 
                if (cnt==0 && (!block)) break; //don't block
            }
            if (selector != null) {// 在 NioSelectorPool 的 selector 注册 OP_WRITE 事件 
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
                else key.interestOps(SelectionKey.OP_WRITE);
                keycount = selector.select(writeTimeout);
            }
            // 是否超时 
            if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
        } //while
          // 如果超时,则直接抛出 SocketTimeoutException 异常 
        if (timedout) throw new SocketTimeoutException();} finally {// 在返回前,取消 SelectionKey,并将所有的 key 从 selector 中删掉 
        if (key != null) {key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return written;
}

4. 总结

Tomcat 在使用 Java NIO 的时候,将 ServerSocketChannel 配置成阻塞模式,这样可以方便地对 ServerSocketChannel 编写程序。当 accept 方法获得一个 SocketChannel,并没有立即从线程池中取出一个线程来处理这个 SocketChannel,而是构建一个 OP_REGISTER 类型的 PollerEvent,并放到 Poller.events 队列中。Poller 线程会处理这个 PollerEvent,发现是 OP_REGISTER 类型,会在 Poller.selector 上注册一个这个 SocketChannel 的 OP_READ 就绪事件。如图 1 所示。

因为 Java NIO 的 wakeup 特性,使用 wakeupCount 信号量控制 Selector.wakeup() 方法,非阻塞方法 Selector.selectNow() 和阻塞方法 Selector.select() 的调用。我们在编写 Java NIO 程序时候也可以参考这种方式。

在 SocketChannel 上读的时候,分成非阻塞模式和阻塞模式。

  • 非阻塞模式,如果读不到数据,则直接返回了;如果读到数据则继续读。
  • 阻塞模式。如果第一次读取不到数据,会在 NioSelectorPool 提供的 Selector 对象上注册 OP_READ 就绪事件,并循环调用 Selector.select(long) 方法,超时等待 OP_READ 就绪事件。如果 OP_READ 事件已经就绪,并且接下来读到数据,则会继续读。read() 方法整体会根据 readTimeout 设置进行超时控制。若超时,则会抛出 SocketTimeoutException 异常。

在 SocketChannel 上写的时候也分成非阻塞模式和阻塞模式。

  • 非阻塞模式,写数据之前不会监听 OP_WRITE 事件。如果没有成功,则直接返回。
  • 阻塞模式。第一次写数据之前不会监听 OP_WRITE 就绪事件。如果没有写成功,则会在 NioSelectorPool 提供的 selector 注册 OP_WRITE 事件。并循环调用 Selector.select(long) 方法,超时等待 OP_WRITE 就绪事件。如果 OP_WRITE 事件已经就绪,并且接下来写数据成功,则会继续写数据。write 方法整体会根据 writeTimeout 设置进行超时控制。如超时,则会抛出 SocketTimeoutException 异常。

在写数据的时候,开始没有监听 OP_WRITE 就绪事件,直接调用 write() 方法。这是一个乐观设计,估计网络大部分情况都是正常的,不会拥塞。如果第一次写没有成功,则说明网络可能拥塞,那么再等待 OP_WRITE 就绪事件。

阻塞模式的读写方法没有在原有的 Poller.selector 上注册就绪事件,而是使用 NioSelectorPool 类提供的 Selector 对象注册就绪事件。这样的设计可以将各个 Channel 的就绪事件分散注册到不同的 Selector 对象中,避免大量 Channel 集中注册就绪事件到一个 Selector 对象,影响性能。

5. 参考资料

1)Tomcat6.0.18 源码

2)Ron Hitchens 的 Java NIO

Tomcat 的 Http11NioProtocol 协议使用 Java NIO 技术实现高性能 Web 服务器。本文通过分析 Http11NioProtocol 源码来学习 Java NIO 的使用。从中可以了解到阻塞 IO 和非阻塞 IO 的配合,NIO 的读写操作以及 Selector.wakeup 的使用。

1. 初始化阶段
Java NIO 服务器端实现的第一步是开启一个新的 ServerSocketChannel 对象。Http11NioProtocol 的实现也不例外, 在 NioEndPoint 类的 init 方法可以看到这段代码。

代码 1:NioEndPoint.init() 方法

public void init()
    throws Exception {

    if (initialized)
        return;
    // 开启一个新的 ServerSocketChannel
    serverSock = ServerSocketChannel.open();
    // 设置 socket 的性能偏好
    serverSock.socket().setPerformancePreferences(socketProperties .getPerformanceConnectionTime(),
                                                  socketProperties.getPerformanceLatency(),
                                                  socketProperties.getPerformanceBandwidth());
    InetSocketAddress addr = (address!=null ?new InetSocketAddress(address ,port):new InetSocketAddress(port));
    // 绑定端口号,并设置 backlog
    serverSock.socket().bind(addr,backlog);
    // 将 serverSock 设置成阻塞 IO
    serverSock.configureBlocking(true); //mimic APR behavior

    // 初始化 acceptor 线程数
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn’t seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    // 初始化 poller 线程数
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }

    // 根据需要,初始化 SSL
    // 因为主要关注 Java NIO, 所以这一块代码就省略掉了
    if (isSSLEnabled()) {
      ……
    }
    //OutOfMemoryError 策略
    if (oomParachute >0) reclaimParachute(true);

    // 开启 NioSelectorPool
    selectorPool.open();
    initialized = true ;
}

 

在 NioEndPoint.init 方法中,可以看到 ServerSocketChannel 被设置成阻塞 IO,并且没有注册任何就绪事件。这样可以和阻塞 ServerSocket 一样方便地使用阻塞 accept 方法来接收客户端新来的连接。但不同的是当 NioEndPoint.Accept 线程通过 accept 方法获得一个新的 SocketChannel 后会构建一个 OP_REGISTER 类型的 PollerEvent 事件并放到 Poller.events 队列中。而我们使用 ServerSocket 实现服务器的时候,在接收到新连接后,一般是从线程池中取出一个线程来处理这个连接。

在 NioEndPoint.Accept 的 setSocketOptions 方法中可以看到获得 SocketChannel 后的处理过程。步骤如下:

1)将 SocketChannel 设置成非阻塞;

2)构建 OP_REGISTER 类型的 PollerEvent 对象,并放入到 Poller.events 队列中。

代码 2:NioEndPoint.Accept 类的 setSocketOptions 方法

protected boolean setSocketOptions(SocketChannel socket) {
    try {
      // 将客户端 Socket 设置为非阻塞,APR 风格
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        // 从缓存中取 NioChannel 对象,如果取不到直接构建一个
        NioChannel channel = nioChannels.poll();
        if (channel == null) {
            // 如果 sslContext 不等于 null, 需要启动 ssl
            if (sslContext != null) {
                ….
            }
            // 正常 tcp 启动
            else {
                // 构建 NioBufferHandler 对象
                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties .getAppReadBufSize(),
                                                                  socketProperties.getAppWriteBufSize(),
                                                                  socketProperties.getDirectBuffer());
                // 构建 NioChannel 对象
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            // 从缓存中取的 NioChannel 对象,将客户端 socket 设置进去
            channel.setIOChannel(socket);
            if (channel instanceof SecureNioChannel) {
                SSLEngine engine = createSSLEngine();
                ((SecureNioChannel)channel).reset(engine);
            } else {
                channel.reset();
            }
        }
        // 注册 NioChannel 对象
        getPoller0().register(channel);
    } catch (Throwable t) {
        try {
            log.error(“” ,t);
        } catch (Throwable tt){}
        // Tell to close the socket
        return false ;
    }
    return true ;
}

Poller 线程会从 Poller.events 队列中取出 PollerEvent 对象,并运行 PollerEvent.run() 方法。在 PollerEvent.run() 方法中发现是 OP_REGISTER 事件,则会在 Poller.selector 上注册 SocketChannel 对象的 OP_READ 就绪事件。

代码 3:PollerEvent.run() 方法代码片段

public void run() {
  if (interestOps == OP_REGISTER) {
      try {
          // 在 Poller.selector 上注册 OP_READ 就绪事件
          socket.getIOChannel().register(socket .getPoller().getSelector(), SelectionKey.OP_READ , key);
      } catch (Exception x) {
          log.error(“” , x);
      }
  }
  ……
}

至此,一个客户端连接准备工作就已经完成了。我们获得了一个客户端的 SocketChannel, 并注册 OP_READ 就绪事件到 Poller.selector 上(如图 1)。下面就可以进行数据读写了。

图 1:ServerSocketChannel 和 SocketChannel 的初始化状态

通过 Tomcat 的 Http11NioProtocol 源码学习 Java NIO 设计

2. Poller.selector 的 wakeup 方法
Poller 线程会做如下工作:

1) 通过 selection 操作获取已经选中的 SelectionKey 数量;

2) 执行 Poller.events 队列中的 PollerEvent;

3) 处理已经选中的 SelectionKey。

当有新 PollerEvent 对象加入 Poller.events 队列中,需要尽快执行第二步,而不应该阻塞的 selection 操作中。所以就需要配合 Selector.wakeup() 方法来实现这个需求。Tomcat 使用信号量 wakeupCounter 来控制 Selector.wakeup() 方法,阻塞 Selector.select() 方法和非阻塞 Selector.selectNow() 方法的使用。

当有新 PollerEvent 对象加入 Poller.events 队列中,会按照如下条件执行 Selector.wakeup() 方法。

当 wakeupCounter 加 1 后等于 0,说明 Poller.selector 阻塞在 selection 操作,这时才需要调用 Selector.wakeup() 方法。
当 wakeupCounter 加 1 后不等于 0,说明 Poller.selector 没有阻塞在 selection 操作,则不需要调用 Selector.wakeup() 方法。并且为了尽快执行第二步,Poller 线程在下一次直接调用非阻塞方法 Selector.selectNow()。
代码 4:Poller.addEvent() 方法,实现将 PollerEvent 对象加入 Poller.events 队列中。

public void addEvent(Runnable event) {
  events.offer(event);
  // 如果 wakeupCount 加 1 后等于 0,则调用 wakeup 方法
  if (wakeupCounter .incrementAndGet() == 0 ) selector.wakeup();
}

代码 5: Poller 线程的 selection 操作代码
if (wakeupCounter .get()>0) {
  keyCount = selector.selectNow();
 else {
  wakeupCounter.set(-1);
  keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);

这样的设计因为 Java NIO 的 wakeup 有如下的特性:
在 Selector 对象上调用 wakeup() 方法将会导致第一个没有返回的 selection 操作立即返回。如果当前没有进行的 selection 操作,那么下一次的 select() 方法的调用将立即返回。而这个将 wakeup 行为延迟到下一个 select() 方法经常不是我们想要的(当然也不是 Tomcat 想要的)。我们一般只是想从 sleeping 的线程 wakeup,但允许接下来的 selection 操作正常处理。
所以,Tomcat 通过 wakeupCounter 信号量的变化来控制只有阻塞在 selection 操作的时候才调用 Selector.wakeup() 方法。当有新 PollerEvent 对象加入 Poller.events 队列中,并且没有处于阻塞在 selection 操作中,则直接调用非阻塞方法 Selector.selectNow()。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2015-02/113900p2.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计16531字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中