阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

集成RabbitMQ

280次阅读
没有评论

共计 5673 个字符,预计需要花费 15 分钟才能阅读完成。

前面我们讲了 ActiveMQ Artemis,它实现了 JMS 的消息服务协议。JMS 是 JavaEE 的消息服务标准接口,但是,如果 Java 程序要和另一种语言编写的程序通过消息服务器进行通信,那么 JMS 就不太适合了。

AMQP 是一种使用广泛的独立于语言的消息协议,它的全称是 Advanced Message Queuing Protocol,即高级消息队列协议,它定义了一种二进制格式的消息流,任何编程语言都可以实现该协议。实际上,Artemis 也支持 AMQP,但实际应用最广泛的 AMQP 服务器是使用 Erlang 编写的 RabbitMQ。

安装 RabbitMQ

我们先从 RabbitMQ 的官网下载并安装 RabbitMQ,安装和启动 RabbitMQ 请参考官方文档。要验证启动是否成功,可以访问 RabbitMQ 的管理后台 http://localhost:15672,如能看到登录界面表示 RabbitMQ 启动成功:

集成 RabbitMQ

RabbitMQ 后台管理的默认用户名和口令均为guest

AMQP 协议

AMQP 协议和前面我们介绍的 JMS 协议有所不同。在 JMS 中,有两种类型的消息通道:

  1. 点对点的 Queue,即 Producer 发送消息到指定的 Queue,接收方从 Queue 收取消息;
  2. 一对多的 Topic,即 Producer 发送消息到指定的 Topic,任意多个在线的接收方均可从 Topic 获得一份完整的消息副本。

但是 AMQP 协议比 JMS 要复杂一点,它只有 Queue,没有 Topic,并且引入了 Exchange 的概念。当 Producer 想要发送消息的时候,它将消息发送给 Exchange,由 Exchange 将消息根据各种规则投递到一个或多个 Queue:

                                    ┌───────┐
                                ┌──▶│Queue-1│
                  ┌──────────┐  │   └───────┘
              ┌──▶│Exchange-1│──┤
┌──────────┐  │   └──────────┘  │   ┌───────┐
│Producer-1│──┤                 ├──▶│Queue-2│
└──────────┘  │   ┌──────────┐  │   └───────┘
              └──▶│Exchange-2│──┤
                  └──────────┘  │   ┌───────┐
                                └──▶│Queue-3│
                                    └───────┘

如果某个 Exchange 总是把消息发送到固定的 Queue,那么这个消息通道就相当于 JMS 的 Queue。如果某个 Exchange 把消息发送到多个 Queue,那么这个消息通道就相当于 JMS 的 Topic。和 JMS 的 Topic 相比,Exchange 的投递规则更灵活,比如一个“登录成功”的消息被投递到 Queue- 1 和 Queue-2,而“登录失败”的消息则被投递到 Queue-3。这些路由规则称之为 Binding,通常都在 RabbitMQ 的管理后台设置。

我们以具体的业务为例子,在 RabbitMQ 中,首先创建 3 个 Queue,分别用于发送邮件、短信和 App 通知:

集成 RabbitMQ

创建 Queue 时注意到可配置为持久化(Durable)和非持久化(Transient),当 Consumer 不在线时,持久化的 Queue 会暂存消息,非持久化的 Queue 会丢弃消息。

紧接着,我们在 Exchanges 中创建一个 Direct 类型的 Exchange,命名为registration,并添加如下两个 Binding:

集成 RabbitMQ

上述 Binding 的规则就是:凡是发送到 registration 这个 Exchange 的消息,均被发送到 q_mailq_sms这两个 Queue。

我们再创建一个 Direct 类型的 Exchange,命名为login,并添加如下 Binding:

集成 RabbitMQ

上述 Binding 的规则稍微复杂一点,当发送消息给 login 这个 Exchange 时,如果消息没有指定 Routing Key,则被投递到 q_appq_mail,如果消息指定了 Routing Key=”login_failed”,那么消息被投递到q_sms

配置好 RabbitMQ 后,我们就可以基于 Spring Boot 开发 AMQP 程序。

使用 RabbitMQ

我们首先创建 Spring Boot 工程springboot-rabbitmq,并添加如下依赖引入 RabbitMQ:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后在 application.yml 中添加 RabbitMQ 相关配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

我们还需要在 Application 中添加一个MessageConverter

import org.springframework.amqp.support.converter.MessageConverter;

@SpringBootApplication
public class Application {
    ...

    @Bean
    MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}

MessageConverter用于将 Java 对象转换为 RabbitMQ 的消息。默认情况下,Spring Boot 使用 SimpleMessageConverter,只能发送Stringbyte[]类型的消息,不太方便。使用Jackson2JsonMessageConverter,我们就可以发送 JavaBean 对象,由 Spring Boot 自动序列化为 JSON 并以文本消息传递。

因为引入了 starter,所有 RabbitMQ 相关的 Bean 均自动装配,我们需要在 Producer 注入的是RabbitTemplate

@Component
public class MessagingService {@Autowired
    RabbitTemplate rabbitTemplate;

    public void sendRegistrationMessage(RegistrationMessage msg) {rabbitTemplate.convertAndSend("registration", "", msg);
    }

    public void sendLoginMessage(LoginMessage msg) {String routingKey = msg.success ? "" : "login_failed";
        rabbitTemplate.convertAndSend("login", routingKey, msg);
    }
}

发送消息时,使用 convertAndSend(exchange, routingKey, message) 可以指定 Exchange、Routing Key 以及消息本身。这里传入 JavaBean 后会自动序列化为 JSON 文本。上述代码将 RegistrationMessage 发送到 registration,将LoginMessage 发送到login,并根据登录是否成功来指定 Routing Key。

接收消息时,需要在消息处理的方法上标注@RabbitListener

@Component
public class QueueMessageListener {final Logger logger = LoggerFactory.getLogger(getClass());

    static final String QUEUE_MAIL = "q_mail";
    static final String QUEUE_SMS = "q_sms";
    static final String QUEUE_APP = "q_app";

    @RabbitListener(queues = QUEUE_MAIL)
    public void onRegistrationMessageFromMailQueue(RegistrationMessage message) throws Exception {logger.info("queue {} received registration message: {}", QUEUE_MAIL, message);
    }

    @RabbitListener(queues = QUEUE_SMS)
    public void onRegistrationMessageFromSmsQueue(RegistrationMessage message) throws Exception {logger.info("queue {} received registration message: {}", QUEUE_SMS, message);
    }

    @RabbitListener(queues = QUEUE_MAIL)
    public void onLoginMessageFromMailQueue(LoginMessage message) throws Exception {logger.info("queue {} received message: {}", QUEUE_MAIL, message);
    }

    @RabbitListener(queues = QUEUE_SMS)
    public void onLoginMessageFromSmsQueue(LoginMessage message) throws Exception {logger.info("queue {} received message: {}", QUEUE_SMS, message);
    }

    @RabbitListener(queues = QUEUE_APP)
    public void onLoginMessageFromAppQueue(LoginMessage message) throws Exception {logger.info("queue {} received message: {}", QUEUE_APP, message);
    }
}

上述代码一共定义了 5 个 Consumer,监听 3 个 Queue。

启动应用程序,我们注册一个新用户,然后发送一条 RegistrationMessage 消息。此时,根据 registration 这个 Exchange 的设定,我们会在两个 Queue 收到消息:

... c.i.learnjava.service.UserService        : try register by [email protected]...
... c.i.learnjava.web.UserController         : user registered: [email protected]
... c.i.l.service.QueueMessageListener       : queue q_mail received registration message: [RegistrationMessage: [email protected], name=Bob, timestamp=1594559871495]
... c.i.l.service.QueueMessageListener       : queue q_sms received registration message: [RegistrationMessage: [email protected], name=Bob, timestamp=1594559871495]

当我们登录失败时,发送 LoginMessage 并设定 Routing Key 为 login_failed,此时,只有q_sms 会收到消息:

... c.i.learnjava.service.UserService        : try login by [email protected]...
... c.i.l.service.QueueMessageListener       : queue q_sms received message: [LoginMessage: [email protected], name=(unknown), success=false, timestamp=1594559886722]

登录成功后,发送 LoginMessage,此时,q_mailq_app将收到消息:

... c.i.learnjava.service.UserService        : try login by [email protected]...
... c.i.l.service.QueueMessageListener       : queue q_mail received message: [LoginMessage: [email protected], name=Bob, success=true, timestamp=1594559895251]
... c.i.l.service.QueueMessageListener       : queue q_app received message: [LoginMessage: [email protected], name=Bob, success=true, timestamp=1594559895251]

RabbitMQ 还提供了使用 Topic 的 Exchange(此 Topic 指消息的标签,并非 JMS 的 Topic 概念),可以使用 * 进行匹配并路由。可见,掌握 RabbitMQ 的核心是理解其消息的路由规则。

直接指定一个 Queue 并投递消息也是可以的,此时指定 Routing Key 为 Queue 的名称即可,因为 RabbitMQ 提供了一个 default exchange 用于根据 Routing Key 查找 Queue 并直接投递消息到指定的 Queue。但是要实现一对多的投递就必须自己配置 Exchange。

练习

在 Spring Boot 中使用 RabbitMQ。

下载练习

小结

Spring Boot 提供了 AMQP 的集成,默认使用 RabbitMQ 作为 AMQP 消息服务器。

使用 RabbitMQ 发送消息时,理解 Exchange 如何路由至一个或多个 Queue 至关重要。

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-08-05发表,共计5673字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7982472
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
150元打造低成本NAS小钢炮,捡一块3865U工控板

150元打造低成本NAS小钢炮,捡一块3865U工控板

150 元打造低成本 NAS 小钢炮,捡一块 3865U 工控板 一块二手的熊猫 B3 工控板 3865U,搭...
告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

  告别 Notion 焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁” 引言 在数字笔记工...
【1024程序员】我劝你赶紧去免费领一个AWS、华为云等的主机

【1024程序员】我劝你赶紧去免费领一个AWS、华为云等的主机

【1024 程序员】我劝你赶紧去免费领一个 AWS、华为云等的主机 每年 10 月 24 日,程序员们都会迎来...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
星哥带你玩飞牛NAS-8:有了NAS你可以干什么?软件汇总篇

星哥带你玩飞牛NAS-8:有了NAS你可以干什么?软件汇总篇

星哥带你玩飞牛 NAS-8:有了 NAS 你可以干什么?软件汇总篇 前言 哈喽各位玩友!我是是星哥,不少朋友私...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地 大家好,我是星哥,今天教大家在飞牛 NA...
国产开源公众号AI知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率

国产开源公众号AI知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率

国产开源公众号 AI 知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率 大家好,我是星哥,...
星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛 NAS-14:解锁公网自由!Lucky 功能工具安装使用保姆级教程 作为 NAS 玩家,咱们最...
多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞定

多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞定

多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞...
小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比 星哥玩云,带你从小白到上云高手。今天咱们就来聊聊——什...