当前位置: 代码迷 >> Web前端 >> RMI/WebService冗余服务器的通用客户端
  详细解决方案

RMI/WebService冗余服务器的通用客户端

热度:804   发布时间:2012-10-24 14:15:58.0
RMI/WebService冗余服务器的通用客户端。
需求:
企业应用要有有redundancy, 两台或者多台服务器提供HA(High available)服务,提供热备用。服务一般是EJB/RMI/Web Service等服务, 在一台发生服务故障后,客户端一般自动切换到其它可用服务器, 所有服务器都依次fail后才报错。

EJB也应该适用,但是一般EJB容器都提供了更完善的HA机制和策略。这里不cover.

不足和限制:
1. 暂时没有考虑内网/外网优先顺序。
2. 使用接口+java proxy实现拦截,接口必须。
3. web service使用动态绑定。
4. 一台失败后,遍历所有服务器,故障服务器没有排除
5. java 6以上测试通过
6. 通信失败的检测不见得完善
7. Web Service采用POJO发布,要求Port和Service名称和接口名称相同,便于简化查找。建议定义为常量。

测试服务代码(同时提供rmi/ws服务):
================================================
package org.steeven.remote;

import java.rmi.Remote;
import java.rmi.RemoteException;

import javax.jws.WebService;
import javax.jws.soap.SOAPBinding;

@WebService(targetNamespace = Hello.WS_QNAME)
@SOAPBinding(style = SOAPBinding.Style.RPC)
public interface Hello extends Remote {
	public static final String WS_QNAME = "http://steeeven.org/client";

	public String add(String a, String b) throws RemoteException;
}

package org.steeven.remote;

import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;

import javax.jws.WebService;
import javax.jws.soap.SOAPBinding;
import javax.xml.ws.Endpoint;

@WebService(targetNamespace = Hello.WS_QNAME, portName = "Hello", serviceName = "Hello")
@SOAPBinding(style = SOAPBinding.Style.RPC)
public class ServerImpl extends UnicastRemoteObject implements Hello {

	private static final long serialVersionUID = 1L;

	protected ServerImpl() throws RemoteException {
		super();
	}

	public String add(String a, String b) throws RemoteException {
		if ("steeven".equals(b))
			throw new RuntimeException("SH IT");
		return a + b;
	}

	public static void main(String[] args) throws Exception {
		// 发布为RMI服务
		Registry registry = LocateRegistry.createRegistry(18888);
		registry.bind("server", new ServerImpl());
		// 发布为web service
		Endpoint.publish("http://0.0.0.0:18889/server", new ServerImpl());
		System.err.println("Server ready");
	}
}



实现和测试代码:
===========================================
package org.steeven.client;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.rmi.ConnectException;
import java.util.LinkedList;

public class HaClient<T> {
	private LinkedList<String> preferredUrls = new LinkedList<String>();
	private LinkedList<String> otherUrls = new LinkedList<String>();
	private T currentService;
	private Class<T> type;
	private ProtocolHandler protocalHandler;

	InvocationHandler invokeHandler = new InvocationHandler() {
		@Override
		public Object invoke(Object proxy, Method method, Object[] args)
				throws Throwable {
			try {
				try {
					System.out.println("invoke: " + method);
					return method.invoke(getServer(false), args);
				} catch (Throwable e) {
					if (protocalHandler
							.isServiceOut(e instanceof InvocationTargetException ? e
									.getCause()
									: e)) {
						System.out.println("Prefered server failed");
						return method.invoke(getServer(true), args);
					} else
						throw e;
				}
			} catch (InvocationTargetException ex) {
				throw ex.getCause(); // InvocationTargetExcetpion.getCause()
			}
		}
	};

	protected HaClient(ProtocolHandler protocalHandler, Class<T> type,
			String service, InetSocketAddress... nameServers) {
		this.protocalHandler = protocalHandler;
		this.type = type;
		for (InetSocketAddress address : nameServers) {
			if (isLocalHost(address.getAddress()))
				preferredUrls.add(protocalHandler.buildServiceUrl(service,
						address));
			else
				otherUrls
						.add(protocalHandler.buildServiceUrl(service, address));
		}
	}

	protected HaClient(ProtocolHandler protocalHandler, Class<T> type,
			LinkedList<String> preferredUrls, LinkedList<String> otherUrls) {
		this.type = type;
		this.protocalHandler = protocalHandler;
		this.preferredUrls = preferredUrls;
		this.otherUrls = otherUrls;
	}

	/**
	 * check preferred server (local host) list first, then other server list.
	 */
	synchronized protected T getServer(boolean retry) throws Exception {
		// try local first, then others
		if (!retry && currentService != null) {
			System.out.println("user last server");
			return currentService;
		}
		currentService = null;
		tryServers(preferredUrls); // try preferred
		if (currentService == null)
			tryServers(otherUrls); // try others
		if (currentService == null)
			throw new ConnectException("No server available now");
		return currentService;
	}

	/**
	 * remove active from list to current, roll failed server to bottom.
	 */
	@SuppressWarnings("unchecked")
	private void tryServers(LinkedList<String> list) throws Exception {
		LinkedList<String> badList = new LinkedList<String>();
		try {
			String url;
			while (list.size() > 0) {
				url = list.peek();
				try {
					System.out.println("trying: " + url);
					// TODO URI InetAddress.isReachable(n) to ping first
					currentService = (T) protocalHandler.lookupService(url,
							type);
					return;
				} catch (Exception e) {
					if (protocalHandler.isServiceOut(e)) {
						System.out.println("failed to try: " + url);
						list.pop();
						badList.add(url);
					} else
						throw e;
				}
			}
		} finally {
			list.addAll(badList);
		}
	}

	@SuppressWarnings("unchecked")
	public T newInstance() {
		return (T) Proxy.newProxyInstance(type.getClassLoader(),
				new Class<?>[] { type }, invokeHandler);
	}

	private boolean isLocalHost(InetAddress address) {
		boolean local = address.isLoopbackAddress()
				|| address.isAnyLocalAddress();
		if (!local)
			try {
				local = (NetworkInterface.getByInetAddress(address) != null);
			} catch (SocketException e) {
			}
		return local;
	}

	public static <T> T create(ProtocolHandler protocol, Class<T> type,
			String service, InetSocketAddress... addresses) {
		return new HaClient<T>(protocol, type, service, addresses)
				.newInstance();
	}

	public static <T> T create(ProtocolHandler protocol, Class<T> type,
			String service, int port, String... hosts) {
		InetSocketAddress[] addresses = new InetSocketAddress[hosts.length];
		for (int i = 0; i < addresses.length; i++) {
			addresses[i] = new InetSocketAddress(hosts[i], port);
			if (addresses[i].getAddress() == null)
				throw new IllegalArgumentException("Invalid host: " + hosts[i]);
		}
		return new HaClient<T>(protocol, type, service, addresses)
				.newInstance();
	}

	public abstract static class ProtocolHandler {

		public String buildServiceUrl(String service, InetSocketAddress address) {
			return getProtocol() + "://"
					+ address.getAddress().getHostAddress() + ":"
					+ address.getPort() + service;
		}

		public abstract String getProtocol();

		public abstract boolean isServiceOut(Throwable e);

		public abstract Object lookupService(String url, Class<?> type)
				throws Exception;

	}
}



package org.steeven.client;

import java.net.ConnectException;
import java.net.URL;

import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import javax.xml.ws.WebServiceException;

import org.steeven.remote.Hello;

/**
 * High Available Http client. Provide service proxy that auto detect available
 * RMI service form several servers.
 * 
 * @author xli
 * 
 * @param <T>
 */
public class HttpProtocolHandler extends HaClient.ProtocolHandler {

	private String qname;

	public HttpProtocolHandler(String qname) {
		this.qname = qname;
	}

	@Override
	public String getProtocol() {
		return "http";
	}

	public boolean isServiceOut(Throwable e) {
		return e instanceof WebServiceException
				&& e.getCause() instanceof ConnectException;
	}

	@Override
	public Object lookupService(String url, Class<?> type) throws Exception {
		Service service = Service.create(new URL(url + "?wsdl"), new QName(
				qname, type.getSimpleName()));
		return service.getPort(new QName(Hello.WS_QNAME, type.getSimpleName()),
				type);
	}

	public static void main(String args[]) throws Exception {
		Hello hello = HaClient.create(new HttpProtocolHandler(Hello.WS_QNAME),
				Hello.class, "/server", 18889, "10.80.1.113", "10.80.2.184",
				"10.80.1.117");
		while (true) {
			try {
				System.out.println("------------------------new-----------------");
				System.out.println(hello.add("hello ", "steeven1"));
			} catch (Exception e) {
				e.printStackTrace();
			}
			Thread.sleep(3000);
		}
	}
}



package org.steeven.client;

import java.rmi.ConnectException;
import java.rmi.Naming;

import org.steeven.remote.Hello;

/**
 * High Available RMI client. Provide service proxy that auto detect available
 * RMI service form several servers.
 * 
 * @author xli
 * 
 * @param <T>
 */
public class RmiProtocolHandler extends HaClient.ProtocolHandler {
	
	@Override
	public String getProtocol() {
		return "rmi";
	}

	public boolean isServiceOut(Throwable e) {
		return e instanceof ConnectException;
	}

	@Override
	public Object lookupService(String url, Class<?> type) throws Exception {
		return Naming.lookup(url);
	}

	public static void main(String args[]) throws Exception {
		Hello hello = HaClient.create(new RmiProtocolHandler(), Hello.class,
				"/server", 18888, "10.80.1.113", "10.80.2.184", "10.80.1.117");
		while (true) {
			try {
				System.out.println("------------------------new-----------------");
				System.out.println(hello.add("hello ", "steeven1"));
			} catch (Exception e) {
				e.printStackTrace();
			}
			Thread.sleep(3000);
		}
	}
}
1 楼 steeven 2010-01-17  
还有几个问题:
1. 没有考虑到一个端口上的多个service, 没有整体检测, 替换.
2. 没有采用ping机制,效率不高. 有的server可能会不支持.
3. 没有包装成URL对象.
4. WebService相关参数应该从接口反射本身取得
2 楼 C_J 2010-01-17  
好像挺不错,学习了。

想请教几个问题:

1.一个端口会有多个service?

2.ping机制?你是说ICMP协议的ping么?如果机器禁止了怎么办呢?
3 楼 steeven 2010-01-19  
C_J 写道
好像挺不错,学习了。

想请教几个问题:

1.一个端口会有多个service?

2.ping机制?你是说ICMP协议的ping么?如果机器禁止了怎么办呢?


这里的端口就是JNDI或者HTTP的端口,里面可能注册了多个service.
ping,对,经常有机器关掉。但是自己的项目里面还是可控的
  相关解决方案