当前位置: 代码迷 >> 综合 >> RabbiMQ学习文档
  详细解决方案

RabbiMQ学习文档

热度:96   发布时间:2024-03-10 01:43:48.0

RabbiMQ学习文档

rabbitMQ是遵循amqp协议的一个erlang实现。

amqp协议:高级消息队列协议

http协议:request、response

telnet协议:经常用来查看某一台ip上的指定端口是否是ping通的【远程登陆协议】

AMQP 0-9-1 complete Reference Guide rabbitmq 实现的amqp协议的版本号

  • connection =>open , use , close [open-ok , close , une-ok]
  • channel =>open, flow , close , [构建在connection之上,在amqp中常作为长连接]
  • exchange =>
  • queue
  • basic =>发布和获取 message中的一些设置
  • tx =>事务处理
  • confirm =>发布确认机制

详细设计书一样

在这里插入图片描述
在这里插入图片描述

延时处理,拉长时间

以更长的时间来换取堆积的业务逻辑

异步处理:响应很快,增加服务器承载能力

流量削峰:

扩展性:UI和业务的解耦,可以独立演化

高可用:处理器发生故障以后,不会影响可用性

缺陷:

即时性降低,降低了用户体验—无法避免;业务上来屈服;

复杂性提高

vhost:避免命名冲突

exchange:

  • direct
  • headers
  • topic
  • fanout

在这里插入图片描述

在windows上是一个服务

UI工具可以查看rabbitmq的事实状况,http api的方式也可以查看

http://127.0.0.1:15672/#/

在这里插入图片描述

ui和命令行的区别:ui只是命令行的子集

在这里插入图片描述

  • 应用层序和集群的管理 application and Cluster Management

    • stop
    • stop_app
    • start_app
    • reset [格式化的功能]
    • force reset [无条件设置]
  • 集群配置 Cluster management:ram和disk

    • purge_queue {queue名} 用于清空某一个队列
  • 用户管理 userManagement

    • add_user {username} {password} [设置users的角色]
    • set_user_tags {username} {tag …} [设置用户的角色]
    • authenticate_user {username} {password} 验证用户名,密码是否正确
    • list_users 展示所有users
  • 访问控制 access Control

    • set_permissions [-p vhost] {user} {conf} {write} {read}
    • clear_permissions {user}
  • 参数管理 parameter management 【第三方插件比较多】

  • 政策管理 policy management 对queue 的全局设置用得上这个政策管理,队列镜像也是用的这个命令

  • 服务器状态

    • list_queues {name} {pid} {durable}
    • list_exchanges [-p vhost] [exchangeinfoitem]
    • list_bindings [-p vhost] [bindinginfoitem]
    • list_connections [connectioninfoitem]
    • list_channels [channelinfoitem]

    connections =>channels 建立connections是长连接,channels是挂在connection之上的

    • status
    • environment
    • Miscellaneous 混合命令

erlang和rabbitmq的区别

一个是语言环境,一个是应用程序

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

环境配置文件:measia【erlang的分布式数据库】, config配置文件,log日志的存放路径的设置

config文件。。mongodb , redis

在这里插入图片描述

知道端口:5673

log信息 默认打出来的是info格式

{log_levels,[{connection,info},{channel,info}]}

这样日志特别大,对磁盘的压力就很大

{vm_memory_high_watermark,0.4}

指定ram占内存百分比上限,connection达到阀值会阻塞blocked【报警】 让管道流变小

{disk_free_limit,5000000} 也是会触发报警机制,让管道流变小

使用C#连接rabbitmq

.Net Client

1.api文档 类似msdn

2.下载方式有两种: 1)通过官网下载 2)通过nuget下载

  • connectiong to a Broker

服务器配置,guest不可以被外网访问,在生产环境中,默认都是新增用户的

这是生产者代码:

using System;
using System.Net.Http;
using System.Text;
using RabbitMQ.Client;namespace RabbitmqDemo
{class Program{static void Main(string[] args){ConnectionFactory factory = new ConnectionFactory(){HostName = "127.0.0.1",UserName = "datamip",Password = "123456"};//第一步:创建connectionvar connection = factory.CreateConnection();//第二步:创建channelvar channel = connection.CreateModel();//第三步:声明交换机(因为rabbitmq已经有了自定义的amqp default exchange ,所以这里不声明也能自动创建)//第四步:创建一个队列(queue)channel.QueueDeclare("mytest", true, false, false);var msg = Encoding.UTF8.GetBytes("你好"); //第五步:发布消息channel.BasicPublish(string.Empty,"mytest",basicProperties:null,body:msg);// using...// connection.Dispose();// channel.Dispose();Console.WriteLine("Hello World!");}}
}

这是消费者代码:

using System;
using System.Text;
using RabbitMQ.Client;namespace RabbitmqConsumer
{class Program{static void Main(string[] args){ConnectionFactory factory = new ConnectionFactory(){HostName = "127.0.0.1",UserName = "datamip",Password = "123456"};//第一步:创建connectionvar connection = factory.CreateConnection();//第二步:创建channelvar channel = connection.CreateModel();//第三步:声明交换机(因为rabbitmq已经有了自定义的amqp default exchange ,所以这里不声明也能自动创建)//第四步:获取消息BasicGetResult basicGetResult = channel.BasicGet("mytest", true);ReadOnlyMemory<byte> msg = basicGetResult.Body;Console.WriteLine("Hello World!");}}
}
交换机机制:exchange:direct/fanout/headers/topic

在这里插入图片描述

Demo中用到的BasicGet是主动的去拉取,subscribe和publish是发布订阅的模式

workqueue方式

EventingBasicConsumer:多个consumer可以分摊我们的cpu计算压力

如果申明自定义交换机,一定要手动绑定

//第三步:声明交换机(因为rabbitmq已经有了自定义的amqp default exchange ,所以这里不声明也能自动创建)可选channel.ExchangeDeclare("myexchange",ExchangeType.Direct,true,false,null);//第四步:创建一个队列(queue)channel.QueueDeclare("mytest", true, false, false);channel.QueueBind("mytest", "myexchange", "mytest", null);for (int i = 0; i < 100; i++){var msg = Encoding.UTF8.GetBytes(string.Format("{0}:{1}",i,"你好"));//第五步:发布消息// channel.BasicPublish(string.Empty, "mytest", basicProperties: null, body: msg);channel.BasicPublish("myexchange", "mytest", basicProperties: null, body: msg);}

在这里插入图片描述

Routing模式

做一个日志处理分发,将日志级别不同的存到不同的队列里去,info/debug/warn存到log_else队列中,等待消费者消费;将error存到log_error队列中,等待消费者消费;在这里注意,我们可以在生产者/消费者任意一端构建交换机、队列,因此最合适的方式是,将不同级别队列的构建放在消费者一端,在生产者端,将消息的存进不同的路由中(routekey中)
在这里插入图片描述

生产者:
在这里插入图片描述

在这里插入图片描述

消费者:

在这里插入图片描述

fanout:exchange多播的现象

应用场景相当多

1.下订单流程:如果订单提交,同时发送短信和推送

2.cs软件弹出消息,普通情况下,我们是轮询的方式,我们在cs中绑定fanout exchange,这时候服务器有消息的话,可以及时推送

3.客户关怀千人千面

淘宝=>催付、付款后提醒、发货提醒、签收提醒

VjDN-1603794767791)]

消费者:

[外链图片转存中…(img-NEHpO6ST-1603794767791)]

[外链图片转存中…(img-I3GOlfRN-1603794767792)]

fanout:exchange多播的现象

应用场景相当多

1.下订单流程:如果订单提交,同时发送短信和推送

2.cs软件弹出消息,普通情况下,我们是轮询的方式,我们在cs中绑定fanout exchange,这时候服务器有消息的话,可以及时推送

3.客户关怀千人千面

淘宝=>催付、付款后提醒、发货提醒、签收提醒

? =>给用户关联推荐、彩信和邮件、短信给用户推荐关联推荐

  相关解决方案