阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

RabbitMQ 内部实现

413次阅读
没有评论

共计 5854 个字符,预计需要花费 15 分钟才能阅读完成。

RabbitMQ 的通讯协议

发送消息流程:

< AMQP
> 10,10: Connection.start
< 10,11: Connection.start_ok
> 10,30: Connection.tune
< 10,31: Connection.tune_ok
< 10,40: Connection.open
> 10,41: Connection.open_ok
< 20,10: Channel.open
> 20,11: Channel.open_ok
< 85,10: Confirm.select
> 85,11: Confirm.select_ok
< 60,40: Basic.publish
< Message
> 60,80: Basic.ack
< 20,40: Channel.close
> 20,41: Channel.close_ok
< 10,50: Connection.close
> 10,51: Connection.close_ok

接收消息流程:

< AMQP
> 10,10: Connection.start
< 10,11: Connection.start_ok
> 10,30: Connection.tune
< 10,31: Connection.tune_ok
< 10,40: Connection.open
> 10,41: Connection.open_ok
< 20,10: Channel.open
> 20,11: Channel.open_ok
< 60,10: Basic.qos
> 60,11: Basic.qos_ok
< 60,20: Basic.consume
> 60,21: Basic.consume_ok
> 60,60: Basic.deliver
> Message
< 60,80: Basic.ack

RabbitMQ 中的保证组播的实现方式

RabbitMQ 的保证组播 Guaranteed Multicast 实现在 gm.erl 文件中。
保证组播是指:进程组中的进程可以动态添加和删除,发送到进程组的消息在消息的生命周期中,保证到达进程组中的每个进程。消息的生命周期从消息发送时算起,直到消息发送者了解到消息已经抵达了进程组中的所有进程为止。
这种保证指:

  1. 如果进程组中包含进程,消息就会发送到其中的每个进程;
  2. 如果 P 先后发送了 m 和 m’,对于组中所有先于 m 消息发送时加入进程组的进程 P’,如果它收到了消息 m’, 则它必须要收到 m;
  3. 消息顺序保证,如果 P 先后发送了 m 和 m’,所有的成员要先后收到 m 和 m’; 但是因果顺序并不强制要求,比如 P 收到 m 然后发送 m’,则其他成员不保证先收到 m,后收到 m’。

保证组播最简单的实现方式是由发送者发给组中每个成员,这需要组中所有成员都相互连接。存在的问题是假如发送者在消息发送中失效,谁负责消息发送成功的确认。并且由发送者发送消息给每个成员,对于发送者 CPU 和网络压力也会很大。
RabbitMQ 并不是这样实现的,它将所有成员组成一个链表,顺着链表发送。这样不需要所有成员之间建立连接,假如发送者失败,后继者可以接替它的地位,并且发送消息对于每个成员的开销接近一致,不会发生某个成员压力过大的情况。
消息的异步发送可以提高整个发送过程的性能,在链表 A -> B -> C -> D 中,如果 A 发送消息,它不需要和 C、D 建立连接,当 D 确认收到消息之后 A 才能确认消息已经发送到每个成员。

Backing Queue 的实现

Backing queue 即消息存储的具体形式和引擎。RabbitMQ 的 backing queue 默认为 rabbit_variable_queue。为了提高内存和磁盘的利用率 rabbit_variable_queue 根据具体的情形将消息存储在内存或者磁盘中。rabbit_variable_queue 中消息的 4 种状态:

  • alpha 消息的内容和位置都在内存;
  • beta 消息内容在磁盘,位置在内存;
  • gamma 消息内容在磁盘,位置在内存和磁盘;
  • delta 消息内容和位置都在磁盘,表现为一组消息。

对于持久化消息 (delivery_mode = 2,并且 exchange、queue 都是持久化),消息和位置都存在磁盘,并且状态属于以上 4 状态之一。
大部分情况下,消息移动路线为 q1 -> q2 -> delta -> q3 -> q4, 4 个状态大部分都被跳过。q1, q4 只包含 alpha 状态消息;q2,q3 包含 beta 和 gamma 状态。当新消息到来的时候,会确定归属于哪个状态,可能会跳过 q1 或者 其他 q。当消费者读取消息的时候,首先读取 q4,q4 为空的话则读取 q3,q3 为空的话 delta 中的下一组消息被读入 q3,并减小 delta 的大小。对于持久化的消息发送到持久化的队列,消息会立刻写入 msg_store 和 queue_index。

RabbitMQ 中的消息索引 rabbit_queue_index

rabbit_queue_index 记录消息在磁盘文件系统的位置,启动的时候 RabbitMQ 会加载 index 文件, crush 后会进行恢复。
index 分为很多有序 index segment,index segment 从 0 开始递增,每个 index segment 文件 默认包含 16834 个 publish、develiver、ack。
所以 index segment 0 包含 id 为 0 – 16834-1 的消息,index segment 1 包含 id 为 16834 – 16834*2-1 的消息。message seq id % 16834 = 消息在 segment 中的位置。当消费速度很快,消息可能会不写入磁盘。当 segment 文件中的 publish num = ack number 时,就会删除这个 segment。

Journal 文件

由于消息可能在生产时就被立即消费,为了避免多余的重复 IO 操作,还有一个固定长度的 journal 文件顺序记录每个动作。当 journal 文件满的时候,其中的操作再附加到对应的 segment 文件上,并把 journal 文件清空。journal 文件中的 seq id 是绝对 id,segment 中的 id 为相对 id。journal 文件也完全保存在内存中,其中还保存了 segment id 和 对应文件状态的映射。所有操作都附加到文件状态上,当 journal 文件 flush 到 segment 文件中时,发现 publish 数量和 ack 数量一致就不需要写操作了。当需要 sync 一致性的时候,可以 sync journal 文件。
消息在 Journal 文件中状态是这样的:{(‘no_pub’|{MsgId, MsgProps, IsPersistent}),(‘del’|’no_del’), (‘ack’|’no_ack’)}
Journal 文件名为 journal.jif
qistate 记录 dirty count (日志中累积的数量) 和 segment 缓存信息和文件目录,每个动作发生 dirty count+1, segment 缓存信息则包含了 journal 中的消息信息 JEntries 和 Unack 的数量。
当新动作 append 到日志中时,JEntries 以 Array 的形式保存在内存中,RelSeq: {…},同时记录 Unack 的数量,新消息 +1,ack-1。
新消息 publish:
假如不存在则保存 {MsgId, MsgProps, IsPersistent}
RelSeq: {{MsgId, MsgProps, IsPersistent}, no_del, no_ack}
消息删除:
原来为 {{MsgId, MsgProps, IsPersistent}, no_del, no_ack} 并且 action 为 del
RelSeq: {{MsgId, MsgProps, IsPersistent}, del, no_ack}
消息 ack:
原来为 {no_pub, del, no_ack} 并且 action 为 ack
RelSeq: {no_pub, no_del, ack}
消息 ack:
原来为 {{MsgId, MsgProps, IsPersistent}, del, no_ack} 并且 action 为 ack
则真正删除。
当 dirty count 大于日志最大限度的时候就开始将数据 flush 到 segment 中。如果 unack = 0,说明完全被消费,删除对应的 segment 文件,否则 append 到对应 segment 文件上,过程中需要跳过被 del 和 ack 以及 no_pub 的消息。

Segment 文件及其他目录文件

Segment 文件后缀 .idx。结构为 <<REL_SQL_ONLY_PREFIX, IsPersistent, RelSeq, Body, [DEL], [ACK]>> 其中 DEL 和 ACK 为 <<REL_SQL_ONLY_PREFIX, RelSeq>>。
RabbitMQ 会为在 queues 目录下每个 queue 创建一个 MD5 映射的文件夹,当正常停机的时候会在文件夹下创建 clean.dot 文件, 并保存内存中的数据;非正常停机重启时,会从 journal 中恢复数据。journal 文件在 queue 空闲的时候也会 flush 到 segment 中。当加载 segment 的时候会采用 read ahead 方式读文件。

文件句柄缓存 file_handle_cache

这是 RabbitMQ 对 Erlang file API 的一个封装,无论是 index 还是消息体的存储都使用这些 API。
特点:

  • 每个文件支持一写多读
  • 写都是 append
  • 有 write buffer,没有 read buffer
  • 支持手动 sync
  • 都是调用 prim_file:* 函数

消息存储 rabbit_msg_store

Index,存储 MsgId 到消息位置的映射 {MsgId, RefCount, File, Offset, TotalSize}
FileSummary,ets 表存储文件到摘要的映射 {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
所有队列的消息存储在 1 个文件中,文件也是 append 模式,增大后分割新文件。当某些消息被消费的时候,文件就会产生空洞,ValidTotalSize 记录了真实空间占用,这个值用来计算是否执行文件 GC。文件 GC 通过将文件和其他文件合并实现,提高存储利用率和执行效率。非正常停机情况下,通过扫描 GC 过程影响的文件,重构 index 和 filesummary。默认情况下,一个文件中的空洞超过 50% 就会执行 GC。GC 过程为将右边的文件向左合并:左边的文件会重新写到临时文件构成没有空洞的文件,然后写回。右边文件中的有效数据 append 到左边文件中。然后更新 index 和 filesummary。消息体是引用计数的,同一个 msgid 的消息只存储一次,并记录写的次数,当删除同样次数的时候才真正将消息删除。引用数并不存储在消息体中。读消息的时候只有引用数大于 1 的消息才会会读到 cache 中,减少了内存占用,提高了性能。删除消息的时候,即使引用计数等于 0 也不把消息真正删除,为了防止消息存储到多队列,一个队列写入删除完成而另一个队列写入还未开始。当文件 GC 的时候会锁定文件。将操作排队处理。文件有 write back cache,一个 client 写入的内容可以立刻被其他 client 读取,这消除了 sync 时对读取的阻塞。当写进程非常繁忙的时候 读写也不会有延迟。由于 msg_store 有一个缓冲区,有很多写和删除事件在排队。flying_ets 用来处理 publish、remove 到来在真正写入之前的情况,通过 +1、- 1 计数抵消,这样可以避免一些写入。

rabbit_amqqueue

处理 queue 的声明和状态信息、高可用策略、权限验证、统计信息、以及提供 queue 操作接口。这些信息是存储在 mnesia #amqqueue 中。

rabbit_amqqueue_process

queue 操作处理进程。

去除 per-connection 流控

只需要修改:

%case {CS, (Throttle#throttle.conserve_resources orelse
%           credit_flow:blocked())} of

为:

case {CS, Throttle#throttle.conserve_resources} of

消费者负载均衡

rabbit_amqueue_process:deliver_msgs_to_consumers 中 queue:out 和 queue:in 实现了 RR 负载均衡。

根据消息 ID 从持久化文件中读取消息体

Ref = rabbit_guid:gen(),
MSCState = rabbit_msg_store:client_init(
msg_store_persistent, Ref, undefined, undefined),
MsgId = <<245,82,73,201,192,75,136,167,88,84,149,197,104,141,81,33>>,
MSCState1 = rabbit_msg_store:read(MsgId, MSCState).

CentOS 5.6 安装 RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm

RabbitMQ 客户端 C ++ 安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm

用 Python 尝试 RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm

RabbitMQ 集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm

Ubuntu 下 PHP + RabbitMQ 使用 http://www.linuxidc.com/Linux/2010-07/27309.htm

在 CentOS 上安装 RabbitMQ 流程 http://www.linuxidc.com/Linux/2011-12/49610.htm

RabbitMQ 概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ 入门教程  http://www.linuxidc.com/Linux/2015-02/113983.htm

RabbitMQ 的详细介绍 :请点这里
RabbitMQ 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2015-08/120928.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计5854字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7998222
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
安装并使用谷歌AI编程工具Antigravity(亲测有效)

安装并使用谷歌AI编程工具Antigravity(亲测有效)

  安装并使用谷歌 AI 编程工具 Antigravity(亲测有效) 引言 Antigravity...
飞牛NAS玩转Frpc并且配置,随时随地直连你的私有云

飞牛NAS玩转Frpc并且配置,随时随地直连你的私有云

飞牛 NAS 玩转 Frpc 并且配置,随时随地直连你的私有云 大家好,我是星哥,最近在玩飞牛 NAS。 在数...
Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集 在云原生体系中,Prometheus 已成为最主流的监控与报警...
星哥带你玩飞牛NAS-5:飞牛NAS中的Docker功能介绍

星哥带你玩飞牛NAS-5:飞牛NAS中的Docker功能介绍

星哥带你玩飞牛 NAS-5:飞牛 NAS 中的 Docker 功能介绍 大家好,我是星哥,今天给大家带来如何在...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face 免费服务器 +Docker 快速部署 HertzBeat 监控平台 ...
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸

一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸

一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸 前言 作为天天跟架构图、拓扑图死磕的...
300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

  300 元就能买到的 ” 小钢炮 ”?惠普 7L 四盘位小主机解析 最近...
你的云服务器到底有多强?宝塔跑分告诉你

你的云服务器到底有多强?宝塔跑分告诉你

你的云服务器到底有多强?宝塔跑分告诉你 为什么要用宝塔跑分? 宝塔跑分其实就是对 CPU、内存、磁盘、IO 做...
星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

  星哥带你玩飞牛 NAS-16:飞牛云 NAS 换桌面,fndesk 图标管理神器上线! 引言 哈...