当前位置: 代码迷 >> 综合 >> zookeeper(6)—— zk服务端集群源码
  详细解决方案

zookeeper(6)—— zk服务端集群源码

热度:35   发布时间:2023-12-12 12:02:33.0

服务端集群源码,同样从QuorumPeerMain的main方法入手

    public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {
   

81 main.initializeAndRun(args); //跟进去

    protected void initializeAndRun(String[] args) throws ConfigException, IOException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.servers.size() > 0) {runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}}

114 runFromConfig(config); //跟进去,集群模式

主要创建了两个对象,ServerCnxnFactory、QuorumPeer

根据配置信息,对QuorumPeer对象进行赋值

   public void runFromConfig(QuorumPeerConfig config) throws IOException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());quorumPeer = getQuorumPeer();quorumPeer.setQuorumPeers(config.getServers());quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()),new File(config.getDataDir())));quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setQuorumVerifier(config.getQuorumVerifier());quorumPeer.setClientPortAddress(config.getClientPortAddress());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();quorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}}

170 quorumPeer.start(); //跟进去

    public synchronized void start() {loadDataBase();  //从日志或快照将数据加载到内存cnxnFactory.start(); // 单机版里说过,开启一个socket监听,并处理请求  startLeaderElection(); //领导选举(等会再说)super.start(); //调用QuorumPeerMain自己的run方法,根据服务器自己的状态,进行初始化}

接下来详细介绍super.start(); //调用QuorumPeerMain自己的run方法

注意此时已经领导选举了(但是可能还未成功)

根据服务器状态(是Leader、还是Follewing、OBSERVING、Looking表示领导还未选出来),进行初始化

1、OBSERVING

case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e );                        } finally {observer.shutdown();setObserver(null);setPeerState(ServerState.LOOKING);}break;

967 setObserver(makeObserver(logFactory));

先看makeObserver,注意这里创建了一个ObserverZooKeeperServer对象

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {return new Observer(this, new ObserverZooKeeperServer(logFactory,this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));}

再看observer.observeLeader();

968 observer.observeLeader();
=>64 QuorumServer leaderServer = findLeader();  //找到leader服务器
=>67 connectToLeader(leaderServer.addr, leaderServer.hostname);//连接leader
=>68 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);  //获取leader的日志编号
=>70 syncWithLeader(newLeaderZxid); //和leader的日志同步(可能比leader大,可能比leader小,可能没有日志要全量同步)
==>469 zk.startup();  //单机版介绍过了,但集群版有些不一样

主要看zk.startup();

 public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker();//判断session是否过期setupRequestProcessors(); //注意这里使用模板模式,具体ObserverZooKeeperServer的setupRequestProcessorsregisterJMX();setState(State.RUNNING);notifyAll();}

注意:setupRequestProcessors();方法调用的是ObserverZooKeeperServer对象的方法

RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,Long.toString(getServerId()), true,getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();

创建了三个对象ObserverRequestProcessor、commitProcessor FinalRequestProcessor

也是链式调用,作用分别如下:

ObserverRequestProcessor:处理客户端请求,如果是写请求,转发给Leader,如果是读请求,进入commitProcessor 处理

68 Request request = queuedRequests.take();//获取客户端请求
79 nextProcessor.processRequest(request); //调用commitProcessor
86-100 处理写请求,转发给leader

commitProcessor (难点)

79 nextProcessor.processRequest(request);  //调用commitProcessor
83 wait(); //如果是事务性请求,等待领导投票(156commit中会唤醒,投票完成后由leader进行调用)
137-139 如果是非事务性请求,自己解决

最后FinalRequestProcessor

和单机版的一样,记录内存,返回响应

2、Follewing

和Observer非常类似,不再重复

3、LEADING

case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;}

993 setLeader(makeLeader(logFactory));//创建LeaderZooKeeperServer对象

994 leader.lead();

=>380 zk.loadData(); //加载数据
=>386 cnxAcceptor = new LearnerCnxAcceptor();
=>387 cnxAcceptor.start();

看下cnxAccepter.start();

320 Socket s = ss.accept(); //leader接收Foller或Observer连接请求
328 new LearnerHandler(s, is, Leader.this); //把小弟封装到LearnerHandler,等待小弟的消息
329 fh.start(); //再调用线程

看下fh.start();

=>431 startZkServer();//每个角色都用这个方法
==>966 zk.startup();

又是zk.startup();

    public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker();setupRequestProcessors();registerJMX();setState(State.RUNNING);notifyAll();}

重点在setupRequestProcessors();
 

    protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false,getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}

这里创建了五个对象,作用分别如下:

PrepRequestProcessor 处理客户端的请求

从submittedRequests队列中获取请求

根据请求类型的不同,分别进行处理pRequest2Txn

将处理过的请求封装到outstandingChanges

ProposalRequestProcessor 处理投票

如果是同步请求

72 zks.getLeader().processSync((LearnerSyncRequest)request);

直接将同步请求封装成packet,并加入到queuedPackets队列中

如果是其他请求

74 nextProcessor.processRequest(request);

  实际调用CommitProcessor.processRequest

如果是非事务请求,直接交给ToBeAppliedRequestProcessor

如果是事务请求,等待leader投票后发送commit请求被唤醒,再把运行提交的请求加到ToBeAppliedRequestProcessor里处理

75-83  zks.getLeader().propose(request); //处理真正投票

=> 779 new QuorumPacket //准备一张票,投给自己

=> 792 sendPacket(pp); //发给小弟

到这里结束了,此时要跳转到小弟org.apache.zookeeper.server.quorum.Learner的syncWithLeader(long newLeaderZxid)

382 case Leader.PROPOSAL:

   拿到leader的投票,写进硬盘,写入成功后

467 writePacket(ack, true); //再给leader一个ack响应

到这里结束了,之前说leader与小弟直接的socket都封装到LearnerHandler中

577 case Leader.ACK

583 syncLimitCheck.updateAck(qp.getZxid()); //更新

589 leader.processAck

=> 598 p.ackSet.add(sid); //leader收集ack

=> 603 if (self.getQuorumVerifier().containsQuorum(p.ackSet)) //超过半数,即通过

=> 617 commit(zxid); //提交,commit会唤醒等待中的小弟

ToBeAppliedRequestProcessor

667 toBeApplied.remove(); //此时投票已结束,将投票信息删除

然后继续下一个处理FinalRequestProcessor

FinalRequestProcessor

和单机一样

此外,上面领导选举的过程并没有介绍

QuorumPeer的start方法

public synchronized void start() {loadDataBase();cnxnFactory.start();        startLeaderElection(); //领导选举super.start();}

startLeaderElection(); //领导选举

705 new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); //创建投票,投给自己
729 createElectionAlgorithm(electionType);  //领导选举的算法,electionType在配置文件中electionAlg默认值为3
case 3:qcm = createCnxnManager();QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){listener.start();le = new FastLeaderElection(this, qcm);} else {LOG.error("Null listener when initializing cnx manager");} break;

824  qcm = createCnxnManager(); //创建QuorumCnxManager,该对象中保存了以下几个队列

senderWorkerMap :每台服务器(sid)对应的senderWorker

recvQueue:本台服务器接收到的消息

queueSendMap:需要发送给每个服务器(sid)的消息队列

lastMessageSent:发送给每台服务器最近的消息

828 new FastLeaderElection(this, qcm); //领导选举策略。进行投票,创建两个线程,一个发送,一个接收

继续看looking

944 setCurrentVote(makeLEStrategy().lookForLeader()); //设置当前的投票

进入lookForLeader() 【FastLeaderElection类】

810 updateProposal //先给自己投一票

815 sendNotifications();  //将发送信息放入sendqueue队列中去

821  while 循环

855 case LOOKING:

   如果服务器界号大于本机

       If 对比zxid等

         更新为服务器

       Else

         更新本机

       将发送信息放入sendqueue队列中去

   如果服务器界号小于本机

        什么都不做

如果服务器界号等于本机

    直接对比

    将发送信息放入sendqueue队列中去

  889-903 收集响应。超过半数即可确认leader

909  改变服务器状态

922 case OBSERVING:

    如果是ob,则不需要投票

925 case FOLLOWING:

926 case LEADING:

     如果是Following和leader,则标识已经有领导了,与领导同步

  相关解决方案