阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kafka+Log4j实现日志集中管理

141次阅读
没有评论

共计 9533 个字符,预计需要花费 24 分钟才能阅读完成。

记录如何使用 Kafka+Log4j 实现集中日志管理的过程。

引言

前面写的《Spring+Log4j+ActiveMQ 实现远程记录日志——实战 + 分析》得到了许多同学的认可,在认可的同时,也有同学提出可以使用 Kafka 来集中管理日志,于是今天就来学习一下。

特别说明,由于网络上关于 Kafka+Log4j 的完整例子并不多,我也是一边学习一边使用,因此如果有解释得不好或者错误的地方,欢迎批评指正,如果你有好的想法,也欢迎留言探讨。
 
第一部分 搭建 Kafka 环境

安装 Kafka

下载:http://kafka.apache.org/downloads.html

tar zxf kafka-<VERSION>.tgz
cd kafka-<VERSION>

启动 Zookeeper

启动 Zookeeper 前需要配置一下 config/zookeeper.properties:

Kafka+Log4j 实现日志集中管理

接下来启动 Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka Server

启动 Kafka Server 前需要配置一下 config/server.properties。主要配置以下几项,内容就不说了,注释里都很详细:

Kafka+Log4j 实现日志集中管理

然后启动 Kafka Server:

bin/kafka-server-start.sh config/server.properties

创建 Topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

查看创建的 Topic

>bin/kafka-topics.sh –list –zookeeper localhost:2181

启动控制台 Producer,向 Kafka 发送消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
This is a message
This is another message
^C

启动控制台 Consumer,消费刚刚发送的消息

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
This is a message
This is another message

删除 Topic

bin/kafka-topics.sh –delete –zookeeper localhost:2181 –topic test

注:只有当 delete.topic.enable=true 时,该操作才有效

配置 Kafka 集群(单台机器上)

首先拷贝 server.properties 文件为多份(这里演示 4 个节点的 Kafka 集群,因此还需要拷贝 3 份配置文件):

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties

修改 server1.properties 的以下内容:

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

同理修改 server2.properties 和 server3.properties 的这些内容,并保持所有配置文件的 zookeeper.connect 属性都指向运行在本机的 zookeeper 地址 localhost:2181。注意,由于这几个 Kafka 节点都将运行在同一台机器上,因此需要保证这几个值不同,这里以累加的方式处理。例如在 server2.properties 上:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

把 server3.properties 也配置好以后,依次启动这些节点:

bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic 在逻辑上可以被认为是一个 queue,每条消费都必须指定它的 Topic,可以简单理解为必须指明把这条消息放进哪个 queue 里。为了使得 Kafka 的吞吐率可以线性提高,物理上把 Topic 分成一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。

现在在 Kafka 集群上创建备份因子为 3,分区数为 4 的 Topic:

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 4 –topic kafka

说明:备份因子 replication-factor 越大,则说明集群容错性越强,就是当集群 down 掉后,数据恢复的可能性越大。所有的分区数里的内容共同组成了一份数据,分区数 partions 越大,则该 topic 的消息就越分散,集群中的消息分布就越均匀。

然后使用 kafka-topics.sh 的 –describe 参数查看一下 Topic 为 kafka 的详情:

Kafka+Log4j 实现日志集中管理

输出的第一行是所有分区的概要,接下来的每一行是一个分区的描述。可以看到 Topic 为 kafka 的消息,PartionCount=4,ReplicationFactor= 3 正是我们创建时指定的分区数和备份因子。

另外:Leader 是指负责这个分区所有读写的节点;Replicas 是指这个分区所在的所有节点(不论它是否活着);ISR 是 Replicas 的子集,代表存有这个分区信息而且当前活着的节点。

拿 partition:0 这个分区来说,该分区的 Leader 是 server0,分布在 id 为 0,1,2 这三个节点上,而且这三个节点都活着。

再来看下 Kafka 集群的日志:

Kafka+Log4j 实现日志集中管理

其中 kafka-logs- 0 代表 server0 的日志,kafka-logs- 1 代表 server1 的日志,以此类推。

从上面的配置可知,id 为 0,1,2,3 的节点分别对应 server0, server1, server2, server3。而上例中的 partition:0 分布在 id 为 0, 1, 2 这三个节点上,因此可以在 server0, server1, server2 这三个节点上看到有 kafka- 0 这个文件夹。这个 kafka- 0 就代表 Topic 为 kafka 的 partion0。
 
第二部分 Kafka+Log4j 项目整合

先来看下 Maven 项目结构图:

Kafka+Log4j 实现日志集中管理

作为 Demo,文件不多。先看看 pom.xml 引入了哪些 jar 包:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>18.0</version>
</dependency>

重要的内容是 log4j.properties:

log4j.rootLogger=INFO,console
 
# for package com.demo.kafka, log would be sent to kafka appender.
log4j.logger.com.demo.kafka=DEBUG,kafka
 
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kafka
# multiple brokers are separated by comma “,”.
log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] – [%l] %m%n
 
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] – [%l] %m%n
 

App.Java 里面就很简单啦,主要是通过 log4j 输出日志:

package com.demo.kafka;
import org.apache.log4j.Logger;
public class App {
    private static final Logger LOGGER = Logger.getLogger(App.class);
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            LOGGER.info(“Info [” + i + “]”);
            Thread.sleep(1000);
        }
    }
}

MyConsumer.java 用于消费 kafka 中的信息:

package com.demo.kafka;
 
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
 
public class MyConsumer {
    private static final String ZOOKEEPER = “localhost:2181”;
    //groupName 可以随意给,因为对于 kafka 里的每条消息,每个 group 都会完整的处理一遍
    private static final String GROUP_NAME = “test_group”;
    private static final String TOPIC_NAME = “kafka”;
    private static final int CONSUMER_NUM = 4;
    private static final int PARTITION_NUM = 4;
 
    public static void main(String[] args) {
        // specify some consumer properties
        Properties props = new Properties();
        props.put(“zookeeper.connect”, ZOOKEEPER);
        props.put(“zookeeper.connectiontimeout.ms”, “1000000”);
        props.put(“group.id”, GROUP_NAME);
 
        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector =
            Consumer.createJavaConsumerConnector(consumerConfig);
 
        // create 4 partitions of the stream for topic“test”, to allow 4
        // threads to consume
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
            consumerConnector.createMessageStreams(
                ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(TOPIC_NAME);
 
        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);
 
        // consume the messages in the threads
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new Runnable() {
                public void run() {
                    for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {
                        // process message (msgAndMetadata.message())
                        System.out.println(new String(msgAndMetadata.message()));
                    }
                }
            });
        }
    }
}

MyProducer.java 用于向 Kafka 发送消息,但不通过 log4j 的 appender 发送。此案例中可以不要。但是我还是放在这里:

package com.demo.kafka;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class MyProducer {
    private static final String TOPIC = “kafka”;
    private static final String CONTENT = “This is a single message”;
    private static final String BROKER_LIST = “localhost:9092”;
    private static final String SERIALIZER_CLASS = “kafka.serializer.StringEncoder”;
   
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(“serializer.class”, SERIALIZER_CLASS);
        props.put(“metadata.broker.list”, BROKER_LIST);
       
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
 
        //Send one message.
        KeyedMessage<String, String> message =
            new KeyedMessage<String, String>(TOPIC, CONTENT);
        producer.send(message);
       
        //Send multiple messages.
        List<KeyedMessage<String,String>> messages =
            new ArrayList<KeyedMessage<String, String>>();
        for (int i = 0; i < 5; i++) {
            messages.add(new KeyedMessage<String, String>
                (TOPIC, “Multiple message at a time. ” + i));
        }
        producer.send(messages);
    }
}

到这里,代码就结束了。
 
第三部分 运行与验证

先运行 MyConsumer,使其处于监听状态。同时,还可以启动 Kafka 自带的 ConsoleConsumer 来验证是否跟 MyConsumer 的结果一致。最后运行 App.java。

先来看看 MyConsumer 的输出:

Kafka+Log4j 实现日志集中管理

再来看看 ConsoleConsumer 的输出:

Kafka+Log4j 实现日志集中管理

可以看到,尽管发往 Kafka 的消息去往了不同的地方,但是内容是一样的,而且一条也不少。最后再来看看 Kafka 的日志。

我们知道,Topic 为 kafka 的消息有 4 个 partion,从之前的截图可知这 4 个 partion 均匀分布在 4 个 kafka 节点上,于是我对每一个 partion 随机选取一个节点查看了日志内容。

上图中黄色选中部分依次代表在 server0 上查看 partion0,在 server1 上查看 partion1,以此类推。

而红色部分是日志内容,由于在创建 Topic 时准备将 20 条日志分成 4 个区存储,可以很清楚的看到,这 20 条日志确实是很均匀的存储在了几个 partion 上。

摘一点 Infoq 上的话:每个日志文件都是一个 log entrie 序列,每个 log entrie 包含一个 4 字节整型数值(值为 N +5),1 个字节的 ”magic value”,4 个字节的 CRC 校验码,其后跟 N 个字节的消息体。每条消息都有一个当前 Partition 下唯一的 64 字节的 offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:

message length:4 bytes (value: 1+4+n)
“magic” value:1 byte
crc:4 bytes
payload:n bytes

这里我们看到的日志文件的每一行,就是一个 log entrie,每一行前面无法显示的字符(蓝色选中部分),就是(message length + magic value + crc)了。而 log entrie 的后部分,则是消息体的内容了。

问题

1. 如果要使用此种方式,有一种场景是提取某天或者某小时的日志,那么如何设计 Topic 呢?是不是要在 Topic 上带入日期或者小时数?还有更好的设计方案吗?

2. 假设按每小时设计 Topic,那么如何在使用诸如 logger.info() 这样的方法时,自动根据时间去改变 Topic 呢?有类似的例子吗?

—- 欢迎交流,共同进步。

样例下载

—————————————— 分割线 ——————————————

免费下载地址在 http://linux.linuxidc.com/

用户名与密码都是 www.linuxidc.com

具体下载目录在 /2015 年资料 /12 月 /13 日 /Kafka+Log4j 实现日志集中管理

下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm

—————————————— 分割线 ——————————————

参考页面:

  • http://kafka.apache.org/07/quickstart.html
  • http://kafka.apache.org/documentation.html#quickstart

相关阅读

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Apache kafka 原理与特性 (0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka 部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2015-12/126172.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计9533字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中