阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

高吞吐量的分布式发布订阅消息系统Kafka

99次阅读
没有评论

共计 20939 个字符,预计需要花费 53 分钟才能阅读完成。

一、Kafka 概述 

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Apache kafka 原理与特性(0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka 部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

二、Kafka 相关术语

  • Broker
    Kafka 集群包含一个或多个服务器,这种服务器被称为 broker
  • Topic
    每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
  • Partition
    Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition.
  • Producer
    负责发布消息到 Kafka broker
  • Consumer
    消息消费者,向 Kafka broker 读取消息的客户端。
  • Consumer Group
    每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

二、Kafka 下载及安装

     1、下载

1
wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

2、安装

1
2
tar zxvf kafka_2.11-0.9.0.1.tgz
cd kafka_2.11-0.9.0.1

3、集群配置

         设定有两台服务器 192.168.1.237、192.168.1.238,两台服务器各安装有两 zookeeper, 端口都为 2181(zookeeper 不再说明),每个服务器都为 Kafka 配置 3 个 broker。

     3.1、server.properties 配置

1
2
3
4
5
6
broker.id = 10
port = 9090
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server0
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

说明:host.name\advertised.host.name 两个参数还是要配置为 IP,否则会有各种各样的问题。

    3.2、server1.properties 配置

1
cp config/servier.properties config/server1.properties<br>vim config/server1.properties
?
1
2
3
4
5
6
broker.id = 11
port = 9091
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server1
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

 3.3、server2.properties 配置

1
2
cp config/servier.properties config/server2.properties
vim config/server2.properties
?
1
2
3
4
5
6
broker.id = 12
port = 9092
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server2
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

说明:同一台服务器 port、log.dirs 不能相同,不同的服务器 broker.id 只要在一个集群中都不能相同。

      3.4、同理 另一台服务器的 server.properties,server1.properties,server2.properties 的 broker.id 分别为:20、21、22,port 分别为:9090、9091、9092  其它:host.name=192.168.1.238、advertised.host.name=192.168.1.238

      3.5、启动

1
2
3
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &

3.6、监控端口

1
2
3
4
netstat -tunpl |grep 2181
netstat -tunpl |grep 9090
netstat -tunpl |grep 9091
netstat -tunpl |grep 9092

看一下这 4 个端口起来没有,并看一下 iptables 有没有加入这 4 个 IP 的启动,或要把 iptables 相关,否则 Java 连接不进来。

    四、测试

        4.1、创建 Topic

1
bin/kafka-topics.sh --create --zookeeper 192.168.1.237:2181 --replication-factor 3 --partitions 1 --topic testTopic

 4.2、查看创建情况

?
1
bin/kafka-topics.sh --describe --zookeeper 192.168.1.237:2181 --topic testTopic

  4.3、生产者发送消息

1
bin/kafka-console-producer.sh --broker-list 192.168.1.237:9090 --topic testTopic

        4.4、消费都接收消息

1
bin/kafka-console-consumer.sh --zookeeper 192.168.1.237:2181 --from-beginning --topic testTopic

 4.5、检查 consumer offset 位置

1
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.1.237:2181 --group testTopic

五、遇到的问题

          1、运行一段时间报错

1
2
3
4
5
6
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# //hs_err_pid6500.log
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)

   解决:

you can adjust the JVM heap size by editing kafka-server-start.sh, zookeeper-server-start.shand so on:

1
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

  The -Xms parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only             have 512M, you should change the maximum heap size (-Xmx) too:

1
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-06/132067p2.htm

一、概述

    Spring Integration Kafka 是基于 Apache Kafka 和 Spring Integration 来集成 Kafka,对开发配置提供了方便。

二、配置

    1、spring-kafka-consumer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
                        http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
                        http://www.springframework.org/schema/integration
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/task
                        http://www.springframework.org/schema/task/spring-task.xsd">
 
    <!-- topic test conf -->
    <int:channel id="inputFromKafka" >
        <int:dispatcher task-executor="kafkaMessageExecutor" />
    </int:channel>
    <!-- zookeeper 配置 可以配置多个 -->
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="192.168.1.237:2181" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />
    <!-- channel 配置 auto-startup="true"  否则接收不发数据 -->
    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="true" channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
    <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" />
    <bean id="kafkaDecoder"
        class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
 
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
    <!-- 消息接收的 BEEN -->
    <bean id="kafkaConsumerService" class="com.sunney.service.impl.KafkaConsumerService" />
    <!-- 指定接收的方法 -->
    <int:outbound-channel-adapter channel="inputFromKafka"
        ref="kafkaConsumerService" method="processMessage" />
 
    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
        consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                max-messages="5000">
                <!-- 两个 TOPIC 配置 -->
                <int-kafka:topic id="mytopic" streams="4" />
                <int-kafka:topic id="sunneytopic" streams="4" />
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
</beans>
    

2、spring-kafka-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
     
   <!-- commons config -->
    <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer"/>
    <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
        <constructor-arg value="Java.lang.String" />
    </bean>
    <bean id="producerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                <prop key="message.send.max.retries">5</prop>
                <prop key="serializer.class">kafka.serializer.StringEncoder</prop>
                <prop key="request.required.acks">1</prop>
            </props>
        </property>
    </bean>
     
    <!-- topic test config  -->
     
    <int:channel id="kafkaTopicTest">
        <int:queue />
    </int:channel>
     
    <int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest"
        auto-startup="true" channel="kafkaTopicTest" order="3">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS"
            receive-timeout="1" task-executor="taskExecutor" />
    </int-kafka:outbound-channel-adapter>
    <task:executor id="taskExecutor" pool-size="5"
        keep-alive="120" queue-capacity="500" />
    <!-- <bean id="kafkaEncoder"
        class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder">
        <constructor-arg value="com.company.AvroGeneratedSpecificRecord" />
    </bean> -->
    <int-kafka:producer-context id="producerContextTopicTest"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <!-- 多个 topic 配置 -->
            <int-kafka:producer-configuration
                broker-list="192.168.1.237:9090,192.168.1.237:9091,192.168.1.237:9092"
                key-serializer="stringSerializer"
                value-class-type="java.lang.String"
                value-serializer="stringSerializer"
                topic="mytopic" />
            <int-kafka:producer-configuration
                broker-list="192.168.1.237:9090,192.168.1.237:9091,192.168.1.237:9092"
                key-serializer="stringSerializer"
                value-class-type="java.lang.String"
                value-serializer="stringSerializer"
                topic="sunneytopic"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
</beans>

3、发消息接口 KafkaService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.sunney.service;
 
/**
 * 类 KafkaService.java 的实现描述:发消息接口类
 * @author Sunney 2016 年 4 月 30 日 上午 11:30:53
 */
public interface KafkaService {
    /**
     * 发消息
     * @param topic 主题
     * @param obj 发送内容
     */
    public void sendUserInfo(String topic, Object obj);
}

4、发消息实现类 KafkaServiceImpl 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sunney.service.impl;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
 
import com.sunney.service.KafkaService;
 
/**
 * 类 KafkaServiceImpl.java 的实现描述:发消息实现类
 * @author Sunney 2016 年 4 月 30 日 上午 11:31:13
 */
@Service("kafkaService")
public class KafkaServiceImpl  implements KafkaService{
 
    @Autowired
    @Qualifier("kafkaTopicTest")
    MessageChannel channel;
 
    public void sendUserInfo(String topic, Object obj) {
        channel.send(MessageBuilder.withPayload(obj)
                                    .setHeader(KafkaHeaders.TOPIC,topic)
                                    .build());
    }
 
}

5、消费接收类 KafkaConsumerService 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.sunney.service.impl;
 
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.alibaba.fastjson.JSON;
import com.sunney.service.UserDto;
 
/**
 * 类 KafkaConsumerService.java 的实现描述:消费接收类
 *
 * @author Sunney 2016 年 4 月 30 日 上午 11:46:14
 */
public class KafkaConsumerService {
 
    static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
 
    public void processMessage(Map<String, Map<Integer, String>> msgs) {
        logger.info("===============processMessage===============");
        for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
            logger.info("============Topic:" + entry.getKey());
            LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue();
            Set<Integer> keys = messages.keySet();
            for (Integer i : keys)
                logger.info("======Partition:" + i);
            Collection<String> values = messages.values();
            for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) {
                String message = "["+iterator.next()+"]";
                logger.info("=====message:" + message);
                List<UserDto> userList = JSON.parseArray(message, UserDto.class); 
                logger.info("=====userList.size:" + userList.size());
 
            }
 
        }
    }
 
}

6、pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>1.3.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId> org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>

六、源代码地址:https://github.com/sunney2010/kafka-demo

     七、遇到的问题

         1、消费端口收不到消息

              spring-kafka-consumer.xml 的 auto-startup 设置为 true

一、Kafka 概述 

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Apache kafka 原理与特性(0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka 部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

二、Kafka 相关术语

  • Broker
    Kafka 集群包含一个或多个服务器,这种服务器被称为 broker
  • Topic
    每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
  • Partition
    Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition.
  • Producer
    负责发布消息到 Kafka broker
  • Consumer
    消息消费者,向 Kafka broker 读取消息的客户端。
  • Consumer Group
    每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

二、Kafka 下载及安装

     1、下载

1
wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

2、安装

1
2
tar zxvf kafka_2.11-0.9.0.1.tgz
cd kafka_2.11-0.9.0.1

3、集群配置

         设定有两台服务器 192.168.1.237、192.168.1.238,两台服务器各安装有两 zookeeper, 端口都为 2181(zookeeper 不再说明),每个服务器都为 Kafka 配置 3 个 broker。

     3.1、server.properties 配置

1
2
3
4
5
6
broker.id = 10
port = 9090
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server0
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

说明:host.name\advertised.host.name 两个参数还是要配置为 IP,否则会有各种各样的问题。

    3.2、server1.properties 配置

1
cp config/servier.properties config/server1.properties<br>vim config/server1.properties
?
1
2
3
4
5
6
broker.id = 11
port = 9091
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server1
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

 3.3、server2.properties 配置

1
2
cp config/servier.properties config/server2.properties
vim config/server2.properties
?
1
2
3
4
5
6
broker.id = 12
port = 9092
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server2
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

说明:同一台服务器 port、log.dirs 不能相同,不同的服务器 broker.id 只要在一个集群中都不能相同。

      3.4、同理 另一台服务器的 server.properties,server1.properties,server2.properties 的 broker.id 分别为:20、21、22,port 分别为:9090、9091、9092  其它:host.name=192.168.1.238、advertised.host.name=192.168.1.238

      3.5、启动

1
2
3
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &

3.6、监控端口

1
2
3
4
netstat -tunpl |grep 2181
netstat -tunpl |grep 9090
netstat -tunpl |grep 9091
netstat -tunpl |grep 9092

看一下这 4 个端口起来没有,并看一下 iptables 有没有加入这 4 个 IP 的启动,或要把 iptables 相关,否则 Java 连接不进来。

    四、测试

        4.1、创建 Topic

1
bin/kafka-topics.sh --create --zookeeper 192.168.1.237:2181 --replication-factor 3 --partitions 1 --topic testTopic

 4.2、查看创建情况

?
1
bin/kafka-topics.sh --describe --zookeeper 192.168.1.237:2181 --topic testTopic

  4.3、生产者发送消息

1
bin/kafka-console-producer.sh --broker-list 192.168.1.237:9090 --topic testTopic

        4.4、消费都接收消息

1
bin/kafka-console-consumer.sh --zookeeper 192.168.1.237:2181 --from-beginning --topic testTopic

 4.5、检查 consumer offset 位置

1
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.1.237:2181 --group testTopic

五、遇到的问题

          1、运行一段时间报错

1
2
3
4
5
6
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# //hs_err_pid6500.log
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)

   解决:

you can adjust the JVM heap size by editing kafka-server-start.sh, zookeeper-server-start.shand so on:

1
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

  The -Xms parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only             have 512M, you should change the maximum heap size (-Xmx) too:

1
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-06/132067p2.htm

一、概述

      Kafka 在雅虎内部被很多团队使用, 媒体团队用它做实时分析流水线, 可以处理高达 20Gbps(压缩数据)的峰值带宽。
为了简化开发者和服务工程师维护 Kafka 集群的工作,构建了一个叫做 Kafka 管理器的基于 Web 工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些 topic 分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。
该软件是用 Scala 语言编写的。目前 (2015 年 02 月 03 日) 雅虎已经开源了 Kafka Manager 工具。这款 Kafka 集群管理工具主要支持以下几个功能:
1、管理几个不同的集群;
2、很容易地检查集群的状态 (topics, brokers, 副本的分布, 分区的分布);
3、选择副本;
4、产生分区分配(Generate partition assignments) 基于集群的当前状态;
5、重新分配分区。

二、Kafka Manager 下载及安装

     项目地址:https://github.com/yahoo/kafka-manager

     这个项目比 https://github.com/claudemamo/kafka-web-console 要好用一些,显示的信息更加丰富,kafka-manager 本身可以是一个集群。

     不过 kafka-manager 也没有权限管理功能。

     下载:

1
git clone git@github.com:yahoo/kafka-manager.git

    下载完后,只能源代码你什么也做不了,我们要把项目编译打包,该软件是用 Scala 语言编写,把有编译打包很麻烦,他依赖于 sbt。sbt 比较难安装。

三、sbt 安装

    1、下载 sbt-0.13.11

         我的服务器是 CentOS 自动安装几次没有成功,我还是选择手动安装。

         请自己到 http://www.scala-sbt.org/download.html 下载最新版本,我的版本是 sbt-0.13.11

    2、建立目录,解压文件到所建立目录

1
2
$ sudo mkdir /opt/scala/sbt
$ sudo tar zxvf sbt-0.13.11.tgz -C /opt/scala/

 3、建立启动 sbt 的脚本文件

1
2
3
4
5
6
7
/* 选定一个位置,建立启动 sbt 的脚本文本文件,如 /opt/scala/sbt/ 目录下面新建文件名为 sbt 的文本文件 */
$ cd /opt/scala/sbt/
$ vim sbt
/* 在 sbt 文本文件中添加
BT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
Java $SBT_OPTS -jar /opt/scala/sbt/bin/sbt-launch.jar "$@"
然后按 esc 键 输入 :wq 保存退出,注意红色字体中的路径可以是绝对路径也可以是相对路径,只要能够正确的定位到解压的 sbt 文件包中的 sbt-launch.jar 文件即可 */

并修改 sbt 文件权限

1
$ chmod u+x sbt

4、配置 PATH 环境变量,保证在控制台中可以使用 sbt 命令

1
2
3
$ vim /etc/profile
/* 在文件尾部添加如下代码后,保存退出 */
export PATH=/opt/scala/sbt/:$PATH

 

1
2
/* 使配置文件立刻生效 */
$ source /etc/profile

 5、测试 sbt 是否安装成功
          第一次执行时,会下载一些文件包,然后才能正常使用,要确保联网了,下载的过程分很慢。安装成功后显示如下

1
sbt sbt-version<br>[info] Set current project to sbt (in build file:/opt/scala/sbt/)<br>[info] 0.13.11

四、编绎打包

1
2
cd kafka-manager
sbt clean dist

生成的包会在 kafka-manager/target/universal 下面。生成的包只需要 java 环境就可以运行了,在部署的机器上不需要安装 sbt。

      如果打包会很慢的要有点耐心呀,还有可能打包失败,可以考虑配置代理。

四、Kafka Manager 部署

    1、打好包好,在部署机器上解压,修改好配置文件,就可以运行了 – 解压

1
unzip kafka-manager-1.0-SNAPSHOT.zip

2、修改 conf/application.conf,把 kafka-manager.zkhosts 改为自己的 zookeeper 服务器地址

1
kafka-manager.zkhosts="192.168.1.237:2181"

3、启动

1
2
cd kafka-manager-1.0-SNAPSHOT/bin
./kafka-manager -Dconfig.file=../conf/application.conf

4、查看帮助 和 后台运行

1
2
./kafka-manager -h
nohup ./kafka-manager -Dconfig.file=../conf/application.conf >/dev/null 2>&1

说明:正常来说,play 框架应该会自动加载 conf/application.conf 配置里的内容,但是貌似这个不起作用,要显式指定才行。

参考:https://github.com/yahoo/kafka-manager/issues/16

    5、默认 http 端口是 9000,可以修改配置文件里的 http.port 的值,或者通过命令行参数传递:

1
./kafka-manager -Dhttp.port=9001

五、sbt 配置代理

    sbt 的配置 http 代理的参考文档:http://www.scala-sbt.org/0.12.1/docs/Detailed-Topics/Setup-Notes.html#http-proxy

    通过 - D 设置叁数即可:

1
java -Dhttp.proxyHost=myproxy -Dhttp.proxyPort=8080 -Dhttp.proxyUser=username -Dhttp.proxyPassword=mypassword

也可以用下面这种方式,设置一下 SBT_OPTS 的环境变量即可:

1
export SBT_OPTS="$SBT_OPTS -Dhttp.proxyHost=myproxy -Dhttp.proxyPort=myport"

注意:myproxy,这个值里不要带 http 前缀,也不要带端口号。

      比如,你的代理是 http://localhost:8123,那么应该这样配置:

1
export SBT_OPTS="$SBT_OPTS -Dhttp.proxyHost=localhost -Dhttp.proxyPort=8123"

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-06/132067.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-21发表,共计20939字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中