阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kafka部署与代码实例

125次阅读
没有评论

共计 13653 个字符,预计需要花费 35 分钟才能阅读完成。

kafka 作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka 的部署包括 zookeeper 环境 /kafka 环境,同时还需要进行一些配置操作. 接下来介绍如何使用 kafka。

我们使用 3 个 zookeeper 实例构建 zk 集群,使用 2 个 kafka broker 构建 kafka 集群。

其中 kafka 为 0.8V,zookeeper 为 3.4.5V

————————————– 分割线 ————————————–

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

————————————– 分割线 ————————————–

test-kafka.zip (5.3 KB) 下载

免费下载地址在 http://linux.linuxidc.com/

用户名与密码都是 www.linuxidc.com

具体下载目录在 /2014 年资料 / 9 月 /29 日 /Kafka 部署与代码实例

下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm

————————————– 分割线 ————————————–

一.Zookeeper 集群构建

我们有 3 个 zk 实例,分别为 zk-0,zk-1,zk-2; 如果你仅仅是测试使用,可以使用 1 个 zk 实例.

1) zk-0

调整配置文件:

clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
## 只需要修改上述配置,其他配置保留默认值

启动 zookeeper

./zkServer.sh start

2) zk-1

调整配置文件 (其他配置和 zk- 0 一只):

clientPort=2182
## 只需要修改上述配置,其他配置保留默认值

启动 zookeeper

./zkServer.sh start

3) zk-2

调整配置文件 (其他配置和 zk- 0 一只):

clientPort=2183
## 只需要修改上述配置,其他配置保留默认值

启动 zookeeper

./zkServer.sh start

二. Kafka 集群构建

因为 Broker 配置文件涉及到 zookeeper 的相关约定,因此我们先展示 broker 配置文件. 我们使用 2 个 kafka broker 来构建这个集群环境,分别为 kafka-0,kafka-1.

1) kafka-0

在 config 目录下修改配置文件为:

broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication 机制, 让每个 topic 的 partitions 在 kafka-cluster 中备份 2 个
## 用来提高 cluster 的容错能力..
default.replication.factor=1
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000

因为 kafka 用 scala 语言编写,因此运行 kafka 需要首先准备 scala 相关环境。

> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency

其中最后一条指令执行有可能出现异常,暂且不管。启动 kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

因为 zookeeper 环境已经正常运行了,我们无需通过 kafka 来挂载启动 zookeeper. 如果你的一台机器上部署了多个 kafka broker,你需要声明 JMS_PORT.

2) kafka-1

broker.id=1
port=9093
## 其他配置和 kafka- 0 保持一致

然后和 kafka- 0 一样执行打包命令,然后启动此 broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

仍然可以通过如下指令查看 topic 的 ”partition”/”replicas” 的分布和存活情况.

> bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
topic: test partition: 0 leader: 0 replicas: 0 isr: 0 

到目前为止环境已经 OK 了, 那我们就开始展示编程实例吧。[配置参数详解] http://www.linuxidc.com/Linux/2014-09/107388.htm

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-09/107387p2.htm

三. 项目准备

项目基于 maven 构建,不得不说 kafka Java 客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下 pom.xml; 其中各个依赖包必须版本协调一致。如果 kafka client 的版本和 kafka server 的版本不一致, 将会有很多异常, 比如 ”broker id not exists” 等; 因为 kafka 从 0.7 升级到 0.8 之后 (正名为 2.8.0),client 与 server 通讯的 protocol 已经改变.

<dependencies>
 <dependency>
  <groupId>log4j</groupId>
  <artifactId>log4j</artifactId>
  <version>1.2.14</version>
 </dependency>
 <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.8.2</artifactId>
  <version>0.8.0</version>
  <exclusions>
   <exclusion>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
   </exclusion>
  </exclusions>
 </dependency>
 <dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.8.2</version>
 </dependency>
 <dependency>
  <groupId>com.yammer.metrics</groupId>
  <artifactId>metrics-core</artifactId>
  <version>2.2.0</version>
 </dependency>
 <dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version>0.3</version>
 </dependency>
</dependencies>

四.Producer 端代码

1) producer.properties 文件:此文件放在 /resources 目录下

#partitioner.class=
##broker 列表可以为 kafka server 的子集, 因为 producer 需要从 broker 中获取 metadata
## 尽管每个 broker 都可以提供 metadata, 此处还是建议, 将所有 broker 都列举出来
## 此值, 我们可以在 spring 中注入过来
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
## 同步, 建议为 async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
## 在 producer.type=async 时有效
#batch.num.messages=100

2) KafkaProducerClient.java 代码样例

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * User: guanqing-liu
 */
public class KafkaProducerClient {

 private Producer<String, String> inner;
 
 private String brokerList;//for metadata discovery,spring setter
 private String location = “kafka-producer.properties”;//spring setter
 
 private String defaultTopic;//spring setter

 public void setBrokerList(String brokerList) {
  this.brokerList = brokerList;
 }

 public void setLocation(String location) {
  this.location = location;
 }

 public void setDefaultTopic(String defaultTopic) {
  this.defaultTopic = defaultTopic;
 }

 public KafkaProducerClient(){}
 
 public void init() throws Exception {
  Properties properties = new Properties();
  properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
 
 
  if(brokerList != null) {
   properties.put(“metadata.broker.list”, brokerList);
  }

  ProducerConfig config = new ProducerConfig(properties);
  inner = new Producer<String, String>(config);
 }

 public void send(String message){
  send(defaultTopic,message);
 }
 
 public void send(Collection<String> messages){
  send(defaultTopic,messages);
 }
 
 public void send(String topicName, String message) {
  if (topicName == null || message == null) {
   return;
  }
  KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
  inner.send(km);
 }

 public void send(String topicName, Collection<String> messages) {
  if (topicName == null || messages == null) {
   return;
  }
  if (messages.isEmpty()) {
   return;
  }
  List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
  int i= 0;
  for (String entry : messages) {
   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
   kms.add(km);
   i++;
   if(i % 20 == 0){
    inner.send(kms);
    kms.clear();
   }
  }
 
  if(!kms.isEmpty()){
   inner.send(kms);
  }
 }

 public void close() {
  inner.close();
 }

 /**
  * @param args
  */
 public static void main(String[] args) {
  KafkaProducerClient producer = null;
  try {
   producer = new KafkaProducerClient();
   //producer.setBrokerList(“”);
   int i = 0;
   while (true) {
    producer.send(“test-topic”, “this is a sample” + i);
    i++;
    Thread.sleep(2000);
   }
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (producer != null) {
    producer.close();
   }
  }

 }

}

3) spring 配置

    <bean id=”kafkaProducerClient” class=”com.test.kafka.KafkaProducerClient” init-method=”init” destroy-method=”close”>
        <property name=”zkConnect” value=”${zookeeper_cluster}”></property>
        <property name=”defaultTopic” value=”${kafka_topic}”></property>
    </bean>

kafka 作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka 的部署包括 zookeeper 环境 /kafka 环境,同时还需要进行一些配置操作. 接下来介绍如何使用 kafka。

我们使用 3 个 zookeeper 实例构建 zk 集群,使用 2 个 kafka broker 构建 kafka 集群。

其中 kafka 为 0.8V,zookeeper 为 3.4.5V

————————————– 分割线 ————————————–

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

————————————– 分割线 ————————————–

test-kafka.zip (5.3 KB) 下载

免费下载地址在 http://linux.linuxidc.com/

用户名与密码都是 www.linuxidc.com

具体下载目录在 /2014 年资料 / 9 月 /29 日 /Kafka 部署与代码实例

下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm

————————————– 分割线 ————————————–

一.Zookeeper 集群构建

我们有 3 个 zk 实例,分别为 zk-0,zk-1,zk-2; 如果你仅仅是测试使用,可以使用 1 个 zk 实例.

1) zk-0

调整配置文件:

clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
## 只需要修改上述配置,其他配置保留默认值

启动 zookeeper

./zkServer.sh start

2) zk-1

调整配置文件 (其他配置和 zk- 0 一只):

clientPort=2182
## 只需要修改上述配置,其他配置保留默认值

启动 zookeeper

./zkServer.sh start

3) zk-2

调整配置文件 (其他配置和 zk- 0 一只):

clientPort=2183
## 只需要修改上述配置,其他配置保留默认值

启动 zookeeper

./zkServer.sh start

二. Kafka 集群构建

因为 Broker 配置文件涉及到 zookeeper 的相关约定,因此我们先展示 broker 配置文件. 我们使用 2 个 kafka broker 来构建这个集群环境,分别为 kafka-0,kafka-1.

1) kafka-0

在 config 目录下修改配置文件为:

broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
##replication 机制, 让每个 topic 的 partitions 在 kafka-cluster 中备份 2 个
## 用来提高 cluster 的容错能力..
default.replication.factor=1
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000

因为 kafka 用 scala 语言编写,因此运行 kafka 需要首先准备 scala 相关环境。

> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency

其中最后一条指令执行有可能出现异常,暂且不管。启动 kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

因为 zookeeper 环境已经正常运行了,我们无需通过 kafka 来挂载启动 zookeeper. 如果你的一台机器上部署了多个 kafka broker,你需要声明 JMS_PORT.

2) kafka-1

broker.id=1
port=9093
## 其他配置和 kafka- 0 保持一致

然后和 kafka- 0 一样执行打包命令,然后启动此 broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

仍然可以通过如下指令查看 topic 的 ”partition”/”replicas” 的分布和存活情况.

> bin/kafka-list-topic.sh –zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
topic: test partition: 0 leader: 0 replicas: 0 isr: 0&nbsp;

到目前为止环境已经 OK 了, 那我们就开始展示编程实例吧。[配置参数详解] http://www.linuxidc.com/Linux/2014-09/107388.htm

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-09/107387p2.htm

五.Consumer 端

1) consumer.properties: 文件位于 /resources 目录下

## 此值可以配置, 也可以通过 spring 注入
##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
auto.commit.enable=true
auto.commit.interval.ms=60000

2) KafkaConsumerClient.Java 代码样例

package com.test.kafka;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

/**
 * User: guanqing-liu
 */
public class KafkaConsumerClient {

 private String groupid; //can be setting by spring
 private String zkConnect;//can be setting by spring
 private String location = “kafka-consumer.properties”;// 配置文件位置
 private String topic;
 private int partitionsNum = 1;
 private MessageExecutor executor; //message listener
 private ExecutorService threadPool;
 
 private ConsumerConnector connector;
 
 private Charset charset = Charset.forName(“utf8”);

 public void setGroupid(String groupid) {
  this.groupid = groupid;
 }

 public void setZkConnect(String zkConnect) {
  this.zkConnect = zkConnect;
 }

 public void setLocation(String location) {
  this.location = location;
 }

 public void setTopic(String topic) {
  this.topic = topic;
 }

 public void setPartitionsNum(int partitionsNum) {
  this.partitionsNum = partitionsNum;
 }

 public void setExecutor(MessageExecutor executor) {
  this.executor = executor;
 }

 public KafkaConsumerClient() {}

 //init consumer,and start connection and listener
 public void init() throws Exception {
  if(executor == null){
   throw new RuntimeException(“KafkaConsumer,exectuor cant be null!”);
  }
  Properties properties = new Properties();
  properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
 
  if(groupid != null){
   properties.put(“groupid”, groupid);
  }
  if(zkConnect != null){
   properties.put(“zookeeper.connect”, zkConnect);
  }
  ConsumerConfig config = new ConsumerConfig(properties);

  connector = Consumer.createJavaConsumerConnector(config);
  Map<String, Integer> topics = new HashMap<String, Integer>();
  topics.put(topic, partitionsNum);
  Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
  List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
  threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
 
  //start
  for (KafkaStream<byte[], byte[]> partition : partitions) {
   threadPool.execute(new MessageRunner(partition));
  }
 }

 public void close() {
  try {
   threadPool.shutdownNow();
  } catch (Exception e) {
   //
  } finally {
   connector.shutdown();
  }

 }

 class MessageRunner implements Runnable {
  private KafkaStream<byte[], byte[]> partition;

  MessageRunner(KafkaStream<byte[], byte[]> partition) {
   this.partition = partition;
  }

  public void run() {
   ConsumerIterator<byte[], byte[]> it = partition.iterator();
   while (it.hasNext()) {
    // connector.commitOffsets(); 手动提交 offset, 当 autocommit.enable=false 时使用
    MessageAndMetadata<byte[], byte[]> item = it.next();
    try{
     executor.execute(new String(item.message(),charset));// UTF-8, 注意异常
    }catch(Exception e){
     //
    }
   }
  }
 
  public String getContent(Message message){
            ByteBuffer buffer = message.payload();
            if (buffer.remaining() == 0) {
                return null;
            }
            CharBuffer charBuffer = charset.decode(buffer);
            return charBuffer.toString();
  }
 }

 public static interface MessageExecutor {

  public void execute(String message);
 }

 /**
  * @param args
  */
 public static void main(String[] args) {
  KafkaConsumerClient consumer = null;
  try {
   MessageExecutor executor = new MessageExecutor() {

    public void execute(String message) {
     System.out.println(message);
    }
   };
   consumer = new KafkaConsumerClient();
   
   consumer.setTopic(“test-topic”);
   consumer.setPartitionsNum(2);
   consumer.setExecutor(executor);
   consumer.init();
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
    if(consumer != null){
    consumer.close();
    }
  }

 }

}

3) spring 配置 (略)

需要提醒的是, 上述 LogConsumer 类中, 没有太多的关注异常情况, 必须在 MessageExecutor.execute() 方法中抛出异常时的情况.

在测试时,建议优先启动 consumer,然后再启动 producer,这样可以实时的观测到最新的消息。

Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计13653字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中