阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

基于Flume+Log4j+Kafka的日志采集架构方案

192次阅读
没有评论

共计 14167 个字符,预计需要花费 36 分钟才能阅读完成。

本文将会介绍如何使用 Flume、log4j、Kafka 进行规范的日志采集。

基于 Flume+Log4j+Kafka 的日志采集架构方案

Flume 基本概念

Flume 是一个完善、强大的日志采集工具,关于它的配置,在网上有很多现成的例子和资料,这里仅做简单说明不再详细赘述。
Flume 包含 Source、Channel、Sink 三个最基本的概念:

基于 Flume+Log4j+Kafka 的日志采集架构方案

Source——日志来源,其中包括:Avro Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Kafka Source、NetCat Source、Sequence Generator Source、Syslog Source、HTTP Source、Stress Source、Legacy Source、Custom Source、Scribe Source 以及 Twitter 1% firehose Source。

Channel——日志管道,所有从 Source 过来的日志数据都会以队列的形式存放在里面,它包括:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom Channel。

Sink——日志出口,日志将通过 Sink 向外发射,它包括:HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBase Sink、Async HBase Sink、Morphline Solr Sink、Elastic Search Sink、Kite Dataset Sink、Kafka Sink、Custom Sink。

基于 Flume 的日志采集是灵活的,我们可以看到既有 Avro Sink 也有 Avro Source,既有 Thrift Sink 也有 Thrift Source,这意味着我们可以将多个管道处理串联起来,如下图所示:

基于 Flume+Log4j+Kafka 的日志采集架构方案

串联的意义在于,我们可以将多个管道合并到一个管道中最终输出到同一个 Sink 中去,如下图:

基于 Flume+Log4j+Kafka 的日志采集架构方案

上面讲述了 Source 和 Sink 的作用,而 Channel 的作用在于处理不同的 Sink,假设我们一个 Source 要对应多个 Sink,则只需要为一个 Source 建立多个 Channel 即可,如下所示:

基于 Flume+Log4j+Kafka 的日志采集架构方案

一个 Source 如果想要输出到多个 Sink 中去,就需要建立多个 Channel 进行介入并最终输出,通过上面这几张图,我们可以很好的理解 Flume 的运行机制,我们在这里也就点到为止,详细的配置可以在官网或者在网上搜索到、查看到。

一般情况下,我们使用 Exec Source 对 log 文件进行监控,这样做确实是比较简单,但是并不方便,我们需要在每一台要监控的服务器上部署 Flume,对运维来讲万一目标日志文件发生 IO 异常(例如格式改变、文件名改变、文件被锁),也是很痛苦的,因此我们最好能让日志直接通过 Socket 发送出去,而不是存放在本地,这样一来,不仅降低了目标服务器的磁盘占用,还能够有效的防止文件 IO 异常,而 Kafka 就是一个比较好的解决方案,具体的架构如下图所示:

基于 Flume+Log4j+Kafka 的日志采集架构方案

 

由上图可以看到,日志最终流向了两个地方:HBase Persistence 和 Realtime Processor,而至于为什么不用 Kafka 直接与 Storm 进行通信的原因是为了将 Sotrm 逻辑和日志源通过 Flume 进行隔离,在 Storm 中对日志进行简单的分析后,将结果扔进 Rabbit MQ 中供 WEB APP 消费。

HBase Persistence 就是将原始的日志记录在 HBase 中以便回档查询,而 Realtime Processor 则包含了实时的日志统计以及错误异常邮件提醒等功能。

为了能够准确的捕获到异常数据,我们还需要对程序进行一些规范化的改造,例如提供统一的异常处理句柄等等。

日志输出格式

既然打算要对日志进行统一处理,一个统一、规范的日志格式就是非常重要的,而我们以往使用的 PatternLayout 对于最终字段的切分非常的不方便,如下所示:

2016-05-08 19:32:55,572 [INFO] [main] – [com.banksteel.log.demo.log4j.Demo.main(Demo.Java:13)] 输出信息……
2016-05-08 19:32:55,766 [DEBUG] [main] – [com.banksteel.log.demo.log4j.Demo.main(Demo.java:15)] 调试信息……
2016-05-08 19:32:55,775 [WARN] [main] – [com.banksteel.log.demo.log4j.Demo.main(Demo.java:16)] 警告信息……
2016-05-08 19:32:55,783 [ERROR] [main] – [com.banksteel.log.demo.log4j.Demo.main(Demo.java:20)] 处理业务逻辑的时候发生一个错误……
java.lang.Exception: 错误消息啊
at com.banksteel.log.demo.log4j.Demo.main(Demo.java:18)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

如何去解析这个日志,是个非常头疼的地方,万一某个系统的开发人员输出的日志不符合既定规范的 PatternLayout 就会引发异常。

为了能够一劳永逸的解决格式问题,我们采用 JsonLayout 就能很好的规范日志输出,例如 LOG4J 2.X 版本中提供的 JsonLayout 输出的格式如下所示:

{"timeMillis" : 1462712870612,
  "thread" : "main",
  "level" : "FATAL",
  "loggerName" : "com.banksteel.log.demo.log4j2.Demo",
  "message" : "发生了一个可能会影响程序继续运行下去的异常!",
  "thrown" : {"commonElementCount" : 0,
    "localizedMessage" : "错误消息啊",
    "message" : "错误消息啊",
    "name" : "java.lang.Exception",
    "extendedStackTrace" : [ {"class" : "com.banksteel.log.demo.log4j2.Demo",
      "method" : "main",
      "file" : "Demo.java",
      "line" : 20,
      "exact" : true,
      "location" : "classes/",
      "version" : "?"
    }, {"class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke0",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : -2,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : 57,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "sun.reflect.DelegatingMethodAccessorImpl",
      "method" : "invoke",
      "file" : "DelegatingMethodAccessorImpl.java",
      "line" : 43,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "java.lang.reflect.Method",
      "method" : "invoke",
      "file" : "Method.java",
      "line" : 606,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "com.intellij.rt.execution.application.AppMain",
      "method" : "main",
      "file" : "AppMain.java",
      "line" : 144,
      "exact" : true,
      "location" : "idea_rt.jar",
      "version" : "?"
    } ]
  },
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "source" : {"class" : "com.banksteel.log.demo.log4j2.Demo",
    "method" : "main",
    "file" : "Demo.java",
    "line" : 23
  }
} 

我们看到,这种格式,无论用什么语言都能轻松解析了。

日志框架的 Kafka 集成

我们这里只用 log4j 1.x 和 log4j 2.x 进行示例。

log4j 1.x 与 Kafka 集成

首先 POM.xml 的内容如下:

<dependencies>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.1</version>
    </dependency>
</dependencies> 

注意,我们这里使用的 Kafka 版本号是 0.8.2.1,但是对应 0.9.0.1 是可以使用的并且 0.9.0.1 也只能用 0.8.2.1 才不会发生异常(具体异常可以自己尝试一下)。

而 log4j 1.x 本身是没有 JsonLayout 可用的,因此我们需要自己实现一个类,如下所示:

package com.banksteel.log.demo.log4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.log4j.Layout;
import org.apache.log4j.spi.LoggingEvent;

import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * 扩展 Log4j 1.x,使其支持 JsonLayout,与 log4j2.x 一样是基于 Jackson 进行解析,其格式也是完全参考 Log4J 2.x 实现的。*
 * @author 热血 BUG 男
 * @version 1.0.0
 * @since Created by gebug on 2016/5/8.
 */
public class JsonLayout extends Layout {private final ObjectMapper mapper = new ObjectMapper();

    public String format(LoggingEvent loggingEvent) {
        String json;
        Map<String, Object> map = new LinkedHashMap<String, Object>(0);
        Map<String, Object> source = new LinkedHashMap<String, Object>(0);
        source.put("method", loggingEvent.getLocationInformation().getMethodName());
        source.put("class", loggingEvent.getLocationInformation().getClassName());
        source.put("file", loggingEvent.getLocationInformation().getFileName());
        source.put("line", safeParse(loggingEvent.getLocationInformation().getLineNumber()));

        map.put("timeMillis", loggingEvent.getTimeStamp());
        map.put("thread", loggingEvent.getThreadName());
        map.put("level", loggingEvent.getLevel().toString());
        map.put("loggerName", loggingEvent.getLocationInformation().getClassName());
        map.put("source", source);
        map.put("endOfBatch", false);
        map.put("loggerFqcn", loggingEvent.getFQNOfLoggerClass());


        map.put("message", safeToString(loggingEvent.getMessage()));
        map.put("thrown", formatThrowable(loggingEvent));
        try {json = mapper.writeValueAsString(map);
        } catch (JsonProcessingException e) {return e.getMessage();}
        return json;
    }

    private List<Map<String, Object>> formatThrowable(LoggingEvent le) {if (le.getThrowableInformation() == null ||
                le.getThrowableInformation().getThrowable() == null)
            return null;
        List<Map<String, Object>> traces = new LinkedList<Map<String, Object>>();
        Map<String, Object> throwableMap = new LinkedHashMap<String, Object>(0);
        StackTraceElement[] stackTraceElements = le.getThrowableInformation().getThrowable().getStackTrace();
        for (StackTraceElement stackTraceElement : stackTraceElements) {throwableMap.put("class", stackTraceElement.getClassName());
            throwableMap.put("file", stackTraceElement.getFileName());
            throwableMap.put("line", stackTraceElement.getLineNumber());
            throwableMap.put("method", stackTraceElement.getMethodName());
            throwableMap.put("location", "?");
            throwableMap.put("version", "?");
            traces.add(throwableMap);
        }
        return traces;
    }

    private static String safeToString(Object obj) {if (obj == null) return null;
        try {return obj.toString();} catch (Throwable t) {return "Error getting message:" + t.getMessage();}
    }

    private static Integer safeParse(String obj) {try {return Integer.parseInt(obj.toString());
        } catch (NumberFormatException t) {return null;
        }
    }

    public boolean ignoresThrowable() {return false;
    }

    public void activateOptions() {}
}

其实并不复杂,注意其中有一些获取不到的信息,用?代替了,保留字段的目的在于与 log4j 2.x 的日志格式完全一致,配置 log4j.properties 如下对接 Kafka:

log4j.rootLogger=INFO,console
log4j.logger.com.banksteel.log.demo.log4j=DEBUG,kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=server_log
log4j.appender.kafka.brokerList=Kafka-01:9092,Kafka-02:9092,Kafka-03:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=com.banksteel.log.demo.log4j.JsonLayout
# appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n 

通过打印日志我们可以看到其输出的最终格式如下:

{"timeMillis": 1462713132695,
  "thread": "main",
  "level": "ERROR",
  "loggerName": "com.banksteel.log.demo.log4j.Demo",
  "source": {"method": "main",
    "class": "com.banksteel.log.demo.log4j.Demo",
    "file": "Demo.java",
    "line": 20
  },
  "endOfBatch": false,
  "loggerFqcn": "org.slf4j.impl.Log4jLoggerAdapter",
  "message": "处理业务逻辑的时候发生一个错误……",
  "thrown": [
    {"class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {"class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {"class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {"class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {"class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {"class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    }
  ]
}
基于 Flume+Log4j+Kafka 的日志采集架构方案

测试类:

基于 Flume+Log4j+Kafka 的日志采集架构方案
package com.banksteel.log.demo.log4j;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author 热血 BUG 男
 * @version 1.0.0
 * @since Created by gebug on 2016/5/8.
 */
public class Demo {private static final Logger logger = LoggerFactory.getLogger(Demo.class);

    public static void main(String[] args) {logger.info("输出信息……");
        logger.trace("随意打印……");
        logger.debug("调试信息……");
        logger.warn("警告信息……");
        try {throw new Exception("错误消息啊");
        } catch (Exception e) {logger.error("处理业务逻辑的时候发生一个错误……", e);
        }
    }
} 

log4j 2.x 与 Kafka 集成

log4j 2.x 天生支持 JsonLayout,并且与 Kafka 集成方便,我们只需要按部就班的配置一下就好了,POM.xml 如下:

<dependencies>
  <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.5</version>
  </dependency>
  <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.5</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.7.4</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.7.4</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.7.4</version>
  </dependency>
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.9.0.1</version>
  </dependency>
</dependencies>

log4j2.xml 配置文件如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<!-- Log4j2 的配置文件 -->
<Configuration status="DEBUG" strict="true" name="LOG4J2_DEMO" packages="com.banksteel.log.demo.log4j2">
    <properties>
        <property name="logPath">log</property>
    </properties>

    <Appenders>
        <!-- 配置控制台输出样式 -->
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%highlight{%d{yyyy-MM-dd HH:mm:ss} %d{UNIX_MILLIS} [%t] %-5p %C{1.}:%L - %msg%n}"/>
        </Console>
        <!-- 配置 Kafka 日志主动采集,Storm 会将日志解析成字段存放在 HBase 中。-->
        <Kafka name="Kafka" topic="server_log">
            <!-- 使用 JSON 传输日志文件 -->
            <JsonLayout complete="true" locationInfo="true"/>
            <!--Kafka 集群配置,需要在本机配置 Hosts 文件,或者通过 Nginx 配置 -->
            <Property name="bootstrap.servers">Kafka-01:9092,Kafka-02:9092,Kafka-03:9092</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="DEBUG">
            <!-- 启用控制台输出日志 -->
            <AppenderRef ref="Console"/>
            <!-- 启用 Kafka 采集日志 -->
            <AppenderRef ref="Kafka"/>
        </Root>
    </Loggers>
</Configuration>

这样就 Okay 了,我们可以在 Kafka 中看到完整的输出:

{"timeMillis" : 1462712870591,
  "thread" : "main",
  "level" : "ERROR",
  "loggerName" : "com.banksteel.log.demo.log4j2.Demo",
  "message" : "处理业务逻辑的时候发生一个错误……",
  "thrown" : {"commonElementCount" : 0,
    "localizedMessage" : "错误消息啊",
    "message" : "错误消息啊",
    "name" : "java.lang.Exception",
    "extendedStackTrace" : [ {"class" : "com.banksteel.log.demo.log4j2.Demo",
      "method" : "main",
      "file" : "Demo.java",
      "line" : 20,
      "exact" : true,
      "location" : "classes/",
      "version" : "?"
    }, {"class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke0",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : -2,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : 57,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "sun.reflect.DelegatingMethodAccessorImpl",
      "method" : "invoke",
      "file" : "DelegatingMethodAccessorImpl.java",
      "line" : 43,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "java.lang.reflect.Method",
      "method" : "invoke",
      "file" : "Method.java",
      "line" : 606,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {"class" : "com.intellij.rt.execution.application.AppMain",
      "method" : "main",
      "file" : "AppMain.java",
      "line" : 144,
      "exact" : true,
      "location" : "idea_rt.jar",
      "version" : "?"
    } ]
  },
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "source" : {"class" : "com.banksteel.log.demo.log4j2.Demo",
    "method" : "main",
    "file" : "Demo.java",
    "line" : 22
  }
}

为了减少日志对空间的占用,我们通常还会设置 JSONLayout 的 compact 属性为 true,这样在 kafka 中获得的日志将会排除掉空格和换行符。

最后

由于在实际开发中,我们会引入多个第三方依赖,这些依赖往往也会依赖无数的 log 日志框架,为了保证测试通过,请认清本文例子中的包名以及版本号,log4j 1.x 的 Json 输出是为了完全模拟 2.x 的字段,因此部分字段用?代替,如果想要完美,请自行解决。

随便解释一下日志级别,以便建立规范:

log.error 错误信息,通常写在 catch 中,可以使用 log.error(“ 发生了一个错误 ”,e) 来记录详细的异常堆栈

log.fatal 严重错误,该级别的错误用来记录会导致程序异常退出的错误日志。

log.warn 警告

log.info 信息

log.trace 简单输出文字

log.debug 调试信息

Log4j 配置详解 http://www.linuxidc.com/Linux/2014-10/108401.htm

Apache Log4j 2 更多内容请看:http://logging.apache.org/log4j/2.x/

Log4j 入门使用教程 http://www.linuxidc.com/Linux/2013-06/85223.htm

Log4j 日志详细用法 http://www.linuxidc.com/Linux/2014-09/107303.htm

Hibernate 配置 Log4j 显示 SQL 参数 http://www.linuxidc.com/Linux/2013-03/81870.htm

Log4j 学习笔记 (1)_Log4j 基础 & 配置项解析 http://www.linuxidc.com/Linux/2013-03/80586.htm

Log4j 学习笔记 (2)_Log4j 配置示例 &Spring 集成 Log4j http://www.linuxidc.com/Linux/2013-03/80587.htm

Log4j 的详细介绍 :请点这里
Log4j 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2016-05/131402.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-21发表,共计14167字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中