阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

使用Python构建基于Hadoop的MapReduce日志分析平台

134次阅读
没有评论

共计 5966 个字符,预计需要花费 15 分钟才能阅读完成。

流量比较大的日志要是直接写入 Hadoop 对 Namenode 负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入 HDFS。根据情况定期合成,写入到 hdfs 里面。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

咱们看看日志的大小,200G 的 dns 日志文件,我压缩到了 18G,要是用 awk perl 当然也可以,但是处理速度肯定没有分布式那样的给力。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

Hadoop Streaming 原理

mapper 和 reducer 会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming 工具会创建 MapReduce 作业,发送给各个 tasktracker,同时监控整个作业的执行过程。

任何语言,只要是方便接收标准输入输出就可以做 mapreduce~

再搞之前我们先简单测试下 shell 模拟 mapreduce 的性能速度~

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

看下他的结果,350M 的文件用时 35 秒左右。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

这是 2G 的日志文件,居然用了 3 分钟。当然和我写的脚本也有问题,我们是模拟 mapreduce 的方式,而不是调用 shell 下牛逼的 awk,gawk 处理。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

awk 的速度!果然很霸道,处理日志的时候,我也很喜欢用 awk,只是学习的难度有点大,不像别的 shell 组件那么灵活简单。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

推荐阅读:

《Python 开发技术详解》.(周伟, 宗杰).[高清 PDF 扫描版 + 随书视频 + 代码] http://www.linuxidc.com/Linux/2013-11/92693.htm

Python 脚本获取 Linux 系统信息 http://www.linuxidc.com/Linux/2013-08/88531.htm

Python 网站文件及数据库备份脚本 http://www.linuxidc.com/Linux/2012-06/62346.htm

Python 文件处理:读取文件 http://www.linuxidc.com/Linux/2013-08/88496.htm

如何发布自定义的 Python 模块 http://www.linuxidc.com/Linux/2013-08/88495.htm

Python 爬虫多线程抓取代理服务器 http://www.linuxidc.com/Linux/2013-07/87289.htm

Python 中 re(正则表达式)模块详解 http://www.linuxidc.com/Linux/2013-08/88588.htm

Python 的详细介绍:请点这里
Python 的下载地址:请点这里

这是官方的提供的两个 demo ~
map.py

#!/usr/bin/env python
“””A more advanced Mapper, using Python iterators and generators.”””
import sys
def read_input(file):
    for line in file:
        # split the line into words
        yield line.split()
def main(separator=’\t’):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:
            print ‘%s%s%d’ % (word, separator, 1)
if __name__ == “__main__”:
    main()

 

reduce.py 的修改方式

#!/usr/bin/env python
“””A more advanced Reducer, using Python iterators and generators.”””
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator=’\t’):
    for line in file:
        yield line.rstrip().split(separator, 1)
def main(separator=’\t’):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #  current_word – string containing a word (the key)
    #  group – iterator yielding all [“<current_word>”, “<count>”] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print “%s%s%d” % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass
if __name__ == “__main__”:
    main()

咱们再简单点:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print ‘%s\t%s’ % (word, 1)

 

#!/usr/bin/env python
                                                                                                                                                                                                           
from operator import itemgetter
import sys
                                                                                                                                                                                                           
current_word = None
current_count = 0
word = None
                                                                                                                                                                                                           
for line in sys.stdin:
    line = line.strip()
    word, count = line.split(‘\t’, 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print ‘%s\t%s’ % (current_word, current_count)
        current_count = count
        current_word = word
                                                                                                                                                                                                         
if current_word == word:
    print ‘%s\t%s’ % (current_word, current_count)

咱们就简单模拟下数据,跑个测试

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

流量比较大的日志要是直接写入 Hadoop 对 Namenode 负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入 HDFS。根据情况定期合成,写入到 hdfs 里面。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

咱们看看日志的大小,200G 的 dns 日志文件,我压缩到了 18G,要是用 awk perl 当然也可以,但是处理速度肯定没有分布式那样的给力。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

Hadoop Streaming 原理

mapper 和 reducer 会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming 工具会创建 MapReduce 作业,发送给各个 tasktracker,同时监控整个作业的执行过程。

任何语言,只要是方便接收标准输入输出就可以做 mapreduce~

再搞之前我们先简单测试下 shell 模拟 mapreduce 的性能速度~

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

看下他的结果,350M 的文件用时 35 秒左右。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

这是 2G 的日志文件,居然用了 3 分钟。当然和我写的脚本也有问题,我们是模拟 mapreduce 的方式,而不是调用 shell 下牛逼的 awk,gawk 处理。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

awk 的速度!果然很霸道,处理日志的时候,我也很喜欢用 awk,只是学习的难度有点大,不像别的 shell 组件那么灵活简单。

使用 Python 构建基于 Hadoop 的 MapReduce 日志分析平台

推荐阅读:

《Python 开发技术详解》.(周伟, 宗杰).[高清 PDF 扫描版 + 随书视频 + 代码] http://www.linuxidc.com/Linux/2013-11/92693.htm

Python 脚本获取 Linux 系统信息 http://www.linuxidc.com/Linux/2013-08/88531.htm

Python 网站文件及数据库备份脚本 http://www.linuxidc.com/Linux/2012-06/62346.htm

Python 文件处理:读取文件 http://www.linuxidc.com/Linux/2013-08/88496.htm

如何发布自定义的 Python 模块 http://www.linuxidc.com/Linux/2013-08/88495.htm

Python 爬虫多线程抓取代理服务器 http://www.linuxidc.com/Linux/2013-07/87289.htm

Python 中 re(正则表达式)模块详解 http://www.linuxidc.com/Linux/2013-08/88588.htm

Python 的详细介绍:请点这里
Python 的下载地址:请点这里

剩下就没啥了,在 Hadoop 集群环境下,运行 hadoop 的 steaming.jar 组件,加入 mapreduce 的脚本,指定输出就行了.  下面的例子我用的是 shell 的成分。

[root@101 cron]#$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc

详细的参数,对于咱们来说提供性能可以把 tasks 的任务数增加下,根据情况自己测试下,也别太高了,增加负担。
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的 mapper 程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的 reducer 程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是 mapper 或者 reducer 要用的输入文件,如配置文件,字典等。
(6)-partitioner:用户自定义的 partitioner 程序
(7)-combiner:用户自定义的 combiner 程序(必须用 java 实现)
(8)-D:作业的一些属性(以前用的是 -jonconf),具体有:
          1)mapred.map.tasks:map task 数目
          2)mapred.reduce.tasks:reduce task 数目
          3)stream.map.input.field.separator/stream.map.output.field.separator:map task 输入 / 输出数
据的分隔符, 默认均为 \t。
            4)stream.num.map.output.key.fields:指定 map task 输出记录中 key 所占的域数目
            5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task 输入 / 输出数据的分隔符,默认均为 \t。
            6)stream.num.reduce.output.key.fields:指定 reduce task 输出记录中 key 所占的域数目

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计5966字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中