阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kafka集群部署与配置手册

146次阅读
没有评论

共计 12232 个字符,预计需要花费 31 分钟才能阅读完成。

本文中包含了一套 Kafka 集群的部署、配置、调试和压测的技术方法。

在三个主机节点上进行部署。
server1:192.168.10.1
server2:192.168.10.2
server3:192.168.10.3
 
1、jdk7u80 的安装与配置
 
rpm -ivh jdk-7u80-linux-x64.rpm
 
配置环境变量:

more /etc/profile
Java_HOME=/usr/java/jdk1.7.0_80
PATH=\$JAVA_HOME/bin:\$PATH:.
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME
export PATH
export CLASSPATH

注:低版本 jdk 在运行 kafka 时存在 bug。
 
2、系统 iptables 防火墙对局域网段开放以下端口

-A INPUT -s 192.168.10.0/24 -m state –state NEW -m tcp -p tcp –dport 2888 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state –state NEW -m tcp -p tcp –dport 3888 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state –state NEW -m tcp -p tcp –dport 9092 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state –state NEW -m tcp -p tcp –dport 2181 -j ACCEPT

3、集群节点间主机名解析配置

more /etc/hosts 
192.168.10.1    server1 
192.168.10.2    server2 
192.168.10.3    server3 

4、在三个主机节点上部署 kafka-2.11 如下

cd /data
unzip kafka_2.11-0.10.0.0.zip
mv kafka_2.11-0.10.0.0/ kafka

5、配置 zookeeper 集群
注:以下除特别说明在哪个节点进行配置外,均需要修改三个主机节点。
 
因为该 zookeeper 是专服务于 kafka 的,所以直接把其数据目录放置于 /data/kafka/zookeeper,便于后续管理。
mkdir -p /data/kafka/zookeeper
 
编辑 zookeeper 配置文件:

cd /data/kafka
vi config/zookeeper.properties

tickTime=2000
dataDir=/data/kafka/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=15
syncLimit=5
server.1=192.168.10.1:2888:3888
server.2=192.168.10.2:2888:3888
server.3=192.168.10.3:2888:3888

创建 ServerID 标识:
节点 server1:echo “1” > /data/kafka/zookeeper/myid
节点 server2:echo “2” > /data/kafka/zookeeper/myid
节点 server3:echo “3” > /data/kafka/zookeeper/myid
注:这里设置的 myid 取值需要和 zookeeper.properties 中“server.id”保持一致。
 
chmod +x zookeeper-server-start.sh zookeeper-server-stop.sh kafka-run-class.sh
 
修改 zookeeper 启动脚本如下,以便于管理:

$ more zookeeper-server-start.sh
#!/bin/bash
#if [$# -lt 1];
#then
#    echo “USAGE: $0 [-daemon] zookeeper.properties”
#    exit 1
#fi
base_dir=$(dirname $0)
if [“x$KAFKA_LOG4J_OPTS” = “x”]; then
    export KAFKA_LOG4J_OPTS=”-Dlog4j.configuration=file:$base_dir/../config/log4j.properties”
fi
if [“x$KAFKA_HEAP_OPTS” = “x”]; then
    export KAFKA_HEAP_OPTS=”-Xmx512M -Xms512M”
fi
EXTRA_ARGS=”-name zookeeper -loggc”
#COMMAND=$1
COMMAND=”-daemon”
case $COMMAND in
  -daemon)
    EXTRA_ARGS=”-daemon “$EXTRA_ARGS
    shift
    ;;
 *)
    ;;
esac
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain “$@”
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain “../config/zookeeper.properties”

启停 zookeeper 的方法:

cd /data/kafka/bin
./zookeeper-server-start.sh
./zookeeper-server-stop.sh

6、配置 kafka 集群
配置 /data/kafka/config/server.properties 如下。三个主机节点上配置文件中仅前面几行的参数取值不同。

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.10.1:9092
port=9092
host.name=192.168.10.1
# The number of threads handling network requests
num.network.threads=8
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# The number of queued requests allowed before blocking the network threads
queued.max.requests=100
# The purge interval (in number of requests) of the fetch request purgatory
fetch.purgatory.purge.interval.requests=200
# The purge interval (in number of requests) of the producer request purgatory
producer.purgatory.purge.interval.requests=200

############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka/kafka-logs
# The default number of log partitions per topic.
num.partitions=24
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=2
# The maximum size of message that the server can receive
message.max.bytes=1000000
# Enable auto creation of topic on the server
auto.create.topics.enable=true
# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096
# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760
# Allow to delete topics
delete.topic.enable=true
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=20000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=10000
# The frequency in ms that the log flusher checks whether any log needs to be flushed to disk
log.flush.scheduler.interval.ms=2000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs.
log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# The maximum time before a new log segment is rolled out (in hours)
log.roll.hours=168
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
# How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000

############################# Replication configurations ################
# default replication factors for automatically created topics
default.replication.factor=3
# Number of fetcher threads used to replicate messages from a source broker.
num.replica.fetchers=4
# The number of bytes of messages to attempt to fetch for each partition.
replica.fetch.max.bytes=1048576
# max wait time for each fetcher request issued by follower replicas.
replica.fetch.wait.max.ms=500
# The frequency with which the high watermark is saved out to disk
replica.high.watermark.checkpoint.interval.ms=5000
# The socket timeout for network requests.
replica.socket.timeout.ms=30000
# The socket receive buffer for network requests
replica.socket.receive.buffer.bytes=65536
# If a follower hasn’t sent any fetch requests or hasn’t consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr
replica.lag.time.max.ms=10000
# The socket timeout for controller-to-broker channels
controller.socket.timeout.ms=30000
controller.message.queue.size=10

7、配置 kafka 生产者和消费者
修改 /data/kafka/config 下的 producer.properties 文件
bootstrap.servers=192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092
producer.type=async
compression.type=snappy
 
修改 /data/kafka/config 下的 comsumer.properties 文件
zookeeper.connect=192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181
 
8、kafka 集群服务启停管理的配置
 
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “$@” 
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “../config/server.properties” 

chmod +x bin/kafka-server-start.sh kafka-server-stop.sh

修改 kafka-server-start.sh

more kafka-server-start.sh
#!/bin/bash
#if [$# -lt 1];
#then
#    echo “USAGE: $0 [-daemon] server.properties [–override property=value]*”
#    exit 1
#fi
base_dir=$(dirname $0)

if [“x$KAFKA_LOG4J_OPTS” = “x”]; then
    export KAFKA_LOG4J_OPTS=”-Dlog4j.configuration=file:$base_dir/../config/log4j.properties”
fi

if [“x$KAFKA_HEAP_OPTS” = “x”]; then
    export KAFKA_HEAP_OPTS=”-Xmx5G -Xms5G”
fi

EXTRA_ARGS=”-name kafkaServer -loggc”

#COMMAND=$1
COMMAND=”-daemon”
case $COMMAND in
  -daemon)
    EXTRA_ARGS=”-daemon “$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “$@”
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “../config/server.properties”

启停 kafka 服务的方法:
cd /data/kafka/bin
./kafka-server-start.sh
./kafka-server-stop.sh
 
注:观察和检查 /data/kafka/logs 下的各个日志文件,以确认无报错信息且各项服务日志输出正确。
 
 
9、kafka 集群的常用管理命令
建议另外安排一个备机作为长期监控和测试 kafka 集群的主机。程序部署在 /data/kafka 下。
创建了以下用于服务和性能监控的 topic:
./kafka-topics.sh –create –zookeeper 192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181 –replication-factor 3 –partitions 6 –topic test-for-sys-monitor
查看已创建了的 topics 列表:
./kafka-topics.sh –list –zookeeper 192.168.10.1:2181
查看指定 topic 的详情:
./kafka-topics.sh –describe –zookeeper 192.168.10.1:2181 –topic test-for-sys-monitor
使用终端生产者命令进行测试:
./kafka-console-producer.sh –broker-list 192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092  –topic test-for-sys-monitor
使用终端消费者命令进行测试:
./kafka-console-consumer.sh –zookeeper 192.168.10.1:2181 –topic test-for-sys-monitor
注:以���测试,在生产者侧输入的数据,会在消费者侧打印出来。

删除 topic 的命令:
./kafka-topics.sh –delete –zookeeper 192.168.10.1:2181  –topic test-for-sys-monitor
注:kafka 集群中放开了删除 topic 的功能,请谨慎使用。
 
压测写 500 万条数据,每条 1KB:
./kafka-producer-perf-test.sh –topic test-perf-20161220 –num-records 500000 –record-size 1000 –throughput 100000 –producer-props bootstrap.servers=192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092
5000000 records sent, 53149.648149 records/sec (50.69 MB/sec), 569.30 ms avg latency, 2096.00 ms max latency, 8 ms 50th, 1759 ms 95th, 1874 ms 99th, 2044 ms 99.9th.

压测读 6 *100 万条数据:
 ./kafka-consumer-perf-test.sh –zookeeper 192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181 –messages 1000000 –topic test-perf-20161220 –threads 6
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2016-12-20 16:08:50:102, 2016-12-20 16:09:48:525, 5722.0459, 97.9417, 6000000, 102699.2794

注:压测后,请及时删除用于压测的 topic,因为所产生的 kafka 日志数据很可观。
 
登录 zookeeper shell 中查看 kafka 创建的相关元数据:

./zookeeper-shell.sh 192.168.10.3:2181
Connecting to 192.168.10.3:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
    [consumers, config, controller, isr_change_notification, brokers, admin, zookeeper, controller_epoch]
get /brokers/ids/1
{“jmx_port”:-1,”timestamp”:”1482223274389″,”endpoints”:[“PLAINTEXT://192.168.10.1:9092″],”host”:”192.168.10.1″,”version”:3,”port”:9092}
cZxid = 0x900000009
ctime = Tue Dec 20 16:41:14 CST 2016
mZxid = 0x900000009
mtime = Tue Dec 20 16:41:14 CST 2016
pZxid = 0x900000009
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1591b61880f0000
dataLength = 137
numChildren = 0
get /brokers/ids/2
{“jmx_port”:-1,”timestamp”:”1482223307625″,”endpoints”:[“PLAINTEXT://192.168.10.2:9092″],”host”:”192.168.10.2″,”version”:3,”port”:9092}
cZxid = 0x900000013
ctime = Tue Dec 20 16:41:47 CST 2016
mZxid = 0x900000013
mtime = Tue Dec 20 16:41:47 CST 2016
pZxid = 0x900000013
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1591b61880f0001
dataLength = 137
numChildren = 0
get /brokers/ids/3
{“jmx_port”:-1,”timestamp”:”1482223315746″,”endpoints”:[“PLAINTEXT://192.168.10.3:9092″],”host”:”192.168.10.3″,”version”:3,”port”:9092}
cZxid = 0x900000020
ctime = Tue Dec 20 16:41:55 CST 2016
mZxid = 0x900000020
mtime = Tue Dec 20 16:41:55 CST 2016
pZxid = 0x900000020
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3591b618ffe0001
dataLength = 137
numChildren = 0
quit
    Quitting…

改变 topic 的分区数量
./kafka-topics.sh –alter –zookeeper 192.168.10.1:2181 –topic test_topic –partitions 4
 
增删改 topic 的配置参数
./kafka-topics.sh –alter –zookeeper 192.168.10.1:2181 –topic test_topic–config key=value
./kafka-topics.sh —alter –zookeeper 192.168.10.1:2181 –topic test_topic–deleteConfig key
 
查看 topic 的分区、副本状态,需要关注各个分区的负载分布是否均衡(即 Leader 角色的分布):
./kafka-topics.sh –describe –zookeeper 192.168.10.1:2181 –topic test-for-sys-monitor
Topic:test-for-sys-monitor    PartitionCount:6    ReplicationFactor:3    Configs:
    Topic: test-for-sys-monitor    Partition: 0    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2
    Topic: test-for-sys-monitor    Partition: 1    Leader: 2    Replicas: 2,1,3    Isr: 2,1,3
    Topic: test-for-sys-monitor    Partition: 2    Leader: 3    Replicas: 3,2,1    Isr: 3,2,1
    Topic: test-for-sys-monitor    Partition: 3    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
    Topic: test-for-sys-monitor    Partition: 4    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
    Topic: test-for-sys-monitor    Partition: 5    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2

执行 leader 分布的再平衡:
./kafka-preferred-replica-election.sh –zookeeper 192.168.10.1:2181

注:kafka 本身是会自动做 leader 分布再平衡工作的,但不会是发现问题后立即执行,会有半小时的延迟。
注:很多配置参数,除非明确理解其作用,否则不必配置,因为 kafka 自身是对这些参数设置了默认值的,这些默认设置已经是一个比较好的选择。

CentOS 7.2 部署 Elasticsearch+Kibana+Zookeeper+Kafka  http://www.linuxidc.com/Linux/2016-11/137636.htm

CentOS 7 下安装 Logstash ELK Stack 日志管理系统  http://www.linuxidc.com/Linux/2016-08/134165.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

CentOS 7 下 Kafka 集群安装  http://www.linuxidc.com/Linux/2017-01/139734.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

CentOS 7 下安装 Kafka 单机版  http://www.linuxidc.com/Linux/2017-01/139732.htm

Apache kafka 原理与特性(0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka 部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 介绍及环境搭建  http://www.linuxidc.com/Linux/2016-12/138724.htm

Kafka 介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-02/141037.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计12232字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中