阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Apache Kafka 代码实例

127次阅读
没有评论

共计 5984 个字符,预计需要花费 15 分钟才能阅读完成。

前提

  • 已经配置好 kafka。若未安装,可以参照【Apache Kafka 安装升级指南 http://www.linuxidc.com/Linux/2013-11/92753.htm】
  • 已在 eclipse 里面安装 scala 插件。Eclipse Kepler 中在 Help->Eclipse Markectplace 中搜索 Scalar,然后安装即可。
  • 使用 maven 构建 kafka 测试 project 在 eclipse 中。
  • 创建 topic:在 kafka 的安装目录下执行 bin/kafka-create-topic.sh –zookeeper 192.168.20.99:2181 –replica 1 –partition 1 –topic test
  • 启动 consumer:在 kafka 的安装目录下执行 bin/kafka-console-consumer.sh –zookeeper 192.168.20.99:2181 –topic test –from-beginning

pom.xml 文件如下

所有 kafka 依赖的 jar 包都在 com.sksamuel.kafka 下面。其中 kafka 使用的版本是 0.8.0-beta1,kafka 是 2.10。

<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
  xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.iflytek.cpcloud.kafka</groupId>
  <artifactId>kafkatest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafkatest</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.14</version>
  </dependency>
  <dependency>
   <groupId>com.sksamuel.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <version>0.8.0-beta1</version>
  </dependency>
 </dependencies>
</project>

然后写一个 kafka producer 的测试程序如下:

package com.iflytek.cpcloud.kafka.kafkatest;

import Java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Test the Kafka Producer
 * @author jcsong2
 *
 */
public class ProducerTest {
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put(“zk.connect”, “192.168.20.99:2181”);
  props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
  props.put(“metadata.broker.list”, “192.168.20.99:9092”);
  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  for (int i = 0; i < 10; i++)
   producer.send(new KeyedMessage<String, String>(“test”, “test” + i));
 }
}

在 consuemr 端可以看到 test0 到 test9 十行输出。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2013-11/92754p2.htm

Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里

相关阅读

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

再写一个 kafka consumer 的测试程序如下:

package com.iflytek.cpcloud.kafka.kafkatest;

import Java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerTest extends Thread {
 private final ConsumerConnector consumer;
 private final String topic;

 public static void main(String[] args) {
  ConsumerTest consumerThread = new ConsumerTest(“test”);
  consumerThread.start();
 }

 public ConsumerTest(String topic) {
  consumer = kafka.consumer.Consumer
    .createJavaConsumerConnector(createConsumerConfig());
  this.topic = topic;
 }

 private static ConsumerConfig createConsumerConfig() {
  Properties props = new Properties();
  props.put(“zookeeper.connect”, “192.168.20.99:2181”);
  props.put(“group.id”, “0”);
  props.put(“zookeeper.session.timeout.ms”, “400000”);
  props.put(“zookeeper.sync.time.ms”, “200”);
  props.put(“auto.commit.interval.ms”, “1000”);

  return new ConsumerConfig(props);

 }

 public void run() {
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(topic, new Integer(1));
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
    .createMessageStreams(topicCountMap);
  KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  ConsumerIterator<byte[], byte[]> it = stream.iterator();
  while (it.hasNext())
   System.out.println(new String(it.next().message()));
 }
}

在 kafka-console-producer 端输入的数据会回显到 eclipse 的 console 中。

以上程序参考 kafka-0.8.0-bata1 中的 example。

前提

  • 已经配置好 kafka。若未安装,可以参照【Apache Kafka 安装升级指南 http://www.linuxidc.com/Linux/2013-11/92753.htm】
  • 已在 eclipse 里面安装 scala 插件。Eclipse Kepler 中在 Help->Eclipse Markectplace 中搜索 Scalar,然后安装即可。
  • 使用 maven 构建 kafka 测试 project 在 eclipse 中。
  • 创建 topic:在 kafka 的安装目录下执行 bin/kafka-create-topic.sh –zookeeper 192.168.20.99:2181 –replica 1 –partition 1 –topic test
  • 启动 consumer:在 kafka 的安装目录下执行 bin/kafka-console-consumer.sh –zookeeper 192.168.20.99:2181 –topic test –from-beginning

pom.xml 文件如下

所有 kafka 依赖的 jar 包都在 com.sksamuel.kafka 下面。其中 kafka 使用的版本是 0.8.0-beta1,kafka 是 2.10。

<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
  xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.iflytek.cpcloud.kafka</groupId>
  <artifactId>kafkatest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafkatest</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.14</version>
  </dependency>
  <dependency>
   <groupId>com.sksamuel.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <version>0.8.0-beta1</version>
  </dependency>
 </dependencies>
</project>

然后写一个 kafka producer 的测试程序如下:

package com.iflytek.cpcloud.kafka.kafkatest;

import Java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Test the Kafka Producer
 * @author jcsong2
 *
 */
public class ProducerTest {
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put(“zk.connect”, “192.168.20.99:2181”);
  props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
  props.put(“metadata.broker.list”, “192.168.20.99:9092”);
  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  for (int i = 0; i < 10; i++)
   producer.send(new KeyedMessage<String, String>(“test”, “test” + i));
 }
}

在 consuemr 端可以看到 test0 到 test9 十行输出。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2013-11/92754p2.htm

Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里

相关阅读

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计5984字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中