服务端集群源码,同样从QuorumPeerMain的main方法入手
public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {
81 main.initializeAndRun(args); //跟进去
protected void initializeAndRun(String[] args) throws ConfigException, IOException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.servers.size() > 0) {runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}}
114 runFromConfig(config); //跟进去,集群模式
主要创建了两个对象,ServerCnxnFactory、QuorumPeer
根据配置信息,对QuorumPeer对象进行赋值
public void runFromConfig(QuorumPeerConfig config) throws IOException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());quorumPeer = getQuorumPeer();quorumPeer.setQuorumPeers(config.getServers());quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()),new File(config.getDataDir())));quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setQuorumVerifier(config.getQuorumVerifier());quorumPeer.setClientPortAddress(config.getClientPortAddress());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();quorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}}
170 quorumPeer.start(); //跟进去
public synchronized void start() {loadDataBase(); //从日志或快照将数据加载到内存cnxnFactory.start(); // 单机版里说过,开启一个socket监听,并处理请求 startLeaderElection(); //领导选举(等会再说)super.start(); //调用QuorumPeerMain自己的run方法,根据服务器自己的状态,进行初始化}
接下来详细介绍super.start(); //调用QuorumPeerMain自己的run方法
注意此时已经领导选举了(但是可能还未成功)
根据服务器状态(是Leader、还是Follewing、OBSERVING、Looking表示领导还未选出来),进行初始化
1、OBSERVING
case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e ); } finally {observer.shutdown();setObserver(null);setPeerState(ServerState.LOOKING);}break;
967 setObserver(makeObserver(logFactory));
先看makeObserver,注意这里创建了一个ObserverZooKeeperServer对象
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {return new Observer(this, new ObserverZooKeeperServer(logFactory,this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));}
再看observer.observeLeader();
968 observer.observeLeader();
=>64 QuorumServer leaderServer = findLeader(); //找到leader服务器
=>67 connectToLeader(leaderServer.addr, leaderServer.hostname);//连接leader
=>68 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); //获取leader的日志编号
=>70 syncWithLeader(newLeaderZxid); //和leader的日志同步(可能比leader大,可能比leader小,可能没有日志要全量同步)
==>469 zk.startup(); //单机版介绍过了,但集群版有些不一样
主要看zk.startup();
public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker();//判断session是否过期setupRequestProcessors(); //注意这里使用模板模式,具体ObserverZooKeeperServer的setupRequestProcessorsregisterJMX();setState(State.RUNNING);notifyAll();}
注意:setupRequestProcessors();方法调用的是ObserverZooKeeperServer对象的方法
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,Long.toString(getServerId()), true,getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
创建了三个对象ObserverRequestProcessor、commitProcessor 、FinalRequestProcessor
也是链式调用,作用分别如下:
ObserverRequestProcessor:处理客户端请求,如果是写请求,转发给Leader,如果是读请求,进入commitProcessor 处理
68 Request request = queuedRequests.take();//获取客户端请求
79 nextProcessor.processRequest(request); //调用commitProcessor
86-100 处理写请求,转发给leader
commitProcessor (难点)
79 nextProcessor.processRequest(request); //调用commitProcessor
83 wait(); //如果是事务性请求,等待领导投票(156commit中会唤醒,投票完成后由leader进行调用)
137-139 如果是非事务性请求,自己解决
最后FinalRequestProcessor
和单机版的一样,记录内存,返回响应
2、Follewing
和Observer非常类似,不再重复
3、LEADING
case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;}
993 setLeader(makeLeader(logFactory));//创建LeaderZooKeeperServer对象
994 leader.lead();
=>380 zk.loadData(); //加载数据
=>386 cnxAcceptor = new LearnerCnxAcceptor();
=>387 cnxAcceptor.start();
看下cnxAccepter.start();
320 Socket s = ss.accept(); //leader接收Foller或Observer连接请求
328 new LearnerHandler(s, is, Leader.this); //把小弟封装到LearnerHandler,等待小弟的消息
329 fh.start(); //再调用线程
看下fh.start();
=>431 startZkServer();//每个角色都用这个方法
==>966 zk.startup();
又是zk.startup();
public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker();setupRequestProcessors();registerJMX();setState(State.RUNNING);notifyAll();}
重点在setupRequestProcessors();
protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false,getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}
这里创建了五个对象,作用分别如下:
PrepRequestProcessor 处理客户端的请求
从submittedRequests队列中获取请求
根据请求类型的不同,分别进行处理pRequest2Txn
将处理过的请求封装到outstandingChanges
ProposalRequestProcessor 处理投票
如果是同步请求
72 zks.getLeader().processSync((LearnerSyncRequest)request);
直接将同步请求封装成packet,并加入到queuedPackets队列中
如果是其他请求
74 nextProcessor.processRequest(request);
实际调用CommitProcessor.processRequest
如果是非事务请求,直接交给ToBeAppliedRequestProcessor
如果是事务请求,等待leader投票后发送commit请求被唤醒,再把运行提交的请求加到ToBeAppliedRequestProcessor里处理
75-83 zks.getLeader().propose(request); //处理真正投票
=> 779 new QuorumPacket //准备一张票,投给自己
=> 792 sendPacket(pp); //发给小弟
到这里结束了,此时要跳转到小弟org.apache.zookeeper.server.quorum.Learner的syncWithLeader(long newLeaderZxid)
382 case Leader.PROPOSAL:
拿到leader的投票,写进硬盘,写入成功后
467 writePacket(ack, true); //再给leader一个ack响应
到这里结束了,之前说leader与小弟直接的socket都封装到LearnerHandler中
577 case Leader.ACK
583 syncLimitCheck.updateAck(qp.getZxid()); //更新
589 leader.processAck
=> 598 p.ackSet.add(sid); //leader收集ack
=> 603 if (self.getQuorumVerifier().containsQuorum(p.ackSet)) //超过半数,即通过
=> 617 commit(zxid); //提交,commit会唤醒等待中的小弟
ToBeAppliedRequestProcessor
667 toBeApplied.remove(); //此时投票已结束,将投票信息删除
然后继续下一个处理FinalRequestProcessor
FinalRequestProcessor
和单机一样
此外,上面领导选举的过程并没有介绍
QuorumPeer的start方法
public synchronized void start() {loadDataBase();cnxnFactory.start(); startLeaderElection(); //领导选举super.start();}
startLeaderElection(); //领导选举
705 new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); //创建投票,投给自己
729 createElectionAlgorithm(electionType); //领导选举的算法,electionType在配置文件中electionAlg默认值为3
case 3:qcm = createCnxnManager();QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){listener.start();le = new FastLeaderElection(this, qcm);} else {LOG.error("Null listener when initializing cnx manager");} break;
824 qcm = createCnxnManager(); //创建QuorumCnxManager,该对象中保存了以下几个队列
senderWorkerMap :每台服务器(sid)对应的senderWorker
recvQueue:本台服务器接收到的消息
queueSendMap:需要发送给每个服务器(sid)的消息队列
lastMessageSent:发送给每台服务器最近的消息
828 new FastLeaderElection(this, qcm); //领导选举策略。进行投票,创建两个线程,一个发送,一个接收
继续看looking
944 setCurrentVote(makeLEStrategy().lookForLeader()); //设置当前的投票
进入lookForLeader() 【FastLeaderElection类】
810 updateProposal //先给自己投一票
815 sendNotifications(); //将发送信息放入sendqueue队列中去
821 while 循环
855 case LOOKING:
如果服务器界号大于本机
If 对比zxid等
更新为服务器
Else
更新本机
将发送信息放入sendqueue队列中去
如果服务器界号小于本机
什么都不做
如果服务器界号等于本机
直接对比
将发送信息放入sendqueue队列中去
889-903 收集响应。超过半数即可确认leader
909 改变服务器状态
922 case OBSERVING:
如果是ob,则不需要投票
925 case FOLLOWING:
926 case LEADING:
如果是Following和leader,则标识已经有领导了,与领导同步