当前位置: 代码迷 >> 综合 >> rocketMq-nameServer篇
  详细解决方案

rocketMq-nameServer篇

热度:41   发布时间:2023-11-14 22:24:21.0

启动流程

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.namesrv;import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
import org.slf4j.LoggerFactory;
public class NamesrvStartup {
    private static InternalLogger log;private static Properties properties = null;private static CommandLine commandLine = null;public static void main(String[] args) {
    main0(args);}public static NamesrvController main0(String[] args) {
    try {
    NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {
    e.printStackTrace();System.exit(-1);}return null;}public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {
    System.exit(-1);return null;}final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');if (file != null) {
    InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/my-loback-namerv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
    throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {
    controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Overridepublic Void call() throws Exception {
    controller.shutdown();return null;}}));controller.start();return controller;}public static void shutdown(final NamesrvController controller) {
    controller.shutdown();}public static Options buildCommandlineOptions(final Options options) {
    Option opt = new Option("c", "configFile", true, "Name server config properties file");opt.setRequired(false);options.addOption(opt);opt = new Option("p", "printConfigItem", false, "Print all config item");opt.setRequired(false);options.addOption(opt);return options;}public static Properties getProperties() {
    return properties;}
}

NamesrvController

NamesrvController就是

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");}

KVConfigManager

KVConfigManager作用
字段lock 读写锁configTable 内存记录的配置
方法构造函数load:加载配置文件,读取到内存的configTable中putKVConfig:添加一条记录deleteKVConfig:删除一条记录persist:将内存记录的configTable持久化到配置文件getKVListByNamespace:拿到configTable对应namespace的所有记录getKVConfig:获取configTable中namespace,key对应的一条记录printAllPeriodically:打印configTable所有配置,被周期性的调用
思考
refer

加载namesrvController指定的kvConfig配置文件(常为xxx/kvConfig.json)到内存
读取或增加,删除kvConfig记录
将内存记录的配置,持久化到文件
打印所有kvConfig配置
?

字段

    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvController namesrvController;//NameServer控制类private final ReadWriteLock lock = new ReentrantReadWriteLock();//读写锁private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =new HashMap<String, HashMap<String, String>>();

主要注意
lock 是一个读写锁,用来控制并发
configTable 就是在内存中记录住的kv配置,第一级key为NameSpace(暂时什么用还不清楚)

RouteInfoManager

看名称是路由管理
处理topic注册和broker管理

   private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

BrokerHousekeepingService

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.namesrv.routeinfo;import io.netty.channel.Channel;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.ChannelEventListener;public class BrokerHousekeepingService implements ChannelEventListener {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvController namesrvController;public BrokerHousekeepingService(NamesrvController namesrvController) {
    this.namesrvController = namesrvController;}@Overridepublic void onChannelConnect(String remoteAddr, Channel channel) {
    }@Overridepublic void onChannelClose(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);}@Overridepublic void onChannelException(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);}@Overridepublic void onChannelIdle(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);}
}

监听连接情况的事件监听器

来到org.apache.rocketmq.namesrv.NamesrvStartup#start

public static NamesrvController start(final NamesrvController controller) throws Exception {
    
{
    if (null == controller) {
    throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {
    controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Overridepublic Void call() throws Exception {
    controller.shutdown();return null;}}));controller.start();return controller;}

org.apache.rocketmq.namesrv.NamesrvController#initialize

    public boolean initialize() {
    this.kvConfigManager.load();this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Overridepublic void run() {
    NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Overridepublic void run() {
    NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
    // Register a listener to reload SslContexttry {
    fileWatchService = new FileWatchService(new String[] {
    TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {
    boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {
    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
    log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
    certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
    keyChanged = true;}if (certChanged && keyChanged) {
    log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {
    ((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {
    log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;}

构造NettyRemotingServer

NettyRemotingServer

   public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {
    publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
    return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});if (useEpoll()) {
    this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {
    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {
    this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {
    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();}

oneWay和async的samphore默认分别为128和64
初始化netty,boss线程数1,work线程默认为机器线程
?

回到initialize
org.apache.rocketmq.namesrv.NamesrvController#registerProcessor

  private void registerProcessor() {
    if (namesrvConfig.isClusterTest()) {
    this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {
    this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}}

注册了一个默认的DefaultRequestProcessor,并使用remotingExecutor线程池处理processor
?

然后开启定时任务

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Overridepublic void run() {
    NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);

根据心跳判断,两分钟无心跳则认为不再活跃

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {
    Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
    RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}

?

?

  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Overridepublic void run() {
    NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);
    public void printAllPeriodically() {
    try {
    this.lock.readLock().lockInterruptibly();try {
    log.info("--------------------------------------------------------");{
    log.info("configTable SIZE: {}", this.configTable.size());Iterator<Entry<String, HashMap<String, String>>> it =this.configTable.entrySet().iterator();while (it.hasNext()) {
    Entry<String, HashMap<String, String>> next = it.next();Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();while (itSub.hasNext()) {
    Entry<String, String> nextSub = itSub.next();log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),nextSub.getValue());}}}} finally {
    this.lock.readLock().unlock();}} catch (InterruptedException e) {
    log.error("printAllPeriodically InterruptedException", e);}}

定期打印配置数据
继续看

    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Overridepublic Void call() throws Exception {
    controller.shutdown();return null;}}));

在jvm关闭时执行的钩子函数执行controller.shutdown();
进入start方法

public void start() throws Exception {
    this.remotingServer.start();if (this.fileWatchService != null) {
    this.fileWatchService.start();}}

进入org.apache.rocketmq.remoting.netty.NettyRemotingServer#start

   this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});

耗时任务handler线程池

  prepareSharableHandlers();ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {
    @Overridepublic void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
    childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {
    ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {
    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();}

nettyEventExecutor是处理事件的线程

NettyEventExecutor

class NettyEventExecutor extends ServiceThread {
    private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();private final int maxSize = 10000;public void putNettyEvent(final NettyEvent event) {
    if (this.eventQueue.size() <= maxSize) {
    this.eventQueue.add(event);} else {
    log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());}}@Overridepublic void run() {
    log.info(this.getServiceName() + " service started");final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();while (!this.isStopped()) {
    try {
    NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);if (event != null && listener != null) {
    switch (event.getType()) {
    case IDLE:listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());break;case CLOSE:listener.onChannelClose(event.getRemoteAddr(), event.getChannel());break;case CONNECT:listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());break;case EXCEPTION:listener.onChannelException(event.getRemoteAddr(), event.getChannel());break;default:break;}}} catch (Exception e) {
    log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {
    return NettyEventExecutor.class.getSimpleName();}}

在这里插入图片描述

同样,把任务交给线程,而不是使用work线程去做
?

  this.timer.scheduleAtFixedRate(new TimerTask() {
    @Overridepublic void run() {
    try {
    NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {
    log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#scanResponseTable

public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();while (it.hasNext()) {
    Entry<Integer, ResponseFuture> next = it.next();ResponseFuture rep = next.getValue();if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
    rep.release();it.remove();rfList.add(rep);log.warn("remove timeout request, " + rep);}}for (ResponseFuture rf : rfList) {
    try {
    executeInvokeCallback(rf);} catch (Throwable e) {
    log.warn("scanResponseTable, operationComplete Exception", e);}}}

定期扫描,如果发送的消息,超时未返回,则需要移除responseTable,并且释放countDownLatch

broker注册与维护

跟生产者接收消息相同
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived

  public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;if (cmd != null) {
    switch (cmd.getType()) {
    case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}

同样,根据code,取出何时的pair和相应线程池进行处理
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();//定义了,但是没有运行if (pair != null) {
    Runnable run = new Runnable() {
    @Overridepublic void run() {
    try {
    doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {
    @Overridepublic void callback(RemotingCommand response) {
    doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {
    if (response != null) {
    response.setOpaque(opaque);response.markResponseType();try {
    ctx.writeAndFlush(response);} catch (Throwable e) {
    log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {
    }}}};if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
    AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {
    NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {
    log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {
    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {
    if ((System.currentTimeMillis() % 10000) == 0) {
    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {
    String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}

又进入org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor#asyncProcessRequest

public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
    RemotingCommand response = processRequest(ctx, request);responseCallback.callback(response);}

再进入
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

 @Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
    if (ctx != null) {
    log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
    return this.registerBrokerWithFilterServer(ctx, request);} else {
    return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:return this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:return this.wipeWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:return getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:return deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:return this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:return this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:return this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:return this.getConfig(ctx, request);default:break;}return null;}

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer

  public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if (!checksum(ctx, request, requestHeader)) {
    response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();if (request.getBody() != null) {
    try {
    registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());} catch (Exception e) {
    throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);}} else {
    registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),registerBrokerBody.getTopicConfigSerializeWrapper(),registerBrokerBody.getFilterServerList(),ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

   public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();try {
    try {
    this.lock.writeLock().lockInterruptibly();//为创建集群的时候就是默认集群Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {
    brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {
    registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {
    Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
    it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {
    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {
    ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {
    for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
    this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),//broker到nameservert没有专门的心跳包,在注册topic 时候更新信息topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {
    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}if (filterServerList != null) {
    if (filterServerList.isEmpty()) {
    this.filterServerTable.remove(brokerAddr);} else {
    this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {
    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {
    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {
    result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {
    this.lock.writeLock().unlock();}} catch (Exception e) {
    log.error("registerBroker Exception", e);}return result;}

更新brokerAddrTable中的数据,把ip地址更新进去,根据brokerName,和brokerId,因此同集群的brokerName需要相同

    BrokerData brokerData = this.brokerAddrTable.get(brokerName);  
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

更新brokerLiveTable中的心跳时间
如果事master注册,更新topicQueueTable中的值

  if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) 
 private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();queueData.setBrokerName(brokerName);queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());queueData.setReadQueueNums(topicConfig.getReadQueueNums());queueData.setPerm(topicConfig.getPerm());queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());if (null == queueDataList) {
    queueDataList = new LinkedList<QueueData>();queueDataList.add(queueData);this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);} else {
    boolean addNewOne = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {
    QueueData qd = it.next();if (qd.getBrokerName().equals(brokerName)) {
    if (qd.equals(queueData)) {
    addNewOne = false;} else {
    log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,queueData);it.remove();}}}if (addNewOne) {
    queueDataList.add(queueData);}}}

如果不是master点

 if (MixAll.MASTER_ID != brokerId) {
    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {
    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {
    result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}

返回从节点,主节点地址,从节点会去请求连接主节点
?

topic信息数据

回到org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
    return this.registerBrokerWithFilterServer(ctx, request);} else {
    return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:return this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:return this.wipeWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:return getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:return deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:return this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:return this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:return this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:return this.getConfig(ctx, request);default:break;}

看下生产者的获取topic信息的接口_GET_ROUTEINFO_BY_TOPIC_
?

  public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {
    if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
    String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}

处理逻辑也十分简单直接从维护的topic列表中取出数据返回
?

总结

在rokcetMq的几个模块中,nameserver是功能和逻辑都比较简单的,主要功能是维护,broker信息,地址,每个broker的队列信息,以及所有topic以及对应的broker信息。