阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Apache kafka原理与特性(0.8V)

112次阅读
没有评论

共计 28870 个字符,预计需要花费 73 分钟才能阅读完成。

前言: Kafka 是一个轻量级的 / 分布式的 / 具备 replication 能力的日志采集组件, 通常被集成到应用系统中, 收集 ” 用户行为日志 ” 等, 并可以使用各种消费终端 (consumer) 将消息转存到 HDFS 等其他结构化数据存储系统中. 因为日志消息通常为文本数据, 尺寸较小, 且对实时性以及数据可靠性要求不严格, 但是需要日志存储端具备较高的数据吞吐能力, 这种 ” 宽松 ” 的设计要求, 非常适合使用 kafka。

一. 入门

1.1 简介

Kafka 是一个 ” 分布式的 ”/” 可分区的(partitioned)”/” 基于备份的(replicated)”/” 基于 commit-log 存储 ” 的服务. 它提供了类似于 JMS 的特性, 但是在设计实现上完全不同, 此外它并不是 JMS 规范的实现.

kafka 消息是根据 Topic 进行归类, 发送消息者成为 Producer, 消息接收者成为 Consumer; 此外 kafka 集群有多个 kafka 实例组成, 每个实例 (server) 称为 broker.

无论是 kafka 集群, 还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性以及保存一些 meta 信息.

Apache kafka 原理与特性(0.8V)

(摘自官网)

其中 client 与 server 的通讯, 都是基于 TCP, 而且消息协议非常轻量级.

Topics/logs

一个 Topic 可以认为是一类消息, 每个 topic 将被分成多个 partition(区), 每个 partition 在存储层面是 append log 文件. 任何发布到此 partition 的消息都会直接追加到 log 文件的尾部, 每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型数字, 它唯一的标记一条消息.kafka 并没有提供其他额外的索引机制来存储 offset, 因为在 kafka 中几乎不允许对消息进行 ” 随机读 - 写 ”, 一旦消息写入 log 日志之后, 将不能被修改.

Apache kafka 原理与特性(0.8V)

(摘自官网)

kafka 和 JMS 实现 (activeMQ) 不同的是: 即使消息被消费, 消息仍然不会被立即删除. 日志文件将会根据 broker 中的配置要求, 保留一定的时间之后删除; 比如 log 文件保留 2 天, 那么两天后, 文件会被清除, 无论其中的消息是否被消费.kafka 通过这种简单的手段, 来释放磁盘空间. 此外,kafka 的性能并不会因为日志文件的太多而低下, 所以即使保留较多的 log 文件, 也不不会有问题.

对于 consumer 而言, 它需要保存消费消息的 offset, 对于 offset 的保存和使用, 有 consumer 来控制; 当 consumer 正常消费消息时,offset 将会 ” 线性 ” 的向前驱动, 即消息将依次顺序被消费. 事实上 consumer 可以使用任意顺序消费消息, 它只需要将 offset 重置为任意值..(offset 将会保存在 zookeeper 中, 参见下文)

kafka 集群几乎不需要维护任何 consumer 和 producer 状态信息, 这些信息有 zookeeper 保存; 因此 producer 和 consumer 的客户端实现非常轻量级, 它们可以随意离开, 而不会对集群造成额外的影响.

partitions 的设计目的有多个. 最根本原因是 kafka 基于文件存储. 通过分区, 可以将日志内容分散到多个 server 上, 来避免文件尺寸达到单机磁盘的上限, 每个 partiton 都会被当前 server(kafka 实例)保存; 可以将一个 topic 切分多任意多个 partitions(备注: 基于 sharding), 来消息保存 / 消费的效率. 此外越多的 partitions 意味着可以容纳更多的 consumer, 有效提升并发消费的能力.(具体原理参见下文).

Distribution

一个 Topic 的多个 partitions, 被分布在 kafka 集群中的多个 server 上; 每个 server(kafka 实例)负责 partitions 中消息的读写操作; 此外 kafka 还可以配置每个 partition 需要备份的个数(replicas), 每个 partition 将会被备份到多台机器上, 以提高可用性.[replicas 特性在 0.8V 才支持]

基于 replicated 方案, 那么就意味着需要对多个备份进行调度; 一个 partition 可以在多个 server 上备份, 那么其中一个 server 作为此 partiton 的 leader;leader 负责此 partition 所有的读写操作, 如果 leader 失效, 那么将会有其他 follower 来接管(成为新的 leader);follower 只是单调的和 leader 跟进, 同步消息即可.. 由此可见作为 leader 的 server 承载了全部的请求压力, 因此从集群的整体考虑, 有多少个 partitions 就意味着有多少个 ”leader”,kafka 会将 ”leader” 均衡的分散在每个实例上, 来确保整体的性能稳定.[备注:kafka 中将 leader 角色权限下放到 partition 这个层级]

Apache kafka 原理与特性(0.8V)

kafka-cluster

Producers

Producer 将消息发布到指定的 Topic 中, 同时 Producer 也能决定将此消息发送到哪个 partition; 如果一个 Topic 有多个 partitions 时, 你需要选择 partition 是算法, 比如基于 ”round-robin” 方式或者通过其他的一些算法等. 无论如何选择 partition 路由算法, 我们最直接的目的就是希望消息能够均匀的发送给每个 partition, 这样可以让 consumer 消费的消息量也能 ” 均衡 ”.

Consumers

本质上 kafka 只支持 Topic. 每个 consumer 属于一个 consumer group; 反过来说, 每个 group 中可以有多个 consumer. 对于 Topic 中的一条特定的消息, 只会被订阅此 Topic 的每个 group 中的一个 consumer 消费, 此消息不会发送给一个 group 的多个 consumer; 那么一个 group 中所有的 consumer 将会交错的消费整个 Topic.

如果所有的 consumer 都具有相同的 group, 这种情况和 JMS queue 模式很像; 消息将会在 consumers 之间负载均衡.

如果所有的 consumer 都具有不同的 group, 那这就是 ” 发布 - 订阅 ”; 消息将会广播给所有的消费者.

Apache kafka 原理与特性(0.8V)

(摘自官网)

在 kafka 中, 一个 partition 中的消息只会被 group 中的一个 consumer 消费(同一时刻); 每个 group 中 consumer 消息消费互相独立; 我们可以认为一个 group 是一个 ” 订阅 ” 者, 一个 Topic 中的每个 partions, 只会被一个 ” 订阅者 ” 中的一个 consumer 消费, 不过一个 consumer 可以同时消费多个 partitions 中的消息.kafka 只能保证一个 partition 中的消息被某个 consumer 消费时是顺序的. 事实上, 从 Topic 角度来说, 当有多个 partitions 时, 消息仍不是全局有序的.

通常情况下, 一个 group 中会包含多个 consumer, 这样不仅可以提高 topic 中消息的并发消费能力, 而且还能提高 ” 故障容错 ” 性, 如果 group 中的某个 consumer 失效, 那么其消费的 partitions 将会有其他 consumer 自动接管.

kafka 的设计原理决定, 对于一个 topic, 同一个 group 中不能有多于 partitions 个数的 consumer 同时消费, 否则将意味着某些 consumer 将无法得到消息.

Guarantees

1) 发送到 partitions 中的消息将会按照它接收的顺序追加到日志中, 无论一个 partition 由多少个 log 文件构成, 那么它发送给 consumer 的顺序是一定的.

2) 对于消费者而言, 它们消费消息的顺序和日志中消息顺序一致.

3) 如果 Topic 的 ”replication factor” 为 N, 那么允许 N - 1 个 kafka 实例失效. 只要有一个 replication 存活, 那么此 partition 的读写操作都不会中断.

1.2 Use cases

Messaging

和一些常规的消息系统相比,kafka 仍然是个不错的选择; 它具备 partitons/replication 和容错, 可以使 kafka 具有良好的扩展性和性能优势. 不过到目前为止, 我们应该很清楚认识到,kafka 并没有提供 JMS 中的 ” 事务性 ”” 消息传输担保(消息确认机制)”” 消息分组 ” 等企业级特性;kafka 只能使用作为 ” 常规 ” 的消息系统, 在一定程度上, 尚未确保消息的发送与接收绝对可靠(比如, 消息重发, 消息发送丢失等)

Websit activity tracking

kafka 可以作为 ” 网站活性跟踪 ” 的最佳工具; 可以将网页 / 用户操作等信息发送到 kafka 中. 并实时监控, 或者离线统计分析等.

Log Aggregation

kafka 的特性决定它非常适合作为 ” 日志收集中心 ”;application 可以将操作日志 ” 批量 ”” 异步 ” 的发送到 kafka 集群中, 而不是保存在本地或者 DB 中;kafka 可以批量提交消息 / 压缩消息等, 这对 producer 端而言, 几乎感觉不到性能的开支. 此时 consumer 端可以使 Hadoop 等其他系统化的存储和分析系统.

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

二. 设计原理

kafka 的设计初衷是希望做为一个统一的信息收集平台, 能够实时的收集反馈信息, 并需要能够支撑较大的数据量, 且具备良好的容错能力.

1.Persistence

kafka 使用文件存储消息 (append only log), 这就直接决定 kafka 在性能上严重依赖文件系统的本身特性. 且无论任何 OS 下, 对文件系统本身的优化是非常艰难的. 文件缓存 / 直接内存映射等是常用的手段. 因为 kafka 是对日志文件进行 append 操作, 因此磁盘检索的开支是较小的; 同时为了减少磁盘写入的次数,broker 会将消息暂时 buffer 起来, 当消息的个数(或尺寸) 达到一定阀值时, 再 flush 到磁盘, 这样减少了磁盘 IO 调用的次数. 对于 kafka 而言, 较高性能的磁盘, 将会带来更加直接的性能提升.

2.Efficiency

需要考虑的影响性能点很多, 除磁盘 IO 之外, 我们还需要考虑网络 IO, 这直接关系到 kafka 的吞吐量问题.kafka 并没有提供太多高超的技巧; 对于 producer 端, 可以将消息 buffer 起来, 当消息的条数达到一定阀值时, 批量发送给 broker; 对于 consumer 端也是一样, 批量 fetch 多条消息. 不过消息量的大小可以通过配置文件来指定. 对于 kafka broker 端, 似乎有个 sendfile 系统调用可以潜在的提升网络 IO 的性能: 将文件的数据映射到系统内存中,socket 直接读取相应的内存区域即可, 而无需进程再次 copy 和交换(这里涉及到 ” 磁盘 IO 数据 ”/” 内核内存 ”/” 进程内存 ”/” 网络缓冲区 ”, 多者之间的数据 copy).

其实对于 producer/consumer/broker 三者而言,CPU 的开支应该都不大, 因此启用消息压缩机制是一个良好的策略; 压缩需要消耗少量的 CPU 资源, 不过对于 kafka 而言, 网络 IO 更应该需要考虑. 可以将任何在网络上传输的消息都经过压缩.kafka 支持 gzip/snappy 等多种压缩方式.

3. Producer

Load balancing

kafka 集群中的任何一个 broker, 都可以向 producer 提供 metadata 信息, 这些 metadata 中包含 ” 集群中存活的 servers 列表 ”/”partitions leader 列表 ” 等信息(请参看 zookeeper 中的节点信息). 当 producer 获取到 metadata 信心之后, producer 将会和 Topic 下所有 partition leader 保持 socket 连接; 消息由 producer 直接通过 socket 发送到 broker, 中间不会经过任何 ” 路由层 ”. 事实上, 消息被路由到哪个 partition 上, 有 producer 客户端决定. 比如可以采用 ”random””key-hash”” 轮询 ” 等, 如果一个 topic 中有多个 partitions, 那么在 producer 端实现 ” 消息均衡分发 ” 是必要的. 在 producer 端的配置文件中, 开发者可以指定 partition 路由的方式.

Asynchronous send

将多条消息暂且在客户端 buffer 起来, 并将他们批量发送到 broker; 小数据 IO 太多, 会拖慢整体的网络延迟, 批量延迟发送事实上提升了网络效率; 不过这也有一定的隐患, 比如当 producer 失效时, 那些尚未发送的消息将会丢失.

4.Consumer

consumer 端向 broker 发送 ”fetch” 请求, 并告知其获取消息的 offset; 此后 consumer 将会获得一定条数的消息;consumer 端也可以重置 offset 来重新消费消息.[备注:offset, 消息偏移量,integer 值,broker 可以根据 offset 来决定消息的起始位置]

在 JMS 实现中,Topic 模型基于 push 方式, 即 broker 将消息推送给 consumer 端. 不过在 kafka 中, 采用了 pull 方式, 即 consumer 在和 broker 建立连接之后, 主动去 pull(或者说 fetch)消息; 这中模式有些优点, 首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理, 且可以控制消息消费的进度(offset); 此外, 消费者可以良好的控制消息消费的数量,batch fetch.

其他 JMS 实现, 消息消费的位置是有 prodiver 保留, 以便避免重复发送消息或者将没有消费成功的消息重发等, 同时还要控制消息的状态. 这就要求 JMS broker 需要太多额外的工作. 在 kafka 中,partition 中的消息只有一个 consumer 在消费, 且不存在消息状态的控制, 也没有复杂的消息确认机制, 可见 kafka broker 端是相当轻量级的. 当消息被 consumer 接收之后,consumer 可以在本地保存最后消息的 offset, 并间歇性的向 zookeeper 注册 offset. 由此可见,consumer 客户端也很轻量级.

这就意味着,kafka 中 consumer 负责维护消息的消费记录, 而 broker 则不关心这些, 这种设计不仅提高了 consumer 端的灵活性, 也适度的减轻了 broker 端设计的复杂度; 这是和众多 JMS prodiver 的区别. 此外,kafka 中消息 ACK 的设计也和 JMS 有很大不同,kafka 中的消息时批量 (通常以消息的条数或者 chunk 的尺寸为单位) 发送给 consumer, 当消息消费成功后, 向 zookeeper 提交消息的 offset, 而不会向 broker 交付 ACK. 或许你已经意识到, 这种 ” 宽松 ” 的设计, 将会有 ” 丢失 ” 消息 /” 消息重发 ” 的危险.

5.Message Delivery Semantics

对于 JMS 实现, 消息传输担保非常直接: 有且只有一次(exactly once). 在 kafka 中稍有不同, 对于 consumer 而言:

1) at most once: 最多一次, 这个和 JMS 中 ” 非持久化 ” 消息类似. 发送一次, 无论成败, 将不会重发.

2) at least once: 消息至少发送一次, 如果消息未能接受成功, 可能会重发, 直到接收成功.

3) exactly once: 消息只会发送一次.

at most once: 消费者 fetch 消息, 然后保存 offset, 然后处理消息; 当 client 保存 offset 之后, 但是在消息处理过程中 consumer 进程失效(crash), 导致部分消息未能继续处理. 那么此后可能其他 consumer 会接管, 但是因为 offset 已经提前保存, 那么新的 consumer 将不能 fetch 到 offset 之前的消息(尽管它们尚没有被处理), 这就是 ”at most once”.

at least once: 消费者 fetch 消息, 然后处理消息, 然后保存 offset. 如果消息处理成功之后, 但是在保存 offset 阶段 zookeeper 异常或者 consumer 失效, 导致保存 offset 操作未能执行成功, 这就导致接下来再次 fetch 时可能获得上次已经处理过的消息, 这就是 ”at least once”.

exactly once: kafka 中并没有严格的去实现(基于 2 阶段提交, 事务), 我们认为这种策略在 kafka 中是没有必要的.

因为 ” 消息消费 ” 和 ” 保存 offset” 这两个操作的先后时机不同, 导致了上述 3 种情况, 通常情况下 ”at-least-once” 是我们搜选.(相比 at most once 而言, 重复接收数据总比丢失数据要好).

Apache kafka 原理与特性(0.8V)

6. Replication

kafka 中,replication 策略是基于 partition, 而不是 topic;kafka 将每个 partition 数据复制到多个 server 上, 任何一个 partition 有一个 leader 和多个 follower(可以没有); 备份的个数可以通过 broker 配置文件来设定.leader 处理所有的 read-write 请求,follower 需要和 leader 保持同步.Follower 就像一个 ”consumer”, 消费消息并保存在本地日志中;leader 负责跟踪所有的 follower 状态, 如果 follower” 落后 ” 太多或者失效,leader 将会把它从 replicas 同步列表中删除. 当所有的 follower 都将一条消息保存成功, 此消息才被认为是 ”committed”, 那么此时 consumer 才能消费它, 这种同步策略, 就要求 follower 和 leader 之间必须具有良好的网络环境. 即使只有一个 replicas 实例存活, 仍然可以保证消息的正常发送和接收, 只要 zookeeper 集群存活即可.(备注: 不同于其他分布式存储, 比如 hbase 需要 ” 多数派 ” 存活才行)

kafka 判定一个 follower 存活与否的条件有 2 个:1) follower 需要和 zookeeper 保持良好的链接 2) 它必须能够及时的跟进 leader, 不能落后太多. 如果同时满足上述 2 个条件, 那么 leader 就认为此 follower 是 ” 活跃的 ”. 如果一个 follower 失效 (server 失效) 或者落后太多,leader 将会把它从同步列表中移除[备注: 如果此 replicas 落后太多, 它将会继续从 leader 中 fetch 数据, 直到足够 up-to-date, 然后再次加入到同步列表中;kafka 不会更换 replicas 宿主! 因为 ” 同步列表 ” 中 replicas 需要足够快, 这样才能保证 producer 发布消息时接受到 ACK 的延迟较小].

当 leader 失效时, 需在 followers 中选取出新的 leader, 可能此时 follower 落后于 leader, 因此需要选择一个 ”up-to-date” 的 follower.kafka 中 leader 选举并没有采用 ” 投票多数派 ” 的算法, 因为这种算法对于 ” 网络稳定性 ”/” 投票参与者数量 ” 等条件有较高的要求, 而且 kafka 集群的设计, 还需要容忍 N - 1 个 replicas 失效. 对于 kafka 而言, 每个 partition 中所有的 replicas 信息都可以在 zookeeper 中获得, 那么选举 leader 将是一件非常简单的事情. 选择 follower 时需要兼顾一个问题, 就是新 leader server 上所已经承载的 partition leader 的个数, 如果一个 server 上有过多的 partition leader, 意味着此 server 将承受着更多的 IO 压力. 在选举新 leader, 需要考虑到 ” 负载均衡 ”,partition leader 较少的 broker 将会更有可能成为新的 leader.

在整几个集群中, 只要有一个 replicas 存活, 那么此 partition 都可以继续接受读写操作.

7.Log

如果一个 topic 的名称为 ”my_topic”, 它有 2 个 partitions, 那么日志将会保存在 my_topic_0 和 my_topic_1 两个目录中; 日志文件中保存了一序列 ”log entries”(日志条目), 每个 log entry 格式为 ”4 个字节的数字 N 表示消息的长度 ” + “N 个字节的消息内容 ”; 每个日志都有一个 offset 来唯一的标记一条消息,offset 的值为 8 个字节的数字, 表示此消息在此 partition 中所处的起始位置.. 每个 partition 在物理存储层面, 有多个 log file 组成(称为 segment).segment file 的命名为 ” 最小 offset”.kafka. 例如 ”00000000000.kafka”; 其中 ” 最小 offset” 表示此 segment 中起始消息的 offset.

Apache kafka 原理与特性(0.8V)

(摘自官网)

其中每个 partiton 中所持有的 segments 列表信息会存储在 zookeeper 中.

当 segment 文件尺寸达到一定阀值时(可以通过配置文件设定, 默认 1G), 将会创建一个新的文件; 当 buffer 中消息的条数达到阀值时将会触发日志信息 flush 到日志文件中, 同时如果 ” 距离最近一次 flush 的时间差 ” 达到阀值时, 也会触发 flush 到日志文件. 如果 broker 失效, 极有可能会丢失那些尚未 flush 到文件的消息. 因为 server 意外失效, 仍然会导致 log 文件格式的破坏(文件尾部), 那么就要求当 server 启东是需要检测最后一个 segment 的文件结构是否合法并进行必要的修复.

获取消息时, 需要指定 offset 和最大 chunk 尺寸,offset 用来表示消息的起始位置,chunk size 用来表示最大获取消息的总长度(间接的表示消息的条数). 根据 offset, 可以找到此消息所在 segment 文件, 然后根据 segment 的最小 offset 取差值, 得到它在 file 中的相对位置, 直接读取输出即可.

日志文件的删除策略非常简单: 启动一个后台线程定期扫描 log file 列表, 把保存时间超过阀值的文件直接删除(根据文件的创建时间). 为了避免删除文件时仍然有 read 操作(consumer 消费), 采取 copy-on-write 方式.

8.Distribution

kafka 使用 zookeeper 来存储一些 meta 信息, 并使用了 zookeeper watch 机制来发现 meta 信息的变更并作出相应的动作(比如 consumer 失效, 触发负载均衡等)

1) Broker node registry: 当一个 kafka broker 启动后, 首先会向 zookeeper 注册自己的节点信息(临时 znode), 同时当 broker 和 zookeeper 断开连接时, 此 znode 也会被删除.

格式: /broker/ids/[0…N]  –>host:port; 其中 [0..N] 表示 broker id, 每个 broker 的配置文件中都需要指定一个数字类型的 id(全局不可重复),znode 的值为此 broker 的 host:port 信息.

2) Broker Topic Registry: 当一个 broker 启动时, 会向 zookeeper 注册自己持有的 topic 和 partitions 信息, 仍然是一个临时 znode.

格式: /broker/topics/[topic]/[0…N]  其中 [0..N] 表示 partition 索引号.

3) Consumer and Consumer group: 每个 consumer 客户端被创建时, 会向 zookeeper 注册自己的信息; 此作用主要是为了 ” 负载均衡 ”.

一个 group 中的多个 consumer 可以交错的消费一个 topic 的所有 partitions; 简而言之, 保证此 topic 的所有 partitions 都能被此 group 所消费, 且消费时为了性能考虑, 让 partition 相对均衡的分散到每个 consumer 上.

4) Consumer id Registry: 每个 consumer 都有一个唯一的 ID(host:uuid, 可以通过配置文件指定, 也可以由系统生成), 此 id 用来标记消费者信息.

格式: /consumers/[group_id]/ids/[consumer_id]

仍然是一个临时的 znode, 此节点的值为{“topic_name”:#streams…}, 即表示此 consumer 目前所消费的 topic + partitions 列表.

5) Consumer offset Tracking: 用来跟踪每个 consumer 目前所消费的 partition 中最大的 offset.

格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]  –>offset_value

此 znode 为持久节点, 可以看出 offset 跟 group_id 有关, 以表明当 group 中一个消费者失效, 其他 consumer 可以继续消费.

6) Partition Owner registry: 用来标记 partition 正在被哪个 consumer 消费. 临时 znode

格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]  –>consumer_node_id

此节点表达了 ” 一个 partition” 只能被 group 下一个 consumer 消费, 同时当 group 下某个 consumer 失效, 那么将会触发负载均衡(即: 让 partitions 在多个 consumer 间均衡消费, 接管那些 ” 游离 ” 的 partitions)

当 consumer 启动时, 所触发的操作:

A) 首先进行 ”Consumer id Registry”;

B) 然后在 ”Consumer id Registry” 节点下注册一个 watch 用来监听当前 group 中其他 consumer 的 ”leave” 和 ”join”; 只要此 znode path 下节点列表变更, 都会触发此 group 下 consumer 的负载均衡.(比如一个 consumer 失效, 那么其他 consumer 接管 partitions).

C) 在 ”Broker id registry” 节点下, 注册一个 watch 用来监听 broker 的存活情况; 如果 broker 列表变更, 将会触发所有的 groups 下的 consumer 重新 balance.

Consumer 均衡算法

当一个 group 中, 有 consumer 加入或者离开时, 会触发 partitions 均衡. 均衡的最终目的, 是提升 topic 的并发消费能力.

1) 假如 topic1, 具有如下 partitions: P0,P1,P2,P3

2) 加入 group 中, 有如下 consumer: C0,C1

3) 首先根据 partition 索引号对 partitions 排序: P0,P1,P2,P3

4) 根据 consumer.id 排序: C0,C1

5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size, 本例值 M =2(向上取整)

6) 然后依次分配 partitions: C0 = [P0,P1],C1=[P2,P3], 即 Ci = [P(i * M),P((i + 1) * M -1)]

总结:

1) Producer 端使用 zookeeper 用来 ” 发现 ”broker 列表, 以及和 Topic 下每个 partition leader 建立 socket 连接并发送消息.

2) Broker 端使用 zookeeper 用来注册 broker 信息, 已经监测 partition leader 存活性.

3) Consumer 端使用 zookeeper 用来注册 consumer 信息, 其中包括 consumer 消费的 partition 列表等, 同时也用来发现 broker 列表, 并和 partition leader 建立 socket 连接, 并获取消息.

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-09/107388p2.htm

三. 主要配置

1.Broker 主要配置

##broker 标识,cluster 中, 此 ID 必须唯一
broker.id=0
## 接受 consumer/producer 的链接端口
port=9092
## 用来维护集群状态, 以及 consumer 消费记录
##consumer 和 broker 必须接入到同一个 zk 环境中.
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=30000
##broker 所能接受的消息的最大尺寸
##producer 不能发布更大尺寸的 message
messages.max.bytes=1000000
##broker 在处理 client 请求是, 允许开启的线程个数. 默认为 3.
num.network.threads=3
## 用于磁盘 IO 操作的线程的个数, 默认为 8, 建议和磁盘的个数保持一致
num.io.threads=8
## 允许入队的最大请求数,” 数据操作请求 ” 首先加入队列, 等待 IO 线程
## 进行磁盘操作获取数据, 数据操作结束后, 请求被移除队列并由 network
## 线程响应给 client 端. 此参数用于控制 ” 等待 IO 处理的请求数 ”.
queued.max.requests=500
#socket 调优参数: sendBuffer (SO_SNDBUF)
socket.send.buffer.bytes=1048576
##socket 调优参数:receiveBuffer (SO_RCVBUFF)
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
#################Log##########
log.dirs=/tmp/kafka-logs
## 每个 topic 的分区数.
##kafka 的特点就在于 ” 分区 ”, 每个 Topic 被拆分成多个 partitions
##partitions 可以被 sharding 到多个 broker 上, 以提高并发能力和 ” 可用性 ”
num.partitions=2
##log 文件片段的最大尺寸, 每个 partition(逻辑上) 的数据都会被写入到磁盘的
##log 文件中(append only), 此参数用于控制单个文件的大小.
## 1024*1024*1024,1G
##log.segment.bytes=

##log 文件 ”sync” 到磁盘之前累积的消息条数
## 因为磁盘 IO 操作是一个慢操作, 但又是一个 ” 数据可靠性 ” 的必要手段
## 所以此参数的设置, 需要在 ” 数据可靠性 ” 与 ” 性能 ” 之间做必要的权衡.
## 如果此值过大, 将会导致每次 ”fsync” 的时间较长(IO 阻塞)
## 如果此值过小, 将会导致 ”fsync” 的次数较多, 这也意味着整体的 client 请求有一定的延迟.
## 物理 server 故障, 将会导致没有 fsync 的消息丢失.
## 默认值为 10000
log.flush.interval.messages=10000
## 仅仅通过 interval 来控制消息的磁盘写入时机, 是不足的.
## 此参数用于控制 ”fsync” 的时间间隔, 如果消息量始终没有达到阀值, 但是离上一次磁盘同步的时间间隔
## 达到阀值, 也将触发.
log.flush.interval.ms=1000
# 对某些特定的 topic 而言, 重写 log.flush.interval.messages 属性
##log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

######################
## 是否自动创建 topic
## 如果 broker 中没有 topic 的信息, 当 producer/consumer 操作 topic 时, 是否自动创建.
## 如果为 false, 则只能通过 API 或者 command 创建 topic
auto.create.topics.enable=true
##partition leader 与 replicas 之间通讯时,socket 的超时时间
controller.socket.timeout.ms=30000
##partition leader 与 replicas 数据同步时, 消息的队列尺寸.
controller.message.queue.size=10
##partitions 的 ”replicas” 个数, 不得大于集群中 broker 的个数
default.replication.factor=1
##partition Leader 和 follower 通讯时, 如果在此时间内, 没有收到 follower 的 ”fetch 请求 ”
##leader 将会认为 follower” 失效 ”, 将不会与其同步消息.[follower 主动跟随 leader, 并请求同步消息]
replica.lag.time.max.ms=10000
## 如果 follower 落后与 leader 太多, 将会认为此 follower[或者说 partition relicas] 已经失效
## 通常, 在 follower 与 leader 通讯时, 因为网络延迟或者链接断开, 总会导致 replicas 中消息同步滞后
## 如果消息之后太多,leader 将认为此 follower 网络延迟较大或者消息吞吐能力有限, 将会把此 replicas 迁移
## 到其他 follower 中.
## 在 broker 数量较少, 或者网络不足的环境中, 建议提高此值.
replica.lag.max.messages=4000
##follower 与 leader 之间的 socket 超时时间
replica.socket.timeout.ms=30000
##1024*1024,follower 每次 fetch 数据的最大尺寸
## 没有意义的参数
replica.fetch.max.bytes=1048576
## 当 follower 的 fetch 请求发出后, 等待 leader 发送数据的时间.
## 超时后, 将会重新 fetch.
replica.fetch.wait.max.ms=500
##fetch 的最小数据尺寸, 如果 leader 中尚未同步的数据不足此值, 将会阻塞, 直到满足条件
replica.fetch.min.bytes=1
##follower 中开启的 fetcher 线程数, 增加此值可以提高数据同步到速度, 但也额外的增加了 leader 的 IO 负荷.
num.replica.fetchers=1
###########################
## 检测 log 文件的时间间隔
log.cleanup.interval.mins=1
##log 文件被保留的时长, 如果超过此时长, 将会被清除, 无论 log 中的消息是否被消费过.
log.retention.hours=168

2.Consumer 主要配置

## 当前消费者的 group 名称, 需要指定
## 消息的消费进度, 是根据 group 来划定的
group.id=
##consumer 作为 zookeeper client, 需要通过 zk 保存一些 meta 信息,
## 比如 consumer 消费的消息 offset 等.
## 必须和 broker 使用同样的 zk 配置
zookeeper.connect=hostname1:port,hostname2:port2
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
## 当前 consumer 的标识, 可以设定, 也可以有系统生成.
## 主要用来跟踪消息消费情况, 便于观察
conusmer.id=
## 获取消息的最大尺寸,broker 不会像 consumer 输出大于此值的消息 chunk
## 每次 feth 将得到多条消息, 此值为总大小
## 提升此值, 将会消耗更多的 consumer 端内存
fetch.messages.max.bytes=1048576
##broker 发送给 consumer 的最小数据尺寸, 如果消息尺寸不足, 将会等待, 直到满足.
fetch.min.bytes=1
## 当消息的尺寸不足时,server 阻塞的时间, 如果超时, 消息将立即发送给 consumer.
fetch.wait.max.ms=100
queued.max.message.chunks=10
## 当有新的 consumer 加入到 group 时, 将会 reblance, 此后将会有 partitions 的消费端迁移到新
## 的 consumer 上, 如果一个 consumer 获得了某个 partition 的消费权限, 那么它将会向 zk 注册
##”Partition Owner registry” 节点信息, 但是有可能此时旧的 consumer 尚没有释放此节点,
## 此值用于控制, 注册节点的重试次数.
rebalance.max.retries=4
## 当 consumer 消费一定量的消息之后, 将会自动向 zookeeper 提交 offset 信息
## 注意 offset 信息并不是每消费一次消息就向 zk 提交一次, 而是现在本地保存(内存), 并定期提交
auto.commit.enable=true
## 自动提交的时间间隔, 默认为 1 分钟.
auto.commit.interval.ms=60*1000

3.Producer 主要配置

## 对于开发者而言, 需要通过 broker.list 指定当前 producer 需要关注的 broker 列表
##producer 通过和每个 broker 链接, 并获取 partitions,
## 如果某个 broker 链接失败, 将导致此上的 partitons 无法继续发布消息
## 格式:host1:port,host2:port2, 其中 host:port 需要参考 broker 配置文件.
## 对于 producer 而言没有使用 zookeeper 自动发现 broker 列表,非常奇怪。(0.8V 和 0.7 有区别)
metadata.broker.list=
##producer 接收消息 ack 的时机. 默认为 0.
##0: producer 不会等待 broker 发送 ack
##1: 当 leader 接收到消息之后发送 ack
##2: 当所有的 follower 都同步消息成功后发送 ack.
request.required.acks=0
## 在向 producer 发送 ack 之前,broker 允许等待的最大时间
## 如果超时,broker 将会向 producer 发送一个 error ACK. 意味着上一次消息因为某种
## 原因未能成功(比如 follower 未能同步成功)
request.timeout.ms=10000
##producer 消息发送的模式, 同步或异步.
## 异步意味着消息将会在本地 buffer, 并适时批量发送
## 默认为 sync, 建议 async
producer.type=sync
## 消息序列化类, 将消息实体转换成 byte[]
serializer.class=kafka.serializer.DefaultEncoder
key.serializer.class=${serializer.class}
##partitions 路由类, 消息在发送时将根据此实例的方法获得 partition 索引号.
## 默认为消息的 hashcode % partitions 个数
partitioner.class=kafka.producer.DefaultPartitioner

## 消息压缩算法,none,gzip,snappy
compression.codec=none
## 消息在 producer 端 buffer 的条数. 仅在 producer.type=async 下有效
batch.num.messages=200
## 在 async 模式下, 当 message 被缓存的时间超过此值后, 将会批量发送给 broker
## 此值和 batch.num.messages 协同工作.
queue.buffering.max.ms=5000
## 在 async 模式下,producer 端允许 buffer 的最大消息量
## 无论如何,producer 都无法尽快的将消息发送给 broker, 从而导致消息在 producer 端大量沉积
## 此时, 如果消息的条数达到阀值, 将会导致 producer 端阻塞或者消息被抛弃.
queue.buffering.max.messages=10000
## 当消息在 producer 端沉积的条数达到 ”queue.buffering.max.meesages” 后
## 阻塞一定时间后, 队列仍然没有 enqueue(producer 仍然没有发送出任何消息)
## 此时 producer 可以继续阻塞或者将消息抛弃, 此 timeout 值用于控制 ” 阻塞 ” 的时间
##-1: 无阻塞超时限制, 消息不会被抛弃
##0: 立即清空队列, 消息被抛弃
queue.enqueue.timeout.ms=-1
## 当 producer 接收到 error ACK, 或者没有接收到 ACK 时, 允许消息重发的次数
## 因为 broker 并没有完整的机制来避免消息重复, 所以当网络异常时(比如 ACK 丢失)
## 有可能导致 broker 接收到重复的消息.
message.send.max.retries=3
##producer 刷新 topic metada 的时间间隔
##producer 需要知道 partition leader 的位置, 以及当前 topic 的情况
## 因此 producer 需要一个机制来获取最新的 metadata, 当 producer 遇到特定错误时, 将会立即刷新
##(比如 topic 失效,partition 丢失,leader 失效等), 此外也可以通过此参数来配置额外的刷新机制
topic.metadata.refresh.interval.ms=600000

 broker 配置文件请参考: kafka.server.KafkaConfig
consumer 配置文件请参考: kafka.consumer.ConsumerConfig
producer 配置文件请参考: kafka.producer.ProducerConfig

【kafka 部署与实践】http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里

前言: Kafka 是一个轻量级的 / 分布式的 / 具备 replication 能力的日志采集组件, 通常被集成到应用系统中, 收集 ” 用户行为日志 ” 等, 并可以使用各种消费终端 (consumer) 将消息转存到 HDFS 等其他结构化数据存储系统中. 因为日志消息通常为文本数据, 尺寸较小, 且对实时性以及数据可靠性要求不严格, 但是需要日志存储端具备较高的数据吞吐能力, 这种 ” 宽松 ” 的设计要求, 非常适合使用 kafka。

一. 入门

1.1 简介

Kafka 是一个 ” 分布式的 ”/” 可分区的(partitioned)”/” 基于备份的(replicated)”/” 基于 commit-log 存储 ” 的服务. 它提供了类似于 JMS 的特性, 但是在设计实现上完全不同, 此外它并不是 JMS 规范的实现.

kafka 消息是根据 Topic 进行归类, 发送消息者成为 Producer, 消息接收者成为 Consumer; 此外 kafka 集群有多个 kafka 实例组成, 每个实例 (server) 称为 broker.

无论是 kafka 集群, 还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性以及保存一些 meta 信息.

Apache kafka 原理与特性(0.8V)

(摘自官网)

其中 client 与 server 的通讯, 都是基于 TCP, 而且消息协议非常轻量级.

Topics/logs

一个 Topic 可以认为是一类消息, 每个 topic 将被分成多个 partition(区), 每个 partition 在存储层面是 append log 文件. 任何发布到此 partition 的消息都会直接追加到 log 文件的尾部, 每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型数字, 它唯一的标记一条消息.kafka 并没有提供其他额外的索引机制来存储 offset, 因为在 kafka 中几乎不允许对消息进行 ” 随机读 - 写 ”, 一旦消息写入 log 日志之后, 将不能被修改.

Apache kafka 原理与特性(0.8V)

(摘自官网)

kafka 和 JMS 实现 (activeMQ) 不同的是: 即使消息被消费, 消息仍然不会被立即删除. 日志文件将会根据 broker 中的配置要求, 保留一定的时间之后删除; 比如 log 文件保留 2 天, 那么两天后, 文件会被清除, 无论其中的消息是否被消费.kafka 通过这种简单的手段, 来释放磁盘空间. 此外,kafka 的性能并不会因为日志文件的太多而低下, 所以即使保留较多的 log 文件, 也不不会有问题.

对于 consumer 而言, 它需要保存消费消息的 offset, 对于 offset 的保存和使用, 有 consumer 来控制; 当 consumer 正常消费消息时,offset 将会 ” 线性 ” 的向前驱动, 即消息将依次顺序被消费. 事实上 consumer 可以使用任意顺序消费消息, 它只需要将 offset 重置为任意值..(offset 将会保存在 zookeeper 中, 参见下文)

kafka 集群几乎不需要维护任何 consumer 和 producer 状态信息, 这些信息有 zookeeper 保存; 因此 producer 和 consumer 的客户端实现非常轻量级, 它们可以随意离开, 而不会对集群造成额外的影响.

partitions 的设计目的有多个. 最根本原因是 kafka 基于文件存储. 通过分区, 可以将日志内容分散到多个 server 上, 来避免文件尺寸达到单机磁盘的上限, 每个 partiton 都会被当前 server(kafka 实例)保存; 可以将一个 topic 切分多任意多个 partitions(备注: 基于 sharding), 来消息保存 / 消费的效率. 此外越多的 partitions 意味着可以容纳更多的 consumer, 有效提升并发消费的能力.(具体原理参见下文).

Distribution

一个 Topic 的多个 partitions, 被分布在 kafka 集群中的多个 server 上; 每个 server(kafka 实例)负责 partitions 中消息的读写操作; 此外 kafka 还可以配置每个 partition 需要备份的个数(replicas), 每个 partition 将会被备份到多台机器上, 以提高可用性.[replicas 特性在 0.8V 才支持]

基于 replicated 方案, 那么就意味着需要对多个备份进行调度; 一个 partition 可以在多个 server 上备份, 那么其中一个 server 作为此 partiton 的 leader;leader 负责此 partition 所有的读写操作, 如果 leader 失效, 那么将会有其他 follower 来接管(成为新的 leader);follower 只是单调的和 leader 跟进, 同步消息即可.. 由此可见作为 leader 的 server 承载了全部的请求压力, 因此从集群的整体考虑, 有多少个 partitions 就意味着有多少个 ”leader”,kafka 会将 ”leader” 均衡的分散在每个实例上, 来确保整体的性能稳定.[备注:kafka 中将 leader 角色权限下放到 partition 这个层级]

Apache kafka 原理与特性(0.8V)

kafka-cluster

Producers

Producer 将消息发布到指定的 Topic 中, 同时 Producer 也能决定将此消息发送到哪个 partition; 如果一个 Topic 有多个 partitions 时, 你需要选择 partition 是算法, 比如基于 ”round-robin” 方式或者通过其他的一些算法等. 无论如何选择 partition 路由算法, 我们最直接的目的就是希望消息能够均匀的发送给每个 partition, 这样可以让 consumer 消费的消息量也能 ” 均衡 ”.

Consumers

本质上 kafka 只支持 Topic. 每个 consumer 属于一个 consumer group; 反过来说, 每个 group 中可以有多个 consumer. 对于 Topic 中的一条特定的消息, 只会被订阅此 Topic 的每个 group 中的一个 consumer 消费, 此消息不会发送给一个 group 的多个 consumer; 那么一个 group 中所有的 consumer 将会交错的消费整个 Topic.

如果所有的 consumer 都具有相同的 group, 这种情况和 JMS queue 模式很像; 消息将会在 consumers 之间负载均衡.

如果所有的 consumer 都具有不同的 group, 那这就是 ” 发布 - 订阅 ”; 消息将会广播给所有的消费者.

Apache kafka 原理与特性(0.8V)

(摘自官网)

在 kafka 中, 一个 partition 中的消息只会被 group 中的一个 consumer 消费(同一时刻); 每个 group 中 consumer 消息消费互相独立; 我们可以认为一个 group 是一个 ” 订阅 ” 者, 一个 Topic 中的每个 partions, 只会被一个 ” 订阅者 ” 中的一个 consumer 消费, 不过一个 consumer 可以同时消费多个 partitions 中的消息.kafka 只能保证一个 partition 中的消息被某个 consumer 消费时是顺序的. 事实上, 从 Topic 角度来说, 当有多个 partitions 时, 消息仍不是全局有序的.

通常情况下, 一个 group 中会包含多个 consumer, 这样不仅可以提高 topic 中消息的并发消费能力, 而且还能提高 ” 故障容错 ” 性, 如果 group 中的某个 consumer 失效, 那么其消费的 partitions 将会有其他 consumer 自动接管.

kafka 的设计原理决定, 对于一个 topic, 同一个 group 中不能有多于 partitions 个数的 consumer 同时消费, 否则将意味着某些 consumer 将无法得到消息.

Guarantees

1) 发送到 partitions 中的消息将会按照它接收的顺序追加到日志中, 无论一个 partition 由多少个 log 文件构成, 那么它发送给 consumer 的顺序是一定的.

2) 对于消费者而言, 它们消费消息的顺序和日志中消息顺序一致.

3) 如果 Topic 的 ”replication factor” 为 N, 那么允许 N - 1 个 kafka 实例失效. 只要有一个 replication 存活, 那么此 partition 的读写操作都不会中断.

1.2 Use cases

Messaging

和一些常规的消息系统相比,kafka 仍然是个不错的选择; 它具备 partitons/replication 和容错, 可以使 kafka 具有良好的扩展性和性能优势. 不过到目前为止, 我们应该很清楚认识到,kafka 并没有提供 JMS 中的 ” 事务性 ”” 消息传输担保(消息确认机制)”” 消息分组 ” 等企业级特性;kafka 只能使用作为 ” 常规 ” 的消息系统, 在一定程度上, 尚未确保消息的发送与接收绝对可靠(比如, 消息重发, 消息发送丢失等)

Websit activity tracking

kafka 可以作为 ” 网站活性跟踪 ” 的最佳工具; 可以将网页 / 用户操作等信息发送到 kafka 中. 并实时监控, 或者离线统计分析等.

Log Aggregation

kafka 的特性决定它非常适合作为 ” 日志收集中心 ”;application 可以将操作日志 ” 批量 ”” 异步 ” 的发送到 kafka 集群中, 而不是保存在本地或者 DB 中;kafka 可以批量提交消息 / 压缩消息等, 这对 producer 端而言, 几乎感觉不到性能的开支. 此时 consumer 端可以使 Hadoop 等其他系统化的存储和分析系统.

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

二. 设计原理

kafka 的设计初衷是希望做为一个统一的信息收集平台, 能够实时的收集反馈信息, 并需要能够支撑较大的数据量, 且具备良好的容错能力.

1.Persistence

kafka 使用文件存储消息 (append only log), 这就直接决定 kafka 在性能上严重依赖文件系统的本身特性. 且无论任何 OS 下, 对文件系统本身的优化是非常艰难的. 文件缓存 / 直接内存映射等是常用的手段. 因为 kafka 是对日志文件进行 append 操作, 因此磁盘检索的开支是较小的; 同时为了减少磁盘写入的次数,broker 会将消息暂时 buffer 起来, 当消息的个数(或尺寸) 达到一定阀值时, 再 flush 到磁盘, 这样减少了磁盘 IO 调用的次数. 对于 kafka 而言, 较高性能的磁盘, 将会带来更加直接的性能提升.

2.Efficiency

需要考虑的影响性能点很多, 除磁盘 IO 之外, 我们还需要考虑网络 IO, 这直接关系到 kafka 的吞吐量问题.kafka 并没有提供太多高超的技巧; 对于 producer 端, 可以将消息 buffer 起来, 当消息的条数达到一定阀值时, 批量发送给 broker; 对于 consumer 端也是一样, 批量 fetch 多条消息. 不过消息量的大小可以通过配置文件来指定. 对于 kafka broker 端, 似乎有个 sendfile 系统调用可以潜在的提升网络 IO 的性能: 将文件的数据映射到系统内存中,socket 直接读取相应的内存区域即可, 而无需进程再次 copy 和交换(这里涉及到 ” 磁盘 IO 数据 ”/” 内核内存 ”/” 进程内存 ”/” 网络缓冲区 ”, 多者之间的数据 copy).

其实对于 producer/consumer/broker 三者而言,CPU 的开支应该都不大, 因此启用消息压缩机制是一个良好的策略; 压缩需要消耗少量的 CPU 资源, 不过对于 kafka 而言, 网络 IO 更应该需要考虑. 可以将任何在网络上传输的消息都经过压缩.kafka 支持 gzip/snappy 等多种压缩方式.

3. Producer

Load balancing

kafka 集群中的任何一个 broker, 都可以向 producer 提供 metadata 信息, 这些 metadata 中包含 ” 集群中存活的 servers 列表 ”/”partitions leader 列表 ” 等信息(请参看 zookeeper 中的节点信息). 当 producer 获取到 metadata 信心之后, producer 将会和 Topic 下所有 partition leader 保持 socket 连接; 消息由 producer 直接通过 socket 发送到 broker, 中间不会经过任何 ” 路由层 ”. 事实上, 消息被路由到哪个 partition 上, 有 producer 客户端决定. 比如可以采用 ”random””key-hash”” 轮询 ” 等, 如果一个 topic 中有多个 partitions, 那么在 producer 端实现 ” 消息均衡分发 ” 是必要的. 在 producer 端的配置文件中, 开发者可以指定 partition 路由的方式.

Asynchronous send

将多条消息暂且在客户端 buffer 起来, 并将他们批量发送到 broker; 小数据 IO 太多, 会拖慢整体的网络延迟, 批量延迟发送事实上提升了网络效率; 不过这也有一定的隐患, 比如当 producer 失效时, 那些尚未发送的消息将会丢失.

4.Consumer

consumer 端向 broker 发送 ”fetch” 请求, 并告知其获取消息的 offset; 此后 consumer 将会获得一定条数的消息;consumer 端也可以重置 offset 来重新消费消息.[备注:offset, 消息偏移量,integer 值,broker 可以根据 offset 来决定消息的起始位置]

在 JMS 实现中,Topic 模型基于 push 方式, 即 broker 将消息推送给 consumer 端. 不过在 kafka 中, 采用了 pull 方式, 即 consumer 在和 broker 建立连接之后, 主动去 pull(或者说 fetch)消息; 这中模式有些优点, 首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理, 且可以控制消息消费的进度(offset); 此外, 消费者可以良好的控制消息消费的数量,batch fetch.

其他 JMS 实现, 消息消费的位置是有 prodiver 保留, 以便避免重复发送消息或者将没有消费成功的消息重发等, 同时还要控制消息的状态. 这就要求 JMS broker 需要太多额外的工作. 在 kafka 中,partition 中的消息只有一个 consumer 在消费, 且不存在消息状态的控制, 也没有复杂的消息确认机制, 可见 kafka broker 端是相当轻量级的. 当消息被 consumer 接收之后,consumer 可以在本地保存最后消息的 offset, 并间歇性的向 zookeeper 注册 offset. 由此可见,consumer 客户端也很轻量级.

这就意味着,kafka 中 consumer 负责维护消息的消费记录, 而 broker 则不关心这些, 这种设计不仅提高了 consumer 端的灵活性, 也适度的减轻了 broker 端设计的复杂度; 这是和众多 JMS prodiver 的区别. 此外,kafka 中消息 ACK 的设计也和 JMS 有很大不同,kafka 中的消息时批量 (通常以消息的条数或者 chunk 的尺寸为单位) 发送给 consumer, 当消息消费成功后, 向 zookeeper 提交消息的 offset, 而不会向 broker 交付 ACK. 或许你已经意识到, 这种 ” 宽松 ” 的设计, 将会有 ” 丢失 ” 消息 /” 消息重发 ” 的危险.

5.Message Delivery Semantics

对于 JMS 实现, 消息传输担保非常直接: 有且只有一次(exactly once). 在 kafka 中稍有不同, 对于 consumer 而言:

1) at most once: 最多一次, 这个和 JMS 中 ” 非持久化 ” 消息类似. 发送一次, 无论成败, 将不会重发.

2) at least once: 消息至少发送一次, 如果消息未能接受成功, 可能会重发, 直到接收成功.

3) exactly once: 消息只会发送一次.

at most once: 消费者 fetch 消息, 然后保存 offset, 然后处理消息; 当 client 保存 offset 之后, 但是在消息处理过程中 consumer 进程失效(crash), 导致部分消息未能继续处理. 那么此后可能其他 consumer 会接管, 但是因为 offset 已经提前保存, 那么新的 consumer 将不能 fetch 到 offset 之前的消息(尽管它们尚没有被处理), 这就是 ”at most once”.

at least once: 消费者 fetch 消息, 然后处理消息, 然后保存 offset. 如果消息处理成功之后, 但是在保存 offset 阶段 zookeeper 异常或者 consumer 失效, 导致保存 offset 操作未能执行成功, 这就导致接下来再次 fetch 时可能获得上次已经处理过的消息, 这就是 ”at least once”.

exactly once: kafka 中并没有严格的去实现(基于 2 阶段提交, 事务), 我们认为这种策略在 kafka 中是没有必要的.

因为 ” 消息消费 ” 和 ” 保存 offset” 这两个操作的先后时机不同, 导致了上述 3 种情况, 通常情况下 ”at-least-once” 是我们搜选.(相比 at most once 而言, 重复接收数据总比丢失数据要好).

Apache kafka 原理与特性(0.8V)

6. Replication

kafka 中,replication 策略是基于 partition, 而不是 topic;kafka 将每个 partition 数据复制到多个 server 上, 任何一个 partition 有一个 leader 和多个 follower(可以没有); 备份的个数可以通过 broker 配置文件来设定.leader 处理所有的 read-write 请求,follower 需要和 leader 保持同步.Follower 就像一个 ”consumer”, 消费消息并保存在本地日志中;leader 负责跟踪所有的 follower 状态, 如果 follower” 落后 ” 太多或者失效,leader 将会把它从 replicas 同步列表中删除. 当所有的 follower 都将一条消息保存成功, 此消息才被认为是 ”committed”, 那么此时 consumer 才能消费它, 这种同步策略, 就要求 follower 和 leader 之间必须具有良好的网络环境. 即使只有一个 replicas 实例存活, 仍然可以保证消息的正常发送和接收, 只要 zookeeper 集群存活即可.(备注: 不同于其他分布式存储, 比如 hbase 需要 ” 多数派 ” 存活才行)

kafka 判定一个 follower 存活与否的条件有 2 个:1) follower 需要和 zookeeper 保持良好的链接 2) 它必须能够及时的跟进 leader, 不能落后太多. 如果同时满足上述 2 个条件, 那么 leader 就认为此 follower 是 ” 活跃的 ”. 如果一个 follower 失效 (server 失效) 或者落后太多,leader 将会把它从同步列表中移除[备注: 如果此 replicas 落后太多, 它将会继续从 leader 中 fetch 数据, 直到足够 up-to-date, 然后再次加入到同步列表中;kafka 不会更换 replicas 宿主! 因为 ” 同步列表 ” 中 replicas 需要足够快, 这样才能保证 producer 发布消息时接受到 ACK 的延迟较小].

当 leader 失效时, 需在 followers 中选取出新的 leader, 可能此时 follower 落后于 leader, 因此需要选择一个 ”up-to-date” 的 follower.kafka 中 leader 选举并没有采用 ” 投票多数派 ” 的算法, 因为这种算法对于 ” 网络稳定性 ”/” 投票参与者数量 ” 等条件有较高的要求, 而且 kafka 集群的设计, 还需要容忍 N - 1 个 replicas 失效. 对于 kafka 而言, 每个 partition 中所有的 replicas 信息都可以在 zookeeper 中获得, 那么选举 leader 将是一件非常简单的事情. 选择 follower 时需要兼顾一个问题, 就是新 leader server 上所已经承载的 partition leader 的个数, 如果一个 server 上有过多的 partition leader, 意味着此 server 将承受着更多的 IO 压力. 在选举新 leader, 需要考虑到 ” 负载均衡 ”,partition leader 较少的 broker 将会更有可能成为新的 leader.

在整几个集群中, 只要有一个 replicas 存活, 那么此 partition 都可以继续接受读写操作.

7.Log

如果一个 topic 的名称为 ”my_topic”, 它有 2 个 partitions, 那么日志将会保存在 my_topic_0 和 my_topic_1 两个目录中; 日志文件中保存了一序列 ”log entries”(日志条目), 每个 log entry 格式为 ”4 个字节的数字 N 表示消息的长度 ” + “N 个字节的消息内容 ”; 每个日志都有一个 offset 来唯一的标记一条消息,offset 的值为 8 个字节的数字, 表示此消息在此 partition 中所处的起始位置.. 每个 partition 在物理存储层面, 有多个 log file 组成(称为 segment).segment file 的命名为 ” 最小 offset”.kafka. 例如 ”00000000000.kafka”; 其中 ” 最小 offset” 表示此 segment 中起始消息的 offset.

Apache kafka 原理与特性(0.8V)

(摘自官网)

其中每个 partiton 中所持有的 segments 列表信息会存储在 zookeeper 中.

当 segment 文件尺寸达到一定阀值时(可以通过配置文件设定, 默认 1G), 将会创建一个新的文件; 当 buffer 中消息的条数达到阀值时将会触发日志信息 flush 到日志文件中, 同时如果 ” 距离最近一次 flush 的时间差 ” 达到阀值时, 也会触发 flush 到日志文件. 如果 broker 失效, 极有可能会丢失那些尚未 flush 到文件的消息. 因为 server 意外失效, 仍然会导致 log 文件格式的破坏(文件尾部), 那么就要求当 server 启东是需要检测最后一个 segment 的文件结构是否合法并进行必要的修复.

获取消息时, 需要指定 offset 和最大 chunk 尺寸,offset 用来表示消息的起始位置,chunk size 用来表示最大获取消息的总长度(间接的表示消息的条数). 根据 offset, 可以找到此消息所在 segment 文件, 然后根据 segment 的最小 offset 取差值, 得到它在 file 中的相对位置, 直接读取输出即可.

日志文件的删除策略非常简单: 启动一个后台线程定期扫描 log file 列表, 把保存时间超过阀值的文件直接删除(根据文件的创建时间). 为了避免删除文件时仍然有 read 操作(consumer 消费), 采取 copy-on-write 方式.

8.Distribution

kafka 使用 zookeeper 来存储一些 meta 信息, 并使用了 zookeeper watch 机制来发现 meta 信息的变更并作出相应的动作(比如 consumer 失效, 触发负载均衡等)

1) Broker node registry: 当一个 kafka broker 启动后, 首先会向 zookeeper 注册自己的节点信息(临时 znode), 同时当 broker 和 zookeeper 断开连接时, 此 znode 也会被删除.

格式: /broker/ids/[0…N]  –>host:port; 其中 [0..N] 表示 broker id, 每个 broker 的配置文件中都需要指定一个数字类型的 id(全局不可重复),znode 的值为此 broker 的 host:port 信息.

2) Broker Topic Registry: 当一个 broker 启动时, 会向 zookeeper 注册自己持有的 topic 和 partitions 信息, 仍然是一个临时 znode.

格式: /broker/topics/[topic]/[0…N]  其中 [0..N] 表示 partition 索引号.

3) Consumer and Consumer group: 每个 consumer 客户端被创建时, 会向 zookeeper 注册自己的信息; 此作用主要是为了 ” 负载均衡 ”.

一个 group 中的多个 consumer 可以交错的消费一个 topic 的所有 partitions; 简而言之, 保证此 topic 的所有 partitions 都能被此 group 所消费, 且消费时为了性能考虑, 让 partition 相对均衡的分散到每个 consumer 上.

4) Consumer id Registry: 每个 consumer 都有一个唯一的 ID(host:uuid, 可以通过配置文件指定, 也可以由系统生成), 此 id 用来标记消费者信息.

格式: /consumers/[group_id]/ids/[consumer_id]

仍然是一个临时的 znode, 此节点的值为{“topic_name”:#streams…}, 即表示此 consumer 目前所消费的 topic + partitions 列表.

5) Consumer offset Tracking: 用来跟踪每个 consumer 目前所消费的 partition 中最大的 offset.

格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]  –>offset_value

此 znode 为持久节点, 可以看出 offset 跟 group_id 有关, 以表明当 group 中一个消费者失效, 其他 consumer 可以继续消费.

6) Partition Owner registry: 用来标记 partition 正在被哪个 consumer 消费. 临时 znode

格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]  –>consumer_node_id

此节点表达了 ” 一个 partition” 只能被 group 下一个 consumer 消费, 同时当 group 下某个 consumer 失效, 那么将会触发负载均衡(即: 让 partitions 在多个 consumer 间均衡消费, 接管那些 ” 游离 ” 的 partitions)

当 consumer 启动时, 所触发的操作:

A) 首先进行 ”Consumer id Registry”;

B) 然后在 ”Consumer id Registry” 节点下注册一个 watch 用来监听当前 group 中其他 consumer 的 ”leave” 和 ”join”; 只要此 znode path 下节点列表变更, 都会触发此 group 下 consumer 的负载均衡.(比如一个 consumer 失效, 那么其他 consumer 接管 partitions).

C) 在 ”Broker id registry” 节点下, 注册一个 watch 用来监听 broker 的存活情况; 如果 broker 列表变更, 将会触发所有的 groups 下的 consumer 重新 balance.

Consumer 均衡算法

当一个 group 中, 有 consumer 加入或者离开时, 会触发 partitions 均衡. 均衡的最终目的, 是提升 topic 的并发消费能力.

1) 假如 topic1, 具有如下 partitions: P0,P1,P2,P3

2) 加入 group 中, 有如下 consumer: C0,C1

3) 首先根据 partition 索引号对 partitions 排序: P0,P1,P2,P3

4) 根据 consumer.id 排序: C0,C1

5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size, 本例值 M =2(向上取整)

6) 然后依次分配 partitions: C0 = [P0,P1],C1=[P2,P3], 即 Ci = [P(i * M),P((i + 1) * M -1)]

总结:

1) Producer 端使用 zookeeper 用来 ” 发现 ”broker 列表, 以及和 Topic 下每个 partition leader 建立 socket 连接并发送消息.

2) Broker 端使用 zookeeper 用来注册 broker 信息, 已经监测 partition leader 存活性.

3) Consumer 端使用 zookeeper 用来注册 consumer 信息, 其中包括 consumer 消费的 partition 列表等, 同时也用来发现 broker 列表, 并和 partition leader 建立 socket 连接, 并获取消息.

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-09/107388p2.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-20发表,共计28870字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中