阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kafka代码API

140次阅读
没有评论

共计 3109 个字符,预计需要花费 8 分钟才能阅读完成。

1. 建立工程,导入相应的 jar 包

Procuder 类

package cn.itcast.kafka;

import Java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
 
 // 要读取的数据主题
 private static final String topic = “kfc”;
 // 消费者的数量
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  // 指定 zookeeper 的地址
  props.put(“zookeeper.connect”, “storm01:2181,storm02:2181,storm03:2181”);
  // 消费组的编号
  props.put(“group.id”, “1111”);
  // 偏移量,从哪个位置读
  props.put(“auto.offset.reset”, “smallest”);
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  // 根据 map 获取所有的主题对应的消息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  // 获取某个主题的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  // 开启两个消费者进程,读取主题下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

consumer– 消费者类

package cn.itcast.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
 
 // 要读取的数据主题
 private static final String topic = “kfc”;
 // 消费者的数量
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  // 指定 zookeeper 的地址
  props.put(“zookeeper.connect”, “storm01:2181,storm02:2181,storm03:2181”);
  // 消费组的编号
  props.put(“group.id”, “1111”);
  // 偏移量,从哪个位置读
  props.put(“auto.offset.reset”, “smallest”);
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  // 根据 map 获取所有的主题对应的消息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  // 获取某个主题的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  // 开启两个消费者进程,读取主题下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka 使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计3109字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中