阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

利用Rsyslog进行日志收集到Kafka

313次阅读
没有评论

共计 4544 个字符,预计需要花费 12 分钟才能阅读完成。

项目需要将日志收集起来做存储分析,数据的流向为 rsyslog(收集)-> kafka(消息队列)-> logstash(清理) -> es、hdfs;今天我们先将如何利用 rsyslog 进行日志收集到 kafka。

一、环境准备

通过对 rsyslog 官方文档 查看, 得知 rsyslog 对 kafka 的支持是 v8.7.0 版本后才提供的支持. 通过 ChangeLog 也可以看出 V8.X 的版本变化.
最新 V8 稳定版已经提供 RPM 包的 Rsyslog-kafka 插件了, 直接 yum 安装即可, 添加 yum 源:

[rsyslog_v8]
name=Adiscon CentOS-$releasever - local packages for $basearch
baseurl=http://rpms.adiscon.com/v8-stable/epel-$releasever/$basearch
enabled=1
gpgcheck=0
gpgkey=http://rpms.adiscon.com/RPM-GPG-KEY-Adiscon
protect=1

添加后 yum install rsyslog rsyslog-kafka.x86_64即可完成安装。

二、配置

1. 处理原则

  • input submit received messages to rulesets, zero or many
  • ruleset contains rule, rule consist of a filter and an action list
  • actions consist of the action call itself (e.g.”:omusrmsg:”) as well as all action-defining configuration statements ($Action… directives)

2. Statement Types 表达式类型

通常利用 RainerScript type statements 进行非常简洁明了的配置声明,例如:

mail.info /var/log/mail.log 

3. 流程控制

  • Control structures
  • 过滤条件
    1. Selector: 传统方式,格式如下:
      <facility>[,facility...][,*].[=,!]<priority>[,priority...][,*];<facility>[,facility...][,*].[=|!]<priority>[,priority...][,*]...
      其中默认 facility 为 auth, authpriv, cron, daemon, kern, lpr, mail, mark, news, security (same as auth), syslog, user, uucp and local0 through local7;
      默认 priority 为 debug, info, notice, warning, warn (same as warning), err, error (same as err), crit, alert, emerg, panic (same as emerg);
      2)Property-based filters:new filter type. 形式如下:
      :property, [!]compare-operation, "value"
      分别对应 名字,比较符,需要对比的字段。比较符包括 contains, isequal, startswith, regex, ereregex
      3)Expression based filters:
      if expr then action-part-of-selector-line
    2. BSD-style blocks:
    3. 例子:if $syslogfacility-text == 'local0' and $msg startswith 'DEVNAME' and not ($msg contains 'error1' or $msg contains 'error0') then /var/log/somelog

4. 数据处理: 支持 set, unset, reset 操作

备注:Only message json (CEE/Lumberjack) properties can be modified by the set, unset andreset statements

5. input

有很多种 input 模块, 我们以 imfile 模块为例, 此模块将所有的文本文件内容逐行转到 syslog 中.

input(type="imfile" tag="kafka" file="analyze.log" ruleset="imfile-kafka"[, Facility=local.7])

6. outputs

也叫作 actions,处理动作,格式如下

 action (type="omkafka"
        topic="kafka_test"
        broker="10.120.169.149:9092"
    )

7. Rulesets and Rules

Rulesets 包括多条 rule,一条规则就是 rsyslog 处理消息的一种方式, 每个规则包含 filter 和 actions

input(type="imfile" tag="kafka" file="analyze.log" ruleset="rulesetname")
ruleset(name="rulesetname") {action(type="omfile" file="/path/to/file")
    action(type="..." ...)
    /* and so on... */
}

通过 input 里面的 ruleset 配置,将输入流进入 ruleset 进行规则匹配,然后执行 action 操作,完成对流的处理。

8. Queue parameters

将不同的输入流进入不同的队列并行处理数据,通常在 ruleset 或者 action 中配置,默认只有一个队列。配置参数例子

action(type="omfwd" target="192.168.2.11" port="10514" protocol="tcp"
       queue.filename="forwarding" queue.size="1000000" queue.type="LinkedList"
      )

9. templates

这是 rsyslog 一个重要的特性,它可以让用户自定义输入流格式,同样也可以用于动态生成日志文件,默认是原始格式。
一般表达式如下:
template(parameters) {list-descriptions}

  • list : 列表模板,包含 name, type=”list”,多个 constant 和 property 对。
template(name="tpl1" type="list") {constant(value="Syslog MSG is:'")
    property(name="msg")
    constant(value="',")
    property(name="timereported" dateFormat="rfc3339" caseConversion="lower")
    constant(value="\n")
    }
  • string: 字符串自定义格式模块,由name, type=”string”, string=”<onstant text and replacement variables>”,例如

%TIMESTAMP:::date-rfc3339% %HOSTNAME%%syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n”

将每个日志字段通过自定义变量和处理方式(property replacer)得到全局能读取的日志变量。

注意:

  1. 原始格式:v6 之前的格式,$template strtpl,"PRI: %pri%, MSG: %msg%\n"
  2. 利用 action 里的 template 参数将 templates 和 action 进行绑定,如
    action(template=TEMPLATENAME,type="omfile" file="/var/log/all-msgs.log")

三. 实例

增加一个将 nginx access 日志通过 rsyslog 传输到 kafka 的实例,将 nginx_kafka.conf 放入到 /etc/rsyslog.d 目录中,重启 rsyslog 即可。

# 加载 omkafka 和 imfile 模块
module(load="omkafka")
module(load="imfile")

# nginx template
template(name="nginxAccessTemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n")

# ruleset
ruleset(name="nginx-kafka") {# 日志转发 kafka
    action (type="omkafka"
        template="nginxAccessTemplate"
        confParam=["compression.codec=snappy", "queue.buffering.max.messages=400000"]
        partitions.number="4"
        topic="test_nginx"
        broker="10.120.169.149:9092"
        queue.spoolDirectory="/tmp"
        queue.filename="test_nginx_kafka"
        queue.size="360000"
        queue.maxdiskspace="2G"
        queue.highwatermark="216000"
        queue.discardmark="350000"
        queue.type="LinkedList" 
        queue.dequeuebatchsize="4096"
        queue.timeoutenqueue="0"
        queue.maxfilesize="10M" 
        queue.saveonshutdown="on"
        queue.workerThreads="4"
    )
}

# 定义消息来源及设置相关的 action
input(type="imfile" Tag="nginx,aws" File="/var/log/access.log" Ruleset="nginx-kafka")

检查 conf 文件是否正确可以运行 rsyslogd debug 模式 rsyslogd -dn 运行,看日志输出结果,或者直接运行 rsyslogd -N 1 检查 conf 文件是否正确。

CentOS7.3 下部署 Rsyslog+LogAnalyzer+MySQL 中央日志服务器  http://www.linuxidc.com/Linux/2017-10/147693.htm

Rsyslog+Loganalyer+MySQL 下部署日志服务器 http://www.linuxidc.com/Linux/2017-08/146468.htm

Rsyslog 日志收集服务并结合 Loganalyzer 工具展示  http://www.linuxidc.com/Linux/2017-05/143690.htm

本文永久更新链接地址:http://www.linuxidc.com/Linux/2018-01/150453.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-21发表,共计4544字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中