阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

解决Hadoop JobConf限制为5M的问题

477次阅读
没有评论

共计 7690 个字符,预计需要花费 20 分钟才能阅读完成。

我们的业务是要使用 mongodb 的 Hadoop driver 处理输出。我们重写的 mongodbInputFormat 的时候传递数据的时候是把数据写入 conf,然后再从 mongoSplitter 里面里面从 conf 里面读出来。比如下面这样:

把数据放入数据 conf:

List<Long> tagsUrns =null;
 //tagUrns 赋值 …..
 conf.set(“tagUrns”,
            ObjectSerializer.serialize((Serializable) tagsUrns));

在 mapper,reduce, 或者 mongoSpiltter 里拿出 conf 里的数据:

List<Long> tagUrns = (List<Long>) ObjectSerializer
            .deserialize(context.getConfiguration().get(“tagUrns”));

由于 conf 只能放入 boolean、int、string 的值,而我需要给 hadoop Configuration 放入的是 list 或者其他对象,所以需要用到一个序列化工具类。

序列化工具类代码:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import Java.io.*;

public class ObjectSerializer {

private static final Log log = LogFactory.getLog(ObjectSerializer.class);

public static String serialize(Serializable obj) throws IOException {
    if (obj == null)
        return “”;
    try {
        ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
        ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
        objStream.writeObject(obj);
        objStream.close();
        return encodeBytes(serialObj.toByteArray());
    } catch (Exception e) {
        throw new IOException(“Serialization error: ” + e.getMessage(), e);
    }
}

public static Object deserialize(String str) throws IOException {
    if (str == null || str.length() == 0)
        return null;
    try {
        ByteArrayInputStream serialObj = new ByteArrayInputStream(
                decodeBytes(str));
        ObjectInputStream objStream = new ObjectInputStream(serialObj);
        return objStream.readObject();
    } catch (Exception e) {
        throw new IOException(“Deserialization error: ” + e.getMessage(), e);
    }
}

public static String encodeBytes(byte[] bytes) {
    StringBuffer strBuf = new StringBuffer();

    for (int i = 0; i < bytes.length; i++) {
        strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) ‘a’)));
        strBuf.append((char) (((bytes[i]) & 0xF) + ((int) ‘a’)));
    }

    return strBuf.toString();
}

public static byte[] decodeBytes(String str) {
    byte[] bytes = new byte[str.length() / 2];
    for (int i = 0; i < str.length(); i += 2) {
        char c = str.charAt(i);
        bytes[i / 2] = (byte) ((c – ‘a’) << 4);
        c = str.charAt(i + 1);
        bytes[i / 2] += (c – ‘a’);
    }
    return bytes;
}

}

但是当我放入的数据太大时,运行 hadoop 任务时报错,错误信息:

解决 Hadoop JobConf 限制为 5M 的问题

错误信息说明 hadoop 的 conf 是有限制的,查询下发现限制为 5M:

解决 Hadoop JobConf 限制为 5M 的问题

所以当时就懵了。这不从 conf 传入,好像又拿不到。最后想着能不能从 hdfs 文件直接读数据文件。但是我的数据必须在 mongospliter 里面获取数据,而这里只能拿到 conf。最后发现 hadoop 获取 FileSystem 方式为:

@Resource(name = “hadoopConfiguration”)
private Configuration configuration = null;

………..

 fileSystem = FileSystem.get(configuration);

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-11/92407p2.htm

推荐阅读

Hadoop 2.0 安装向导 (0.23.x) http://www.linuxidc.com/Linux/2012-05/61463.htm

Hadoop 1.2.1 单节点安装 (Single Node Setup) 步骤 http://www.linuxidc.com/Linux/2013-08/89377.htm

在 CentOS 上安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88600.htm

Ubuntu 12.04 安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88187.htm

CentOS 6.3 x86_64 安装与配置 Hadoop-1.0 http://www.linuxidc.com/Linux/2013-07/87959.htm

Hadoop 入门 –Hadoop2 伪分布式安装 http://www.linuxidc.com/Linux/2013-06/86403.htm

Hadoop2.2.0 单节点安装及测试 http://www.linuxidc.com/Linux/2013-10/91911.htm

查看发现这个 configuration 属于 package org.apache.Hadoop.conf:

解决 Hadoop JobConf 限制为 5M 的问题

查看了下 mongodb 里面字段的 configuration 也是属于 package org.apache.hadoop.conf:

解决 Hadoop JobConf 限制为 5M 的问题

所以就大胆的想着在 mongoSplitter 里面构造 FileSystem 直接去读文件获取,因为对 hadoop 不是很熟悉,而 mongodb 的 driver 里 mongoSplitter 也是在 MongoInputFormat 里面调用的,所以担心这里的 conf 并不能获得 hadoop 集群的 hdfs,所以给 conf 里面传入了一个字符串 dataPath 来指定需要读取的文件路径。然后读取:

FileSystem fileSystem = null;

        try {
            fileSystem = FileSystem.get(mongoConfig .conf);
        } catch (IOException e) {
            log.warn(“ 无法获得 fileSystem…….”);
        }
    FSDataInputStream hdfsInStream = null;
    BufferedReader bufferedReader = null;
          try {

                    // 打开文件流
                    hdfsInStream = fileSystem.open(new Path(mongoConfig.conf.get(“dataPath”)));

                    bufferedReader = new BufferedReader(new InputStreamReader(
                            hdfsInStream, “UTF-8”));
                    String line = null;
                    while ((line = bufferedReader.readLine()) != null) {
                        try {
                            if (StringUtils.isNotBlank(line)) {

                                System.out.println(line);

                            }
                        } catch (Exception e) {
                            log.warn(ExceptionUtils.getFullStackTrace(e));
                            continue;
                        }
                    }
                    line = null;

            } catch (Exception e) {
                log.warn(ExceptionUtils.getFullStackTrace(e));
            } finally {
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (hdfsInStream != null) {
                        hdfsInStream.close();
                    }
                } catch (IOException e) {
                    log.warn(“ 关闭流时异常:” + ExceptionUtils.getFullStackTrace(e));
                }
            }

上面没有关闭 fileSystem 是因为不能关闭他,如果关闭了其他 hadoop 获取 hdfs 的流都会被关闭,调用会报空指针。应该是全局唯一的。

最后发现是可以的。也就是说 hadoop 在调用 inputFormat 调用的 getSplitter 方法时传入的 jobContext 已经包含了 hadoop 集群的信息,所以可以在自己实现的 mongoSplitter 方法里使用 FileSystem 读取 hdfs 数据,也解决了因为 conf 限制的问题。

@Override
public List<InputSplit> getSplits(JobContext context) {
    final Configuration hadoopConfiguration = context.getConfiguration();
    final MongoSimpleConfig conf = new MongoSimpleConfig(hadoopConfiguration);
    return MongoSimpleSplitter.calculateSplits(conf);
}

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

我们的业务是要使用 mongodb 的 Hadoop driver 处理输出。我们重写的 mongodbInputFormat 的时候传递数据的时候是把数据写入 conf,然后再从 mongoSplitter 里面里面从 conf 里面读出来。比如下面这样:

把数据放入数据 conf:

List<Long> tagsUrns =null;
 //tagUrns 赋值 …..
 conf.set(“tagUrns”,
            ObjectSerializer.serialize((Serializable) tagsUrns));

在 mapper,reduce, 或者 mongoSpiltter 里拿出 conf 里的数据:

List<Long> tagUrns = (List<Long>) ObjectSerializer
            .deserialize(context.getConfiguration().get(“tagUrns”));

由于 conf 只能放入 boolean、int、string 的值,而我需要给 hadoop Configuration 放入的是 list 或者其他对象,所以需要用到一个序列化工具类。

序列化工具类代码:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import Java.io.*;

public class ObjectSerializer {

private static final Log log = LogFactory.getLog(ObjectSerializer.class);

public static String serialize(Serializable obj) throws IOException {
    if (obj == null)
        return “”;
    try {
        ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
        ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
        objStream.writeObject(obj);
        objStream.close();
        return encodeBytes(serialObj.toByteArray());
    } catch (Exception e) {
        throw new IOException(“Serialization error: ” + e.getMessage(), e);
    }
}

public static Object deserialize(String str) throws IOException {
    if (str == null || str.length() == 0)
        return null;
    try {
        ByteArrayInputStream serialObj = new ByteArrayInputStream(
                decodeBytes(str));
        ObjectInputStream objStream = new ObjectInputStream(serialObj);
        return objStream.readObject();
    } catch (Exception e) {
        throw new IOException(“Deserialization error: ” + e.getMessage(), e);
    }
}

public static String encodeBytes(byte[] bytes) {
    StringBuffer strBuf = new StringBuffer();

    for (int i = 0; i < bytes.length; i++) {
        strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) ‘a’)));
        strBuf.append((char) (((bytes[i]) & 0xF) + ((int) ‘a’)));
    }

    return strBuf.toString();
}

public static byte[] decodeBytes(String str) {
    byte[] bytes = new byte[str.length() / 2];
    for (int i = 0; i < str.length(); i += 2) {
        char c = str.charAt(i);
        bytes[i / 2] = (byte) ((c – ‘a’) << 4);
        c = str.charAt(i + 1);
        bytes[i / 2] += (c – ‘a’);
    }
    return bytes;
}

}

但是当我放入的数据太大时,运行 hadoop 任务时报错,错误信息:

解决 Hadoop JobConf 限制为 5M 的问题

错误信息说明 hadoop 的 conf 是有限制的,查询下发现限制为 5M:

解决 Hadoop JobConf 限制为 5M 的问题

所以当时就懵了。这不从 conf 传入,好像又拿不到。最后想着能不能从 hdfs 文件直接读数据文件。但是我的数据必须在 mongospliter 里面获取数据,而这里只能拿到 conf。最后发现 hadoop 获取 FileSystem 方式为:

@Resource(name = “hadoopConfiguration”)
private Configuration configuration = null;

………..

 fileSystem = FileSystem.get(configuration);

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-11/92407p2.htm

推荐阅读

Hadoop 2.0 安装向导 (0.23.x) http://www.linuxidc.com/Linux/2012-05/61463.htm

Hadoop 1.2.1 单节点安装 (Single Node Setup) 步骤 http://www.linuxidc.com/Linux/2013-08/89377.htm

在 CentOS 上安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88600.htm

Ubuntu 12.04 安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88187.htm

CentOS 6.3 x86_64 安装与配置 Hadoop-1.0 http://www.linuxidc.com/Linux/2013-07/87959.htm

Hadoop 入门 –Hadoop2 伪分布式安装 http://www.linuxidc.com/Linux/2013-06/86403.htm

Hadoop2.2.0 单节点安装及测试 http://www.linuxidc.com/Linux/2013-10/91911.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计7690字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7963020
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
自己手撸一个AI智能体—跟创业大佬对话

自己手撸一个AI智能体—跟创业大佬对话

自己手撸一个 AI 智能体 — 跟创业大佬对话 前言 智能体(Agent)已经成为创业者和技术人绕...
星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛 NAS 硬件 02:某鱼 6 张左右就可拿下 5 盘位的飞牛圣体 NAS 前言 大家好,我是星...
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的 Nano Banana Pro?附赠邪修的用法 前言 大家好,我是星哥,今天来介绍谷歌的 ...
让微信公众号成为 AI 智能体:从内容沉淀到智能问答的一次升级

让微信公众号成为 AI 智能体:从内容沉淀到智能问答的一次升级

让微信公众号成为 AI 智能体:从内容沉淀到智能问答的一次升级 大家好,我是星哥,之前写了一篇文章 自己手撸一...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手! 前言 对于个人开发者、建站新手或是想搭建测试站点的从业者...
星哥带你玩飞牛NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手!

星哥带你玩飞牛NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手!

星哥带你玩飞牛 NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手! 作为动漫爱好者,你是否还在为...
颠覆 AI 开发效率!开源工具一站式管控 30+大模型ApiKey,秘钥付费+负载均衡全搞定

颠覆 AI 开发效率!开源工具一站式管控 30+大模型ApiKey,秘钥付费+负载均衡全搞定

  颠覆 AI 开发效率!开源工具一站式管控 30+ 大模型 ApiKey,秘钥付费 + 负载均衡全...
告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

  告别 Notion 焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁” 引言 在数字笔记工...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...