共计 6206 个字符,预计需要花费 16 分钟才能阅读完成。
前言
最近在利用 Spark streaming
和Kafka
构建一个实时的数据分析系统,对图书阅读数据进行分析,做实时推荐。Spark Streaming 模块是对于 Spark Core 的一个扩展,目的是为了以高吞吐量,并且容错的方式处理持续性的数据流。目前 Spark Streaming 支持的外部数据源有 Flume、Kafka、Twitter、ZeroMQ、TCP Socket 等。Apache Kafka
是一个分布式的消息发布 - 订阅系统,Kafka 可以作为流计算系统的数据源,本例中 Spark Streaming
将从 Kafka
中消费数据。
系统环境
软件版本
1
2 3
|
Spark: 1.4.1 Kafka: 0.8.1.1 zookeeper: 3.4.6
|
集群节点
一共有四台主机,主机名分别为 nn0001, dn0001, dn0002, dn0003。
1
2 3
4
|
192.168.186.12 nn0001 192.168.186.13 dn0001 192.168.186.14 dn0002 192.168.186.15 dn0003
|
zookeeper 安装
kafka 使用 zookeeper 来管理,存储一些 meta 信息,并使用了 zookeeper watch 机制来发现 meta 信息的变更并作出相应的动作(比如 consumer 失效, 触发负载均衡等)。
Zookeeper 的配置在机器 1 上完成后分发到其他三台机器即可。
1
2 3
4 5 6
|
[bigdata@nn0001 ~]$ wget http: [bigdata@nn0001 ~]$ tar -zxvf zookeeper-3.4.6.tar.gz [bigdata@nn0001 ~]$cd zookeeper-3.4.6/conf [bigdata@nn0001 conf]$ pwd /home/bigdata/bigprosoft/zookeeper-3.4.6/conf [bigdata@nn0001 conf]$ cp zoo_sample.cfg zoo.cfg
|
修改配置文件
1
2 3
4 5 6 7 8 9 10
|
[bigdata@nn0001 conf]$ vi zoo.cfg tickTime=2000 dataDir=/home/bigdata/bigprosoft/zookeeper/data clientPort=2181 initLimit=10 syncLimit=5 server.1=nn0001:2888:3888 server.2=dn0001:2888:3888 server.3=dn0002:2888:3888 server.4=dn0003:2888:3888
|
在 dataDir 目录下创建 myid 文件,nn0001 机器的内容为 1,dn0001 机器的内容为 2,更多依此类推。
1
2 3
|
[bigdata@nn0001 data]$ echo 1 > myid [bigdata@nn0001 data]$ cat myid 1
|
启动测试
1
2 3
4 5 6 7 8 9 10
|
[bigdata@nn0001 bin]$ ./zkServer.sh start [bigdata@nn0001 bin]$ jps 10805 QuorumPeerMain # 已经启动成功了 15494 Master 11816 NameNode 20958 Jps 17539 Worker 12084 ResourceManager 12945 RunJar 12944 RunJar
|
停止
1
|
[bigdata@nn0001 bin]$ ./zkServer.sh stop
|
其它机器相同操作,scp 过去即可。
kafka 安装
Kafka 的 broker、producer、consumer、topic 等概念以及原理可以查阅官方文档
本次实验采用的多节点多 broker 集群模式,为每一台机器分配一个broker id
。
1
2 3
4 5 6 7 8 9 10
|
[bigdata@nn0001 ~]$ wget http: [bigdata@nn0001 ~]$ tar zxf kafka_2.10-0.8.1.1.tgz [bigdata@nn0001 ~]$ cd kafka_2.10-0.8.1.1 [bigdata@nn0001 kafka_2.10-0.8.1.1]$ cd conf [bigdata@nn0001 conf]$ vi server.properties broker.id=1 # 其它机器的 id 依次递增即可 port=9092 host.name=192.168.186.12 advertised.host.name=192.168.186.12 zookeeper.connect=192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181
|
修改完成后分发到另外三台机器上。
启动测试
1
2 3
4 5 6 7 8 9 10 11
|
[bigdata@nn0001 bin]$ nohup ./kafka-server-start.sh ../config/server.properties & [bigdata@nn0001 conf]$ jps 10805 QuorumPeerMain 21282 Jps 15494 Master 21209 Kafka 11816 NameNode 17539 Worker 12084 ResourceManager 12945 RunJar 12944 RunJar
|
依次启动机器
kafka 使用测试
创建 topic
1
|
[ ] .-. -- -- --- -- --
|
查看 topic
1
2 3
4 5 6
|
[bigdata@nn0001 bin]$ ./kafka-topics.sh --describe --zookeeper nn0001:2181 Topic:mytest PartitionCount:2 ReplicationFactor:2 Configs: Topic: mytest Partition: 0 Leader: 2 Replicas: 3,2 Isr: 2 Topic: mytest Partition: 1 Leader: -1 Replicas: 4,3 Isr: Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2
|
producer 测试
1
2 3
|
[bigdata@nn0001 bin]$ ./kafka-console-producer.sh --broker-list 192.168.186.12:9092 --topic test gsdggfgfgfd gdfgdfgdf
|
conumer 测试
1
2 3
4 5 6 7 8 9 10
|
[bigdata@nn0001 bin]$ ./kafka-console-consumer.sh --zookeeper 192.168.186.12:2181 --from-beginning --topic test
abfsfsdfsdfs ffsdfs gsdggfgfgfd gdfgdfgdf ^C[2015-08-28 17:48:40,991] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) Consumed 7 messages `
|
测试高可用
1
2 3
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
[bigdata@nn0001 bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --from-beginning --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2,4 # 可以看到 leader 是2,是 dn0001 机器,把此机器上的 kafka 进程杀掉,再查看 topic 的 leader
[bigdata@dn0002 bin]$ ./kafka-topics.sh --describe --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 4 Replicas: 2,3,4 Isr: 4 # 此时 leader 变成了4,对应的机器是 dn0003.
[bigdata@nn0001 bin]$ ./kafka-console-consumer.sh --zookeeper 192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181 --from-beginning --topic test
abfsfsdfsdfs ffsdfs gsdggfgfgfd gdfgdfgdf q
^C[2015-08-31 10:14:50,964] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) Consumed 7 messages # 消费者消费信息测试
|
ok,搭建过程就完成,下面用 python/java/scala 进行开发实例即可。
排错
问题 1 描述
1
2 3
|
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
|
解决方法
1
2 3
|
[bigdata@nn0001 ~]$ wget http: [bigdata@nn0001 ~]$ cd slf4j-1.7.12 [bigdata@nn0001 ~]$ cp slf4j-nop-1.7.12.jar ~/bigprosoft/kafka/libs/
|
问题 2 描述
1
2 3
4 5 6 7 8 9 10 11 12 13 14
|
[bigdata@nn0001 bin]$ ./kafka-console-producer.sh --broker-list nn0001:9092 --topic test fsfsdfsdf …… [2015-08-28 17:24:18,417] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2015-08-28 17:24:18,419] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) ……
|
解决方法,把 server.properties 中主机名改为 IP 地址即可。
1
2 3
|
host.name=10.171.59.221 advertised.host.name=10.171.59.221 zookeeper.connect=192.168.186.12:2181,192.168.186.13:2181,192.168.186.14:2181,192.168.186.15:2181
|
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Apache kafka 原理与特性(0.8V) http://www.linuxidc.com/Linux/2014-09/107388.htm
Kafka 部署与代码实例 http://www.linuxidc.com/Linux/2014-09/107387.htm
Kafka 介绍和集群环境搭建 http://www.linuxidc.com/Linux/2014-09/107382.htm
Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-09/135116.htm