阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kafka单机环境配置及基本使用详解

473次阅读
没有评论

共计 10858 个字符,预计需要花费 28 分钟才能阅读完成。

基本概念介绍

在 Kafka 中有一些基本的概念,

Topic

  • 简介:Topic 在 Kafka 中是一个抽象的概念,一个主题是已经发布的记录的种类。主题在 Kafka 中是可以被多重订阅的,这就意味着一个主题可能有 0 个、一个、或者许多个消费者去订阅这个主题中的消息。

  • Partitions:在每一个 topic 在 Kafka 中可以有多个分区,增加一个主题的分区可以提高 Kafka 的吞吐率,但是不是越多越好,因为如果分区数量越多的话生产者插入的效率也会降低。所以真正到生产环境时,需要权衡生产与消费的一个平衡关系,消费稍微大于生产者,不会产生消息的堆积,也能够充分提高 Kafka 的效率。

  • Replication Factor:复制因子,是对于当前的 Topic 是否需要副本。如果设置成 1 的话,代表当前 Topic 在整个 Kafka 中只有一份。这里有个限制 Topic 的数量不能够多于当前 Kafka 的 Broker 数量。

  • 存储方式:在 Kafka 的配置中 (Server.properties) 有 logs.dir 的配置,这个是 Kafka 存储消息的位置。如果 Topic 复制因子是 1 分区是 1 的话,在对应的文件夹下会有一个名称为 topicname 的文件夹;如果复制因子是 2 分区是 2,假设存在两个 Broker,在每个 Broker 中将会存在两个文件夹分别为 topicname_0 topicname_1 的文件夹

  • Leader 与 Follower:由于每个 topic 如果存在副本的话,是对于 partition 进行复制。这么多存在在不同的 Broker 上的副本,其中有一个 partition 是 leader 其他的是 Followers,当一个 broker 宕机会在副本中选择一个充当 Leader。

Producer

生产者,顾明思议是生产消息,允许应用发布一个流的消息到一个或者多个主题中,

Consumer

  • 简介:消费者是订阅某个 topic 消息。
  • Group: 每个消费者都有个 groupid 来标定当前消费者属于哪个 group。Group 的作用是,当同一个 group 的两个消费者订阅一个 topic 的时候,如果当前 topic 没有分区那么其中一个消费者是获得不了任何消息的;如果有分区的话,将会按照数量进行负载均衡,每个消费者获得不同的分区的消息。
  • 同一个 Group 下的消费者不会同时订阅一个主题下的同一个分区,如果消费者数量杜宇分区数量,则多出的消费者是不会有任何消息获得的。

    Broker

Broker 是一个 Kafka 的 Server,一台单物理机或者集群都可以拥有多个 broker 一个 broker 可以容纳多个主题,这个与复制因子、主题的分区都有关系。

Kafka 单机配置,一个 Broker

环境:

  • win10 物理机
  • Wmare CentOS7 虚拟机
  • XShell 访问虚拟机

配置 zookeeper

  • 下载
# zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
  • 解压后进入目录
cd zookeeper-3.4.13/conf
  • 复制 zookeeper 的配置文件
cp zoo_sample.cfg zoo.cfg
  • 返回上级进入 bin 目录下,键入如下命令
./zkServer.sh start
  • 查看是否成功开启 zookeeper 服务
# 注:这里提示一下开启后提示的成功不一定是真的成功, 所以需要查看一下
netstat -tunlp|egrep 2181
# 如果没有结果查看统计目录下的 zookeeper.out 文件 查看 log 信息
# 使用 jps 命令查看 QuorumPeerMain 是 zookeeper 的守护进程
11089 QuorumPeerMain
11114 Jps

配置 Kafka

  • 下载安装包
# Kafka
wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • 解压后进入文件夹下 bin 目录下
# 第一个是 start.sh 位置第二个是 server.rpoperties 的位置,所以确认好路径的正确性
./kafka-server-start.sh ./../config/server.properties &
# 我们可以在 Kafka 的目录下直接执行,而不进入到 bin 下,命令看着更舒服些
./bin/kafka-server-start.sh ./config/server.properties &
  • 查看是否开启成功:默认的 Kafka 端口是 9092,zookeeper 是 2181
netstat -tunlp|egrep "(2181|9092)"
# 结果如下
[root@localhost ~]# netstat -tunlp|egrep "(2181|9092)"
tcp6      0     0 :::9092               :::*                  LISTEN      1877/Java  tcp6      0     0 :::2181               :::*                  LISTEN      1820/java
# jps 查看
11089 QuorumPeerMain
11458 Kafka
11847 Jps
  • 至此 Kafka 配置成功

使用 Kafka

创建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 返回结果
Created topic "test"

在虚拟机用 sh 脚本 上作为生产者生产消息

  • 我们重新开一个 Xshell 窗口,CD 到Kafka 目录 /bin 下,我们先介绍这一节会使用到的 kafka-console-producer.sh
# 键入如下命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>today message
>
# 最近本的指定,broker-list 与 topic 是必须的参数
# 成功命令行会进入一个 > 的情况,键入消息按回车键就是发送消息到 Kafka 了
# 发送一个【today message】
  • kafka-console-producer.sh参数说明,运行./kafka-console-producer.sh --help 可查看

在虚拟机上用 sh 脚本 作为消费者消费消息

  • 重新开另个一 Xshell 窗口 CD 到Kafka 目录 /bin 下,我们先介绍这一节会使用到的 kafka-console-consumer.sh
# 键入如下命令
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 最近本的指定,bootstrap-server 与 topic/whitelist 是必须的参数
# 由于有 from-beginning 参数 会从头 load 所有消息
# 消费后返回如下
today message
#在生产端键入消息后,消费端会同步消息出现
  • kafka-console-consumer.sh参数说明运行./kafka-console-consumer.sh --help 可查看

使用 Python 作为生产者、消费者

  • 在物理机上写一个 Python 生产者的脚本
from kafka.producer import KafkaProducer
import time
def send_data(data):
    producer = KafkaProducer(bootstrap_servers='192.168.233.138:9092')
    producer.send("test",b''+str(data)+'')
    producer.flush()
    print ("end")
    
if __name__=="__main__":
    send_data("physics python message");
  • 查看 Xshell 上消费的命令行
[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning
111
333

1
12
physics python message
  • 在物理机上写一个消费者的脚本
from kafka import KafkaConsumer
import time
def get_data(data):
    consumer = KafkaConsumer('test',bootstrap_servers='192.168.233.138:9092', group_id='my_favorite_group')
    print ("end")
    for msg in consumer:
        print(msg)
    
if __name__=="__main__":
    get_data();
  • 物理机消费者的结果
# 我这边是先运行的消费者的脚本,所以实时接收到了物理机产生的消息
ConsumerRecord(topic=u'test', partition=0, offset=5, timestamp=1551762485911L, timestamp_type=0, key=None, value='physics python message', checksum=1520092583, serialized_key_size=-1, serialized_value_size=22)
  • 测试使用虚拟机 sh 端的生产者发送 123 消息,查看物理机消费者结果
ConsumerRecord(topic=u'test', partition=0, offset=6, timestamp=1551762784609L, timestamp_type=0, key=None, value='123', checksum=1760815061, serialized_key_size=-1, serialized_value_size=3)
  • 几点注意
# 物理机连接时可能出现【kafka.errors.NoBrokersAvailable: NoBrokersAvailable】这个错误按照如下顺序依次更改
1. 查看虚拟机防火墙是否关闭
    systemctl status firewalld
    systemctl stop firewalld
2. 更改 kafka 服务端的 server.properties:
    增加 [listeners=PLAINTEXT://192.168.233.138:9092]这一行
3. 修改物理机的 hosts 文件 C:\Windows\System32\drivers\etc\hosts
    增加【虚拟机 ip 虚拟机主机名】Eg:[192.168.233.138 localhost]

使用Springboot 作为生产者、消费者

注:我直接在我的一个寄存的 Spring Boot Demo 项目上更改

  • 在 pom.xml 中添加 kafka 依赖
<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>
<!-- 提示一件事情此处别指定 version 了,直接用最新的就可以,老的版本一些包找不到 -->
  • 写一个 kafka 生产者配置类
package com.example.kane.config;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class kafka_config {public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
     
        public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
     
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());
        }

}
  • 创建一个生产数据的 Controller
package com.example.kane.Controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;


@RestController
@RequestMapping("/kafka")
public class CollectController {protected final Logger logger = LoggerFactory.getLogger(this.getClass());
        @Autowired
        private KafkaTemplate kafkaTemplate;

        @RequestMapping(value = "/send", method = RequestMethod.GET)
        public void sendKafka(HttpServletRequest request, HttpServletResponse response) {
            try {String message = request.getParameter("message");
                logger.info("kafka 的消息 ={}", message);
                kafkaTemplate.send("test", "key", message);
                logger.info("发送 kafka 成功.");
            } catch (Exception e) {logger.error("发送 kafka 失败", e);
            }
        }

}
  • 启动项目后,在浏览器访问 http://localhost:8080/kafka/send?message=url_producer
# 查看结果
2019-03-05 13:57:16.438  INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController    : 发送 kafka 成功.
2019-03-05 13:57:45.871  INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController    : kafka 的消息 =url_producer
2019-03-05 13:57:45.872  INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController    : 发送 kafka 成功.
# 查看虚拟机 Consumer 结果

[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning
physics python message
123
null
url_producer
  • 增加消费者的配置
package com.example.kane.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

import com.example.kane.service.kafka_listener;
@Configuration
@EnableKafka
public class kafka_consumer_config {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        return propsMap;
    }
    @Bean
    public kafka_listener listener() {return new kafka_listener();
    }
}
  • 增加 listener 类
package com.example.kane.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class kafka_listener {protected final Logger logger = LoggerFactory.getLogger(this.getClass());


    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record) {logger.info(record.toString());
        logger.info("kafka 的 key:" + record.key());
        logger.info("kafka 的 value:" + record.value().toString());
    }
}
  • 同样我们用访问 http://localhost:8080/kafka/send?message=url_producer1 重新发一个消息
# 结果
2019-03-05 14:31:04.787  INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController    : 发送 kafka 成功.
2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : ConsumerRecord(topic = test, partition = 0, offset = 10, CreateTime = 1551767464787, serialized key size = 3, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = url_producer1)
2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : kafka 的 key: key
2019-03-05 14:31:04.848  INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener  : kafka 的 value: url_producer1
# 查看虚拟机 消费者信息
physics python message
123
null
url_producer
url_producer1
url_producer1

一些需要注意的问题

  1. 现在 kafka 官方提供自带 zookeeper 版本,不建议使用自带的,还是建议自己安装 zookeeper
  2. 物理机没法访问的时候,看文中的注意事项,依次更改一定能访问

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计10858字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7985435
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的 Nano Banana Pro?附赠邪修的用法 前言 大家好,我是星哥,今天来介绍谷歌的 ...
我用AI做了一个1978年至2019年中国大陆企业注册的查询网站

我用AI做了一个1978年至2019年中国大陆企业注册的查询网站

我用 AI 做了一个 1978 年至 2019 年中国大陆企业注册的查询网站 最近星哥在 GitHub 上偶然...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛 NAS 硬件 02:某鱼 6 张左右就可拿下 5 盘位的飞牛圣体 NAS 前言 大家好,我是星...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

  星哥带你玩飞牛 NAS-16:不再错过公众号更新,飞牛 NAS 搭建 RSS 对于经常关注多个微...
还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手! 前言 对于个人开发者、建站新手或是想搭建测试站点的从业者...
星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛 NAS-14:解锁公网自由!Lucky 功能工具安装使用保姆级教程 作为 NAS 玩家,咱们最...
你的云服务器到底有多强?宝塔跑分告诉你

你的云服务器到底有多强?宝塔跑分告诉你

你的云服务器到底有多强?宝塔跑分告诉你 为什么要用宝塔跑分? 宝塔跑分其实就是对 CPU、内存、磁盘、IO 做...
手把手教你,购买云服务器并且安装宝塔面板

手把手教你,购买云服务器并且安装宝塔面板

手把手教你,购买云服务器并且安装宝塔面板 前言 大家好,我是星哥。星哥发现很多新手刚接触服务器时,都会被“选购...