阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Redis源码分析之发布订阅(pub/sub)

451次阅读
没有评论

共计 7731 个字符,预计需要花费 20 分钟才能阅读完成。

Redis 算是缓存界的老大哥了,最近做的事情对 Redis 依赖较多,使用了里面的发布订阅功能,事务功能以及 SortedSet 等数据结构,后面准备好好学习总结一下 Redis 的一些知识点。

先看下 redis 发布订阅的结构:

redis 发布订阅结构

redis 发布订阅结构

其中发布者跟订阅者之间通过 channel 进行交互,channel 分为两种模式。

一、redis 发布订阅命令简介

redis 中为发布订阅(pub/sub)功能提供了六个命令,分为两种模式。

  1. 由 subscribe,unsubscribe 组成,它们是负责订阅有确定名称的 channel,例如 subscribe test 表示订阅名字为 test 的 channel。
  2. 由 psubscribe,punsubscribe 组成,是负责订阅模糊名字的 channel,例如 psubscribe test* 表示订阅所有以 test 开头的 channel。

最后再加上发布命令 publish 以及查看订阅相关信息的 pubsub 命令组成。

二、redis 发布订阅源码分析

redis 所有的命令及其处理函数都放在了 server.c 文件的开头,从其中找出发布订阅功能相关的命令信息。

    {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
    {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
    {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},

这里可以看出创建一条命令需要很多参数,我们这里只需要关注前两个参数,第一个参数表示命令的内容,第二个表示该命令对应的处理函数。

普通模式订阅 subscribe 函数:
该命令支持多个参数,即 subscribe channel1,channel2…

void subscribeCommand(client *c) {
    int j;
    // 这里挨个处理 subscribe 的参数,因为命令本身被作为参数 0 所以从 1 开始处理后面的参数
    for (j = 1; j < c->argc; j++)
        // 订阅每个频道
        pubsubSubscribeChannel(c,c->argv[j]);
    // 这里设置客户端的状态,下面会解释这个状态的作用
    c->flags |= CLIENT_PUBSUB;
}

在 server.c 文件中,processCommand 函数是在调用具体命令函数之前的判断逻辑,其中有一段:

/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

这里注释也写的很清楚,就是当 client 处于 pub/sub 上下文时,只接收订阅相关命令以及一个 ping 命令,这就解释了上面 subscribeCommand 函数中为什么要设置客户端 flag 字段。

接下来看下订阅的具体逻辑:

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    // 把指定 channel 加入到 client 的 pubsub_channels 哈希表中
    // 不成功说明已经订阅了该频道
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        // 这里是把该 channel 加入到 client 的哈希表中,引用加 1
        incrRefCount(channel);
        // 在 server 的发布订阅哈希表中查找指定 channel
        de = dictFind(server.pubsub_channels,channel);
        // 如果该 channel 还不存在,则创建
        if (de == NULL) {
            // 创建一个空 list
            clients = listCreate();
            // 把 channel 加入到 server 的哈希表中,value 就是该 channel 的所有订阅者
            dictAdd(server.pubsub_channels,channel,clients);
            // 该 channel 引用加 1
            incrRefCount(channel);
        } else {clients = dictGetVal(de);
        }
        // 把 client 加入到该 channel 的订阅列表中
        listAddNodeTail(clients,c);
    }
    // 一系列通知客户端的操作
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

总结一下,订阅其实就是把指定 channel 分别加入到 client 跟 server 的 pub/sub 哈希表中,然后在 server 端保存订阅了该 channle 的所有 client 列表,如下图:

普通模式发布订阅数据结构

普通模式发布订阅数据结构

下面看一下 publish 发布命令:
例如:publish channelName msg

void publishCommand(client *c) {
    // 发布逻辑
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    // 这里是关于集群或者 AOF 的操作
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    // 返回给 client 通知了的订阅者数
    addReplyLongLong(c,receivers);
}

重点看下发布函数的源码:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    // 根据上面的订阅源码,这里就是取出订阅该 channel 的所有 clients
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        // 获取 client 的链表
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        // 由 client 链表创建它的迭代器,c++ 代码真是无力吐槽
        listRewind(list,&li);
        // 遍历所有 client 并发送消息
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    
    // 开始模糊匹配的逻辑处理,模糊模式使用的是链表而不是哈希表,后面会讲
    if (listLength(server.pubsub_patterns)) {
        // 创建模糊规则的迭代器 li
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        // 遍历所有的模糊模式,如果匹配成功则发送消息
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            // 判断当前 channel 是否可以匹配模糊规则
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

从上面的 publish 处理函数可以看出每次进行消息发布的时候,都会向普通模式跟模糊模式发布消息,同时也能看出 普通模式跟模糊模式使用的是两种不同的数据结构,下面看下模糊订阅模式。

模糊模式订阅 psubscribe 函数:

//psubscribe 命令对应的处理函数
void psubscribeCommand(client *c) {
    int j;
    // 挨个订阅 client 指定的 pattern
    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
    // 修改 client 状态
    c->flags |= CLIENT_PUBSUB;
}

int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;
    // 判断 client 是否已经订阅该 pattern,这里与普通模式不同,是个链表
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        // 把指定 pattern 加入到 client 的 pattern 链表中
        listAddNodeTail(c->pubsub_patterns,pattern);
        // 引用计数 +1
        incrRefCount(pattern);
        // 这里是创建一个 pattern 对象,并指向该 client,加入到 server 的 pattern 链表中
        // 从这里可以看出,多个 client 订阅同一个 pattern 会创建多个 patter 对象,与普通模式不同
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    // 通知客户端
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

通过分析上面的源码可以总结一下模糊订阅中的数据结构,如下图:

模糊发布订阅模式数据结构

模糊发布订阅模式数据结构

注:正如上面提到的,模糊模式中,一个 pat 对象中包含一个 pattern 规则跟一个 client 指针,也就是说当多个 client 模糊订阅同一个 pattern 时同样会为每个 client 都创建一个节点。

普通模式取消订阅 unsubscribe 函数:
取消就相对简单了,说白了就是把上面锁保存在 server 跟 client 端的数据删除。

取消订阅入口
void unsubscribeCommand(client *c) {
    // 如果该命令没有参数,则把 channel 全部取消
    if (c->argc == 1) {pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;
        // 迭代取消置顶 channel
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
    // 如果 channel 被全部取消,则修改 client 状态,这样 client 就可以发送其他命令了
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

// 一次性取消订阅所有 channel
int pubsubUnsubscribeAllChannels(client *c, int notify) {
     // 取出 client 端所有的 channel
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;

    while((de = dictNext(di)) != NULL) {robj *channel = dictGetKey(de);
        // 最终也是挨个取消 channel
        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
    
    // 如果 client���面都没有订阅,依然返回响应
    if (notify && count == 0) {addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    // 释放空间
    dictReleaseIterator(di);
    return count;
}

// 取消订阅指定 channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;
    // 从 client 中删除指定 channel
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        // 删除服务端该 channel 中的指定 client
        de = dictFind(server.pubsub_channels,channel);
        serverAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        serverAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            // 如果删除完以后 channel 没有了订阅者,则把 channel 也删除
            dictDelete(server.pubsub_channels,channel);
        }
    }
    // 返回 client 响应
    if (notify) {addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }
    // 引用计数 -1
    decrRefCount(channel); 
    return retval;
}

由于模糊模式的取消订阅与普通模式类似,这里就不再贴代码了。

三、redis 发布订阅总结

整个发布订阅的代码比较简单清晰,一个值得思考的问题时普通模式跟模糊模式中分别使用了哈希表跟链表两种结构进行处理,而不是统一的,原因在于模糊模式不能精确匹配,需要遍历挨个判断,而哈希表的优势在于快速定位查找,在需要遍历跟模糊匹配的场景中并不适用。

下面关于 Redis 的文章您也可能喜欢,不妨参考下:

Ubuntu 14.04 下 Redis 安装及简单测试 http://www.linuxidc.com/Linux/2014-05/101544.htm

Redis 主从复制基本配置 http://www.linuxidc.com/Linux/2015-03/115610.htm

CentOS 7 下 Redis 的安装与配置 http://www.linuxidc.com/Linux/2017-02/140363.htm

Ubuntu 14.04 安装 Redis 与简单配置 http://www.linuxidc.com/Linux/2017-01/139075.htm

Ubuntu 16.04 环境中安装 PHP7.0 Redis 扩展 http://www.linuxidc.com/Linux/2016-09/135631.htm

Redis 单机 & 集群离线安装部署 http://www.linuxidc.com/Linux/2017-03/141403.htm

CentOS 7.0 安装 Redis 3.2.1 详细过程和使用常见问题 http://www.linuxidc.com/Linux/2016-09/135071.htm

Ubuntu 16.04 环境中安装 PHP7.0 Redis 扩展 http://www.linuxidc.com/Linux/2016-09/135631.htm

Ubuntu 15.10 下 Redis 集群部署文档 http://www.linuxidc.com/Linux/2016-06/132340.htm

Redis 实战 中文 PDF http://www.linuxidc.com/Linux/2016-04/129932.htm

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-11/148307.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计7731字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7964871
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

  星哥带你玩飞牛 NAS-16:飞牛云 NAS 换桌面,fndesk 图标管理神器上线! 引言 哈...
告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

  告别 Notion 焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁” 引言 在数字笔记工...
CSDN,你是老太太喝粥——无齿下流!

CSDN,你是老太太喝粥——无齿下流!

CSDN,你是老太太喝粥——无齿下流! 大家好,我是星哥,今天才思枯竭,不写技术文章了!来吐槽一下 CSDN。...
还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手! 前言 对于个人开发者、建站新手或是想搭建测试站点的从业者...
如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的 Nano Banana Pro?附赠邪修的用法 前言 大家好,我是星哥,今天来介绍谷歌的 ...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换,告别多工具切换

12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换,告别多工具切换

12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换...
还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手! 前言 对于个人开发者、建站新手或是想搭建测试站点的从业者...
仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

还在忍受动辄数百兆的“全家桶”监控软件?后台偷占资源、界面杂乱冗余,想查个 CPU 温度都要层层点选? 今天给...
星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

  星哥带你玩飞牛 NAS-16:不再错过公众号更新,飞牛 NAS 搭建 RSS 对于经常关注多个微...
星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定! 前言 作为 NAS 玩家,你是否总被这些...