当前位置: 代码迷 >> 综合 >> 【JAVA 网络编程系列】Netty -- Netty 的启动过程分析
  详细解决方案

【JAVA 网络编程系列】Netty -- Netty 的启动过程分析

热度:96   发布时间:2024-02-24 16:49:05.0

【JAVA 网络编程系列】Netty -- Netty 服务器端的启动过程分析

【1】Netty 典型的服务器代码

public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));public static void main(String[] args) throws Exception {// 1. 声明线程池EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();EchoServerHandler echoServerHandler = new EchoServerHandler();try {// 2. 服务端引导器ServerBootstrap serverBootstrap = new ServerBootstrap();// 3. 设置线程池serverBootstrap.group(bossGroup, workerGroup)// 4. 设置ServerSocketChannel的类型.channel(NioServerSocketChannel.class)// 5. 设置参数.option(ChannelOption.SO_BACKLOG, 100)// 6. 设置ServerSocketChannel对应的Handler,只能设置一个.handler(new LoggingHandler(LogLevel.INFO))// 7. 设置SocketChannel对应的Handler.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 可以添加多个子Handlerp.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(echoServerHandler);}});// 8. 绑定端口ChannelFuture f = serverBootstrap.bind(PORT).sync();// 9. 等待服务端监听端口关闭,这里会阻塞主线程f.channel().closeFuture().sync();} finally {// 10. 优雅地关闭两个线程池bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

【2】Netty 服务器启动入口

ChannelFuture f = serverBootstrap.bind(PORT).sync();

【3】AbstractBootstrap 类 public ChannelFuture bind(int inetPort) 方法分析

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {// 该方法入参传入一个端口号// 调用 InetSocketAddress 方法构造一个 InetSocketAddress 类地址// 默认情况下生成一个 0.0.0.0:8007 的地址public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}public ChannelFuture bind(SocketAddress localAddress) {// 验证 group 与 channelFactory 是否为 null// 只要有一个为 null 则报异常validate();// 判断地址的有效应if (localAddress == null) {throw new NullPointerException("localAddress");}// 调用 doBind 绑定地址return doBind(localAddress);}}
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {/*** 功能* 1. 执行initAndRegister方法* 2. 执行doBind0方法*/private ChannelFuture doBind(final SocketAddress localAddress) {// 异步过程final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}// 不一定能够完成,register 进入 nio event loop 中执行了if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();// 注册完成则执行 doBind0 方法// 取决于initAndRegister()中异步执行的快慢,所以不一定到这里doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.// 异步注册操作一般会立即完成,此处是防止异步注册操作没有完成//// PendingRegistrationPromise 类的 protected EventExecutor executor() 方法中// 会判断 ChannelFuture 是否已经注册完毕放入不同的线程池中执行final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 等待 register 完成再执行 bind 操作// 此处添加监听器,当注册完成了便执行 doBind0 执行绑定操作regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 发生异常则设置失败标记promise.setFailure(cause);} else {// 标记已注册标志promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}}

【3.1】AbstractBootstrap 类 final ChannelFuture initAndRegister() 方法分析

详见 【JAVA 网络编程系列】Netty -- AbstractBootstrap 类 final ChannelFuture initAndRegister() 方法分析(针对服务器端)

【3.2】AbstractBootstrap 类 private static void doBind0 方法分析

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 异步执行channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 调用Channel的bind()方法// io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {@Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {// 调用的是pipeline的bind()方法// io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)return pipeline.bind(localAddress, promise);}protected abstract class AbstractUnsafe implements Unsafe {@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {boolean wasActive = isActive();try {// 绑定地址// 由子类覆写,此处调用的是NioServerSocketChannel中的doBind方法doBind(localAddress);} catch (Throwable t) {// 处理异常safeSetFailure(promise, t);closeIfClosed();return;}// 绑定后开始激活// 成功激活,调用pipeline.fireChannelActive()方法if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {// 最终触发各个ChannelHandler的channelActive方法pipeline.fireChannelActive();}});}// 设置promise为成功状态safeSetSuccess(promise);}}}
public class DefaultChannelPipeline implements ChannelPipeline {@Overridepublic final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {// 从尾开始调用// 此时pipeline中的Handler为// head<=>LoggingHandler<=>ServerBootstrapAcceptor<=>tail,// 出站的pineple实际为tail=>LoggingHandler=>head//// 最终调用到 HeadContext 中的 bind 方法return tail.bind(localAddress, promise);}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {// 调用HeadContext该Handler中unsafe的bind()方法unsafe.bind(localAddress, promise);}}}
public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {// 根据不同的JDK版本调用不同的方法// 通过 Java 原生 Channel 的 bind () 方法if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}}

参考致谢

本博客为博主学习笔记,同时参考了网上众博主的博文以及相关专业书籍,在此表示感谢,本文若存在不足之处,请批评指正。

【1】慕课专栏,网络编程之Netty一站式精讲

【2】极客时间,Netty源码剖析与实战