当前位置: 代码迷 >> 综合 >> SpringBoot ZeroMQ
  详细解决方案

SpringBoot ZeroMQ

热度:52   发布时间:2023-12-18 09:41:47.0

首先导入依赖:

<dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.3.1</version>
</dependency>

ZMQ发布端:

/*** ZMQ发布数据*/
public class ZmqPubClient {/*** ZMQ启动线程数*/private static int ZMQThreadCount = Integer.parseInt("1");/*** ZMQ数据广播端口*/private static int ZMQSendPort = Integer.parseInt("7111");private static ZMQ.Context context = null;private static ZMQ.Socket pubSock = null;/*** 初始化ZMQ对象*/private static void initZMQ() {if (context == null) {context = ZMQ.context(ZMQThreadCount);}if (ZMQSendPort != 0) {pubSock = context.socket(ZMQ.PUB);String bindUri = "tcp://*:" + ZMQSendPort;pubSock.bind(bindUri);} else {throw new RuntimeException("Error!");}}private static ZMQ.Socket getPubSock() {if (pubSock == null) {initZMQ();}return pubSock;}private static ZMQ.Socket getPubSock(int port) {if (context == null) {context = ZMQ.context(ZMQThreadCount);}ZMQ.Socket socket = context.socket(ZMQ.PUB);String binUri = "tcp://*" + port;socket.bind(binUri);return socket;}public static void sendData(String msg) {getPubSock().send(msg, ZMQ.NOBLOCK);}public static void sendData(byte[] msg) {getPubSock().send(msg);}public static void sendData(int port, String msg) {ZMQ.Socket socket = getPubSock(port);socket.send(msg);socket.close();}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 100; i++) {sendData("测试测试".getBytes());System.out.println("发送测试");Thread.sleep(1000);}}
}

接收线程:

/*** ZMQ接收线程*/
public abstract class ZmqSubThread implements Runnable {/*** ZMQ启动线程数*/private int ZMQThreadCount = Integer.parseInt("1");/*** ZMQ接收端口*/private int ZMQRecvPort;/*** ZMQ监听接收端口*/private String ZMQRecvIP;private ZMQ.Context context = null;private ZMQ.Socket subSock = null;public ZmqSubThread() {initZMQ();}public ZmqSubThread(String ZMQRecvIP, int ZMQRecvPort) {this.ZMQRecvIP = ZMQRecvIP;this.ZMQRecvPort = ZMQRecvPort;initZMQ();}/*** 初始化ZMQ对象*/public void initZMQ() {if (ZMQRecvIP == null || "".equals(ZMQRecvIP)) {throw new RuntimeException("IP Error!");}if (ZMQRecvPort == 0) {throw new RuntimeException("Port Error!");}context = ZMQ.context(ZMQThreadCount);subSock = context.socket(ZMQ.SUB);String ConUri = "tcp://" + ZMQRecvIP + ":" + ZMQRecvPort;subSock.connect(ConUri);subSock.subscribe("".getBytes());}@Overridepublic void run() {while (true) {try {byte[] recvBuf = subSock.recv(ZMQ.SUB);if (recvBuf == null) {continue;}dealWith(recvBuf);} catch (Exception e) {e.printStackTrace();}}}/*** 处理接收到数据的抽象方法*/public abstract void dealWith(byte[] data);
}

接收测试:

public class Test {public static void main(String[] args) {ZmqSubThread zmqSubThread = new ZmqSubThread("127.0.0.1", 7111) {@Overridepublic void dealWith(byte[] data) {System.out.println(new String(data));}};Thread thread = new Thread(zmqSubThread);thread.start();}
}
  相关解决方案