1. ChannelEventRunnable类
通道消息线程
所有的消息都在此处理,包含消息的收,发,断开连接,异常等
public class ChannelEventRunnable implements Runnable {private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);private final ChannelHandler handler;private final Channel channel;private final ChannelState state;private final Throwable exception;private final Object message;public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {this(channel, handler, state, null);}public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {this(channel, handler, state, message, null);}public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {this(channel, handler, state, null , t);}public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {this.channel = channel;this.handler = handler;this.state = state;this.message = message;this.exception = exception;}public void run() { // 收发消息处理switch (state) {case CONNECTED:try{handler.connected(channel);}catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try{handler.disconnected(channel);}catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try{handler.sent(channel,message);}catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is "+ message,e);}break;case RECEIVED:try{handler.received(channel, message);}catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is "+ message,e);}break;case CAUGHT:try{handler.caught(channel, exception);}catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel+ ", message is: " + message + ", exception is " + exception,e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}
2. DefaultFuture类 (在这里打断点,找到收发消息的源头)
public class DefaultFuture implements ResponseFuture
通过每个channel[有channle ID]发消息后,然后用DefaultFuture来接收返回的结果(同步返回一个空结果)。具体的结果等到需要用时,直接调DefaultFuture的get()方法,可以同步获取调用结果
public Object get() throws RemotingException {return get(timeout);}// 获取真正的调用结果public Object get(int timeout) throws RemotingException { if (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}if (! isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (! isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (! isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}
3. 具体通道里的消息收发
public class HeaderExchangeHandler implements ChannelHandlerDelegate {protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;private final ExchangeHandler handler;public HeaderExchangeHandler(ExchangeHandler handler){if (handler == null) {throw new IllegalArgumentException("handler == null");}this.handler = handler;}void handlerEvent(Channel channel, Request req) throws RemotingException {if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)) {channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);}}// Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) msg = null;else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);else msg = data.toString();res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);return res;}// find handler by message class.Object msg = req.getData();try {// handle data.Object result = handler.reply(channel, msg);res.setStatus(Response.OK);res.setResult(result);} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));}return res;}static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}}public void connected(Channel channel) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.connected(exchangeChannel);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public void disconnected(Channel channel) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.disconnected(exchangeChannel);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public void sent(Channel channel, Object message) throws RemotingException {Throwable exception = null;try {channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.sent(exchangeChannel, message);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}} catch (Throwable t) {exception = t;}if (message instanceof Request) {Request request = (Request) message;DefaultFuture.sent(channel, request);}if (exception != null) {if (exception instanceof RuntimeException) {throw (RuntimeException) exception;} else if (exception instanceof RemotingException) {throw (RemotingException) exception;} else {throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),exception.getMessage(), exception);}}}private static boolean isClientSide(Channel channel) {InetSocketAddress address = channel.getRemoteAddress();URL url = channel.getUrl();return url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));}public void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {Response response = handleRequest(exchangeChannel, request);channel.send(response);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public void caught(Channel channel, Throwable exception) throws RemotingException {if (exception instanceof ExecutionException) {ExecutionException e = (ExecutionException) exception;Object msg = e.getRequest();if (msg instanceof Request) {Request req = (Request) msg;if (req.isTwoWay() && ! req.isHeartbeat()) {Response res = new Response(req.getId(), req.getVersion());res.setStatus(Response.SERVER_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);return;}}}ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.caught(exchangeChannel, exception);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public ChannelHandler getHandler() {if (handler instanceof ChannelHandlerDelegate) {return ((ChannelHandlerDelegate) handler).getHandler();} else {return handler;}}
}
4. com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler 处理消息收发
(1) 发送dubbo请求消息
// 发送消息public void sent(Channel channel, Object message) throws RemotingException {Throwable exception = null;try {channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.sent(exchangeChannel, message);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}} catch (Throwable t) {exception = t;}if (message instanceof Request) {Request request = (Request) message;DefaultFuture.sent(channel, request);}if (exception != null) {if (exception instanceof RuntimeException) {throw (RuntimeException) exception;} else if (exception instanceof RemotingException) {throw (RemotingException) exception;} else {throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),exception.getMessage(), exception);}}}
(2) 接收dubbo响应消息
// 接收消息 public void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {Response response = handleRequest(exchangeChannel, request);channel.send(response);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}