阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

分布式缓存系统 Memcached

123次阅读
没有评论

共计 79115 个字符,预计需要花费 198 分钟才能阅读完成。

分布式缓存出于如下考虑,首先是缓存本身的水平线性扩展问题,其次是缓存大并发下的本身的性能问题,再次避免缓存的单点故障问题(多副本和副本一致性)。分布式缓存的核心技术包括首先是内存本身的管理问题,包括了内存的分配,管理和回收机制。其次是分布式管理和分布式算法,其次是缓存键值管理和路由。

什么是 Memcached

许多 Web 应用程序都将数据保存到 RDBMS 中,应用服务器从中读取数据并在浏览器中显示。但随着数据量的增大,访问的集中,就会出现 REBMS 的负担加重,数据库响应恶化,网站显示延迟等重大影响。Memcached 是高性能的分布式内存缓存服务器。一般的使用目的是通过缓存数据库查询结果,减少数据库的访问次数,以提高动态 Web 应用的速度、提高扩展性。如图:

分布式缓存系统 Memcached

Memcached 作为高速运行的分布式缓存服务器具有以下特点。

  • 协议简单:memcached 的服务器客户端通信并不使用复杂的 MXL 等格式,而是使用简单的基于文本的协议。
  • 基于 libevent 的事件处理:libevent 是个程序库,他将 Linux 的 epoll、BSD 类操作系统的 kqueue 等时间处理功能封装成统一的接口。memcached 使用这个 libevent 库,因此能在 Linux、BSD、Solaris 等操作系统上发挥其高性能。
  • 内置内存存储方式:为了提高性能,memcached 中保存的数据都存储在 memcached 内置的内存存储空间中。由于数据仅存在于内存中,因此重启 memcached,重启操作系统会导致全部数据消失。另外,内容容量达到指定的值之后 memcached 回自动删除不适用的缓存。
  • Memcached 不互通信的分布式:memcached 尽管是“分布式”缓存服务器,但服务器端并没有分布式功能。各个 memcached 不会互相通信以共享信息。他的分布式主要是通过客户端实现的。

Memcached 的内存管理

最近的 memcached 默认情况下采用了名为 Slab Allocatoion 的机制分配,管理内存。在改机制出现以前,内存的分配是通过对所有记录简单地进行 malloc 和 free 来进行的。但是这中方式会导致内存碎片,加重操作系统内存管理器的负担。

Slab Allocator 的基本原理是按照预先规定的大小,将分配的内存分割成特定长度的块,已完全解决内存碎片问题。Slab Allocation  的原理相当简单。将分配的内存分割成各种尺寸的块(chucnk),并把尺寸相同的块分成组(chucnk 的集合)如图:

分布式缓存系统 Memcached

而且 slab allocator 还有重复使用已分配内存的目的。也就是说,分配到的内存不会释放,而是重复利用。

Slab Allocation 的主要术语

  • Page : 分配给 Slab 的内存空间,默认是 1MB。分配给 Slab 之后根据 slab 的大小切分成 chunk.
  • Chunk : 用于缓存记录的内存空间。
  • Slab Class: 特定大小的 chunk 的组。

在 Slab 中缓存记录的原理

Memcached 根据收到的数据的大小,选择最合适数据大小的 Slab (图 2) memcached 中保存着 slab 内空闲 chunk 的列表,根据该列表选择 chunk, 然后将数据缓存于其中。

分布式缓存系统 Memcached

Memcached 在数据删除方面有效里利用资源

Memcached 删除数据时数据不会真正从 memcached 中消失。Memcached 不会释放已分配的内存。记录超时后,客户端就无法再看见该记录(invisible 透明),其存储空间即可重复使用。

Lazy Expriationmemcached 内部不会监视记录是否过期,而是在 get 时查看记录的时间戳,检查记录是否过期。这种技术称为 lazy expiration. 因此 memcached 不会再过期监视上耗费 CPU 时间。

对于缓存存储容量满的情况下的删除需要考虑多种机制,一方面是按队列机制,一方面应该对应缓存对象本身的优先级,根据缓存对象的优先级进行对象的删除。

LRU: 从缓存中有效删除数据的原理

Memcached 会优先使用已超时的记录空间,但即使如此,也会发生追加新纪录时空间不足的情况。此时就要使用名为 Least Recently Used (LRU)机制来分配空间。这就是删除最少使用的记录的机制。因此当 memcached 的内存空间不足时(无法从 slab class)获取到新空间时,就从最近未使用的记录中搜索,并将空间分配给新的记录。

Memcached 分布式

Memcached 虽然称为“分布式“缓存服务器,但服务器端并没有“分布式”的功能。Memcached 的分布式完全是有客户端实现的。现在我们就看一下 memcached 是怎么实现分布式缓存的。

例如下面假设 memcached 服务器有 node1~node3 三台,应用程序要保存键名为“tokyo”“kanagawa”“chiba”“saitama”“gunma”的数据。

首先向 memcached 中添加“tokyo”。将“tokyo”传给客户端程序库后,客户端实现的算法就会根据“键”来决定保存数据的 memcached 服务器。服务器选定后,即命令它保存“tokyo”及其值。

同样,“kanagawa”“chiba”“saitama”“gunma”都是先选择服务器再保存。

接下来获取保存的数据。获取时也要将要获取的键“tokyo”传递给函数库。函数库通过与数据保存时相同的算法,根据“键”选择服务器。使用的算法相同,就能选中与保存时相同的服务器,然后发送 get 命令。只要数据没有因为某些原因被删除,就能获得保存的值。

分布式缓存系统 Memcached

这样,将不同的键保存到不同的服务器上,就实现了 memcached 的分布式。memcached 服务器增多后,键就会分散,即使一台 memcached 服务器发生故障无法连接,也不会影响其他的缓存,系统依然能继续运行。

Consistent Hashing 的简单说明

Consistent Hashing 如下所示:首先求出 memcached 服务器(节点)的哈希值,并将其配置到 0~232 的圆(continuum)上。然后用同样的方法求出存储数据的键的哈希值,并映射到圆上。然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过 232 仍然找不到服务器,就会保存到第一台 memcached 服务器上。

分布式缓存系统 Memcached

从上图的状态中添加一台 memcached 服务器。余数分布式算法由于保存键的服务器会发生巨大变化 而影响缓存的命中率,但 Consistent Hashing 中,只有在 continuum 上增加服务器的地点逆时针方向的 第一台服务器上的键会受到影响。

分布式缓存系统 Memcached

因此,Consistent Hashing 最大限度地抑制了键的重新分布。而且,有的 Consistent Hashing 的实现方法还采用了虚拟节点的思想。使用一般的 hash 函数的话,服务器的映射地点的分布非常不均匀。因此,使用虚拟节点的思想,为每个物理节点(服务器)在 continuum 上分配 100~200 个点。这样就能抑制分布不均匀,最大限度地减小服务器增减时的缓存重新分布。

Memcached 安装及启动脚本 http://www.linuxidc.com/Linux/2013-07/87641.htm

PHP 中使用 Memcached 的性能问题 http://www.linuxidc.com/Linux/2013-06/85883.htm

Ubuntu 下安装 Memcached 及命令解释 http://www.linuxidc.com/Linux/2013-06/85832.htm

Memcached 的安装和应用 http://www.linuxidc.com/Linux/2013-08/89165.htm

使用 Nginx+Memcached 的小图片存储方案 http://www.linuxidc.com/Linux/2013-11/92390.htm

Memcached 使用入门 http://www.linuxidc.com/Linux/2011-12/49516p2.htm

缓存多副本

缓存多副本主要是用于在缓存数据存放时存储缓存数据的多个副本,以防止缓存失效。缓存失效发生在以下几种情况:

1.    缓存超时被移除(正常失效)

2.    缓存由于存储空间限制被移除(异常失效)

3.    由于缓存节点变化而导致的缓存失效(异常失效)

在缓存多副本的情况下,需要重新考虑缓存的分布式分布策略。其次缓存的多个副本实际本身是可能的多个读的节点,可以做为分布式的并行读,这是另外一个可以考虑的问题。

缓存数据的一致性问题

缓存数据尽量只读,因此缓存本身是不适合大量写和更新操作的数据场景的。对于读的情况下,如果存在数据变化,一种是同时更新缓存和数据库。一种是直接对缓存数据进行失效处理。

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2015-01/112507p2.htm

缓存数据以 item 为基本单元,以双链表形式存放在对应级别大小的 slabclass 结构的 chunk 中。同时该 item 还存放在链式 hashtable 中 bucket 中,用于提供快速查找的索引。

首先是理解缓存的基本数据单元 item 结构:

typedef struct _stritem {
    struct _stritem *next;    // 在 slab 中的双链表后向指针
    struct _stritem *prev;    // 在 slab 中的双向链表的前向指针
    struct _stritem *h_next;  // 指向 hash 表该 bucket 中的该 item 的下一项  /* hash chain next */
    rel_time_t      time;      // 最近访问时间戳 /* least recent access */
    rel_time_t      exptime;    // 过期时间 /* expire time */
    int            nbytes;    // 数据大小 /* size of data */
    unsigned short  refcount;    // 引用计数
    uint8_t        nsuffix;    /* length of flags-and-length string */
    uint8_t        it_flags;  /* ITEM_* above */
    uint8_t        slabs_clsid; // 所在的 slab,该 slab 在 slabclass 数组中的下标 /* which slab class we’re in */
    uint8_t        nkey;      //key 的长度 /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
    * the little shuffle to save space when not using CAS. */
    union {
        uint64_t cas;
        char end;
    } data[];// 真实数据
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then ” flags length\r\n” (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it’s binary!) */
} item;

item 的结构图如下:

分布式缓存系统 Memcached

item 存放的数据结构 slabclass:

//slabclass 结构
typedef struct {
    unsigned int size;    // 该 slab 的每个 chunk 的大小 /* sizes of items */
    unsigned int perslab;  // 能存放的 size 大小的 chunk 的数量 /* how many items per slab */

    void *slots;          /* 回收来的 item 链表,
        当分配出去的 item 回收时不时将空间还给 slab,
        而是直接把该 slab 从 chunk 双向链表中删除,
        挂到 slots 链表的尾部,以供循环利用,
        且在下次使用时不需要再初始化该 item 结构,
        而是直接更改其各属性值即可 list of item ptrs */
    unsigned int sl_curr;  /* 表示当前 slots 链表中
                            有多少个回收而来的空闲 item.
        total free items in list */

    unsigned int slabs;    // 已分配的当前种类 slab 的数量 /* how many slabs were allocated for this class */

    void **slab_list;      /* 初始时, memcached 为每个级别的 slabclass 分配一个 slab,
       当这个 slab 内存块使用完后,
       memcached 就分配一个新的 slab,
       所以 slabclass 可以拥有多个同一级别的 slab,
       这些 slab 就是通过 slab_list 数组来管理的,
       slab. array of slab pointers */
    unsigned int list_size; /* 表示当前 slabclass 有多少个 slab
       size of prev array */

    unsigned int killing;  /* index+1 of dying slab, or zero if none */
    size_t requested;  /* The number of requested bytes */
} slabclass_t;

static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];//slab 数组(其中 slab 按其 chunk 从小到大排列)

(注意:同一级别的 slabclas 可能包括多个该级别的 slab,维护在指针数组 slab_list 中)

item 在 slabclass 中存放的结构示意图:

分布式缓存系统 Memcached

在 slabclass 内, 只有最后一个 slab 存在空闲的内存, 其它 slab 的 chunk 都分配出去了。

end_page_ptr:指向最后一个 slab 中的空闲内存块

end_page_free:表示最后一个 slab 中还剩下多少个空闲 chunk.  图中绿色部分的 chunk 表示空闲 chunk。

每个 slabclass 维护一个双向链表,所有分配出去的 item 按照最近访问时间依次放到该链表中,该链表也就相当于 LRU 队列。

所有 slabclass 的链表头 尾分表保存在 *heads、*tails 两个数组中:

static item *heads[LARGEST_ID];//chunk 链表头指针数组:slabclass 数组中各级别 slabclass 的 chunk 链表头 组成的数组
static item *tails[LARGEST_ID];//chunk 链表尾指针数组:slabclass 数组中各级别 slabclass 的 chunk 链表尾 组成的数组

item 空间分配策略:

** 每次需要为新的 item 分配空间时,首先根据该 item 的大小,计算出对应级别的 slabclass 的 id,然后在 slabclass 数组中找到该 slabclass。

**  定位到对应 slabclass 后,首先检查 LRU 队列的最后一个 chunk 是否过期,过期则分给用户使用;否则到 item 回收链表 slots 中查空闲的 chunk;没有回收空闲的 chunk 则从 slab 空闲(未分配过得)的 chunk 中分配;如果没有,则 LRU 算法在已分配 chunk 的双向链表中从    尾部向前查找能够释放(最久未访问)的 item,依次为新 item 取得空间。

** 当删除某 item 时,并不将该 chunk 空间归还给对应的 slab,而是从该 slab 的已分配 chunk 链表中删除该 chunk , 然后将该 chunk 挂到回收链表 slots 的头部,以供循环利用,并且该 chunk 中的 item 也不会释放,直到该 chunk 被重新利用时直接更新该 item 的各项属性值。(不用每次都初始化 item 结构,提高效率!)

链式 HsahTable:

同时,slab 的 chunk 链表中的 item 也被存放到 hashTable 中。当需要查找给定 key 的 item 时,首先在哈希表中 hash 到该 key 对应的 item,然后利用 hashtable 中的 item 信息得到该 item 在 slabclass 中索引位置。

使用了两张 hashtable,一个主表,一个“原表”。正常情况下,操作都是在主表中进行的;当正在扩容时,首先在原表中进行操作。

当表中 item 数量大于表 bucket 节点数的 1.5 倍时开始扩容为原来的 2 倍,采用逐步扩容方式,每次迁移的数量可以设置。主表与原表是动态切换的,当扩容开始的时候,把主表的类容复制到原表中,让原表替换主表暂时接受操作,而主表容量扩大为原来的两倍,然后逐步从原表中将数据 hash 到扩容后的主表中,当数据全部迁移完成,所有的操作又回到主表中进行了。

这与 Redis 中的两张 hashtable 的操作是一致的。

分布式缓存出于如下考虑,首先是缓存本身的水平线性扩展问题,其次是缓存大并发下的本身的性能问题,再次避免缓存的单点故障问题(多副本和副本一致性)。分布式缓存的核心技术包括首先是内存本身的管理问题,包括了内存的分配,管理和回收机制。其次是分布式管理和分布式算法,其次是缓存键值管理和路由。

什么是 Memcached

许多 Web 应用程序都将数据保存到 RDBMS 中,应用服务器从中读取数据并在浏览器中显示。但随着数据量的增大,访问的集中,就会出现 REBMS 的负担加重,数据库响应恶化,网站显示延迟等重大影响。Memcached 是高性能的分布式内存缓存服务器。一般的使用目的是通过缓存数据库查询结果,减少数据库的访问次数,以提高动态 Web 应用的速度、提高扩展性。如图:

分布式缓存系统 Memcached

Memcached 作为高速运行的分布式缓存服务器具有以下特点。

  • 协议简单:memcached 的服务器客户端通信并不使用复杂的 MXL 等格式,而是使用简单的基于文本的协议。
  • 基于 libevent 的事件处理:libevent 是个程序库,他将 Linux 的 epoll、BSD 类操作系统的 kqueue 等时间处理功能封装成统一的接口。memcached 使用这个 libevent 库,因此能在 Linux、BSD、Solaris 等操作系统上发挥其高性能。
  • 内置内存存储方式:为了提高性能,memcached 中保存的数据都存储在 memcached 内置的内存存储空间中。由于数据仅存在于内存中,因此重启 memcached,重启操作系统会导致全部数据消失。另外,内容容量达到指定的值之后 memcached 回自动删除不适用的缓存。
  • Memcached 不互通信的分布式:memcached 尽管是“分布式”缓存服务器,但服务器端并没有分布式功能。各个 memcached 不会互相通信以共享信息。他的分布式主要是通过客户端实现的。

Memcached 的内存管理

最近的 memcached 默认情况下采用了名为 Slab Allocatoion 的机制分配,管理内存。在改机制出现以前,内存的分配是通过对所有记录简单地进行 malloc 和 free 来进行的。但是这中方式会导致内存碎片,加重操作系统内存管理器的负担。

Slab Allocator 的基本原理是按照预先规定的大小,将分配的内存分割成特定长度的块,已完全解决内存碎片问题。Slab Allocation  的原理相当简单。将分配的内存分割成各种尺寸的块(chucnk),并把尺寸相同的块分成组(chucnk 的集合)如图:

分布式缓存系统 Memcached

而且 slab allocator 还有重复使用已分配内存的目的。也就是说,分配到的内存不会释放,而是重复利用。

Slab Allocation 的主要术语

  • Page : 分配给 Slab 的内存空间,默认是 1MB。分配给 Slab 之后根据 slab 的大小切分成 chunk.
  • Chunk : 用于缓存记录的内存空间。
  • Slab Class: 特定大小的 chunk 的组。

在 Slab 中缓存记录的原理

Memcached 根据收到的数据的大小,选择最合适数据大小的 Slab (图 2) memcached 中保存着 slab 内空闲 chunk 的列表,根据该列表选择 chunk, 然后将数据缓存于其中。

分布式缓存系统 Memcached

Memcached 在数据删除方面有效里利用资源

Memcached 删除数据时数据不会真正从 memcached 中消失。Memcached 不会释放已分配的内存。记录超时后,客户端就无法再看见该记录(invisible 透明),其存储空间即可重复使用。

Lazy Expriationmemcached 内部不会监视记录是否过期,而是在 get 时查看记录的时间戳,检查记录是否过期。这种技术称为 lazy expiration. 因此 memcached 不会再过期监视上耗费 CPU 时间。

对于缓存存储容量满的情况下的删除需要考虑多种机制,一方面是按队列机制,一方面应该对应缓存对象本身的优先级,根据缓存对象的优先级进行对象的删除。

LRU: 从缓存中有效删除数据的原理

Memcached 会优先使用已超时的记录空间,但即使如此,也会发生追加新纪录时空间不足的情况。此时就要使用名为 Least Recently Used (LRU)机制来分配空间。这就是删除最少使用的记录的机制。因此当 memcached 的内存空间不足时(无法从 slab class)获取到新空间时,就从最近未使用的记录中搜索,并将空间分配给新的记录。

Memcached 分布式

Memcached 虽然称为“分布式“缓存服务器,但服务器端并没有“分布式”的功能。Memcached 的分布式完全是有客户端实现的。现在我们就看一下 memcached 是怎么实现分布式缓存的。

例如下面假设 memcached 服务器有 node1~node3 三台,应用程序要保存键名为“tokyo”“kanagawa”“chiba”“saitama”“gunma”的数据。

首先向 memcached 中添加“tokyo”。将“tokyo”传给客户端程序库后,客户端实现的算法就会根据“键”来决定保存数据的 memcached 服务器。服务器选定后,即命令它保存“tokyo”及其值。

同样,“kanagawa”“chiba”“saitama”“gunma”都是先选择服务器再保存。

接下来获取保存的数据。获取时也要将要获取的键“tokyo”传递给函数库。函数库通过与数据保存时相同的算法,根据“键”选择服务器。使用的算法相同,就能选中与保存时相同的服务器,然后发送 get 命令。只要数据没有因为某些原因被删除,就能获得保存的值。

分布式缓存系统 Memcached

这样,将不同的键保存到不同的服务器上,就实现了 memcached 的分布式。memcached 服务器增多后,键就会分散,即使一台 memcached 服务器发生故障无法连接,也不会影响其他的缓存,系统依然能继续运行。

Consistent Hashing 的简单说明

Consistent Hashing 如下所示:首先求出 memcached 服务器(节点)的哈希值,并将其配置到 0~232 的圆(continuum)上。然后用同样的方法求出存储数据的键的哈希值,并映射到圆上。然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过 232 仍然找不到服务器,就会保存到第一台 memcached 服务器上。

分布式缓存系统 Memcached

从上图的状态中添加一台 memcached 服务器。余数分布式算法由于保存键的服务器会发生巨大变化 而影响缓存的命中率,但 Consistent Hashing 中,只有在 continuum 上增加服务器的地点逆时针方向的 第一台服务器上的键会受到影响。

分布式缓存系统 Memcached

因此,Consistent Hashing 最大限度地抑制了键的重新分布。而且,有的 Consistent Hashing 的实现方法还采用了虚拟节点的思想。使用一般的 hash 函数的话,服务器的映射地点的分布非常不均匀。因此,使用虚拟节点的思想,为每个物理节点(服务器)在 continuum 上分配 100~200 个点。这样就能抑制分布不均匀,最大限度地减小服务器增减时的缓存重新分布。

Memcached 安装及启动脚本 http://www.linuxidc.com/Linux/2013-07/87641.htm

PHP 中使用 Memcached 的性能问题 http://www.linuxidc.com/Linux/2013-06/85883.htm

Ubuntu 下安装 Memcached 及命令解释 http://www.linuxidc.com/Linux/2013-06/85832.htm

Memcached 的安装和应用 http://www.linuxidc.com/Linux/2013-08/89165.htm

使用 Nginx+Memcached 的小图片存储方案 http://www.linuxidc.com/Linux/2013-11/92390.htm

Memcached 使用入门 http://www.linuxidc.com/Linux/2011-12/49516p2.htm

缓存多副本

缓存多副本主要是用于在缓存数据存放时存储缓存数据的多个副本,以防止缓存失效。缓存失效发生在以下几种情况:

1.    缓存超时被移除(正常失效)

2.    缓存由于存储空间限制被移除(异常失效)

3.    由于缓存节点变化而导致的缓存失效(异常失效)

在缓存多副本的情况下,需要重新考虑缓存的分布式分布策略。其次缓存的多个副本实际本身是可能的多个读的节点,可以做为分布式的并行读,这是另外一个可以考虑的问题。

缓存数据的一致性问题

缓存数据尽量只读,因此缓存本身是不适合大量写和更新操作的数据场景的。对于读的情况下,如果存在数据变化,一种是同时更新缓存和数据库。一种是直接对缓存数据进行失效处理。

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2015-01/112507p2.htm

memcached 中有两张 hash 表,一个是“主 hash 表”(primary_hashtable),另外一个是“原 hash 表”(old_hashtable)。一般情况下都在主表中接受操作,在插入新 item 时判断是否需要进行扩;每次操作的时候,先会检测表是否正处于扩展 (expanding) 状态,如果是,则原表中进行操作,当扩容完成在转移到主表中进行操作。在扩容时,采取逐步迁移策略:即每次只从原表中迁移一个 bucket 节点的 item 到新主表中,进行逐步迁移。

总的来看,这与 Redis 中的 hash 操作几乎一致。因此不再做详细讲解,具体分析见代码注释。

分布式缓存系统 Memcached

//hash 表的初始化,参数 hashtable_init 为所设置的 hashpower 大小(阶数),默认大小为 16
void assoc_init(const int hashtable_init) {
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
 // 创建主表(hashsize(hashpower):计算 bucket 节点数目 = 2 的 hashpower 次方)
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, “Failed to init hashtable.\n”);
        exit(EXIT_FAILURE);
    }
 //emcached 内部有很多全局的统计信息,用于实时获取各个资源的使用情况,
 // 对统计信息的更新都需要加锁
    STATS_LOCK();// 对全局统计信息加锁,已更新信息
    stats.hash_power_level = hashpower;
    stats.hash_bytes = hashsize(hashpower) * sizeof(void *);
    STATS_UNLOCK();// 解锁
}

 

 

// 在哈希表中查找给定 key 的 item:找到对应的哈希表, 再找对应的桶节点,最后遍历链表找到目标 key 的 item
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;// 桶节点
    unsigned int oldbucket;// 在原表中的桶节点索引

 // 正在扩容,且当前节点在愿表中,还未迁移到主表
 // 注意:i&(2^n-1) 结果即为 i 除以 2^n 的余数
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower – 1))) >= expand_bucket)
    {
        it = old_hashtable[oldbucket];
    } else {// 没有扩容,或者已经迁移到主表中
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

    item *ret = NULL;
    int depth = 0;// 目标节点在桶中的深度
    while (it) {// 遍历桶节点链表
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}

 

/* returns the address of the item pointer before the key.  if *item == 0,
  the item wasn’t found */
// 内部函数:返回目标 key item 的前一个 item 的指针,这样在删除目标 item 时只需要将该返回 item 指针的 next 指针指向目标 item 的 next item 即可。
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
    item **pos;
    unsigned int oldbucket;

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower – 1))) >= expand_bucket)
    {
        pos = &old_hashtable[oldbucket];
    } else {
        pos = &primary_hashtable[hv & hashmask(hashpower)];
    }

    while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
        pos = &(*pos)->h_next;
    }
    return pos;
}

 

/* grows the hashtable to the next power of 2. */
// 哈希表扩容为原来的 2 倍(将原来的主表拷贝到久表中,对主表扩容)
static void assoc_expand(void) {
    old_hashtable = primary_hashtable;

    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, “Hash table expansion starting\n”);
        hashpower++;
        expanding = true;
        expand_bucket = 0;
        STATS_LOCK();
        stats.hash_power_level = hashpower;
        stats.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats.hash_is_expanding = 1;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}

static void assoc_start_expand(void) {
    if (started_expanding)
        return;
    started_expanding = true;
    pthread_cond_signal(&maintenance_cond);
}

 

/* Note: this isn’t an assoc_update.  The key must not already exist to call this */
// 将给定 item 插入到哈希表的桶的头部中  注意:该 item 不能已经存在于 hash 表中(hv:哈希值)
int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;

//    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn’t have duplicately named things defined */

 // 正在扩容,还未完成,则将该 item 放到原 hashtable 的对应 bucket 的单链表的头部
    if (expanding && 
        (oldbucket = (hv & hashmask(hashpower – 1))) >= expand_bucket)// 注意 hashpower 已经加倍,因此是 hashpower-1
    {
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {// 没有正在扩容则放到主 hashtable 中
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }

    hash_items++;
 // 是否需要开始扩容
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        assoc_start_expand();
    }

    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
    return 1;
}

 

// 删除对应 item(只是将 item 从桶链表中移除)
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
    item **before = _hashitem_before(key, nkey, hv);// 查找该 item 的前一个 item

    if (*before) {
        item *nxt;
        hash_items–;//hash 表中的 item 总数
        /* The DTrace probe cannot be triggered as the last instruction
        * due to possible tail-optimization by the compiler
        */
        MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);
        nxt = (*before)->h_next;
        (*before)->h_next = 0;  /* probably pointless, but whatever. */
        *before = nxt;
        return;
    }
    /* Note:  we never actually get here.  the callers don’t delete things
      they can’t find. */
    assert(*before != 0);
}

 

// 迁移函数 start_assoc_maintenance_thread(),创建迁移线程,调用函数 assoc_maintenance_thread 进行迁移
// 线程函数:迁移 bucket 节点,默认一次迁移一个 bucket
static void *assoc_maintenance_thread(void *arg) {

    while (do_run_maintenance_thread) {
        int ii = 0;

        /* Lock the cache, and bulk move multiple buckets to the new
        * hash table. */
        item_lock_global();
        mutex_lock(&cache_lock);

        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
            item *it, *next;
            int bucket;

            for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                next = it->h_next;

    // 计算哈希值,并计算得桶节点索引值
                bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                it->h_next = primary_hashtable[bucket];
                primary_hashtable[bucket] = it;
            }

   // 每迁移完一个 bucket,就在久表中移除该 bucket
            old_hashtable[expand_bucket] = NULL;

            expand_bucket++;
   // 扩容结束
            if (expand_bucket == hashsize(hashpower – 1)) {
                expanding = false;
                free(old_hashtable);
                STATS_LOCK();
                stats.hash_bytes -= hashsize(hashpower – 1) * sizeof(void *);
                stats.hash_is_expanding = 0;
                STATS_UNLOCK();
                if (settings.verbose > 1)
                    fprintf(stderr, “Hash table expansion done\n”);
            }
        }

        mutex_unlock(&cache_lock);
        item_unlock_global();

        if (!expanding) {
            /* finished expanding. tell all threads to use fine-grained locks */
            switch_item_lock_type(ITEM_LOCK_GRANULAR);
            slabs_rebalancer_resume();
            /* We are done expanding.. just wait for next invocation */
            mutex_lock(&cache_lock);
            started_expanding = false;
            pthread_cond_wait(&maintenance_cond, &cache_lock);
            /* Before doing anything, tell threads to use a global lock */
            mutex_unlock(&cache_lock);
            slabs_rebalancer_pause();
            switch_item_lock_type(ITEM_LOCK_GLOBAL);
            mutex_lock(&cache_lock);
            assoc_expand();
            mutex_unlock(&cache_lock);
        }
    }
    return NULL;
}

Memcached 采用典型的 Master-Worker 模式,其核心思想是:有 Master 和 Worker 两类进程(线程)协同工作,Master 进程负责接收和分配任务,Worker 进程负责处理子任务。当各 Worker 进程将各个子任务处理完成后,将结果返回给 Master 进程,由 Master 进程做归纳和汇总。

工作示意图如下所示:

分布式缓存系统 Memcached

其中每个工作线程维护一个连接队列,以接收由主线程分配的客户连接;同时每个工作线程维护一个 Libevent 实例,以处理和主线程间的管道通信以及和客户连接间的 socket 通信事件。

工作线程的创建与初始化等工作由函数 void thread_init(int nthreads, struct event_base *main_base) 

调用相应函数来完成。

 

//Memcached 内部工作线程的封装   
typedef struct {
    pthread_t thread_id;      // 线程 ID   
    struct event_base *base;  //libevent 的不是线程安全的,每个工作线程持有一个 libevent 实例,用于 pipe 管道通信和 socket 通信   
    struct event notify_event; // 用于监听 pipe 管道的 libevent 事件   
    int notify_receive_fd;      // 接收 pipe 管道消息描述符   
    int notify_send_fd;        // 发送 pipe 管道消息描述符   
    struct thread_stats stats;  // 每个线程对应的统计信息   
    struct conn_queue *new_conn_queue; // 每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中   
    cache_t *suffix_cache;      // 后缀 cache   
    uint8_t item_lock_type;    // 线程操作 hash 表持有的锁类型,有局部锁和全局锁   
} LIBEVENT_THREAD;

 

// 分发线程的封装   
typedef struct {
    pthread_t thread_id;        // 线程 id   
    struct event_base *base;    //libevent 实例   
} LIBEVENT_DISPATCHER_THREAD;   
   
// 工作线程绑定到 libevent 实例   
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();// 创建 libevent 实例   
    if (! me->base) {
        fprintf(stderr, “Can’t allocate event base\n”);   
        exit(1);   
    }     

// 工作线程的初始化: 对应 liibevent 实例的创建与设置,绑定到该 libevent 实例、注册事件、创建活动连接队列、创建设置线程,分别执行 libevent 事件循环:Memcached 采用了典型的 Master-Worker 的线程模式,
//Master 就是由 main 线程来充当,而 Worker 线程则是通过 Pthread 创建的。
void thread_init(int nthreads, struct event_base *main_base) {// 该函数调用其他函数来完成线程创建、初始化等工作。
    int        i; 
    int        power; 
 
    pthread_mutex_init(&cache_lock, NULL); 
    pthread_mutex_init(&stats_lock, NULL); 
 
    pthread_mutex_init(&init_lock, NULL); 
    pthread_cond_init(&init_cond, NULL); 
 
    pthread_mutex_init(&cqi_freelist_lock, NULL); 
    cqi_freelist = NULL; 
 
    /* Want a wide lock table, but don’t waste memory */ 
    if (nthreads < 3) {
        power = 10; 
    } else if (nthreads < 4) {
        power = 11; 
    } else if (nthreads < 5) {
        power = 12; 
    } else {
        /* 8192 buckets, and central locks don’t scale much past 5 threads */ 
        power = 13; 
    } 
 
    item_lock_count = hashsize(power); 
    item_lock_hashpower = power; 
 
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t)); 
    if (! item_locks) {
        perror(“Can’t allocate item locks”); 
        exit(1); 
    } 
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL); 
    } 
    pthread_key_create(&item_lock_type_key, NULL); 
    pthread_mutex_init(&item_global_lock, NULL); 
 
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); 
    if (! threads) {
        perror(“Can’t allocate thread descriptors”); 
        exit(1); 
    } 
 
    dispatcher_thread.base = main_base; 
    dispatcher_thread.thread_id = pthread_self(); 
    // 工作线程的初始化, 工作线程和主线程 (main 线程) 是通过 pipe 管道进行通信的   
    for (i = 0; i < nthreads; i++) {
        int fds[2]; 
        if (pipe(fds)) {
            perror(“Can’t create notify pipe”); 
            exit(1); 
        } 
 
        threads[i].notify_receive_fd = fds[0];// 读管道绑定到工作线程的接收消息的描述符   
        threads[i].notify_send_fd = fds[1];// 写管道绑定到工作线程的发送消息的描述符   
 
        setup_thread(&threads[i]);// 添加工作线程到 lievent 中 
        /* Reserve three fds for the libevent base, and two for the pipe */ 
        stats.reserved_fds += 5; 
    } 
 
    /* Create threads after we’ve done all the libevent setup. */ 
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);// 创建线程,执行 libevnt 事件循环 
    } 
 
    /* Wait for all the threads to set themselves up before returning. */ 
    pthread_mutex_lock(&init_lock); 
    wait_for_thread_registration(nthreads); 
    pthread_mutex_unlock(&init_lock); 

 

/*
 * Set up a thread’s information.
 */ 
// 工作线程绑定到 libevent 实例,注册该线程的通知事件,创建并初始化其消息队列(即就绪事件队列)
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init(); 
    if (! me->base) {
        fprintf(stderr, “Can’t allocate event base\n”); 
        exit(1); 
    } 
 
    /* Listen for notifications from other threads */ 
// 设置 event 对象:设置事件 ev 绑定文件描述符或信号值 fd(定时事件设为 -1);设置事件类型;设置回调函数及参数 
    event_set(&me->notify_event, me->notify_receive_fd, 
              EV_READ | EV_PERSIST, thread_libevent_process, me); 
    // 设置事件 ev 从属的 event_base,以及 ev 的优先级 
    event_base_set(me->base, &me->notify_event); 
     
// 往事件队列中注册事件处理器,并将对应事件添加到事件多路分发器上 
// 内部调用 event_add_internal 函数 
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, “Can’t monitor libevent notify pipe\n”); 
        exit(1); 
    } 
 
 // 创建消息队列,用于接受主线程分发的连接   
    me->new_conn_queue = malloc(sizeof(struct conn_queue)); 
    if (me->new_conn_queue == NULL) {
        perror(“Failed to allocate memory for connection queue”); 
        exit(EXIT_FAILURE); 
    } 
    cq_init(me->new_conn_queue);// 初始化连接队列(单链表)
// 创建线程的后缀 cache??
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror(“Failed to initialize mutex”); 
        exit(EXIT_FAILURE); 
    } 
 
    me->suffix_cache = cache_create(“suffix”, SUFFIX_SIZE, sizeof(char*), 
                                    NULL, NULL); 
    if (me->suffix_cache == NULL) {
        fprintf(stderr, “Failed to create suffix cache\n”); 
        exit(EXIT_FAILURE); 
    } 

 

// 创建工作线程 
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t      thread; 
    pthread_attr_t  attr; 
    int            ret; 
 
    pthread_attr_init(&attr);//Posix 线程部分,线程属性初始化   
 
    // 创建线程:外部传入的线程函数 worker_libevent 
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, “Can’t create thread: %s\n”, 
                strerror(ret)); 
        exit(1); 
    } 
}

 

// 线程处理函数:执行 libevent 事件循环(处理与主线程的通信事件和所分配到的客户连接事件的处理)
 
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg; 
 
 
    /* Any per-thread setup can happen here; thread_init() will block until
    * all threads have finished initializing.
    */ 
 
 
    /* set an indexable thread-specific memory item for the lock type.
    * this could be unnecessary if we pass the conn *c struct through
    * all item_lock calls…
    */ 
  // 默认的 hash 表的锁为局部锁   
    me->item_lock_type = ITEM_LOCK_GRANULAR;   
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);// 设定线程的属性   
    // 用于控制工作线程初始化,通过条件变量来控制   
    register_thread_initialized();   
    // 工作线程的 libevent 实例启动   
    event_base_loop(me->base, 0); 
    return NULL; 

其中 Libevent 的事件循环函数 event_baseloop:(代码来自 Libevent 框架源码),其处理流程如图:

分布式缓存系统 Memcached

具体代码如下:

// 事件处理主循环:调用 I / O 事件多路分发器监听函数,以等待事件发生,
// 当有事件发生时将其插入到活动事件队列数组对应优先级的队列中,然后调用其事件的回调函数依次处理事件。
int 
event_base_loop(struct event_base *base, int flags) 
{
    const struct eventop *evsel = base->evsel; 
    struct timeval tv; 
    struct timeval *tv_p; 
    int res, done, retval = 0; 
 
    /* Grab the lock.  We will release it inside evsel.dispatch, and again
    * as we invoke user callbacks. */ 
    EVBASE_ACQUIRE_LOCK(base, th_base_lock);// 获取对 event_base 的独占锁 
 
    // 如果事件循环已经启动,则不能再启动 
    if (base->running_loop) {
        event_warnx(“%s: reentrant invocation.  Only one event_base_loop” 
            ” can run on each event_base at once.”, __func__); 
        EVBASE_RELEASE_LOCK(base, th_base_lock);// 释放对 event_base 的独占锁 
        return -1; 
    } 
 
    base->running_loop = 1;// 标记循环已经启动 
 
    clear_time_cache(base);// 清除系统时间缓存 
 
    if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) 
        evsig_set_base(base);// 设置信号处理器的 event_base 实例 
 
    done = 0; 
 
#ifndef _EVENT_DISABLE_THREAD_SUPPORT   
    base->th_owner_id = EVTHREAD_GET_ID(); // 不支持多线程时 
#endif 
    // 多线程 
    base->event_gotterm = base->event_break = 0; 
 
    while (!done) {
        base->event_continue = 0; 
 
        /* Terminate the loop if we have been asked to */ 
        if (base->event_gotterm) {
            break; 
        } 
 
        if (base->event_break) {
            break; 
        } 
 
        // 校正系统时间,如果系统支持 monotonic 时间,该时间是系统从 boot 后到现在所经过的时间,因此不需要执行校正。
        // 如果系统使用的是非 MONOTONIC 时间,用户可能会向后调整了系统时间 
        // 在 timeout_correct 函数里,比较 last wait time 和当前时间,如果当前时间 < last wait time 
        // 表明时间有问题,这是需要更新 timer_heap 中所有定时事件的超时时间。
        timeout_correct(base, &tv); 
 
        tv_p = &tv; 
        // 根据 timer heap 中事件的最小超时时间,设置系统 I /O demultiplexer 的最大等待时间 
        if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {// 没有就绪事件则设置最大等待时间 
            // 如果就绪事件为 0,且、、
            timeout_next(base, &tv_p);//// 获取时间堆中超时时间最小的事件处理器的, 计算等待时间,返回值存在 tv_p 中 
            // 超时时间作为 I / O 分发器的超时返回时间,这样当 I / O 分发器返回时即可以去处理 
            // 事件堆上就绪的定时事件 
        } else {// 有事件就绪了,就直接返回 
            /*
            * if we have active events, we just poll new events
            * without waiting.
            */ 
            // 如果有就绪事件尚未处理,则将 I / O 复用系统调用的超时时间设置为 0,
            // 这样 I / O 复用系统调用将直接返回,程序即立即处理就绪事件 
            evutil_timerclear(&tv);//(tvp)->tv_sec = (tvp)->tv_usec = 0 
        } 
 
        /* If we have no events, we just exit */ 
        // 如果 event_base 中没有注册任何事件,则直接退出 
        if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
            event_debug((“%s: no events registered.”, __func__)); 
            retval = 1; 
            goto done; 
        } 
 
        /* update last old time */ 
        gettime(base, &base->event_tv);// 更新 base->event_tv 为缓存时间(没清空时)或系统当前那时间 
 
        clear_time_cache(base);// 清除系统时间缓存 
 
        // 调用后端 i / o 复用机制的事件多路分发器的 dispath 函数等待事件,将就绪信号事件,I/ O 事件插入到活动事件队列中 
        res = evsel->dispatch(base, tv_p);// 阻塞调用:如 epoll_wait 
 
        if (res == -1) {
            event_debug((“%s: dispatch returned unsuccessfully.”, 
                __func__)); 
            retval = -1; 
            goto done; 
        } 
 
        update_time_cache(base);// 更新时间缓存为当前系统时间(缓存时间记录了本次事件就绪返回的时间)
// 检查 heap 中的 timer events,将就绪的 timer event 从 heap 上删除,并插入到激活链表中 
        timeout_process(base); 
// 调用 event_process_active()处理激活链表中的就绪 event,调用其回调函数执行事件处理 
// 该函数会寻找最高优先级(priority 值越小优先级越高)的激活事件链表,
// 然后处理链表中的所有就绪事件;
// 因此低优先级的就绪事件可能得不到及时处理;
        if (N_ACTIVE_CALLBACKS(base)) {// 有就绪事件就绪了 
            int n = event_process_active(base);// 处理就绪事件 
            if ((flags & EVLOOP_ONCE) 
                && N_ACTIVE_CALLBACKS(base) == 0 
                && n != 0) 
                done = 1; 
        } else if (flags & EVLOOP_NONBLOCK) 
            done = 1; 
    } 
    event_debug((“%s: asked to terminate loop.”, __func__)); 
 
done: 
    // 循环结束,清空时间缓存 
    clear_time_cache(base); 
    base->running_loop = 0;// 设置停止循环标志 
 
    EVBASE_RELEASE_LOCK(base, th_base_lock);// 释放锁 
 
    return (retval); 
}

 

// 阻塞工作线程   
static void wait_for_thread_registration(int nthreads) {
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);// 在条件变量 init_cond 上面阻塞,阻塞个数为 nthreads-init_count   
    }   
}   
// 唤醒工作线程   
static void register_thread_initialized(void) {
    pthread_mutex_lock(&init_lock);   
    init_count++;   
    pthread_cond_signal(&init_cond);   
    pthread_mutex_unlock(&init_lock);   
}   
   
// 每个线程持有的统计信息   
struct thread_stats {
    pthread_mutex_t  mutex;   
    uint64_t          get_cmds;   
    uint64_t          get_misses;   
    uint64_t          touch_cmds;   
    uint64_t          touch_misses;   
    uint64_t          delete_misses;   
    uint64_t          incr_misses;   
    uint64_t          decr_misses;   
    uint64_t          cas_misses;   
    uint64_t          bytes_read;   
    uint64_t          bytes_written;   
    uint64_t          flush_cmds;   
    uint64_t          conn_yields; /* # of yields for connections (-R option)*/   
    uint64_t          auth_cmds;   
    uint64_t          auth_errors;   
    struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];   
}; 

本节主要分析了工作线程的初始化部分,至于具体的主线程任务分发调度,以及工作线程如何具体处理连接和与主线程之间的通信等后续讲解。

在前面 slab 数据存储部分分析了 Memecached 中记录数据的具体存储机制,从中可以看到所采用的内存管理机制——slab 内存管理,这也正是 linux 所采用的内存高效管理机制,对于 Memchached 这样的内存 cache 服务器,内存高效管理是其最重要的任务之一。

Linux 所使用的 slab 分配器的基础是 Jeff Bonwick 为 SunOS 操作系统首次引入的一种算法。Jeff 的分配器是围绕对象缓存进行的。在内核中,会为有限的对象集(例如文件描述符和其他常见结构)分配大量内存。Jeff 发现对内核中普通对象进行初始化所需的时间超过了对其进行分配和释放所需的时间。因此他的结论是不应该将内存释放回一个全局的内存池,而是将内存保持为针对特定目而初始化的状态。例如,如果内存被分配给了一个互斥锁,那么只需在为互斥锁首次分配内存时执行一次互斥锁初始化函数(mutex_init)即可。后续的内存分配不需要执行这个初始化函数,因为从上次释放和调用析构之后,它已经处于所需的状态中了。

Memecached 中的 slab 分配器也正是使用这一思想来构建一个在空间和时间上都具有高效性的内存分配器。下图 给出了 slab 结构的高层组织结构。在最高层是 cache_chain,这是一个 slab 缓存的链接列表。这对于 best-fit 算法非常有用,可以用来查找最适合所需要的分配大小的缓存(遍历列表)。cache_chain 的每个元素都是一个 kmem_cache 结构的引用(称为一个 cache),它定义了一个要管理的给定大小的对象池。在需要某个特定大小的内存对象时,首先从 cache_chian 中找到最佳大小的一个 kmem_cahce,然后再在对应的 kem_cahe 中按某种算法(如首先利用空闲对象,没有则按 LRU 机制释放已用或过期对象)最终获得所需的大小的空间。具体结构如下图所示:

分布式缓存系统 Memcached

 

从该图可看出,这与前面所分析的 Memcached 的 Item 的存储结构图正是一致的。此处的 cache_chain 对应前面的 slabclass 数组(管理各种大小的 slab 集合),而 kmem_cahe 对应 slabclass 中的某个元素(slab_list 链表)(管理某个特定大小的 slab 链表)。在删除 Item 时,也不会将所对应的内存还给操作系统,而只是从对应的已分配中链表中去掉,转而加到对应的空闲链表 slots 中以供后续循环利用。

memcached 中内存分配机制主要理念:

1. 先为分配相应的大块内存,再在上面进行无缝小对象填充

2. 懒惰检测机制,Memcached 不花过多的时间在检测各个 item 对象是否超时,当 get 获取数据时,才检查 item 对象是否应该删除,你不访问,我就不处理。

 

3. 懒惰删除机制,在 memecached 中删除一个 item 对象的时候,并不是从内存中释放,而是单单的进行标记处理,再将其指针放入 slot 回收插糟,下次分配的时候直接使用。

 

Memcached 首次默认分配 64M 的内存,之后所有的数据都是在这 64M 空间进行存储,在 Memcached 启动之后,不会对这些内存执行释放操作,这些内存只有到 Memcached 进程退出之后会被系统回收。下面分析 Memcached 的内存主要操作函数,按逐级调用顺序给出。

 

/*// 内存初始化,settings.maxbytes 是 Memcached 初始启动参数指定的内存值大小,settings.factor 是内存增长因子 
slabs_init(settings.maxbytes, settings.factor, preallocate); 
 
#define POWER_SMALLEST 1  // 最小 slab 编号 
#define POWER_LARGEST  200 // 首次初始化 200 个 slab 
 
// 实现内存池管理相关的静态全局变量 
static size_t mem_limit = 0;// 总的内存大小 
static size_t mem_malloced = 0;// 初始化内存的大小,这个貌似没什么用 
static void *mem_base = NULL;// 指向总的内存的首地址 
static void *mem_current = NULL;// 当前分配到的内存地址 
static size_t mem_avail = 0;// 当前可用的内存大小 
 
static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];// 定义 slab 结合,总共 200 个 */

 

 

/**
 * Determines the chunk sizes and initializes the slab class descriptors
 * accordingly.
 初始化整个 slabcalss 数组
 limit:Memcached 的总的内存的大小。
 factor:chunk 大小增长因子
 */
void slabs_init(const size_t limit, const double factor, const bool prealloc) {
    int i = POWER_SMALLEST – 1;
 //size 表示申请空间的大小,其值由配置的 chunk_size(指 item 中的数据部分大小)和单个 item 的大小来指定
    unsigned int size = sizeof(item) + settings.chunk_size;

    mem_limit = limit;

    if (prealloc) {// 支持预分配
        /* Allocate everything in a big chunk with malloc */
        mem_base = malloc(mem_limit);// 分配限定的空间,mem_base 为总内存起始地址
        if (mem_base != NULL) {
            mem_current = mem_base;//mem_current 为当前分配空间地址
            mem_avail = mem_limit;// 可用(总分配空间中还未分配给 Item 的部分)
        } else {
            fprintf(stderr, “Warning: Failed to allocate requested memory in”
                    ” one large chunk.\nWill allocate in smaller chunks\n”);
        }
    }
  // 置空 slabclass 数组 
    memset(slabclass, 0, sizeof(slabclass));  //sizeof(slabclass) 为整个数组大小,而非指针大小
    // 开始分配,i<200 && 单个 chunk 的 size<= 单个 item 最大大小 / 内存增长因子 
    while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {
        /* Make sure items are always n-byte aligned */
  // 确保 item 总是 8byte 对齐
        if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES – (size % CHUNK_ALIGN_BYTES);// 没对齐,则补齐

        slabclass[i].size = size;//slab 中 chunk 的大小设为补齐的大小
        slabclass[i].perslab = settings.item_size_max / slabclass[i].size;// 每个 slab 中的 chunk 数量
        size *= factor;// 下一个 slab 中的 chunk 扩大 factor 倍
        if (settings.verbose > 1) {// 如果有打开调试信息,则输出调试信息
            fprintf(stderr, “slab class %3d: chunk size %9u perslab %7u\n”,
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    }
 
 //slab 数组中的最后一个 slab,此时 chunk 大小增加为 1M,因此只有一个 chunk
    power_largest = i;
    slabclass[power_largest].size = settings.item_size_max;
    slabclass[power_largest].perslab = 1;
    if (settings.verbose > 1) {
        fprintf(stderr, “slab class %3d: chunk size %9u perslab %7u\n”,
                i, slabclass[i].size, slabclass[i].perslab);
    }

    /* for the test suite:  faking of how much we’ve already malloc’d */
    {
        char *t_initial_malloc = getenv(“T_MEMD_INITIAL_MALLOC”);
        if (t_initial_malloc) {
            mem_malloced = (size_t)atol(t_initial_malloc);
        }

    }

    if (prealloc) {
  // 真正分配空间:分配每个 slab 的内存空间,传入最大已经初始化的最大 slab 编号
        slabs_preallocate(power_largest);
    }
}

 

 

// 分配每个 slabclass 数组元素的内存空间 
static void slabs_preallocate (const unsigned int maxslabs) {
    int i; 
    unsigned int prealloc = 0; 
 
    for (i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
        if (++prealloc > maxslabs) 
            return; 
        // 执行分配操作,对第 i 个 slabclass 执行分配操作 
        if (do_slabs_newslab(i) == 0) {
            fprintf(stderr, “Error while preallocating slab memory!\n” 
                “If using -L or other prealloc options, max memory must be ” 
                “at least %d megabytes.\n”, power_largest); 
            exit(1); 
        } 
    } 

 

// 为第 id 个 slabclass 执行分配操作 
static int do_slabs_newslab(const unsigned int id) {
    slabclass_t *p = &slabclass[id];// p 指向第 i 个 slabclass 
    int len = settings.slab_reassign ? settings.item_size_max:p->size*p->perslab;  //len 为一个 slabclass 的大小
    char *ptr; 
    //grow_slab_list 初始化 slabclass 的 slab_list,而 slab_list 中的指针指向每个 slab 
    //memory_allocate 从内存池申请 1M 的空间 
    if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) || 
        (grow_slab_list(id) == 0) || 
        ((ptr = memory_allocate((size_t)len)) == 0)) {// 优先从 Memchahed 内存池中分配,如果内存池为空则从系统分配给定大小内存
 
        MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id); 
        return 0; 
    } 
 
    memset(ptr, 0, (size_t)len); 
    // 将申请的 1M 空间按 slabclass 的 size 进行切分 
    split_slab_page_into_freelist(ptr, id); 
 
    p->slab_list[p->slabs++] = ptr;
    mem_malloced += len;// 增加已经分配出去的内存数 
    MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id); 
 
    return 1; 

 

// 将同级别 slab 的空间且分为该大小的 chunk
static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    int x;
    for (x = 0; x < p->perslab; x++) {
        do_slabs_free(ptr, 0, id);// 创建空闲 item 
        ptr += p->size;// 指针前移 item 的大小
    }
}

 

// 创建空闲 item,挂载到对应 slabclass 的空闲链表中 
static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
    slabclass_t *p; 
    item *it; 
 
    assert(((item *)ptr)->slabs_clsid == 0); 
    assert(id >= POWER_SMALLEST && id <= power_largest);// 判断 id 有效性 
    if (id < POWER_SMALLEST || id > power_largest) 
        return; 
 
    MEMCACHED_SLABS_FREE(size, id, ptr); 
    p = &slabclass[id]; 
 
    it = (item *)ptr; 
    it->it_flags |= ITEM_SLABBED; 
    it->prev = 0; 
    it->next = p->slots;// 挂载到 slabclass 的空闲链表中 
    if (it->next) it->next->prev = it; 
    p->slots = it; 
 
    p->sl_curr++;// 空闲 item 个数 +1 
    p->requested -= size;// 已经申请到的空间数量更新 
    return; 

至此,从创建 slabclass 数组,到最底层的创建空闲 item 并挂载到对应的 slabclass 的空闲链表 slots 的头部,的操作完成。

Memcached 的内存池由起始地址指针、当前地址指针、剩余可用空间等变量维护,每次内存池操作只需要相应的改变这些变量即可。

以下为内存池分配操作函数:

// 优先从 Memcached 的内存池分配 size 大小的空间 
static void *memory_allocate(size_t size) {
    void *ret; 
 
    if (mem_base == NULL) {// 如果内存池没创建,则从系统分配 
        ret = malloc(size); 
    } else {
        ret = mem_current; 
        //size 大于剩余的空间 
        if (size > mem_avail) {
            return NULL; 
        } 
 
        // 按 8 字节对齐 
        if (size % CHUNK_ALIGN_BYTES) {
            size += CHUNK_ALIGN_BYTES – (size % CHUNK_ALIGN_BYTES); 
        } 
        // 扣除 size 个空间 
        mem_current = ((char*)mem_current) + size; 
        if (size < mem_avail) {
            mem_avail -= size;// 更新剩余空间大小 
        } else {
            mem_avail = 0; 
        } 
    } 
 
    return ret; 

由此可见,内存池的实现其实是很简单的。当然这里只给出了内存池中内存分配的操作,而内存释放操作也类似。总之,内存池的操作,需要维护内存池的几个指针变量和空间指示变量即可。

上节在分析 slab 内存管理机制时分析 Memcached 整个 Item 存储系统的初始化过程 slabs_init()函数:分配 slabclass 数组空间,到最后将各 slab 划分为各种级别大小的空闲 item 并挂载到对应大小 slab 的空闲链表 slots 上。本节将继续分析对 slab 和 item 的主要操作过程。

slab 机制中所采用的 LRU 算法:

在 memcached 运行过程中,要把一个 item 调入内存,但内存已无空闲空间时,为了保证程序能正常运行,系统必须从内存中调出一部分数据,送磁盘的对换区中。但应将哪些数据调出,须根据一定的算法来确定。通常,把选择换出数据 (页面) 的算法称为页面置换算法(Page Replacement Algorithms)。Memcached 采用最近最久未使用(LRU)置换算法,是根据数据(页面)调入内存后的使用情况进行决策的。由于无法预测各页面将来的使用情况,只能利用“最近的过去”作为“最近的将来”的近似,因此,LRU 置换算法是选择最近最久未使用的页面予以淘汰。

每个 slabclass 维护一个双向链表,所有分配出去的 item 按照最近访问时间依次放到该链表中,该链表也就相当于 LRU 队列。所有 slabclass 的链表头 尾分表保存在 *heads、*tails 两个数组中。当内存不足时,memcached 会对应大小的 slabclass 维护的双向链表的尾部开始检测,即最近最久未使用的 item,往前一直寻找合适的 item 予以淘汰。所以该 LRU 算法为局部的 slabclass 淘汰机制。但是在一些特定情形也会可能引起一些不必要的麻烦,可以在运行时加入”-M”参数禁止该算法。

Slab 相关操作:

/*
 * 根据给定的大小在 slabclass 数组中找到合适的 slabclass,返回该 slabclass 的下标
 * 如果给定大小为 0,或者大于最大的 slab 的 item 大小,则返回 0
 */
unsigned int slabs_clsid(const size_t size) {
    int res = POWER_SMALLEST;

    if (size == 0)
        return 0;
    while (size > slabclass[res].size)
        if (res++ == power_largest)    /* won’t fit in the biggest slab */
            return 0;// 物件过大,不能直接存放
    return res;
}

 

// 当该 slabcalss 的 slots 空闲链表为空时为指定大小的 item 分配空间:
//size 为给定的 item 大小,id 为对应的 slabclass 编号, 返回 item 指针
static void *do_slabs_alloc(const size_t size, unsigned int id) {
    slabclass_t *p;
    void *ret = NULL;
    item *it = NULL;

    if (id < POWER_SMALLEST || id > power_largest) {//id 编号不合法
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);
        return NULL;
    }

    p = &slabclass[id];// 获取对应的 slabclass
    assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0);

    /* fail unless we have space at the end of a recently allocated page,
      we have something on our freelist, or we could allocate a new page */
 // 如果空闲链表为空且 为该大小的 slab 分配一个 slabcalss(1M)失败
    if (! (p->sl_curr != 0 || do_slabs_newslab(id) != 0)) {
        /* We don’t have more memory available */
        ret = NULL;// 如果所有空间都用尽的时候,则返回 NULL 表示目前资源已经枯竭了。
    } else if (p->sl_curr != 0) {// 不管是原来的空闲 item 或者新分配的 item, 都将挂在到 slots 上
        /* return off our freelist */
        it = (item *)p->slots;// 返回空闲链表的头 item
        p->slots = it->next;
        if (it->next) it->next->prev = 0;
        p->sl_curr–;
        ret = (void *)it;
    }

    if (ret) {
        p->requested += size;
        MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);
    } else {
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
    }

    return ret;
}
// 创建空闲 item,挂载到对应 slabclass 的空闲链表中 
static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
    slabclass_t *p; 
    item *it; 
 
    assert(((item *)ptr)->slabs_clsid == 0); 
    assert(id >= POWER_SMALLEST && id <= power_largest);// 判断 id 有效性 
    if (id < POWER_SMALLEST || id > power_largest) 
        return; 
 
    MEMCACHED_SLABS_FREE(size, id, ptr); 
    p = &slabclass[id]; 
 
    it = (item *)ptr; 
    it->it_flags |= ITEM_SLABBED; 
    it->prev = 0; 
    it->next = p->slots;// 挂载到 slabclass 的空闲链表中 
    if (it->next) it->next->prev = it; 
    p->slots = it; 
 
    p->sl_curr++;// 空闲 item 个数 +1 
    p->requested -= size;// 已经申请到的空间数量更新 
    return; 

 

Item 相关操作:

主要接口函数:

do_item_alloc():

首先调用 slabs_clsid 给 item 归类,然后调用 slabs_alloc()函数分配内存,当可使用空间枯竭了的时候,就开始使用 LRU 算法了,从一个对应大小的尾部 tails[id]开始,向前不断尝试能否释放,当发现一个 item 当前没有被使用 (引用计数 refcount 为 0),且其生存时间已经为 0 或生存时间大于现在时间,就果断的把它释放掉。并再次调用 slabs_alloc(), 作者在此提到有一种非常罕见的 bug 能够使引用计数(refcount) 发生泄漏,处理方法是,当 item 连续保持锁定状态 3 小时以上,说明它已经足够陈旧了,应该果断将其释放。最后再将数据复制到分配的空间内。

 item_free():

 调用 slabs_free 将其释放。

 do_item_link():

 1. 调用 assoc_insert()将 item 指针插入 hash 表中

 2. 调用 get_cas_id()给 it 的 cas_id 赋值。

3. 调用 item_link_q(), 把该 item 插入 LRU 队列的最前面

do_item_unlink ():
1. 调用 assoc_delete()在 hash 表中删除此 item

2. 调用 item_unlink_q()从该 slab class 去除此 item 的连接,此处考虑了 item 在链表头部,尾部等各种情况。

3. 最后当引用计数为 0 的时候,调用 item_free()将其释放。

 do_item_remove ():

减少引用计数 refcount,当发现引用计数为 0 的时候,就将其释放。

 do_item_update ():

先调用 item_unlink_q(), 更新了时间以后,再调用 item_link_q()。将其重新连接到 LRU 队列之中,即让该 item 移到 LRU 队列的最前。

do_item_replace ():

 调用 do_item_unlink()解除原有 item 的连接,再调用 do_item_link()连接到新的 item。

 item_get ():

值得说明的是,memcached 的懒惰删除逻辑在这里有体现。就是当你需要 get 一个 item 的时候才考虑该 item 是否应该删除。首先调用 do_item_get_notedeleted()函数,根据关键字使用 assoc_find()查找 item, 如果没找到,返回 NULL,再判断是否达到最大的生存时间,如果是的话,就 do_item_unlink 该 item,返回 NULL。如果该 item 没有满足删除条件,将其引用计数加 1,并返回该 item 的指针。

具体分析见代码:

//key: 键 nkey:键的长度 falgs:键 falg    exptime: 过期时间(相对)cur_hv 哈希值
item *do_item_alloc(char *key, const size_t nkey, const int flags,
                    const rel_time_t exptime, const int nbytes,
                    const uint32_t cur_hv) {
    uint8_t nsuffix;//key 的后缀长度
    item *it = NULL;
    char suffix[40];// 存放 key 后缀
 // 该 item 所需的总长度
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }

 // 以 item 总长来计算所属的 slabclass
    unsigned int id = slabs_clsid(ntotal);
    if (id == 0)
        return 0;

    mutex_lock(&cache_lock);
    /* do a quick check if we have any expired items in the tail.. */
    int tries = 5;// 执行 5 次尝试检查,(快速,而不是全部都检查一遍)
    /* Avoid hangs if a slab has nothing but refcounted stuff in it. */
 // 将所有过期时间旧于设定的 oldest_live 时间的所有 item 视为过期 item,以避免那些实际没用,但却被一直引用的 item
    int tries_lrutail_reflocked = 1000;
    int tried_alloc = 0;
    item *search;
    item *next_it;
    void *hold_lock = NULL;
    rel_time_t oldest_live = settings.oldest_live;// 对所有 item 设定的 item 最长生存时间

    search = tails[id];//serch 指向该 slabclass 对应的 item 链表尾部
    /* We walk up *only* for locked items. Never searching for expired.
    * Waste of CPU for almost all deployments */
    for (; tries > 0 && search != NULL; tries–, search=next_it) {
        /* we might relink search mid-loop, so search->prev isn’t reliable */
        next_it = search->prev;
        if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
            /* We are a crawler, ignore it. */
            tries++;
            continue;
        }
  // 计算哈希值,用于锁定 serch 的该 item,避免其他调用者增加该 item 的引用计数
        uint32_t hv = hash(ITEM_key(search), search->nkey);
        /* Attempt to hash item lock the “search” item. If locked, no
        * other callers can incr the refcount
        */
        /* Don’t accidentally grab ourselves, or bail if we can’t quicklock */
  // 如果该 item 正好是当前的 item, 则忽略
        if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
            continue;
        /* Now see if the item is refcount locked */
        if (refcount_incr(&search->refcount) != 2) {
            /* Avoid pathological case with ref’ed items in tail */
            do_item_update_nolock(search);
            tries_lrutail_reflocked–;
            tries++;
            refcount_decr(&search->refcount);
            itemstats[id].lrutail_reflocked++;
            /* Old rare bug could cause a refcount leak. We haven’t seen
            * it in years, but we leave this code in to prevent failures
            * just in case */
            if (settings.tail_repair_time &&
                    search->time + settings.tail_repair_time < current_time) {
                itemstats[id].tailrepairs++;
                search->refcount = 1;
                do_item_unlink_nolock(search, hv);
            }
            if (hold_lock)
                item_trylock_unlock(hold_lock);

            if (tries_lrutail_reflocked < 1)
                break;

            continue;
        }

        /* Expired or flushed */
  //// 还没过期,或者已经过期但是最近访问时间小于设定的最大过期时间,并且设定的最大过期时间小于当前时间
        if ((search->exptime != 0 && search->exptime < current_time)
            || (search->time <= oldest_live && oldest_live <= current_time))
      {
            itemstats[id].reclaimed++;
            if ((search->it_flags & ITEM_FETCHED) == 0) {
                itemstats[id].expired_unfetched++;
            }
            it = search;
            slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
            do_item_unlink_nolock(it, hv);// 从哈希表  及 item 链表中删除,递减引用计数
            /* Initialize the item block: */
            it->slabs_clsid = 0;// 注意:slabs_clsid = 0 表明不属于任何 slabclass
        } else if ((it = slabs_alloc(ntotal, id)) == NULL) {
            tried_alloc = 1;
            if (settings.evict_to_free == 0) {
                itemstats[id].outofmemory++;
            } else {
                itemstats[id].evicted++;
                itemstats[id].evicted_time = current_time – search->time;
                if (search->exptime != 0)
                    itemstats[id].evicted_nonzero++;
                if ((search->it_flags & ITEM_FETCHED) == 0) {
                    itemstats[id].evicted_unfetched++;
                }
                it = search;
                slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
                do_item_unlink_nolock(it, hv);
                /* Initialize the item block: */
                it->slabs_clsid = 0;

                /* If we’ve just evicted an item, and the automover is set to
                * angry bird mode, attempt to rip memory into this slab class.
                * TODO: Move valid object detection into a function, and on a
                * “successful” memory pull, look behind and see if the next alloc
                * would be an eviction. Then kick off the slab mover before the
                * eviction happens.
                */
                if (settings.slab_automove == 2)
                    slabs_reassign(-1, id);
            }
        }

        refcount_decr(&search->refcount);
        /* If hash values were equal, we don’t grab a second lock */
        if (hold_lock)
            item_trylock_unlock(hold_lock);
        break;
    }

    if (!tried_alloc && (tries == 0 || search == NULL))
        it = slabs_alloc(ntotal, id);

    if (it == NULL) {
        itemstats[id].outofmemory++;
        mutex_unlock(&cache_lock);
        return NULL;
    }

    assert(it->slabs_clsid == 0);
    assert(it != heads[id]);

    /* Item initialization can happen outside of the lock; the item’s already
    * been removed from the slab LRU.
    */
 // 初始化 item。由于该 item 已不在 lru 队列中,因此可以无锁操作
    it->refcount = 1;    // 引用计数初始化为 1 /* the caller will have a reference */
    mutex_unlock(&cache_lock);
    it->next = it->prev = it->h_next = 0;
    it->slabs_clsid = id;// 其所属 id 已经计算好

    DEBUG_REFCNT(it, ‘*’);
    it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;//
    it->nbytes = nbytes;//
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}

 

// 释放一个 item,挂载到 slots 的头部
void item_free(item *it) {
    size_t ntotal = ITEM_ntotal(it);
    unsigned int clsid;
    assert((it->it_flags & ITEM_LINKED) == 0);
    assert(it != heads[it->slabs_clsid]);
    assert(it != tails[it->slabs_clsid]);
    assert(it->refcount == 0);

    /* so slab size changer can tell later if item is already free or not */
    clsid = it->slabs_clsid;
    it->slabs_clsid = 0;
    DEBUG_REFCNT(it, ‘F’);
    slabs_free(it, ntotal, clsid);
}

 

// 将 item 插入到哈希表  lru 队列
int do_item_link(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
    mutex_lock(&cache_lock);
    it->it_flags |= ITEM_LINKED;
    it->time = current_time;

    STATS_LOCK();
    stats.curr_bytes += ITEM_ntotal(it);
    stats.curr_items += 1;
    stats.total_items += 1;
    STATS_UNLOCK();

    /* Allocate a new CAS ID on link. */
    ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
    assoc_insert(it, hv);// 将该 item 指针插入到 hash 表的对应 bucket 的头部(正常情况下插入到主表,扩容时插入到原表)
    item_link_q(it);// 将该 item 指针插入到对应 slab 的头部
    refcount_incr(&it->refcount);// 递增引用计数
    mutex_unlock(&cache_lock);

    return 1;
}

 

// 将该 item 从哈希表  lru 队列中删除,递减引用计数
void do_item_unlink(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
    mutex_lock(&cache_lock);
    if ((it->it_flags & ITEM_LINKED) != 0) {
        it->it_flags &= ~ITEM_LINKED;
        STATS_LOCK();
        stats.curr_bytes -= ITEM_ntotal(it);
        stats.curr_items -= 1;
        STATS_UNLOCK();
        assoc_delete(ITEM_key(it), it->nkey, hv);// 在哈希表中删除该 item
        item_unlink_q(it);// 在 slab 中删除该 item
        do_item_remove(it);// 如果 item 引用计数减到 0,则释放,挂载到 slots 头部
    }
    mutex_unlock(&cache_lock);
}

 

//item 引用计数为 0,则释放
void do_item_remove(item *it) {
    MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);
    assert(it->refcount > 0);

    if (refcount_decr(&it->refcount) == 0) {
        item_free(it);
    }
}

 

// 注意:避免过于频繁的获取该 item 带来的性能影响,设定最小更新时间 ITEM_UPDATE_INTERVAL:
// 即至少要这么久才更新一次该 item 在 lru 中的位置
// 先从 lru 链表删除,然后更新访问时间,最后在插入到 lru 的头部
void do_item_update(item *it) {
    MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
 // 是否达到更新的要求
    if (it->time < current_time – ITEM_UPDATE_INTERVAL) {
        assert((it->it_flags & ITEM_SLABBED) == 0);

        mutex_lock(&cache_lock);
        if ((it->it_flags & ITEM_LINKED) != 0) {
            item_unlink_q(it);// 从 lru 链表删除
            it->time = current_time;// 更新最近访问时间
            item_link_q(it);// 更新位置:放到 lru 头部
        }
        mutex_unlock(&cache_lock);
    }
}

 

// 以新的 item 替换给定的 item(哈希表  lru)
int do_item_replace(item *it, item *new_it, const uint32_t hv) {
    MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
                          ITEM_key(new_it), new_it->nkey, new_it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);

    do_item_unlink(it, hv);// 将旧的 item 从哈希表  lru 队列去掉
    return do_item_link(new_it, hv);// 将该新的 item 加入哈希表  lru 中
}

 

// 如果该 item 过期则返回 NULL,且同时在哈希表、lru 中删除。(get 中体现的正是惰性删除机制)
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
    //mutex_lock(&cache_lock);
    item *it = assoc_find(key, nkey, hv);// 在哈希表中查找该 item,然后利用该 item 的 clsid 信息去 lru 中查找该 item
    if (it != NULL) {
        refcount_incr(&it->refcount);
        /* Optimization for slab reassignment. prevents popular items from
        * jamming in busy wait. Can only do this here to satisfy lock order
        * of item_lock, cache_lock, slabs_lock. */
        if (slab_rebalance_signal &&
            ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
            do_item_unlink_nolock(it, hv);
            do_item_remove(it);// 递减引用计数
            it = NULL;
        }
    }
    //mutex_unlock(&cache_lock);
    int was_found = 0;

    if (settings.verbose > 2) {
        int ii;
        if (it == NULL) {
            fprintf(stderr, “> NOT FOUND “);
        } else {
            fprintf(stderr, “> FOUND KEY “);
            was_found++;
        }
        for (ii = 0; ii < nkey; ++ii) {
            fprintf(stderr, “%c”, key[ii]);
        }
    }

    if (it != NULL) {
        if (settings.oldest_live != 0 && settings.oldest_live <= current_time &&
            it->time <= settings.oldest_live) {// 超过最大生存时间,则从 lru 中删除
            do_item_unlink(it, hv);
            do_item_remove(it);
            it = NULL;
            if (was_found) {
                fprintf(stderr, ” -nuked by flush”);
            }
        } else if (it->exptime != 0 && it->exptime <= current_time) {// 过期
            do_item_unlink(it, hv);
            do_item_remove(it);
            it = NULL;
            if (was_found) {
                fprintf(stderr, ” -nuked by expire”);
            }
        } else {
            it->it_flags |= ITEM_FETCHED;
            DEBUG_REFCNT(it, ‘+’);
        }
    }

    if (settings.verbose > 2)
        fprintf(stderr, “\n”);

    return it;
}

经过前面的几节分析,到此对于 Memcached 中的内存操作,数据 item 的存储体系即基本存取操作等基础部分的剖析已完成。
接下来将主要分析 Memcached 中的核心部分:I/ O 框架库 Libevent 的具体应用,以及多线程的处理机制等。这部分也正是前段时间刚接触的内容,因此接下来将给出更为详细的讲解。

在前面工作线程初始化的分析中讲到 Memcached 采用典型的 Master_Worker 模式,也即半同步 / 半异步的高效网络并发模式。其中主线程(异步线程)负责接收客户端连接,然后分发给工作线程,具体由工作线程完成客户端的求情任务。

在 memcached 中,主线程负责监听所有 socket 上的事件,当 socket 上有可读事件发生,即新的客户连接求情到来,主线程就接受之得到新的连接 socket,并将该连接 socket 信息放入一个任务对象(CQ_ITEM)结构体中,然后选择一个工作线程,将该 CQ_ITEM 放入该工作线程的任务队列(CQ)中,并通过管道通知工作线程:“我接到一个新的客户端请求,我已经把把放入你的任务队列中了,你赶紧去处理吧!”,后面就由工作线程去完成客户端请求的任务。需要主要的是,在 memcached 中,主线程和每个工作线程都关联一个 Libevent 实例来负责与主线程的通信和处理客户端的任务事件。因此,实际上这里的工作线程也是异步的,每个工作线程处理多个客户端请求任务,这正是由 Libevent 实现的。

具体的半同步 / 半异步模式结构如下图所示:

分布式缓存系统 Memcached

I/ O 框架库 Libevent 基本结构如下图所示:

分布式缓存系统 Memcached

连接请求队列 CQ 结构:

// 连接队列(循环单链表)
// 队列的主要操作:初始化  push  pop
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;// 每个队列维持一个线程锁,因此主线程在向该队列中 push ITEM
 // 时都是要加锁的
};

CQ 队列节点 CQ_ITEM:

// 连接对象 CQ_ITEM
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int              sfd;// 连接 socket
    enum conn_states  init_state;// 所有可能状态组成的结构体
    int              event_flags;
    int              read_buffer_size;
    enum network_transport    transport;// 网络传输方式
    CQ_ITEM          *next;// 指向所在连接队列的下一 ITEM 对象
};

连接队列 CQ 的主要操作:
push 与 pop,即主线程将先到来的连接对象 ITEM push 到 CQ 队列中;工作线程从 CQ 队列中 pop 出一个 ITEM 进行处理。

// 主线程将一个 ITEM 放入到连接连接队列的尾部
static void cq_push(CQ *cq, CQ_ITEM *item) {
    item->next = NULL;
    pthread_mutex_lock(&cq->lock);// 需要加锁
    if (NULL == cq->tail)
        cq->head = item;
    else
        cq->tail->next = item;
    cq->tail = item;
    pthread_mutex_unlock(&cq->lock);
}

 

// 工作线程读取 CQ 队列中头部的 ITEM(同时从队列中删除该 ITEM)
// 注意:读写 CQ 队列都需要加锁,以防工作线程正在读取时,主线程往队列中加入新的 ITEM
static CQ_ITEM *cq_pop(CQ *cq) {
    CQ_ITEM *item;
    pthread_mutex_lock(&cq->lock);//
    item = cq->head;
    if (NULL != item) {
        cq->head = item->next;
        if (NULL == cq->head)
            cq->tail = NULL;
    }
    pthread_mutex_unlock(&cq->lock);

    return item;
}

CQ_ITEM 内存池:

memcached 在申请一个 CQ_ITEM 结构体时,并不是直接使用 malloc 申请的。因为这样做可能会导致大量的内存碎片(作为长期运行的服务器进程 memcached 需要考虑这个问题)。为此,memcached 也为 CQ_ITEM 使用类似内存池的技术:每次在主线程在请求一个空闲 ITEM 时,检测该空闲 ITEM 链表,如果为空,则请于预分配一块内存(默认情况一次分配 64 个 ITEM),分割为多个 ITEM,将第一个返回给调用者使用,剩余的连成空闲 ITEM 链表,以备主线程使用。在工作线程中释放一个 ITEM 时,只需要将 ITEM 放回该空闲链表即可。

空闲链表:static CQ_ITEM *cqi_freelist;

具体实现如下:

// 这里采取的优化方法是,一次性分配 64 个 CQ_ITEM 大小的内存 (即预分配). 
 // 下次调用本函数的时候,直接从之前分配 64 个中要一个即可。
 // 由于是为了防止内存碎片,所以不是以链表的形式放置这 64 个 CQ_ITEM。而是数组的形式。
 // 于是,cqi_free 函数就有点特别了。它并不会真正释放. 而是像内存池那样归还 
static CQ_ITEM *cqi_new(void) {
    // 所有线程都会访问 cqi_freelist 的。所以需要加锁 
    CQ_ITEM *item = NULL; 
    pthread_mutex_lock(&cqi_freelist_lock); 
    if (cqi_freelist) {// 如果空闲链表中还有 ITEM,则之间返回一个 ITEM 给调用者
        item = cqi_freelist; 
        cqi_freelist = item->next; 
    } 
    pthread_mutex_unlock(&cqi_freelist_lock); 
 
    if (NULL == item) {// 没有多余的 CQ_ITEM 了 
        int i; 
 
        item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);// 该宏等于 64 
 
        //item[0] 直接返回为调用者,不用 next 指针连在一起。调用者负责将 
        //item[0].next 赋值为 NULL 
        for (i = 2; i < ITEMS_PER_ALLOC; i++)// 将这块内存分成一个个的 item 并且用 next 指针像链表一样连起来 
            item[i – 1].next = &item[i]; 
 
        pthread_mutex_lock(&cqi_freelist_lock); 
        // 因为主线程负责申请 CQ_ITEM,子线程负责释放 CQ_ITEM。所以 cqi_freelist 此刻 
        // 可能并不等于 NULL。由于使用头插法,所以无论 cqi_freeelist 是否为 NULL,都能 
        // 把链表连起来的。
        item[ITEMS_PER_ALLOC – 1].next = cqi_freelist; 
        cqi_freelist = &item[1]; 
        pthread_mutex_unlock(&cqi_freelist_lock); 
    } 
 
    return item; 

 
 
// 并非释放,而是像内存池那样归还 
static void cqi_free(CQ_ITEM *item) {
    pthread_mutex_lock(&cqi_freelist_lock); 
    item->next = cqi_freelist; 
    cqi_freelist = item;  // 头插法归还 
    pthread_mutex_unlock(&cqi_freelist_lock); 

工作线程:

全局变量:static LIBEVENT_THREAD *threads;

threads 指向所有的工程线程组成的数组,主线程通过该 threads 指针即可遍历所有的工作线程,从该数组中选出一个工作线程,然后通过管道即可实现与工作线程的通信,同时也可以将 CQ_ITEM 放入对应的 CQ 队列中。

其中 Memcached 封装了工作线程结构体,包括了 Libevent 实例对象,以及通信管道描述符等。

//LIBEVENT_THREAD 是 Memcached 内部对工作线程的一个封装
typedef struct {
    pthread_t thread_id;    // 线程 id  /* unique ID of this thread */
    struct event_base *base;  //libevent 的不是线程安全的,每个工作线程持有一个 libevent 实例,用于 pipe 管道通信和 socket 通信 
    struct event notify_event; // 线程的通知事件  /* listen event for notify pipe */
    int notify_receive_fd;    // 通知管道接收端(读)/* receiving end of notify pipe */
    int notify_send_fd;        // 通知管道写端 /* sending end of notify pipe */
    struct thread_stats stats; // 该线程的状态 /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; // 每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中 
    cache_t *suffix_cache;      // 后缀 cache 
    uint8_t item_lock_type;    // 线程操作 hash 表持有的锁类型,有局部锁和全局锁 
} LIBEVENT_THREAD;

工作线程的创建包括由其初始化函数完成,包括:配置 Libevent 实例对象、创建 CQ 队列等过程,然后创建线程,启动事件循环。

下面给初始化函数:

// 工作线程的初始化,创建线程,分别执行 libevent 事件循环:Memcached 采用了典型的 Master-Worker 的线程模式,
//Master 就是由 main 线程来充当,而 Worker 线程则是通过 Pthread 创建的。
void thread_init(int nthreads, struct event_base *main_base) {
    int        i;
    int        power;

    pthread_mutex_init(&cache_lock, NULL);
    pthread_mutex_init(&stats_lock, NULL);

    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);

    pthread_mutex_init(&cqi_freelist_lock, NULL);
    cqi_freelist = NULL;

    /* Want a wide lock table, but don’t waste memory */
    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (nthreads < 5) {
        power = 12;
    } else {
        /* 8192 buckets, and central locks don’t scale much past 5 threads */
        power = 13;
    }

    item_lock_count = hashsize(power);
    item_lock_hashpower = power;

    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror(“Can’t allocate item locks”);
        exit(1);
    }
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }
    pthread_key_create(&item_lock_type_key, NULL);
    pthread_mutex_init(&item_global_lock, NULL);

    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));// 申请 nthreads 个工作线程空间,由全局变量 threads 维护
    if (! threads) {
        perror(“Can’t allocate thread descriptors”);
        exit(1);
    }

    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();
 // 工作线程的初始化, 工作线程和主线程 (main 线程) 是通过 pipe 管道进行通信的
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror(“Can’t create notify pipe”);
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];// 读管道绑定到工作线程的接收消息的描述符 
        threads[i].notify_send_fd = fds[1];// 写管道绑定到工作线程的发送消息的描述符 
  // 为每个线程配置一个 Libevent 实例 和一个 CQ 队列
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we’ve done all the libevent setup. */
 
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);// 创建线程,执行 libevnt 事件循环
    }

    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

上面初始化函数中的:

create_worker 函数中,调用工作线程函数 worker_libevent 启动 Libevent 事件循环:event_base_loop, 处理主线程的管道通知和所分配的客户端请求任务等事件。

而 setup_thread 函数主要负责两种事件的注册:

1、将读管道事件注册到 libevent 上。

2、设置其中读管道事件回调函数为 thread_libevent_process(),该函数负责对读取并解析管道的一个字节的通知信息。然后从连接队列 CQ 的头部取出该 ITEM,为该连接 socket 在该线程的 libeven 上注册事件(调用函数 conn_new 设置),连接 socket 事件的回调的函数也 event_handler(因为实际上主线程也是通过 conn_new 初始化监听 socket 的 libevent 可读事件),而其中 event_handler 函数的核心依然是 memcached 网络事件处理的最核心部分 - drive_machine。drive_machine 将留到后面分析。

并设定该线程为该连接 socket 的服务线程,然后将该 ITEM 放入到 ITEM 内存池空闲链表中。

其中管道事件回调 thread_libevent_process 函数,具体代码如下:

// 回调函数:处理主线程发来的读管道事件,将收到的连接 socket 注册到 libevent 上
//fd:读管道描述符 notify_receive_fd
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

// 首先将管道的 1 个字节通知信号读出
//(这是必须的,在水平触发模式下如果不处理该事件,则会被循环通知,直到事件被处理)
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, “Can’t read from libevent pipe\n”);

 // 解析主线程发送来的通知类容
    switch (buf[0]) {
    case ‘c’:// 从 CQ 队列取出头部 ITEM 进行处理
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
  // 创建连接结构体,并向该线程 libevent 注册该连接 socket 的事件。
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                          item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, “Can’t listen for events on UDP socket\n”);
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, “Can’t listen for events on fd %d\n”,
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;// 指定该连接的服务线程为当前线程
        }
        cqi_free(item);// 将该 ITEM 加入空闲 ITEM 链表(ITEM 内存池),以循环利用
    }
        break;
    /* we were told to flip the lock type and report in */
    case ‘l’:
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    register_thread_initialized();
        break;
    case ‘g’:
    me->item_lock_type = ITEM_LOCK_GLOBAL;
    register_thread_initialized();
        break;
    }
}

上面的 setup_thread 函数将管道事件与连接 socket 事件都注册到了 linevent 上,工作线程的基础设施已经完成。

因此线程函数 worker_libevent 则负责启动事件循环,如下:

// 线程处理函数:执行 libevent 事件循环
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; thread_init() will block until
    * all threads have finished initializing.
    */

    /* set an indexable thread-specific memory item for the lock type.
    * this could be unnecessary if we pass the conn *c struct through
    * all item_lock calls…
    */
  // 默认的 hash 表的锁为局部锁 
    me->item_lock_type = ITEM_LOCK_GRANULAR; 
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);// 设定线程的属性 
    // 用于控制工作线程初始化,通过条件变量来控制 
    register_thread_initialized(); 
    // 工作线程的 libevent 实例启动 
    event_base_loop(me->base, 0);
    return NULL;
}

主线程:

上面是工作过线程的基本工作过程,而主线的的主要工作就是:

1、接受新的客户端连接请求

2、将连接 socket 分发给工作线程,并通知工作线程来处理之。

当然,主要函数也就是我们的 main 函数了。在 main 函数中,主线程创建了属于自己的 Libevent 实例,存放在全局变量 main_base 中。在 main 函数的最后,主线程调用 event_base_loop 进入事件循环中。中间的 server_sockets 函数是创建一个监听客户端的 socket,并将创建一个 event 监听该 socket 的可读事件。

main 函数具体分析如下(仅仅展示 main 函数的基本流程):

int main (int argc, char **argv) {
     
    // 检查 libevent 的版本是否足够新.1.3 即可 
    if (!sanitycheck()) {
        return EX_OSERR; 
    } 
 
    // 对 memcached 的关键设置取默认值 
    settings_init(); 
 
    …// 解析 memcached 启动参数 
  // 是否以守护进程方式运行 memcached
    if (do_daemonize) {
        if (sigignore(SIGHUP) == -1) {
            perror(“Failed to ignore SIGHUP”);
        }
        if (daemonize(maxcore, settings.verbose) == -1) {
            fprintf(stderr, “failed to daemon() in order to daemonize\n”);
            exit(EXIT_FAILURE);
        }
    }
    //main_base 是一个 struct event_base 类型的全局变量 
    main_base = event_init();// 为主线程创建一个 event_base 
  <span style=”white-space:pre”> </span>// 如果以多线程模式运行 memcached,则启动工作者线程
<span style=”white-space:pre”> </span>// 配置,创建并启动多线程模式中的每个工作线程
    thread_init(settings.num_threads, main_base);
    conn_init();// 先不管,后面会说到 
 
    // 创建 settings.num_threads 个 worker 线程,并且为每个 worker 线程创建一个 CQ 队列 
    // 并为这些 worker 申请各自的 event_base,worker 线程然后进入事件循环中   
    thread_init(settings.num_threads, main_base); 
 
    // 设置一个定时 event(也叫超时 event),定时(频率为一秒) 更新 current_time 变量 
    // 这个超时 event 是 add 到全局变量 main_base 里面的,所以主线程负责更新 current_time(这是一个很重要的全局变量) 
    clock_handler(0, 0, 0); 
 
 
    /* create the listening socket, bind it, and init */  // 创建监听套接字,绑定到端口
    if (settings.socketpath == NULL) {
        FILE *portnumber_file = NULL; 
        // 创建监听客户端的 socket 
        if (settings.port && server_sockets(settings.port, tcp_transport,//tcp_transport 是枚举类型 
                                          portnumber_file)) {
            vperror(“failed to listen on TCP port %d”, settings.port); 
            exit(EX_OSERR); 
        } 
 
        … 
    } 
 
 
   
    if (event_base_loop(main_base, 0) != 0) {// 主线程进入事件循环 
        retval = EXIT_FAILURE; 
    } 
  // 完成善后工作
    return retval; 
}

这节主要是分析了 memcached 的网络并发处理模式,且分析了主线程和工作线程协调运行的具体实现。

其中工作线程的初始化函数中主要完成了:

1、基本设施的配置,如设置 libevent 实例等

2、在 libevent 上注册管道读通知事件。回调函数为 thread_libevent_process,该函数负责根据管道通知,执行相关任务,比如下面的 3 也是在该函数中调用其他函数完成的:

3、在 libevent 上注册主线程分配的连接 socket 事件,回调函数通过 conn_new 函数设为 event_handler(其中主要是调用了 memcached 网络事件处理的最核心部分 - drive_machine),实际上主线程也是通过 conn_new 初始化监听 socket 的 libevent 可读事件

通过以上分析,对 memcached 的基本框架已经有了较为深入的理解。对于主线程,在这仅仅简单介绍了其基本执行流程,其具体设计细节将留到下节分析。

前两节中对工作线程的工作流程做了较为详细的分析,现把其主要流程总结为下图:

分布式缓存系统 Memcached

接下来本节主要分析主线程相关的函数设计,主函数 main 的基本流程如下图所示:

分布式缓存系统 Memcached

对于主线程中的工作线程的初始化到启动所有的工作线程前面已经做了分析,后面的创建监听 socket、注册监听 socket 的 libevent 事件、启动主线程的 libevent 事件循环,就是接下来的内容了。

其中主要调用的函数是 server_sockets,该函数从配置参数 setting.inner 字符串中依次提取出一个 ip 或者一个 hostname(一个 hostname 可能有多个 ip),然后传给函数 server_socket 函数处理之。server_socket 函数负责完成创建 socket, 绑定到端口,监听 socket, 并将该监听 socket 对应的 conn 结构(调用函数 conn_new,在该函数中将监听 socket 注册到主线程 libevent main_base 上,回调函数为 event_handler,其核心部分是 drive_machine,这与工作线程是一致的),然后将该 conn 放入监听队列(conn *listen_conn)中。监听 socket 上接收到的客户连接的 conn 将放到连接队列 conn **conns 中。最后在 main 函数中启动 libevent 事件循环。还是图来的直观,如下:

分布式缓存系统 Memcached

static int server_sockets(int port, enum network_transport transport, 
                          FILE *portnumber_file) {//<span style=”color: rgb(0, 130, 0); font-family: Consolas, ‘Courier New’, Courier, mono, serif; font-size: 11.8518514633179px; line-height: 18px;”>port 是默认的 11211 或者用户使用 - p 选项设置的端口号 </span>
 
    //settings.inter 里面可能有多个 IP 地址. 如果有多个那么将用逗号分隔 
    char *b; 
    int ret = 0; 
    // 复制一个字符串,避免下面的 strtok_r 函数修改 (污染) 全局变量 settings.inter 
    char *list = strdup(settings.inter); 
 
    // 这个循环主要是处理多个 IP 的情况 
    for (char *p = strtok_r(list, “;,”, &b); 
        p != NULL; // 分割出一个个的 ip, 使用分号; 作为分隔符 
        p = strtok_r(NULL, “;,”, &b)) {
        int the_port = port; 
        char *s = strchr(p, ‘:’);// 启动的可能使用 -l ip:port 参数形式 
        //ip 后面接着端口号,即指定 ip 的同时也指定了该 ip 的端口号 
        // 此时采用 ip 后面的端口号,而不是采用 - p 指定的端口号 
        if (s != NULL) {
            *s = ‘\0’;// 截断后面的端口号,使得 p 指向的字符串只是一个 ip 
            ++s; 
            if (!safe_strtol(s, &the_port)) {// 非法端口号参数值 
                return 1; 
            } 
        } 
        if (strcmp(p, “*”) == 0) {
            p = NULL; 
        } 
        // 处理其中一个 IP。有 p 指定 ip(或者 hostname) 
        ret |= server_socket(p, the_port, transport, portnumber_file); 
    } 
    free(list); 
    return ret; 

 
 
static conn *listen_conn = NULL;// 监听队列(可能要同时监听多个 IP)

 

 
 //interface 是一个 ip、hostname 或者 NULL。这个 ip 字符串后面没有端口号。端口号由参数 port 指出 
static int server_socket(const char *interface, 
                        int port, 
                        enum network_transport transport, 
                        FILE *portnumber_file) {
    int sfd; 
    struct linger ling = {0, 0}; 
    struct addrinfo *ai; 
    struct addrinfo *next; 
    struct addrinfo hints = {.ai_flags = AI_PASSIVE, 
                              .ai_family = AF_UNSPEC }; 
    char port_buf[NI_MAXSERV]; 
    int success = 0; 
    int flags =1; 
 
    hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM; 
 
 
    snprintf(port_buf, sizeof(port_buf), “%d”, port); 
    getaddrinfo(interface, port_buf, &hints, &ai); 
 
    // 如果 interface 是一个 hostname 的话,那么可能就有多个 ip 
    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add; 
 
        // 创建一个套接字,然后设置为非阻塞的 
        sfd = new_socket(next);// 调用 socket 函数 
        bind(sfd, next->ai_addr, next->ai_addrlen); 
 
        success++; 
        listen(sfd, settings.backlog); 
 
  // 函数 conn_new 中将监听套接字 fd 注册到 main_base 上,并设定回调函数为 event_handler,其中核心为 drive_machine 函数,这与工作线程是一致的
        if (!(listen_conn_add = conn_new(sfd, conn_listening, 
                                        EV_READ | EV_PERSIST, 1, 
                                        transport, main_base))) {
            fprintf(stderr, “failed to create listening connection\n”); 
            exit(EXIT_FAILURE); 
        } 
 
        // 将要监听的多个 conn 放到一个监听队列里面 
        listen_conn_add->next = listen_conn; 
        listen_conn = listen_conn_add; 
 
    } 
 
    freeaddrinfo(ai); 
 
    /* Return zero iff we detected no errors in starting up connections */ 
    return success == 0; 

 
 
static int new_socket(struct addrinfo *ai) {
    int sfd; 
    int flags; 
    sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); 
    flags = fcntl(sfd, F_GETFL, 0); 
    fcntl(sfd, F_SETFL, flags | O_NONBLOCK); 
 
    return sfd; 

主线程为每一个监听 socket 和接收到的每一个客户端连接 socket 都分配一个 conn 结构体,用于管理该 socket 的各种状态信息等。需要注意的是的,memcached 并不是对每一个 socket 分别创建分配一个 conn 结构,而是在初始化时一次性分配若干(跟审定的允许的同时最大数量的客户端连接数有关)个 conn 结构的指针(注意不是 conn 结构体,因为每一个 conn 结构是比较大的,因此如果直接分配若干个 conn 结构需要占用较大空间),在都确实需要一个 conn 结构时,再从预分配的指针数组中取用一个,并实际为该指针分配空间,完成具体的初始化等。这与前面的 CQ_ITEM 内存池是一致的——按配置预分配若干,按需取用,循环利用。避免内存碎片,提高性能。

其中函数 conn_init 负责预分配设置的若干个 conn 的指针,由一个 conn** 指针维护。函数 conn_new 则在确实需要一个 conn 时从 conn** 维护的数组中取得一个 conn*, 并完成实际的空间分配等。

具体分析如下:

函数 conn_init:

conn **conns; //conn 数组指针
static void conn_init(void) {
    /* We’re unlikely to see an FD much higher than maxconns. */ 
    // 已经 dup 返回当前未使用的最小正整数,所以 next_fd 等于此刻已经消耗了的 fd 个数 
    int next_fd = dup(1);// 获取当前已经使用的 fd 的个数 
    // 预留一些文件描述符。也就是多申请一些 conn 结构体。以免有别的需要把文件描述符 
    // 给占了。导致 socket fd 的值大于这个数组长度 
    int headroom = 10;// 预留一些文件描述符  /* account for extra unexpected open FDs */ 
    struct rlimit rl; 
 
    //settings.maxconns 的默认值是 1024. 
    max_fds = settings.maxconns + headroom + next_fd; 
 
    /* But if possible, get the actual highest FD we can possibly ever see. */ 
    if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
        max_fds = rl.rlim_max; 
    } else {
        fprintf(stderr, “Failed to query maximum file descriptor; ” 
                        “falling back to maxconns\n”); 
    } 
 
    close(next_fd);//next_fd 只是用来计数的,并没有其他用途 
 
    // 注意,申请的 conn 结构体数量是比 settings.maxconns 这个客户端同时在线数 
    // 还要大的。因为 memcached 是直接用 socket fd 的值作为数组下标的。也正是 
    // 这个原因,前面需要使用 headroom 预留一些空间给突发情况 
    if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) {// 注意是 conn 指针不是 conn 结构体 
        fprintf(stderr, “Failed to allocate connection structures\n”); 
        /* This is unrecoverable so bail out early. */ 
        exit(1); 
    } 
}

函数 conn_new:

// 为 sfd 分配一个 conn 结构体,并且为这个 sfd 建立一个 event,然后注册到 event_base 上。
conn *conn_new(const int sfd, enum conn_states init_state,//init_state 值为 conn_listening 
                const int event_flags, 
                const int read_buffer_size, enum network_transport transport, 
                struct event_base *base) {
    conn *c; 
 
    assert(sfd >= 0 && sfd < max_fds); 
    c = conns[sfd];// 直接使用下标 
 
    if (NULL == c) {// 之前没有哪个连接用过这个 sfd 值,需要申请一个 conn 结构体 
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
            fprintf(stderr, “Failed to allocate connection object\n”); 
            return NULL; 
        } 
     
        …// 初始化一些成员变量 
 
        c->sfd = sfd; 
        conns[sfd] = c; // 将这个结构体交由 conns 数组管理 
    } 
 
    …// 初始化另外一些成员变量 
    c->state = init_state;// 值为 conn_listening 
 
    // 等同于 event_assign,会自动关联 current_base。event 的回调函数是 event_handler 
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c); 
    event_base_set(base, &c->event); 
    c->ev_flags = event_flags; 
 
    if (event_add(&c->event, 0) == -1) {
        perror(“event_add”); 
        return NULL; 
    } 
 
    return c; 

从上可以看到,实际上所有的 conn 从预分配到实际分配,都是有 conn** 指针维护的。只需要通过判断该数组中的某元素 conn 指针是否为空:等于 NULL 即没被实际占用,处于空闲状态,反之,已经被一个实际的 socket fd 占用。

至此,主线程所有的准备工作已经就绪,接下来就是真正的客户端连接事件的处理了:

回调函数 event_handler(工作线程的注册事件的回调函数也是它):

event_handler 本身是简单的,其核心是 drive_machine 函数(一个有限状态机,负责处理所有的客户逻辑)。

 

void event_handler(const int fd, const short which, void *arg) {
    conn *c; 
 
    c = (conn *)arg; 
    assert(c != NULL); 
 
    c->which = which; 
    if (fd != c->sfd) {
        conn_close(c); 
        return; 
    } 
 
    drive_machine(c); 
    return; 

其中的 drive_machine 还是比较复杂的,接下来将分几个小节的内容,对此细细道来。

上节已经分析到了主线程中监听 socket 注册事件和工作线程中连接 socket 注册事件的回调函数都是 event_handler,且 event_handler 的核心部分都是一个有限状态机:drive_machine。因此接下来将对该状态机具体的业务处理进行深入的剖析。

memcached 将每个 socket 都封装为一个 conn 结构体,该结构体包含了比如 socket 的文件描述符 sfd、注册事件 event、连接状态结构体 conn_states,等等诸多信息字段,其中的状态结构:conn_states 中包含了该 socket 的各种状态。而状态机 drive_machine 正是通过该状态结构来判断该 socket 当前所处的具体状态,从而进行业务逻辑处理的。

其中连接状态结构体如下:

//socket 的可能状态组成的结构体
enum conn_states {
    conn_listening,  // 监听状态 /**< the socket which listens for connections */
    conn_new_cmd,    // 为下一个连接做准备 /**< Prepare connection for next command */
    conn_waiting,    // 等待读取一个数据包 /**< waiting for a readable socket */
    conn_read,      // 读取网络数据 /**< reading in a command line */
    conn_parse_cmd,  // 解析缓冲区数据 /**< try to parse a command from the input buffer */
    conn_write,      // 简单的回复数据 /**< writing out a simple response */
    conn_nread,      // 读取固定字节的网络数据 /**< reading in a fixed number of bytes */
    conn_swallow,    // 处理不需要的写缓冲区的数据 /**< swallowing unnecessary bytes w/o storing */
    conn_closing,    // 关闭连接 /**< closing this connection */
    conn_mwrite,    // 顺序写入多个 item 数据  /**< writing out many items sequentially */
    conn_closed,    // 连接已关闭 /**< connection is closed */
    conn_max_state  // 最大状态,断言使用 /**< Max state value (used for assertion) */
};

接下来看下 drive_machine 的概貌吧,其中主要就是一个 while 循环以处理各状态的业务逻辑:

// 监听套接字和 连接套接字 事件回调函数的核心部分:
// 有限状态机:根据套接字的状态 conn_sattes 执行对应的操作
static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int nreqs = settings.reqs_per_event;
    int res;
    const char *str;

    assert(c != NULL);
 // 因为状态间存在转化或跳变等,因此需要循环,直到确定 stop 为止
    while (!stop) {

  // 对套接字的各种状态,进行对应业务处理
        switch(c->state) {
        case conn_listening:// 监听状态
            addrlen = sizeof(addr);

   //
   //
   //
    // 主线程进入状态机之后执行 accept 操作,这个操作也是非阻塞的。
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
          // 连接失败
            if (sfd == -1) {
   //
   //
             
            }
   // 连接成功,则将连接 socket 设为非阻塞
            if (!use_accept4) {
                if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                    perror(“setting O_NONBLOCK”);
                    close(sfd);
                    break;
                }
            }

   // 如果超过最大连接数(根据全局状态结构的记录判断),则需要关闭连接
            if (settings.maxconns_fast &&
                stats.curr_conns + stats.reserved_fds >= settings.maxconns – 1) {
                //
    //
            } else {// 如果没有超载,则直接分发(UDP,不需要建立连接,直接分发)工作线程
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                    DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;

   
        case conn_waiting:

        case conn_read:
           
        case conn_parse_cmd :
           
        case conn_nread:
  // 以及其他各种状态
           
  return;
  }
}

本小节要着重分析的是第一个状态 conn_listening:

该状态是主线程监听 socket 的业务处理:监听套接字,接受,并将得到的连接 socket 分发给选中的某个工作线程。

 switch(c->state) {
        case conn_listening:// 监听状态
            addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
            if (use_accept4) {
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
            } else {
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            }
#else
  // 主线程进入状态机之后执行 accept 操作,这个操作也是非阻塞的。
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
            if (sfd == -1) {
                if (use_accept4 && errno == ENOSYS) {
                    use_accept4 = 0;
                    continue;
                }
                perror(use_accept4 ? “accept4()” : “accept()”);
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    /* these are transient, so don’t log anything */
                    stop = true;
                } else if (errno == EMFILE) {// 连接超载
                    if (settings.verbose > 0)
                        fprintf(stderr, “Too many open connections\n”);
                    accept_new_conns(false);
                    stop = true;
                } else {
                    perror(“accept()”);
                    stop = true;
                }
                break;
            }
   // 连接成功,则将连接 socket 设为非阻塞
            if (!use_accept4) {
                if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                    perror(“setting O_NONBLOCK”);
                    close(sfd);
                    break;
                }
            }

   // 如果超过设置的同时在线最大连接数(默认为 1024)(根据全局状态结构的记录判断),则需要关闭连接
            if (settings.maxconns_fast &&
                stats.curr_conns + stats.reserved_fds >= settings.maxconns – 1) {
                str = “ERROR Too many open connections\r\n”;
                res = write(sfd, str, strlen(str));
                close(sfd);
                STATS_LOCK();
                stats.rejected_conns++;
                STATS_UNLOCK();
            } else {// 如果没有超载,则直接分发(UDP,不需要建立连接,直接分发)工作线程
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                    DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;
        }

其中工作线程的选择采用轮询(round-robin)方式。连接 socket 的派发函数是 dispath_conn_new:

// 主线程在监听套接字的回调函数中,当有新连接到来时,调用该函数将接受到的新连接 socket 分发给工作线程
// 注意:由于 UDP 不需要建立连接,所以直接分发给 Worker 线程
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                      int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();// 从 CQ_ITEM 资源池中取得一个空闲 ITEM
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let’s try */
        fprintf(stderr, “Failed to allocate memory for connection object\n”);
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads;// 通过 round-robin 算法选择一个线程

    LIBEVENT_THREAD *thread = threads + tid;// 缓存这次选中的线程

    last_thread = tid;// 更新最近一次选中的线程编号

 // 设置 CQ_ITEM 的各字段
    item->sfd = sfd;//sfd 是连接 socket
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

 // 主线程将 item 投递到选中的工作线程的 ITEM 连接队列中
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = ‘c’;
  // 管道通知:在 Worker 线程的 notify_send_fd 写入字符 c,表示有连接   
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror(“Writing to thread notify pipe”);
    }
}

可以看到,在该派发函数中首先从 CQ_ITEM 资源池(空闲链表)中提取一个 ITEM,并设置为该连接 socket 的各字段信息,然后以采用轮询方式选择一个工作线程,再将该 ITEM 放入该工作线程的连接任务队列 CQ 中,最后通过通知管道的写端,写入通知信息。接下来就是前面已经分析过的工作线程来负责处理该连接 socket 的所有业务了。

整个状态机的基本流程如下图所示,后续分析将按该流程来进行。

分布式缓存系统 Memcached

接上节分解,主线程将接收的连接 socket 分发给了某工作线程,然后工作线程从任务队列中取出该连接 socket 的 CQ_ITEM,开始处理该连接的所有业务逻辑。这个过程也就是上图中的第一个状态 conn_listening。而工作线程首先进入的状态就是 conn_new_cmd,即为这个新的连接做一些准备工作,如清理该连接 conn 结构的读缓冲区等。

准备状态 conn_new_cmd 具体分析如下:

{
  <span style=”font-size:18px;”>case conn_new_cmd:// 为新连接准备:各种清理重置工作
            /* Only process nreqs at a time to avoid starving other
              connections */
            –nreqs;// 记录每个 libevent 实例处理的最大事件数,通过初始启动参数配置
            if (nreqs >= 0) {// 还可以处理请求
                reset_cmd_handler(c);// 缩小缓冲区,转为解析读缓冲区数据的状态,然后转为等待读取网络数据包状态
            } else {// 拒绝请求
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.conn_yields++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                if (c->rbytes > 0) {// 读缓冲区中有数据了,即表明已经读入了数据,因此不再通知读事件
                    /* We have already read in data into the input buffer,
                      so libevent will most likely not signal read events
                      on the socket (unless more data is available. As a
                      hack we should just put in a request to write data,
                      because that should be possible ;-)
                    */
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {// 更新 event 为写事件,并重新注册到 event_bvase
                        if (settings.verbose > 0)
                            fprintf(stderr, “Couldn’t update event\n”);
                        conn_set_state(c, conn_closing);// 关闭连接
                        break;
                    }
                }
                stop = true;
            }
            break;</span>

}

其中整理缓冲区函数 reset_cmd_handler 函数:首先调用函数 conn_shrink 缩小该 conn 的各种缓冲区,然后进入解析状态,解析读缓冲区中未解析的字节,进而转为等待读数据状态(当读缓冲区中没有数据处理时,即进入等待状态)。

具体分析如下:

static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    c->substate = bin_no_state;
    if(c->item != NULL) {
        item_remove(c->item);// 删除 item
        c->item = NULL;
    }
    conn_shrink(c);// 整理缓冲区
    if (c->rbytes > 0) {// 缓冲区还有字节未解析
        conn_set_state(c, conn_parse_cmd);// 转换为解析状态
    } else {// 缓冲区没有数据
        conn_set_state(c, conn_waiting);// 转为 等待读取一个数据包 状态,状态机没有数据要处理,就进入等待状态 
    }
}

其中调用函数 conn_shrink,来缩小各缓冲区:

static void conn_shrink(conn *c) {
    assert(c != NULL);

    if (IS_UDP(c->transport))// 如果是 UDP 协议,不牵涉缓冲区管理
        return;
 // 读缓冲区空间大小 >READ_BUFFER_HIGHWAT && 还没解析的数据小于 DATA_BUFFER_SIZE
    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
        char *newbuf;

        if (c->rcurr != c->rbuf)// 如果已经解析了部分数据
            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);// 把读缓冲区中的未解析数据向前移动,已覆盖掉已解析的内容

  // 重新分配 DATA_BUFFER_SIZE 大小的空间作为读缓冲区
        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);

        if (newbuf) {
            c->rbuf = newbuf;
            c->rsize = DATA_BUFFER_SIZE;
        }
        /* TODO check other branch… */
        c->rcurr = c->rbuf;// 以解析数据被覆盖,因此剩下的全部未解析
    }

 //// 需要写出(发往客户端)的 item 的数量
    if (c->isize > ITEM_LIST_HIGHWAT) {
        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
        if (newbuf) {
            c->ilist = newbuf;
            c->isize = ITEM_LIST_INITIAL;
        }
    /* TODO check error condition? */
    }
 // 待发送的消息个数,memcached 发送消息是通过 sendmsg 批量发送
    if (c->msgsize > MSG_LIST_HIGHWAT) {
        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
        if (newbuf) {
            c->msglist = newbuf;
            c->msgsize = MSG_LIST_INITIAL;
        }
    /* TODO check error condition? */
    }
 // 一次性顺序写多个 item??
    if (c->iovsize > IOV_LIST_HIGHWAT) {
        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
        if (newbuf) {
            c->iov = newbuf;
            c->iovsize = IOV_LIST_INITIAL;
        }
    /* TODO check return value */
    }
}

由准备状态 conn_new_cmd,如果读缓冲区还有未解析数据,则进入解析状态 conn_parse_cmd,按协议解析读取到的网络数据。如果没有待处理数据,则进入等待状态 conn_waiting。

memcached 采用二进制协议和文本协议两种网络协议,解析时,根据具体的协议解析,然后进入具体命令状态,执行相应具体的操作如:SET GET 等待。

解析状态 conn_parse_cmd:

// 解析读缓冲区中的数据
        case conn_parse_cmd :
   // 如果缓冲区中有完整的命令行,则读取之,否则继续转为等待状态
            if (try_read_command(c) == 0) {// 缓冲区中没有一条完成的命令行,则需要更多的数据,因此继续等待客户端发来数据
                /* wee need more data! */
                conn_set_state(c, conn_waiting);// 继续进入等待状态
            }

            break;

解析缓冲区中的一条完整命令:

// 解析缓冲区数据
static int try_read_command(conn *c) {
    assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));
    assert(c->rbytes > 0);

    if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
            c->protocol = binary_prot;// 二进制协议
        } else {
            c->protocol = ascii_prot;// 文本协议
        }

        if (settings.verbose > 1) {
            fprintf(stderr, “%d: Client using the %s protocol\n”, c->sfd,
                    prot_text(c->protocol));
        }
    }
 // 采用二进制协议
    if (c->protocol == binary_prot) {
      // 如果二进制协议读取的数据小于二进制协议头部长度,则需要继续读取数据
        if (c->rbytes < sizeof(c->binary_header)) {
            /* need more data! */
            return 0;
        } else {
#ifdef NEED_ALIGN
   // 则按 8 字节对齐,提高 CPU 读取的效率
            if (((long)(c->rcurr)) % 8 != 0) {
              // 调整缓冲区
                memmove(c->rbuf, c->rcurr, c->rbytes);
                c->rcurr = c->rbuf;
                if (settings.verbose > 1) {
                    fprintf(stderr, “%d: Realign input buffer\n”, c->sfd);
                }
            }
#endif
            protocol_binary_request_header* req;// 二进制协议头部
            req = (protocol_binary_request_header*)c->rcurr;

          //…
  //….
 
        // 解析二进制协议数据,根据解析结果进行具体操作,如 GET SET 等
            dispatch_bin_command(c);

            c->rbytes -= sizeof(c->binary_header);// 更新已读取到的字节数
            c->rcurr += sizeof(c->binary_header);// 更新缓冲区的路标信息
        }
    } else {// 文本协议
 
        //…
 //….
 
  // 根据文本协议解析结果,执行具体操作如 SET GET。
        process_command(c, c->rcurr);

     
    }

    return 1;
}

二进制协议处理函数:dispatch_bin_command。根据二进制协议解析的结果,处理具体的(比如 get,set 等)操作(进入相应的操作命令状态)。文本协议操作类似。
具体的命令如 SET,GET,DELETE 等操作放到后面讲解。

/ 根据二进制协议解析的结果,处理具体的(比如 get,set 等)操作
static void dispatch_bin_command(conn *c) {
  //…
  //…
 
    switch (c->cmd) {
    case PROTOCOL_BINARY_CMD_SETQ: //SET 命令
        c->cmd = PROTOCOL_BINARY_CMD_SET;
        break;
    case PROTOCOL_BINARY_CMD_ADDQ: //ADD 命令
        c->cmd = PROTOCOL_BINARY_CMD_ADD;
        break;
    //…
    case PROTOCOL_BINARY_CMD_DELETEQ:  //DELETE 命令
        c->cmd = PROTOCOL_BINARY_CMD_DELETE;
        break;
  //…
  //…
    }

    switch (c->cmd) {
     
    //…
    //….
      case PROTOCOL_BINARY_CMD_GETQ:  /* FALLTHROUGH */
        case PROTOCOL_BINARY_CMD_GET:  /* FALLTHROUGH */
        case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
        case PROTOCOL_BINARY_CMD_GETK:
            if (extlen == 0 && bodylen == keylen && keylen > 0) {
                bin_read_key(c, bin_reading_get_key, 0);  // 在该函数中:conn_set_state(c, conn_nread),进入读状态,读取指定数目的数据
            } else {
                protocol_error = 1;
            }
            break;
     
    if (protocol_error)
        handle_binary_protocol_error(c);
 }
}

解析完完成后,就该进入具体的命令操作了,如 SET  GET  等待。具体就后续分解

case bin_read_set_value: 
        complete_update_bin(c);// 执行 Update 操作 
        break; 
case bin_reading_get_key: 
        process_bin_get(c);// 执行 get 操作 
        break;

当缓冲区中没有可解析的数据时,则进入等待状态。

等待状态 conn_waiting:

{
// 进入等待状态
        case conn_waiting:
   // 更新 libevent 中对该连接 socket 注册的事件为读事件,再重新注册。以等待客户端发数据到读缓冲区
            if (!update_event(c, EV_READ | EV_PERSIST)) {// 注册为永久事件,直到下次更新该 event 事件
                if (settings.verbose > 0)
                    fprintf(stderr, “Couldn’t update event\n”);
                conn_set_state(c, conn_closing);
                break;
            }

            conn_set_state(c, conn_read);// 转为读状态
            stop = true;
            break;
}

其中函数 update_event:注意,每次转为读状态,或写状态时,都要更新该连接 socket 在该工作线程 libevent 实例中注册的事件 event,然后再从新注册回 libevent。

且每次都注册为 EV_PERSIST 持久事件,直到下次更新该 event。

具体更新过程如下:

// 更新 event,再重新注册到 event_base 中
static bool update_event(conn *c, const int new_flags) {
    assert(c != NULL);

    struct event_base *base = c->event.ev_base;
    if (c->ev_flags == new_flags)
        return true;
    if (event_del(&c->event) == -1) return false;
    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = new_flags;
    if (event_add(&c->event, 0) == -1) return false;
    return true;
}

切换 conn 状态的函数:

// 切换状态:将 conn 的状态设为 state
static void conn_set_state(conn *c, enum conn_states state) {
    assert(c != NULL);
    assert(state >= conn_listening && state < conn_max_state);// 检验状态合法性

    if (state != c->state) {
        if (settings.verbose > 2) {
            fprintf(stderr, “%d: going from %s to %s\n”,
                    c->sfd, state_text(c->state),
                    state_text(state));
        }

        if (state == conn_write || state == conn_mwrite) {
            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
        }
        c->state = state;// 设置为新的状态
    }
}

当连接 socket 读事件就绪时,就进入读状态,读取网络数据,存入读缓冲区。
读状态 conn_read:

case conn_read: 
        res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);// 判断采用 UDP 协议还是 TCP 协议 
 
        switch (res) 
        {
        case READ_NO_DATA_RECEIVED:// 未读取到数据 
            conn_set_state(c, conn_waiting);// 继续等待 
            break; 
        case READ_DATA_RECEIVED:// 读取数据 
            conn_set_state(c, conn_parse_cmd);// 开始解析数据 
            break; 
        case READ_ERROR:// 读取发生错误 
            conn_set_state(c, conn_closing);// 关闭连接 
            break; 
        case READ_MEMORY_ERROR: // 申请内存空间错误,继续尝试 
            break; 
        } 
        break; 

若采用 TCP 协议,从网络读取数据,其中调用函数 read():

static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;// 记录从新分配缓冲区空间的次数,每次空间增倍
    assert(c != NULL);

 // 如果原缓冲区中有部分数据已解析,则用未解析数据覆盖以解析部分
    if (c->rcurr != c->rbuf) {
        if (c->rbytes != 0) /* otherwise there’s nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }

    while (1) {// 循环读取数据
        if (c->rbytes >= c->rsize) {
            if (num_allocs == 4) {// 如果分配了四次,缓冲区空间还是不够,则返回
                return gotdata;
            }
            ++num_allocs;
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);// 重分配 2 倍空间
            if (!new_rbuf) {// 分配空间失败,则进入关闭连接状态
                STATS_LOCK();
                stats.malloc_fails++;// 全局状态
                STATS_UNLOCK();
                if (settings.verbose > 0) {
                    fprintf(stderr, “Couldn’t realloc input buffer\n”);
                }
                c->rbytes = 0; /* ignore what we read */
                out_of_memory(c, “SERVER_ERROR out of memory reading request”);
                c->write_and_go = conn_closing;
                return READ_MEMORY_ERROR;
            }
            c->rcurr = c->rbuf = new_rbuf;
            c->rsize *= 2;
        }

        int avail = c->rsize – c->rbytes;// 可用空间大小 = 总空间 - 未解析空间
  // 执行网络读取,这个是非阻塞的读
        res = read(c->sfd, c->rbuf + c->rbytes, avail);// 从套接字中读取数据,存入读缓冲区中,存放在原来未解析数据的后面
        if (res > 0) {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.bytes_read += res;// 更新线程状态
            pthread_mutex_unlock(&c->thread->stats.mutex);
            gotdata = READ_DATA_RECEIVED; // 已读取到数据
            c->rbytes += res;
            if (res == avail) {// 最多读取到 avail 个,如果已经读到了,则可以尝试继续读取
                continue;
            } else {
                break;
            }
        }
        if (res == 0) {// 表示已经断开网络连接了 
            return READ_ERROR;
        }
        if (res == -1) {// 因为是非阻塞的,所以会返回下面的两个错误码
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }// 如果返回为负数,且不是上面两个数,则表示发生了其他错误,返回 READ_ERROR 
            return READ_ERROR;
        }
    }
    return gotdata;// 返回读取结果
}

采用 UDP 是数据报的形式时,每次读取到的都是一个完整的数据报形式。

函数 try_read_udp:

//UDP 读取网络数据 
static enum try_read_result try_read_udp(conn *c) 
{
int res; 
 
assert(c != NULL); 
 
c->request_addr_size = sizeof(c->request_addr); 
res = recvfrom(c->sfd, c->rbuf, c->rsize, 0, &c->request_addr, 
        &c->request_addr_size);// 执行 UDP 的网络读取 
if (res > 8)//UDP 数据包大小大于 8,已经有可能是业务数据包 
{
    unsigned char *buf = (unsigned char *) c->rbuf; 
    pthread_mutex_lock(&c->thread->stats.mutex); 
    c->thread->stats.bytes_read += res;// 更新每个线程的统计数据 
    pthread_mutex_unlock(&c->thread->stats.mutex); 
 
    /* Beginning of UDP packet is the request ID; save it. */ 
    c->request_id = buf[0] * 256 + buf[1];//UDP 为了防止丢包,增加了确认字段 
 
    /* If this is a multi-packet request, drop it. */ 
    if (buf[4] != 0 || buf[5] != 1)// 一些业务的特征信息判断 
    {
        out_string(c, “SERVER_ERROR multi-packet request not supported”); 
        return READ_NO_DATA_RECEIVED; 
    } 
 
    /* Don’t care about any of the rest of the header. */ 
    res -= 8; 
    memmove(c->rbuf, c->rbuf + 8, res);// 调整缓冲区 
 
    c->rbytes = res;// 更新信息 
    c->rcurr = c->rbuf; 
    return READ_DATA_RECEIVED; 

return READ_NO_DATA_RECEIVED; 

到此状态机中的主要状态就分析得差不多了,剩下的其他状态主要是一系列具体命令操作,如 SET、GET、DELETE 等,这些正是根据对客户端数据解析的结果所进入的状态,后面将继续分析这些命令的执行过程。

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计79115字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中