阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

集成RabbitMQ

276次阅读
没有评论

共计 5673 个字符,预计需要花费 15 分钟才能阅读完成。

前面我们讲了 ActiveMQ Artemis,它实现了 JMS 的消息服务协议。JMS 是 JavaEE 的消息服务标准接口,但是,如果 Java 程序要和另一种语言编写的程序通过消息服务器进行通信,那么 JMS 就不太适合了。

AMQP 是一种使用广泛的独立于语言的消息协议,它的全称是 Advanced Message Queuing Protocol,即高级消息队列协议,它定义了一种二进制格式的消息流,任何编程语言都可以实现该协议。实际上,Artemis 也支持 AMQP,但实际应用最广泛的 AMQP 服务器是使用 Erlang 编写的 RabbitMQ。

安装 RabbitMQ

我们先从 RabbitMQ 的官网下载并安装 RabbitMQ,安装和启动 RabbitMQ 请参考官方文档。要验证启动是否成功,可以访问 RabbitMQ 的管理后台 http://localhost:15672,如能看到登录界面表示 RabbitMQ 启动成功:

集成 RabbitMQ

RabbitMQ 后台管理的默认用户名和口令均为guest

AMQP 协议

AMQP 协议和前面我们介绍的 JMS 协议有所不同。在 JMS 中,有两种类型的消息通道:

  1. 点对点的 Queue,即 Producer 发送消息到指定的 Queue,接收方从 Queue 收取消息;
  2. 一对多的 Topic,即 Producer 发送消息到指定的 Topic,任意多个在线的接收方均可从 Topic 获得一份完整的消息副本。

但是 AMQP 协议比 JMS 要复杂一点,它只有 Queue,没有 Topic,并且引入了 Exchange 的概念。当 Producer 想要发送消息的时候,它将消息发送给 Exchange,由 Exchange 将消息根据各种规则投递到一个或多个 Queue:

                                    ┌───────┐
                                ┌──▶│Queue-1│
                  ┌──────────┐  │   └───────┘
              ┌──▶│Exchange-1│──┤
┌──────────┐  │   └──────────┘  │   ┌───────┐
│Producer-1│──┤                 ├──▶│Queue-2│
└──────────┘  │   ┌──────────┐  │   └───────┘
              └──▶│Exchange-2│──┤
                  └──────────┘  │   ┌───────┐
                                └──▶│Queue-3│
                                    └───────┘

如果某个 Exchange 总是把消息发送到固定的 Queue,那么这个消息通道就相当于 JMS 的 Queue。如果某个 Exchange 把消息发送到多个 Queue,那么这个消息通道就相当于 JMS 的 Topic。和 JMS 的 Topic 相比,Exchange 的投递规则更灵活,比如一个“登录成功”的消息被投递到 Queue- 1 和 Queue-2,而“登录失败”的消息则被投递到 Queue-3。这些路由规则称之为 Binding,通常都在 RabbitMQ 的管理后台设置。

我们以具体的业务为例子,在 RabbitMQ 中,首先创建 3 个 Queue,分别用于发送邮件、短信和 App 通知:

集成 RabbitMQ

创建 Queue 时注意到可配置为持久化(Durable)和非持久化(Transient),当 Consumer 不在线时,持久化的 Queue 会暂存消息,非持久化的 Queue 会丢弃消息。

紧接着,我们在 Exchanges 中创建一个 Direct 类型的 Exchange,命名为registration,并添加如下两个 Binding:

集成 RabbitMQ

上述 Binding 的规则就是:凡是发送到 registration 这个 Exchange 的消息,均被发送到 q_mailq_sms这两个 Queue。

我们再创建一个 Direct 类型的 Exchange,命名为login,并添加如下 Binding:

集成 RabbitMQ

上述 Binding 的规则稍微复杂一点,当发送消息给 login 这个 Exchange 时,如果消息没有指定 Routing Key,则被投递到 q_appq_mail,如果消息指定了 Routing Key=”login_failed”,那么消息被投递到q_sms

配置好 RabbitMQ 后,我们就可以基于 Spring Boot 开发 AMQP 程序。

使用 RabbitMQ

我们首先创建 Spring Boot 工程springboot-rabbitmq,并添加如下依赖引入 RabbitMQ:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后在 application.yml 中添加 RabbitMQ 相关配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

我们还需要在 Application 中添加一个MessageConverter

import org.springframework.amqp.support.converter.MessageConverter;

@SpringBootApplication
public class Application {
    ...

    @Bean
    MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}

MessageConverter用于将 Java 对象转换为 RabbitMQ 的消息。默认情况下,Spring Boot 使用 SimpleMessageConverter,只能发送Stringbyte[]类型的消息,不太方便。使用Jackson2JsonMessageConverter,我们就可以发送 JavaBean 对象,由 Spring Boot 自动序列化为 JSON 并以文本消息传递。

因为引入了 starter,所有 RabbitMQ 相关的 Bean 均自动装配,我们需要在 Producer 注入的是RabbitTemplate

@Component
public class MessagingService {@Autowired
    RabbitTemplate rabbitTemplate;

    public void sendRegistrationMessage(RegistrationMessage msg) {rabbitTemplate.convertAndSend("registration", "", msg);
    }

    public void sendLoginMessage(LoginMessage msg) {String routingKey = msg.success ? "" : "login_failed";
        rabbitTemplate.convertAndSend("login", routingKey, msg);
    }
}

发送消息时,使用 convertAndSend(exchange, routingKey, message) 可以指定 Exchange、Routing Key 以及消息本身。这里传入 JavaBean 后会自动序列化为 JSON 文本。上述代码将 RegistrationMessage 发送到 registration,将LoginMessage 发送到login,并根据登录是否成功来指定 Routing Key。

接收消息时,需要在消息处理的方法上标注@RabbitListener

@Component
public class QueueMessageListener {final Logger logger = LoggerFactory.getLogger(getClass());

    static final String QUEUE_MAIL = "q_mail";
    static final String QUEUE_SMS = "q_sms";
    static final String QUEUE_APP = "q_app";

    @RabbitListener(queues = QUEUE_MAIL)
    public void onRegistrationMessageFromMailQueue(RegistrationMessage message) throws Exception {logger.info("queue {} received registration message: {}", QUEUE_MAIL, message);
    }

    @RabbitListener(queues = QUEUE_SMS)
    public void onRegistrationMessageFromSmsQueue(RegistrationMessage message) throws Exception {logger.info("queue {} received registration message: {}", QUEUE_SMS, message);
    }

    @RabbitListener(queues = QUEUE_MAIL)
    public void onLoginMessageFromMailQueue(LoginMessage message) throws Exception {logger.info("queue {} received message: {}", QUEUE_MAIL, message);
    }

    @RabbitListener(queues = QUEUE_SMS)
    public void onLoginMessageFromSmsQueue(LoginMessage message) throws Exception {logger.info("queue {} received message: {}", QUEUE_SMS, message);
    }

    @RabbitListener(queues = QUEUE_APP)
    public void onLoginMessageFromAppQueue(LoginMessage message) throws Exception {logger.info("queue {} received message: {}", QUEUE_APP, message);
    }
}

上述代码一共定义了 5 个 Consumer,监听 3 个 Queue。

启动应用程序,我们注册一个新用户,然后发送一条 RegistrationMessage 消息。此时,根据 registration 这个 Exchange 的设定,我们会在两个 Queue 收到消息:

... c.i.learnjava.service.UserService        : try register by [email protected]...
... c.i.learnjava.web.UserController         : user registered: [email protected]
... c.i.l.service.QueueMessageListener       : queue q_mail received registration message: [RegistrationMessage: [email protected], name=Bob, timestamp=1594559871495]
... c.i.l.service.QueueMessageListener       : queue q_sms received registration message: [RegistrationMessage: [email protected], name=Bob, timestamp=1594559871495]

当我们登录失败时,发送 LoginMessage 并设定 Routing Key 为 login_failed,此时,只有q_sms 会收到消息:

... c.i.learnjava.service.UserService        : try login by [email protected]...
... c.i.l.service.QueueMessageListener       : queue q_sms received message: [LoginMessage: [email protected], name=(unknown), success=false, timestamp=1594559886722]

登录成功后,发送 LoginMessage,此时,q_mailq_app将收到消息:

... c.i.learnjava.service.UserService        : try login by [email protected]...
... c.i.l.service.QueueMessageListener       : queue q_mail received message: [LoginMessage: [email protected], name=Bob, success=true, timestamp=1594559895251]
... c.i.l.service.QueueMessageListener       : queue q_app received message: [LoginMessage: [email protected], name=Bob, success=true, timestamp=1594559895251]

RabbitMQ 还提供了使用 Topic 的 Exchange(此 Topic 指消息的标签,并非 JMS 的 Topic 概念),可以使用 * 进行匹配并路由。可见,掌握 RabbitMQ 的核心是理解其消息的路由规则。

直接指定一个 Queue 并投递消息也是可以的,此时指定 Routing Key 为 Queue 的名称即可,因为 RabbitMQ 提供了一个 default exchange 用于根据 Routing Key 查找 Queue 并直接投递消息到指定的 Queue。但是要实现一对多的投递就必须自己配置 Exchange。

练习

在 Spring Boot 中使用 RabbitMQ。

下载练习

小结

Spring Boot 提供了 AMQP 的集成,默认使用 RabbitMQ 作为 AMQP 消息服务器。

使用 RabbitMQ 发送消息时,理解 Exchange 如何路由至一个或多个 Queue 至关重要。

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-08-05发表,共计5673字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7898148
文章搜索
热门文章
开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南

开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南

开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南 大家好,我是星哥。之前介绍了腾讯云的 Code...
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
我用AI做了一个1978年至2019年中国大陆企业注册的查询网站

我用AI做了一个1978年至2019年中国大陆企业注册的查询网站

我用 AI 做了一个 1978 年至 2019 年中国大陆企业注册的查询网站 最近星哥在 GitHub 上偶然...
星哥带你玩飞牛NAS-5:飞牛NAS中的Docker功能介绍

星哥带你玩飞牛NAS-5:飞牛NAS中的Docker功能介绍

星哥带你玩飞牛 NAS-5:飞牛 NAS 中的 Docker 功能介绍 大家好,我是星哥,今天给大家带来如何在...
浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍 前言 在 AI 自动化快速发展的当下,浏览器早已不再只是...
星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛 NAS-14:解锁公网自由!Lucky 功能工具安装使用保姆级教程 作为 NAS 玩家,咱们最...
颠覆 AI 开发效率!开源工具一站式管控 30+大模型ApiKey,秘钥付费+负载均衡全搞定

颠覆 AI 开发效率!开源工具一站式管控 30+大模型ApiKey,秘钥付费+负载均衡全搞定

  颠覆 AI 开发效率!开源工具一站式管控 30+ 大模型 ApiKey,秘钥付费 + 负载均衡全...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
星哥带你玩飞牛 NAS-10:备份微信聊天记录、数据到你的NAS中!

星哥带你玩飞牛 NAS-10:备份微信聊天记录、数据到你的NAS中!

星哥带你玩飞牛 NAS-10:备份微信聊天记录、数据到你的 NAS 中! 大家对「数据安全感」的需求越来越高 ...
支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare 也瘫了连监控都挂,根因藏在哪? 最近两天的互联网堪称“故障...
星哥带你玩飞牛NAS-11:咪咕视频订阅部署全攻略

星哥带你玩飞牛NAS-11:咪咕视频订阅部署全攻略

星哥带你玩飞牛 NAS-11:咪咕视频订阅部署全攻略 前言 在家庭影音系统里,NAS 不仅是存储中心,更是内容...
星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定! 前言 作为 NAS 玩家,你是否总被这些...
安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装 Black 群晖 DSM7.2 系统安装教程(在 Vmware 虚拟机中、实体机均可)! 前言 大家好,...