当前位置: 代码迷 >> 综合 >> 详解dubbo中的超时机制(2)
  详细解决方案

详解dubbo中的超时机制(2)

热度:9   发布时间:2024-03-06 20:09:27.0

这篇文章很久之前就写好了,忘了发,一直在草稿里。。。。

2.7.3 中的超时机制

在2.7.3中,超时机制有很大的改动,下面直接进入源码。

2.7.3 org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker

protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);asyncRpcResult.subscribeTo(responseFuture);// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapterFutureContext.getContext().setCompatibleFuture(responseFuture);return asyncRpcResult;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}

DubboInvoker相比2.5.x中不再判断是否异步请求,统一返回了一个CompletableFuture

 CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);

 2.7.3 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient

public class HeaderExchangeClient implements ExchangeClient {......@Overridepublic CompletableFuture<Object> request(Object request) throws RemotingException {return channel.request(request);}......
}

 2.7.3 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel

    @Overridepublic CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);try {channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}

这里HeaderExchangeClient和HeaderExchangeChannel的相比2.5.x,返回都用CompletableFuture替换原来的ResponseFuture

org.apache.dubbo.remoting.exchange.support.DefaultFuture

//dubbo 2.7.3 DefaultFuturepublic class DefaultFuture extends CompletableFuture<Object> {.......public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true),30,TimeUnit.MILLISECONDS);....../*** check time out of the future*/private static void timeoutCheck(DefaultFuture future) {TimeoutCheckTask task = new TimeoutCheckTask(future.getId());future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);}....../*** init a DefaultFuture* 1.init a DefaultFuture* 2.timeout check** @param channel channel* @param request the request* @param timeout timeout* @return a new DefaultFuture*/public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {final DefaultFuture future = new DefaultFuture(channel, request, timeout);// timeout checktimeoutCheck(future);return future;}

2.7.3中用到了一个新的类HashedWheelTimer,来处理超时请求,这是一个基于时间轮算法的延迟任务队列。

    //dubbo 2.5.x DefaultFuturestatic {Thread th = new Thread(new RemotingInvocationTimeoutScan(),"DubboResponseTimeoutScanTimer");th.setDaemon(true);th.start();}private static class RemotingInvocationTimeoutScan implements Runnable {public void run() {while (true) {try {for (DefaultFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {// create exception response.Response timeoutResponse = new Response(future.getId());// set timeout status.timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));// handle response.DefaultFuture.received(future.getChannel(), timeoutResponse);}}Thread.sleep(30);} catch (Throwable e) {logger.error("Exception when scan the timeout invocation of remoting.", e);}}}}

2.5.x中的超时设计,是开启了一个守护线程不停轮询所有请求,每次轮询sleep 30ms,这和2.7.3中的HashedWheelTimer相比,轮询的时候会多一些的超时判断,个人认为这里2.5.x的实现会比HashedWheelTimer消耗更多的cpu。这也是为什么使用HashedWheelTimer替换2.5.x中做法的原因。