当前位置: 代码迷 >> 综合 >> 【ZeroMQ】ZeroMQ入门
  详细解决方案

【ZeroMQ】ZeroMQ入门

热度:9   发布时间:2023-11-22 18:17:35.0

ZeroMQ入门

      • ZeroMQ入门和进阶
        • ZeroMQ下载编译
        • ZeroMQ文档
      • ZeroMQ 解决传统网络编程的问题
      • ZeroMQ模型介绍
        • 【1】REQ/REP 请求响应模型
        • 【2】PUB/SUB 发布订阅模型
        • 【3】PUSH/PULL 推拉模型
        • 【4】ROUTER/DEALER 模型
        • PUB/SUB模型与PUSH/PULL模型的区别
      • ZeroMQ应用范例
        • 【1】REQ/REP 请求响应模型
        • 【2】PUB/SUB 发布订阅模型
        • 【3】PUSH/PULL 推拉模型
        • 【4】ROUTER/DEALER 模型
      • ZeroMQ存在的问题

ZeroMQ入门和进阶

ZeroMQ下载编译

源码下载地址:http://download.zeromq.org/

ubuntu安装必须的库

sudo apt-get install libtool 
sudo apt-get install pkg-config 
sudo apt-get install build-essential 
sudo apt-get install autoconf 
sudo apt-get install automake

安装加密库

Sodium?个易于使用的可为我们提供加密、解密、签名,密码哈希等功能的软件库。除了自身强?的功能外,它还为我们提供了?个兼容API和?个外部API,以进?步的帮助我们提高其可用性。Sodium的目标是提供构建更高级别加密工具所需的所有核心操作。

若命令行不能安装,则去这个github网址手动下载并解压

git clone git://github.com/jedisct1/libsodium.git 
cd libsodium 
./autogen.sh -s 
./configure && make check 
sudo make install 
sudo ldconfig 
cd ..

下载、编译、安装libzmq

# 下载 
git clone https://github.com/zeromq/libzmq.git 
cd libzmq 
# 查看tag 
git tag 
# 版本 获取指定的版本,不要?主分?,可能有bug 
git checkout v4.3.4 
./autogen.sh 
./configure && make check 
sudo make install 
sudo ldconfig 
cd ..

编译debug版本时使用 ./configure --enable-debug

sudo make install的是时候可以看到具体的.so和.a
libtool: install: /usr/bin/install -c src/.libs/libzmq.lai /usr/local/lib/libzmq.la
libtool: install: /usr/bin/install -c src/.libs/libzmq.a /usr/local/lib/libzmq.a
libtool: install: chmod 644 /usr/local/lib/libzmq.a
libtool: install: ranlib /usr/local/lib/libzmq.a
libtool: finish: PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin:/sbin" ldconfig - n /usr/local/lib
我们在编译的时候需要加上libzmq库,?如gcc -o bin file.c -lzmq

ZeroMQ文档

官方API: http://api.zeromq.org/

英文指南:http://zguide.zeromq.org/page:all

中文指南:https://github.com/anjuke/zguide-cn,文档里面的代码有些是过时的,需要参考提供的github链接的代码

性能测试:http://wiki.zeromq.org/results:perf-howto

ZeroMQ 解决传统网络编程的问题

? 调用的socket接口较多;
? TCP是一对一的连接;- 一对多 , reactor模式
? 编程需要关注很多socket细节问题;
? 不支持跨平台编程;
? 需要自行处理分包、组包问题;
? 流式传输时需处理粘包、半包问题;
? 需自行处理网络异常,比如连接异常中断、重连等;
? 服务端和客户端启动有先后;
? 自行处理IO模型;
? 自行实现消息的缓存 (消息水位);
? 自行实现对消息的加密

ZeroMQ模型介绍

【1】REQ/REP 请求响应模型

在这里插入图片描述

【2】PUB/SUB 发布订阅模型

在这里插入图片描述

【3】PUSH/PULL 推拉模型

请添加图片描述
如上图所示,push/pull既可以做客户端,也可以做服务端。

  • Ventllator push推送信息
  • 多个Worker pull拉取信息消费,然后把结果push给Sink
  • Sink pull拉取结果消费。

这个模式有点像多线程竞争从全局队列获取消息进行消费。

【4】ROUTER/DEALER 模型

在这里插入图片描述

PUB/SUB模型与PUSH/PULL模型的区别

在这里插入图片描述

  • PUB/SUB模型中,client发的消息,不同的server可接收所有的msg进行消费(同topic)。
  • PUSH/PULL模型中,client发的消息,不同server接收指定的msg进行消费。类似于多线程从全局队列取消息模型。

ZeroMQ应用范例

【1】REQ/REP 请求响应模型

hwclient.c

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
//编译:gcc -o hwclient hwclient.c -lzmq
int main (void)
{
    printf ("Connecting to hello world server...\n");void *context = zmq_ctx_new ();// 连接至服务端的套接字void *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5555");int request_nbr;int ret = 0;for (request_nbr = 0; request_nbr != 10; request_nbr++) {
    char buffer [10];printf ("正在发送1 Hello %d...\n", request_nbr);ret = zmq_send (requester, "Hello", 5, 0);printf ("zmq_send1 ret:%d\n", ret);printf ("正在发送2 Hello %d...\n", request_nbr);ret = zmq_send (requester, "Hello", 5, 0);printf ("zmq_send2 ret:%d\n", ret);zmq_recv (requester, buffer, 6, 0);        // 收到响应才能再发zmq_recv (requester, buffer, 6, 0); printf ("接收到 World %d\n", request_nbr);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}

hwserver.c

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
//gcc -o hwserver hwserver.c -lzmq
int main (void)
{
    // Socket to talk to clientsvoid *context = zmq_ctx_new ();// 与客户端通信的套接字void *responder = zmq_socket (context, ZMQ_REP);int rc = zmq_bind (responder, "tcp://*:5555");  // 服务器要做绑定assert (rc == 0);while (1) {
    //  等待客户端请求char buffer [10];int size = zmq_recv (responder, buffer, 10, 0);buffer[size] = '\0';printf ("收到 %s\n", buffer);sleep (1);          //  Do some 'work'//  返回应答zmq_send (responder, "World", 5, 0);}return 0;
}

开启hwserver

# ./hwserver

调试hwclient

# gdb hwclient	//开始gdb调试(gdb) b main	//打断点到main函数(gdb) r	//重新开始运行文件(run-text:加载文本文件,run-bin:加载二进制文件)(gdb) c	//继续运行# ctrl+c //终止/中断运行(gdb) info threads	//查看hwclient进程有多少个线程,如下可见开了3个线程Id   Target Id         Frame 3    Thread 0x7ffff65c7700 (LWP 29297) "ZMQbg/IO/0" 0x00007ffff7b0c0e3 in epoll_wait ()from /lib64/libc.so.62    Thread 0x7ffff6dc8700 (LWP 29296) "ZMQbg/Reaper" 0x00007ffff7b0c0e3 in epoll_wait ()from /lib64/libc.so.6
* 1    Thread 0x7ffff7ef0740 (LWP 29292) "hwclient" 0x00007ffff7b00ddd in poll () from /lib64/libc.so.6(gdb) thread 3	//切换到线程3
[Switching to thread 3 (Thread 0x7ffff65c7700 (LWP 29297))]
#0  0x00007ffff7b0c0e3 in epoll_wait () from /lib64/libc.so.6(gdb) bt	//查看堆栈信息,可见线程3用到了epoll
#0  0x00007ffff7b0c0e3 in epoll_wait () from /lib64/libc.so.6
#1  0x00007ffff7f2e22b in zmq::epoll_t::loop (this=0x4095b0) at src/epoll.cpp:184
#2  0x00007ffff7f4f2d9 in zmq::worker_poller_base_t::worker_routine (arg_=0x4095b0)at src/poller_base.cpp:146
#3  0x00007ffff7f7a060 in thread_routine (arg_=0x409608) at src/thread.cpp:257
#4  0x00007ffff75f0ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007ffff7b0bb0d in clone () from /lib64/libc.so.6(gdb) q	//退出gdb

【2】PUB/SUB 发布订阅模型

wuclient.c

#include "zhelpers.h"
//编译:gcc -o wuclient wuclient.c -lzmq
int main (int argc, char *argv [])
{
    // Socket to talk to serverprintf ("Collecting updates from weather server...\n");void *context = zmq_ctx_new ();void *subscriber = zmq_socket (context, ZMQ_SUB);printf("zmq_connect\n");int rc = zmq_connect (subscriber, "tcp://localhost:5556");assert (rc == 0);// Subscribe to zipcode, default is NYC, 10001const char *filter = (argc > 1)? argv [1]: "10001 ";printf("ZMQ_SUBSCRIBE\n");rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));  // 字符匹配的方式assert (rc == 0);// Process 100 updatesint update_nbr;long total_temp = 0;printf("into for\n");for (update_nbr = 0; update_nbr < 100; update_nbr++) {
    char *string = s_recv (subscriber);int zipcode, temperature, relhumidity;sscanf (string, "%d %d %d",&zipcode, &temperature, &relhumidity);total_temp += temperature;printf("zipcode = %d, temperature:%d\n", zipcode, temperature);free (string);}printf ("Average temperature for zipcode '%s' was %dF\n",filter, (int) (total_temp / update_nbr));zmq_close (subscriber);zmq_ctx_destroy (context);return 0;
}

wuserver.c

#include "zhelpers.h"
int main (void)
{
    // Prepare our context and publishervoid *context = zmq_ctx_new ();void *publisher = zmq_socket (context, ZMQ_PUB);int rc = zmq_bind (publisher, "tcp://*:5556");assert (rc == 0);// Initialize random number generatorsrandom ((unsigned) time (NULL));int zipcode, temperature, relhumidity;temperature = 0;while (1) {
    //  Get values that will fool the bossint zipcode, temperature, relhumidity;zipcode     = randof (100000);// zipcode = 10001;if(++temperature < 0)temperature = 0;relhumidity = randof (50) + 10;//  Send message to all subscriberschar update [20];sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);printf ("pub %05d %d %d\n", zipcode, temperature, relhumidity);s_send (publisher, update);     // 发布是不管有没有人订阅// sleep(1);}zmq_close (publisher);zmq_ctx_destroy (context);return 0;
}

启动服务器

# ./wuserver

客户端1订阅10001频道

# ./wuclient 10001
zmq_connect
ZMQ_SUBSCRIBE
into for
zipcode = 10001, temperature:155055
zipcode = 10001, temperature:291385
zipcode = 10001, temperature:343617
zipcode = 10001, temperature:486772
zipcode = 10001, temperature:554440
zipcode = 10001, temperature:668110
...

客户端2订阅10002频道

# ./wuclient 10002
zmq_connect
ZMQ_SUBSCRIBE
into for
zipcode = 10002, temperature:38414
zipcode = 10002, temperature:100755
zipcode = 10002, temperature:125598
zipcode = 10002, temperature:196114
...

【3】PUSH/PULL 推拉模型

taskvent.c

#include "zhelpers.h"
int main (void) 
{
    void *context = zmq_ctx_new ();// Socket to send messages onvoid *sender = zmq_socket (context, ZMQ_PUSH);zmq_bind (sender, "tcp://*:5557");// Socket to send start of batch message onvoid *sink = zmq_socket (context, ZMQ_PUSH);zmq_connect (sink, "tcp://localhost:5558");printf ("Press Enter when the workers are ready: ");getchar ();printf ("Sending tasks to workers...\n");//  The first message is "0" and signals start of batchs_send (sink, "0");//  Initialize random number generatorsrandom ((unsigned) time (NULL));//  Send 100 tasksint task_nbr;int total_msec = 0;     //  Total expected cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {
    int workload;//  Random workload from 1 to 100msecsworkload = randof (100) + 1;total_msec += workload;char string [10];sprintf (string, "%d", workload);s_send (sender, string);    // 用户层没有关注pull}printf ("Total expected cost: %d msec\n", total_msec);zmq_close (sink);zmq_close (sender);zmq_ctx_destroy (context);return 0;
}

taskwork.c

#include "zhelpers.h"
int main (void) 
{
    // Socket to receive messages onvoid *context = zmq_ctx_new ();void *receiver = zmq_socket (context, ZMQ_PULL);zmq_connect (receiver, "tcp://localhost:5557");// Socket to send messages tovoid *sender = zmq_socket (context, ZMQ_PUSH);zmq_connect (sender, "tcp://localhost:5558");// Process tasks foreverwhile (1) {
    char *string = s_recv (receiver);printf ("%s.", string);     // Show progressfflush (stdout);s_sleep (atoi (string));    // Do the workfree (string);s_send (sender, "");        // Send results to sink 把结果发送给sink}zmq_close (receiver);zmq_close (sender);zmq_ctx_destroy (context);return 0;
}

tasksink.c

#include "zhelpers.h"
int main (void) 
{
    // Prepare our context and socketvoid *context = zmq_ctx_new ();void *receiver = zmq_socket (context, ZMQ_PULL);zmq_bind (receiver, "tcp://*:5558");// Wait for start of batchchar *string = s_recv (receiver);free (string);//  Start our clock nowint64_t start_time = s_clock ();//  Process 100 confirmationsint task_nbr;for (task_nbr = 0; task_nbr < 100; task_nbr++) {
    char *string = s_recv (receiver);free (string);if (task_nbr % 10 == 0)printf (":");elseprintf (".");fflush (stdout);}//  Calculate and report duration of batchprintf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time));zmq_close (receiver);zmq_ctx_destroy (context);return 0;
}

开启taskvent

# ./taskvent 
Press Enter when the workers are ready: 
Sending tasks to workers...
Total expected cost: 4830 msec

开启tasksink

# ./tasksink 
:.........:.........:.........:.........:.........:.........:.........:.........:.........:.........Total elapsed time: 2640 msec

开启taskwork1

# ./taskwork 
69.72.7.85.100.89.19.16.75.52.20.97.78.32.57.74.55.77.83.3.95.37.2.67.83.33.19.6.8.97.6.90.15.22.28.79.14.36.69.91.20.54.53.34.97.25.100.62.80.11.

开启taskwork2

# ./taskwork 
87.52.38.41.34.55.11.54.31.58.100.4.79.2.69.26.45.62.62.82.91.13.53.76.19.3.30.96.38.65.70.60.67.97.24.23.16.16.2.51.24.38.60.61.30.2.14.15.83.8.

【4】ROUTER/DEALER 模型

rrworker.c

#include "zhelpers.h"
#include <unistd.h>
int main (void) 
{
    void *context = zmq_ctx_new ();// Socket to talk to clientsvoid *responder = zmq_socket (context, ZMQ_REP);zmq_connect (responder, "tcp://localhost:5560");while (1) {
    // Wait for next request from clientchar *string = s_recv (responder);printf ("Received request: [%s]\n", string);free (string);// Do some 'work'sleep (1);// Send reply back to clients_send (responder, "World");}// We never get here, but clean up anyhowzmq_close (responder);zmq_ctx_destroy (context);return 0;
}

rrworker2.c

#include "zhelpers.h"
#include <unistd.h>
int main (void) 
{
    void *context = zmq_ctx_new ();// Socket to talk to clientsvoid *responder = zmq_socket (context, ZMQ_REP);zmq_connect (responder, "tcp://localhost:5560");while (1) {
    // Wait for next request from clientchar *string = s_recv (responder);printf ("Received request: [%s]\n", string);free (string);// Do some 'work'sleep (1);// Send reply back to clients_send (responder, "World2");}// We never get here, but clean up anyhowzmq_close (responder);zmq_ctx_destroy (context);return 0;
}

rrbroker.c

// Simple request-reply broker
#include "zhelpers.h"int main (void) 
{
    // Prepare our context and socketsvoid *context = zmq_ctx_new ();void *frontend = zmq_socket (context, ZMQ_ROUTER);void *backend  = zmq_socket (context, ZMQ_DEALER);zmq_bind (frontend, "tcp://*:5559");zmq_bind (backend,  "tcp://*:5560");// Initialize poll setzmq_pollitem_t items [] = {
    {
     frontend, 0, ZMQ_POLLIN, 0 },{
     backend,  0, ZMQ_POLLIN, 0 }};//  Switch messages between socketswhile (1) {
    zmq_msg_t message;zmq_poll (items, 2, -1);if (items [0].revents & ZMQ_POLLIN) {
    while (1) {
    //  Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, frontend, 0);int more = zmq_msg_more (&message);zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break;      //  Last message part}}if (items [1].revents & ZMQ_POLLIN) {
    while (1) {
    //  Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, backend, 0);int more = zmq_msg_more (&message);zmq_msg_send (&message, frontend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break;      //  Last message part}}}//  We never get here, but clean up anyhowzmq_close (frontend);zmq_close (backend);zmq_ctx_destroy (context);return 0;
}

rrclient.c

#include "zhelpers.h"
int main (void) 
{
    void *context = zmq_ctx_new ();// Socket to talk to servervoid *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5559");int request_nbr;for (request_nbr = 0; request_nbr != 10; request_nbr++) {
    s_send (requester, "Hello");char *string = s_recv (requester);printf ("Received reply %d [%s]\n", request_nbr, string);free (string);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}

开启rrworker

# ./rrworker
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]

开启rrworker2

# ./rrworker2
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]

开启rrbroker

# ./rrbroker

开启rrclient1

# ./rrclient 
Received reply 0 [World2]
Received reply 1 [World2]
Received reply 2 [World2]
Received reply 3 [World2]
Received reply 4 [World2]
Received reply 5 [World2]
Received reply 6 [World2]
Received reply 7 [World2]
Received reply 8 [World]
Received reply 9 [World2]

开启rrclient2

# ./rrclient 
Received reply 0 [World]
Received reply 1 [World2]
Received reply 2 [World]
Received reply 3 [World]
Received reply 4 [World]
Received reply 5 [World]
Received reply 6 [World]
Received reply 7 [World]
Received reply 8 [World]
Received reply 9 [World]

ZeroMQ存在的问题

ZeroMQ使用的不如Kafka、RocketMQ广泛的主要原因:

  1. 耦合的问题。程序要继承ZeroMQ本身
  2. 无法集群
  3. 无副本模式
  4. 无法持久化