当前位置: 代码迷 >> 综合 >> 深入分析 Watcher 机制的实现原理(三)客户端接收服务端处理完成的响应及事件触发
  详细解决方案

深入分析 Watcher 机制的实现原理(三)客户端接收服务端处理完成的响应及事件触发

热度:66   发布时间:2024-01-12 17:16:54.0

客户端接收服务端处理完成的响应

ClientCnxnSocketNetty.messageReceived

服 务 端 处 理 完 成 以 后 , 会 通 过NettyServerCnxn.sendResponse 发送返回的响应信息,
客户端会在 ClientCnxnSocketNetty.messageReceived 接收服务端的返回

@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
    updateNow();while (buf.isReadable()) {
    if (incomingBuffer.remaining() > buf.readableBytes()) {
    int newLimit = incomingBuffer.position() + buf.readableBytes();incomingBuffer.limit(newLimit);}buf.readBytes(incomingBuffer);incomingBuffer.limit(incomingBuffer.capacity());if (!incomingBuffer.hasRemaining()) {
    incomingBuffer.flip();if (incomingBuffer == lenBuffer) {
    recvCount.getAndIncrement();readLength();} else if (!initialized) {
    readConnectResult();lenBuffer.clear();incomingBuffer = lenBuffer;initialized = true;updateLastHeard();} else {
    //收到消息以后触发 SendThread.readResponse 方法sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}wakeupCnxn();// Note: SimpleChannelInboundHandler releases the ByteBuf for us// so we don't need to do it.}

SendThread. . readResponse
这个方法里面主要的流程如下
首先读取 header,如果其 xid == -2,表明是一个 ping 的response,return
如果 xid 是 -4 ,表明是一个 AuthPacket 的 response return
如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 enent,通过 EventThread.queueEvent 发送,return
其它情况下:从 pendingQueue 拿出一个 Packet,校验后更新 packet信息

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();//反序列化 headerreplyHdr.deserialize(bbia, "header");switch (replyHdr.getXid()) {
    case PING_XID:LOG.debug("Got ping response for session id: 0x{} after {}ms.",Long.toHexString(sessionId),((System.nanoTime() - lastPingSentNs) / 1000000));return;case AUTHPACKET_XID:LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
    changeZkState(States.AUTH_FAILED);eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,Watcher.Event.KeeperState.AuthFailed, null));eventThread.queueEventOfDeath();}return;case NOTIFICATION_XID://表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)LOG.debug("Got notification session id: 0x{}",Long.toHexString(sessionId));WatcherEvent event = new WatcherEvent();//反序列化响应信息event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {
    String serverPath = event.getPath();if (serverPath.compareTo(chrootPath) == 0) {
    event.setPath("/");} else if (serverPath.length() > chrootPath.length()) {
    event.setPath(serverPath.substring(chrootPath.length()));} else {
    LOG.warn("Got server path {} which is too short for chroot path {}.",event.getPath(), chrootPath);}}WatchedEvent we = new WatchedEvent(event);LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));eventThread.queueEvent(we);return;default:break;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {
    GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia, "token");zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {
    if (pendingQueue.size() == 0) {
    throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());}因为当前这个数据包已经收到了响应,所以讲它从pendingQueued 中移除packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {
    //校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)if (packet.requestHeader.getXid() != replyHdr.getXid()) {
    packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()+ " with err " + replyHdr.getErr()+ " expected Xid " + packet.requestHeader.getXid()+ " for a packet with details: " + packet);}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {
    lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {
    //获得服 务端的响应,反序列化以后设置到 packet.response 属性 中。// 所以我们可以在 exists 方法的最后一行通过packet.response 拿到改请求的返回结果packet.response.deserialize(bbia, "response");}LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);} finally {
    //最后调用finishPacket 方法完成处理finishPacket(packet);}}

finishPacket方法:

// 主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去protected void finishPacket(Packet p) {
    int err = p.replyHeader.getErr();if (p.watchRegistration != null) {
    //注册事件p.watchRegistration.register(err);}// Add all the removed watch events to the event queue, so that the// clients will be notified with 'Data/Child WatchRemoved' event type.将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型if (p.watchDeregistration != null) {
    Map<EventType, Set<Watcher>> materializedWatchers = null;try {
    materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
    Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {
    queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());// ignore connectionloss when removing from local// sessionp.replyHeader.setErr(Code.OK.intValue());}}} catch (KeeperException.NoWatcherException nwe) {
    p.replyHeader.setErr(nwe.code().intValue());} catch (KeeperException ke) {
    p.replyHeader.setErr(ke.code().intValue());}}cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll即可。if (p.cb == null) {
    synchronized (p) {
    p.finished = true;p.notifyAll();}} else {
    p.finished = true;eventThread.queuePacket(p);}}

register方法

public void register(int rc) {
    if (shouldAddWatch(rc)) {
    通过子类的实现取得ZKWatchManager 中的 existsWatchesMap<String, Set<Watcher>> watches = getWatches(rc);synchronized (watches) {
    Set<Watcher> watchers = watches.get(clientPath);if (watchers == null) {
    watchers = new HashSet<Watcher>();watches.put(clientPath, watchers);}// 将Watcher对象放到ZKWatchManager中 的existsWatches 里面watchers.add(watcher);}}}

getWatches方法

class ExistsWatchRegistration extends WatchRegistration {
    public ExistsWatchRegistration(Watcher watcher, String clientPath) {
    super(watcher, clientPath);}@Overrideprotected Map<String, Set<Watcher>> getWatches(int rc) {
    return rc == 0 ? getWatchManager().getDataWatches() : getWatchManager().getExistWatches();}@Overrideprotected boolean shouldAddWatch(int rc) {
    return rc == 0 || rc == KeeperException.Code.NONODE.intValue();}}

客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件

class ZKWatchManager implements ClientWatchManager {
    private static final Logger LOG = LoggerFactory.getLogger(ZKWatchManager.class);private final Map<String, Set<Watcher>> dataWatches = new HashMap<>();private final Map<String, Set<Watcher>> existWatches = new HashMap<>();private final Map<String, Set<Watcher>> childWatches = new HashMap<>();private final Map<String, Set<Watcher>> persistentWatches = new HashMap<>();private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<>();

总 的来说, 当使 用 ZooKeeper 构造方法或 者使用getData 、 exists 和 getChildren 三 个 接 口 来 向ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和 Watcher 对应关系存储起来备用。

queuePacket方法

//将当前的数据包添加到等待事件通知的队列中@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")public void queuePacket(Packet packet) {
    if (wasKilled) {
    synchronized (waitingEvents) {
    if (isRunning) {
    waitingEvents.add(packet);} else {
    processEvent(packet);}}} else {
    waitingEvents.add(packet);}}

客户端接收服务端流程图:

事件触发

zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改节点的值触发监听

服务端事件响应
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();DataNode n = nodes.get(path);if (n == null) {
    throw new KeeperException.NoNodeException();}byte[] lastdata = null;synchronized (n) {
    lastdata = n.data;nodes.preChange(path, n);n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);nodes.postChange(path, n);}// now update if the path is in a quota subtree.String lastPrefix = getMaxPrefixWithQuota(path);long dataBytes = data == null ? 0 : data.length;if (lastPrefix != null) {
    this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);}nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));updateWriteStat(path, dataBytes);// 触 发 对 应 节 点 的NodeDataChanged 事件dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}

WatchManager类的triggerWatch方法

@Overridepublic WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
    //根据事件类型、连接状态、节点路径创建 WatchedEventWatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);Set<Watcher> watchers = new HashSet<>();PathParentIterator pathParentIterator = getPathParentIterator(path);synchronized (this) {
    for (String localPath : pathParentIterator.asIterable()) {
    Set<Watcher> thisWatchers = watchTable.get(localPath);if (thisWatchers == null || thisWatchers.isEmpty()) {
    continue;}Iterator<Watcher> iterator = thisWatchers.iterator();while (iterator.hasNext()) {
    Watcher watcher = iterator.next();WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);if (watcherMode.isRecursive()) {
    if (type != EventType.NodeChildrenChanged) {
    watchers.add(watcher);}} else if (!pathParentIterator.atParentPath()) {
    watchers.add(watcher);if (!watcherMode.isPersistent()) {
    iterator.remove();//根据 watcher 从 watcher 表中取出路径集合Set<String> paths = watch2Paths.get(watcher);if (paths != null) {
    //移除路径paths.remove(localPath);}}}}if (thisWatchers.isEmpty()) {
    watchTable.remove(localPath);}}}if (watchers.isEmpty()) {
    if (LOG.isTraceEnabled()) {
    ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);}return null;}for (Watcher w : watchers) {
    if (supress != null && supress.contains(w)) {
    continue;}//w.process(e);}switch (type) {
    case NodeCreated:ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());break;case NodeDeleted:ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());break;case NodeDataChanged:ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());break;case NodeChildrenChanged:ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());break;default:// Other types not logged.break;}return new WatcherOrBitSet(watchers);}

NettyServerCnxn 这个类的 process 方法看看

@Override
public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);if (LOG.isTraceEnabled()) {
    ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);}// Convert WatchedEvent to a type that can be sent over the wireWatcherEvent e = event.getWrapper();try {
    //发送响应sendResponse(h, e, "notification");} catch (IOException e1) {
    LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e1);close();}
}

那 接 下 里 , 客 户 端 会 收 到 这 个 response , 触 发SendThread.readResponse 方法

客户端处理事件响应

SendThread.readResponse

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();//反序列化 headerreplyHdr.deserialize(bbia, "header");switch (replyHdr.getXid()) {
    case PING_XID:LOG.debug("Got ping response for session id: 0x{} after {}ms.",Long.toHexString(sessionId),((System.nanoTime() - lastPingSentNs) / 1000000));return;case AUTHPACKET_XID:LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
    changeZkState(States.AUTH_FAILED);eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,Watcher.Event.KeeperState.AuthFailed, null));eventThread.queueEventOfDeath();}return;case NOTIFICATION_XID://表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件)LOG.debug("Got notification session id: 0x{}",Long.toHexString(sessionId));WatcherEvent event = new WatcherEvent();//反序列化响应信息event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {
    String serverPath = event.getPath();if (serverPath.compareTo(chrootPath) == 0) {
    event.setPath("/");} else if (serverPath.length() > chrootPath.length()) {
    event.setPath(serverPath.substring(chrootPath.length()));} else {
    LOG.warn("Got server path {} which is too short for chroot path {}.",event.getPath(), chrootPath);}}WatchedEvent we = new WatchedEvent(event);LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));//根据该通知事件,从ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效eventThread.queueEvent(we);return;default:break;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {
    GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia, "token");zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {
    if (pendingQueue.size() == 0) {
    throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());}因为当前这个数据包已经收到了响应,所以讲它从pendingQueued 中移除packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {
    //校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)if (packet.requestHeader.getXid() != replyHdr.getXid()) {
    packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()+ " with err " + replyHdr.getErr()+ " expected Xid " + packet.requestHeader.getXid()+ " for a packet with details: " + packet);}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {
    lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {
    //获得服 务端的响应,反序列化以后设置到 packet.response 属性 中。// 所以我们可以在 exists 方法的最后一行通过packet.response 拿到改请求的返回结果packet.response.deserialize(bbia, "response");}LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);} finally {
    //最后调用finishPacket 方法完成处理finishPacket(packet);}}

queueEvent方法

SendThread 接收到服务端的通知事件后,会通过调用EventThread 类 的 queueEvent 方 法 将 事 件 传 给EventThread 线程,queueEvent 方法根据该通知事件,从ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效。

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
    return;}sessionState = event.getState();final Set<Watcher> watchers;if (materializedWatchers == null) {
    // materialize the watchers based on the event//materializewatchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());} else {
    watchers = new HashSet<>(materializedWatchers);}//封装 WatcherSetEventPair 对象,添加到waitngEvents 队列中WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair);}

materialize方法:

通过 dataWatches 或者 existWatches 或者 childWatches的 remove 取出对应的 watch,表明客户端 watch 也是注册一次就移除
同时需要根据 keeperState、eventType 和 path 返回应该被通知的 Watcher 集合

waitingEvents.add(pair);waitingEvents 是 EventThread 这个线程中的阻塞队列,很明显,又是在我们第一步操作的时候实例化的一个线程。从名字可以指导,waitingEvents 是一个待处理 Watcher的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:

EventThread 的 run() 方法从waitingEvents

@Override@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")public void run() {
    try {
    isRunning = true;while (true) {
    Object event = waitingEvents.take();if (event == eventOfDeath) {
    wasKilled = true;} else {
    //处理事件processEvent(event);}if (wasKilled) {
    synchronized (waitingEvents) {
    if (waitingEvents.isEmpty()) {
    isRunning = false;break;}}}}} catch (InterruptedException e) {
    LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));}

processEvent方法

private void processEvent(Object event) {
    try {
    //判断事件类型if (event instanceof WatcherSetEventPair) {
    // each watcher will process the event//得到WatcherSetEventPairWatcherSetEventPair pair = (WatcherSetEventPair) event;//拿到符合触发机制的所有 watcher 列表,循环进行调用for (Watcher watcher : pair.watchers) {
    try {
    // 调 用 客 户 端 的 回 调processwatcher.process(pair.event);} catch (Throwable t) {
    LOG.error("Error while calling watcher.", t);}}

头疼。。。