当前位置: 代码迷 >> 综合 >> 【JAVA 网络编程系列】Netty -- Netty 关闭流程
  详细解决方案

【JAVA 网络编程系列】Netty -- Netty 关闭流程

热度:27   发布时间:2024-02-24 16:57:31.0

【JAVA 网络编程系列】Netty -- Netty 关闭流程

【1】Netty 关闭方法 -- shutdownGracefully()

public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();EchoServerHandler echoServerHandler = new EchoServerHandler();try {...} finally {// 优雅地关闭两个线程池bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

【2】Netty 关闭流程

【2.1】Netty 关闭流程 -- shutdownGracefully()

public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {@Overridepublic Future<?> shutdownGracefully() {// 调用重载方法// 第一个参数为静默周期,默认2秒// 第二个参数为超时时间,默认15秒return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {@Overridepublic Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {for (EventExecutor l: children) {// 调用孩子的shutdownGracefully()// 这里的EventExecutor就是NioEventLoopl.shutdownGracefully(quietPeriod, timeout, unit);}// 返回的是NioEventLoopGroup的terminationFuturereturn terminationFuture();}
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {// 对应于 NioEventLoop 的 shutdownGracefully()@Overridepublic Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {// 参数检查if (quietPeriod < 0) {throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");}if (timeout < quietPeriod) {throw new IllegalArgumentException("timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");}if (unit == null) {throw new NullPointerException("unit");}// 其它线程正在执行关闭,直接返回if (isShuttingDown()) {return terminationFuture();}// 判断是否处于当前线程boolean inEventLoop = inEventLoop();boolean wakeup;int oldState;for (; ; ) {// 再次检查其它线程正在执行关闭,直接返回if (isShuttingDown()) {return terminationFuture();}int newState;wakeup = true;// 缓存当前状态oldState = state;// 更新状态if (inEventLoop) {newState = ST_SHUTTING_DOWN;} else {switch (oldState) {//五种状态//private static final int ST_NOT_STARTED = 1;//private static final int ST_STARTED = 2;//private static final int ST_SHUTTING_DOWN = 3;//private static final int ST_SHUTDOWN = 4;//private static final int ST_TERMINATED = 5;case ST_NOT_STARTED:case ST_STARTED:newState = ST_SHUTTING_DOWN;break;default:// 此时已经处于关闭相关的状态newState = oldState;// 无需再唤醒selectorwakeup = false;}}// 更新状态成功,退出循环if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {break;}}// 修改NioEventLoop的属性标识gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);gracefulShutdownTimeout = unit.toNanos(timeout);// 若线程没有启动则启动线程// 真正的关闭逻辑将在主循环中处理if (oldState == ST_NOT_STARTED) {doStartThread();}// 添加一个空任务,唤醒EventLoopif (wakeup) {//protected void wakeup(boolean inEventLoop) 在NioEventLoop中被覆写如下//@Override//protected void wakeup(boolean inEventLoop) {//    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {//        唤醒selector//        selector.wakeup();//    }//}wakeup(inEventLoop);}// 返回NioEventLoop的terminationFuturereturn terminationFuture();}
}

【2.2】Netty 关闭流程 -- NioEventLoop.protected void run()

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private void doStartThread() {assert thread == null;// 真正启动线程的地方executor.execute(() -> {...try {// protected abstract void run();// 该方法由子类覆写// 比如NioEventLoop中对run()方法的覆写// 在NioEventLoop类的run()方法中开启了一个主循环SingleThreadEventExecutor.this.run();// 标记启动成功success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {...}});}
}

在 NioEventLoop.protected void run() 方法中真正处理 Netty 的关闭逻辑;

public final class NioEventLoop extends SingleThreadEventLoop {@Overrideprotected void run() {for (;;) {...// 主循环中一直在处理关闭逻辑try {// 判断是否处于关闭中if (isShuttingDown()) {// 关闭closeAll();// 确定关闭// 若confirmShutdown()返回true则跳出循环,run方法执行完毕// 若confirmShutdown()返回false则继续循环直到所有任务执行完毕if (confirmShutdown()) {return;}}} catch (Throwable t) {// 处理异常handleLoopException(t);}}}private void closeAll() {// 再次调用selectNow()方法selectAgain();// 获取selector中所有的SelectionKeySet<SelectionKey> keys = selector.keys();Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());for (SelectionKey k: keys) {// 此处获取的附件就是NioServerSocketChannelObject a = k.attachment();if (a instanceof AbstractNioChannel) {// 把要关闭的Channel加到集合中channels.add((AbstractNioChannel) a);} else {k.cancel();@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;invokeChannelUnregistered(task, k, null);}}// 遍历集合for (AbstractNioChannel ch: channels) {// 调用Channel的unsafe进行关闭ch.unsafe().close(ch.unsafe().voidPromise());}}
}

【2.3】Netty 关闭流程 -- unsafe.close()

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {protected abstract class AbstractUnsafe implements Unsafe {@Overridepublic final void close(final ChannelPromise promise) {assertEventLoop();// 调用重载方法close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);}private void close(final ChannelPromise promise, final Throwable cause,final ClosedChannelException closeCause, final boolean notify) {// 设置promise不可取消if (!promise.setUncancellable()) {return;}// 使用closeInitiated防止重复关闭if (closeInitiated) {// 若已经开启了关闭处理,则为closeFuture设置处理监听器if (closeFuture.isDone()) {// 已经关闭了设置promise的状态为成功safeSetSuccess(promise);} else if (!(promise instanceof VoidChannelPromise)) {// 尚未完成关闭则添加监听器closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {promise.setSuccess();}});}return;}// 下面的逻辑只会执行一次closeInitiated = true;// 判断Channel是否处于激活状态final boolean wasActive = isActive();// 获取写出数据时的缓存final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;// 置为空表示不允许再写出数据了this.outboundBuffer = null;// 关闭Channel的准备工作// prepareToClose()// 对于NioServerSocketChannel,默认为空// 对于NioSocketChannel覆写了该方法Executor closeExecutor = prepareToClose();if (closeExecutor != null) {// 若返回的closeExecutor不为null则将任务加入该线程池的任务队列中排队处理closeExecutor.execute(new Runnable() {@Overridepublic void run() {try {// Execute the close.// 关闭 Channel 并将所有在消息队列中的消息的状态置为 faildoClose0(promise);} finally {// Call invokeLater so closeAndDeregister is executed in the EventLoop again!invokeLater(new Runnable() {@Overridepublic void run() {if (outboundBuffer != null) {// Fail all the queued messages// 未发送的数据标记为 failoutboundBuffer.failFlushed(cause, notify);outboundBuffer.close(closeCause);}// 触发channelInactive()和channelDeregister()方法fireChannelInactiveAndDeregister(wasActive);}});}}});} else {try {// Close the channel and fail the queued messages in all cases.// 关闭 Channel 并将所有在消息队列中的消息的状态置为 faildoClose0(promise);} finally {if (outboundBuffer != null) {// Fail all the queued messages.// 未发送的数据标记为 failoutboundBuffer.failFlushed(cause, notify);outboundBuffer.close(closeCause);}}if (inFlush0) {invokeLater(new Runnable() {@Overridepublic void run() {// 触发channelInactive()和channelDeregister()方法fireChannelInactiveAndDeregister(wasActive);}});} else {// 触发channelInactive()和channelDeregister()方法fireChannelInactiveAndDeregister(wasActive);}}}private void doClose0(ChannelPromise promise) {try {// 关闭 Channel// 该方法由子类覆写doClose();// 将closeFuture设置为已关闭closeFuture.setClosed();// 将promise设置为已成功safeSetSuccess(promise);} catch (Throwable t) {// 处理异常closeFuture.setClosed();safeSetFailure(promise, t);}}private void fireChannelInactiveAndDeregister(final boolean wasActive) {deregister(voidPromise(), wasActive && !isActive());}private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {// 设置promise不可取消if (!promise.setUncancellable()) {return;}// 若尚未注册则设置promise成功并直接返回if (!registered) {safeSetSuccess(promise);return;}// 加入到执行队列invokeLater(new Runnable() {@Overridepublic void run() {try {// 执行注销处理doDeregister();} catch (Throwable t) {// 处理异常logger.warn("Unexpected exception occurred while deregistering a channel.", t);} finally {if (fireChannelInactive) {// 触发ChannelHandler的channelInactive方法pipeline.fireChannelInactive();}if (registered) {registered = false;// 触发ChannelHandler的channelUnregistered方法pipeline.fireChannelUnregistered();}safeSetSuccess(promise);}}});}}
}
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {private final class NioSocketChannelUnsafe extends NioByteUnsafe {@Overrideprotected Executor prepareToClose() {try {if (javaChannel().isOpen() && config().getSoLinger() > 0) {doDeregister();return GlobalEventExecutor.INSTANCE;}} catch (Throwable ignore) {}return null;}}@Overrideprotected void doClose() throws Exception {super.doClose();// 关闭Java原生的Channel,此处为SocketChanneljavaChannel().close();}
}public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {@Overrideprotected void doClose() throws Exception {// 关闭Java原生的Channel,此处为ServerSocketChanneljavaChannel().close();}
}public abstract class AbstractNioChannel extends AbstractChannel {@Overrideprotected void doDeregister() throws Exception {// 取消SelectionKeyeventLoop().cancel(selectionKey());}
}

【2.4】Netty 关闭流程 -- SingleThreadEventExecutor.confirmShutdown()

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {protected boolean confirmShutdown() {// 不是正在关闭,返回falseif (!isShuttingDown()) {return false;}if (!inEventLoop()) {throw new IllegalStateException("must be invoked from an event loop");}// 取消定时任务cancelScheduledTasks();// 设置优雅关闭服务的开始时间if (gracefulShutdownStartTime == 0) {gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();}// 运行所有任务和所有shudown的钩子任务if (runAllTasks() || runShutdownHooks()) {// 当任务队列中存在任务// 并且所有任务执行完毕进入该分支if (isShutdown()) {// Executor shut down - no new tasks anymore.return true;}// 如果静默周期为0,返回trueif (gracefulShutdownQuietPeriod == 0) {return true;}// 否则添加一个空任务,返回falsewakeup(true);return false;}// 运行到此表明没有任何任务在运行final long nanoTime = ScheduledFutureTask.nanoTime();// 如果已经关闭,或者超时了,返回trueif (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {return true;}// 如果当前时间减去上一次运行的时间在静默周期以内if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {// 添加一个空任务,并休眠100mswakeup(true);try {Thread.sleep(100);} catch (InterruptedException e) {// Ignore}return false;}// 超过了静默周期返回truereturn true;}
}

【2.5】Netty 关闭流程 -- 无任务运行/超过静默周期后的处理

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private void doStartThread() {...executor.execute(() -> {...try {...} catch (Throwable t) {...} finally {// 修改状态为ST_SHUTDOWN,之后不能再添加任何任务for (; ; ) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// 再次执行confirmShutdown()直到没有任务或者超时for (; ; ) {if (confirmShutdown()) {break;}}} finally {try {// 执行 cleanup() 方法cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);// threadLock标识减一,将触发某些事件threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}// NioEventLoop的terminationFuture已成功terminationFuture.setSuccess(null);}}}});}
}

 【2.5.1】protected void cleanup() 分析

public final class NioEventLoop extends SingleThreadEventLoop {// 关闭 selector@Overrideprotected void cleanup() {try {selector.close();} catch (IOException e) {logger.warn("Failed to close a selector.", e);}}
}

【2.5.2】threadLock.release() 分析

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {if (unit == null) {throw new NullPointerException("unit");}if (inEventLoop()) {throw new IllegalStateException("cannot await termination of the current thread");}// tryAcquire()的作用是尝试的获得1个许可,如果获取不到则返回falseif (threadLock.tryAcquire(timeout, unit)) {// 释放一个许可threadLock.release();}return isTerminated();}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long deadline = System.nanoTime() + unit.toNanos(timeout);// 循环每一个NioEventLoop,等待它们终止loop: for (EventExecutor l: children) {for (;;) {long timeLeft = deadline - System.nanoTime();if (timeLeft <= 0) {break loop;}if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {break;}}}return isTerminated();}
}

【2.6】NioEventLoopGroup/NioEventLoop 中 terminationFuture 之间的关联建立

NioEventLoopGroup 的 terminationFuture 是在其 shutdownGracefully() 方法中返回;NioEventLoop 的 terminationFuture 是在其 shutdownGracefully() 方法中返回;

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {...// 创建一个监听器final FutureListener<Object> terminationListener = future -> {// 每个孩子完成时,terminatedChildren加一// 如果等于孩子数量,说明全部完成了if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}};// 给每个孩子都添加上这个监听器for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}}
}

参考致谢

本博客为博主学习笔记,同时参考了网上众博主的博文以及相关专业书籍,在此表示感谢,本文若存在不足之处,请批评指正。

【1】慕课专栏,网络编程之Netty一站式精讲

【2】极客时间,Netty源码剖析与实战