阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

解决Hadoop JobConf限制为5M的问题

147次阅读
没有评论

共计 7690 个字符,预计需要花费 20 分钟才能阅读完成。

我们的业务是要使用 mongodb 的 Hadoop driver 处理输出。我们重写的 mongodbInputFormat 的时候传递数据的时候是把数据写入 conf,然后再从 mongoSplitter 里面里面从 conf 里面读出来。比如下面这样:

把数据放入数据 conf:

List<Long> tagsUrns =null;
 //tagUrns 赋值 …..
 conf.set(“tagUrns”,
            ObjectSerializer.serialize((Serializable) tagsUrns));

在 mapper,reduce, 或者 mongoSpiltter 里拿出 conf 里的数据:

List<Long> tagUrns = (List<Long>) ObjectSerializer
            .deserialize(context.getConfiguration().get(“tagUrns”));

由于 conf 只能放入 boolean、int、string 的值,而我需要给 hadoop Configuration 放入的是 list 或者其他对象,所以需要用到一个序列化工具类。

序列化工具类代码:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import Java.io.*;

public class ObjectSerializer {

private static final Log log = LogFactory.getLog(ObjectSerializer.class);

public static String serialize(Serializable obj) throws IOException {
    if (obj == null)
        return “”;
    try {
        ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
        ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
        objStream.writeObject(obj);
        objStream.close();
        return encodeBytes(serialObj.toByteArray());
    } catch (Exception e) {
        throw new IOException(“Serialization error: ” + e.getMessage(), e);
    }
}

public static Object deserialize(String str) throws IOException {
    if (str == null || str.length() == 0)
        return null;
    try {
        ByteArrayInputStream serialObj = new ByteArrayInputStream(
                decodeBytes(str));
        ObjectInputStream objStream = new ObjectInputStream(serialObj);
        return objStream.readObject();
    } catch (Exception e) {
        throw new IOException(“Deserialization error: ” + e.getMessage(), e);
    }
}

public static String encodeBytes(byte[] bytes) {
    StringBuffer strBuf = new StringBuffer();

    for (int i = 0; i < bytes.length; i++) {
        strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) ‘a’)));
        strBuf.append((char) (((bytes[i]) & 0xF) + ((int) ‘a’)));
    }

    return strBuf.toString();
}

public static byte[] decodeBytes(String str) {
    byte[] bytes = new byte[str.length() / 2];
    for (int i = 0; i < str.length(); i += 2) {
        char c = str.charAt(i);
        bytes[i / 2] = (byte) ((c – ‘a’) << 4);
        c = str.charAt(i + 1);
        bytes[i / 2] += (c – ‘a’);
    }
    return bytes;
}

}

但是当我放入的数据太大时,运行 hadoop 任务时报错,错误信息:

解决 Hadoop JobConf 限制为 5M 的问题

错误信息说明 hadoop 的 conf 是有限制的,查询下发现限制为 5M:

解决 Hadoop JobConf 限制为 5M 的问题

所以当时就懵了。这不从 conf 传入,好像又拿不到。最后想着能不能从 hdfs 文件直接读数据文件。但是我的数据必须在 mongospliter 里面获取数据,而这里只能拿到 conf。最后发现 hadoop 获取 FileSystem 方式为:

@Resource(name = “hadoopConfiguration”)
private Configuration configuration = null;

………..

 fileSystem = FileSystem.get(configuration);

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-11/92407p2.htm

推荐阅读

Hadoop 2.0 安装向导 (0.23.x) http://www.linuxidc.com/Linux/2012-05/61463.htm

Hadoop 1.2.1 单节点安装 (Single Node Setup) 步骤 http://www.linuxidc.com/Linux/2013-08/89377.htm

在 CentOS 上安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88600.htm

Ubuntu 12.04 安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88187.htm

CentOS 6.3 x86_64 安装与配置 Hadoop-1.0 http://www.linuxidc.com/Linux/2013-07/87959.htm

Hadoop 入门 –Hadoop2 伪分布式安装 http://www.linuxidc.com/Linux/2013-06/86403.htm

Hadoop2.2.0 单节点安装及测试 http://www.linuxidc.com/Linux/2013-10/91911.htm

查看发现这个 configuration 属于 package org.apache.Hadoop.conf:

解决 Hadoop JobConf 限制为 5M 的问题

查看了下 mongodb 里面字段的 configuration 也是属于 package org.apache.hadoop.conf:

解决 Hadoop JobConf 限制为 5M 的问题

所以就大胆的想着在 mongoSplitter 里面构造 FileSystem 直接去读文件获取,因为对 hadoop 不是很熟悉,而 mongodb 的 driver 里 mongoSplitter 也是在 MongoInputFormat 里面调用的,所以担心这里的 conf 并不能获得 hadoop 集群的 hdfs,所以给 conf 里面传入了一个字符串 dataPath 来指定需要读取的文件路径。然后读取:

FileSystem fileSystem = null;

        try {
            fileSystem = FileSystem.get(mongoConfig .conf);
        } catch (IOException e) {
            log.warn(“ 无法获得 fileSystem…….”);
        }
    FSDataInputStream hdfsInStream = null;
    BufferedReader bufferedReader = null;
          try {

                    // 打开文件流
                    hdfsInStream = fileSystem.open(new Path(mongoConfig.conf.get(“dataPath”)));

                    bufferedReader = new BufferedReader(new InputStreamReader(
                            hdfsInStream, “UTF-8”));
                    String line = null;
                    while ((line = bufferedReader.readLine()) != null) {
                        try {
                            if (StringUtils.isNotBlank(line)) {

                                System.out.println(line);

                            }
                        } catch (Exception e) {
                            log.warn(ExceptionUtils.getFullStackTrace(e));
                            continue;
                        }
                    }
                    line = null;

            } catch (Exception e) {
                log.warn(ExceptionUtils.getFullStackTrace(e));
            } finally {
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (hdfsInStream != null) {
                        hdfsInStream.close();
                    }
                } catch (IOException e) {
                    log.warn(“ 关闭流时异常:” + ExceptionUtils.getFullStackTrace(e));
                }
            }

上面没有关闭 fileSystem 是因为不能关闭他,如果关闭了其他 hadoop 获取 hdfs 的流都会被关闭,调用会报空指针。应该是全局唯一的。

最后发现是可以的。也就是说 hadoop 在调用 inputFormat 调用的 getSplitter 方法时传入的 jobContext 已经包含了 hadoop 集群的信息,所以可以在自己实现的 mongoSplitter 方法里使用 FileSystem 读取 hdfs 数据,也解决了因为 conf 限制的问题。

@Override
public List<InputSplit> getSplits(JobContext context) {
    final Configuration hadoopConfiguration = context.getConfiguration();
    final MongoSimpleConfig conf = new MongoSimpleConfig(hadoopConfiguration);
    return MongoSimpleSplitter.calculateSplits(conf);
}

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

我们的业务是要使用 mongodb 的 Hadoop driver 处理输出。我们重写的 mongodbInputFormat 的时候传递数据的时候是把数据写入 conf,然后再从 mongoSplitter 里面里面从 conf 里面读出来。比如下面这样:

把数据放入数据 conf:

List<Long> tagsUrns =null;
 //tagUrns 赋值 …..
 conf.set(“tagUrns”,
            ObjectSerializer.serialize((Serializable) tagsUrns));

在 mapper,reduce, 或者 mongoSpiltter 里拿出 conf 里的数据:

List<Long> tagUrns = (List<Long>) ObjectSerializer
            .deserialize(context.getConfiguration().get(“tagUrns”));

由于 conf 只能放入 boolean、int、string 的值,而我需要给 hadoop Configuration 放入的是 list 或者其他对象,所以需要用到一个序列化工具类。

序列化工具类代码:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import Java.io.*;

public class ObjectSerializer {

private static final Log log = LogFactory.getLog(ObjectSerializer.class);

public static String serialize(Serializable obj) throws IOException {
    if (obj == null)
        return “”;
    try {
        ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
        ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
        objStream.writeObject(obj);
        objStream.close();
        return encodeBytes(serialObj.toByteArray());
    } catch (Exception e) {
        throw new IOException(“Serialization error: ” + e.getMessage(), e);
    }
}

public static Object deserialize(String str) throws IOException {
    if (str == null || str.length() == 0)
        return null;
    try {
        ByteArrayInputStream serialObj = new ByteArrayInputStream(
                decodeBytes(str));
        ObjectInputStream objStream = new ObjectInputStream(serialObj);
        return objStream.readObject();
    } catch (Exception e) {
        throw new IOException(“Deserialization error: ” + e.getMessage(), e);
    }
}

public static String encodeBytes(byte[] bytes) {
    StringBuffer strBuf = new StringBuffer();

    for (int i = 0; i < bytes.length; i++) {
        strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) ‘a’)));
        strBuf.append((char) (((bytes[i]) & 0xF) + ((int) ‘a’)));
    }

    return strBuf.toString();
}

public static byte[] decodeBytes(String str) {
    byte[] bytes = new byte[str.length() / 2];
    for (int i = 0; i < str.length(); i += 2) {
        char c = str.charAt(i);
        bytes[i / 2] = (byte) ((c – ‘a’) << 4);
        c = str.charAt(i + 1);
        bytes[i / 2] += (c – ‘a’);
    }
    return bytes;
}

}

但是当我放入的数据太大时,运行 hadoop 任务时报错,错误信息:

解决 Hadoop JobConf 限制为 5M 的问题

错误信息说明 hadoop 的 conf 是有限制的,查询下发现限制为 5M:

解决 Hadoop JobConf 限制为 5M 的问题

所以当时就懵了。这不从 conf 传入,好像又拿不到。最后想着能不能从 hdfs 文件直接读数据文件。但是我的数据必须在 mongospliter 里面获取数据,而这里只能拿到 conf。最后发现 hadoop 获取 FileSystem 方式为:

@Resource(name = “hadoopConfiguration”)
private Configuration configuration = null;

………..

 fileSystem = FileSystem.get(configuration);

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-11/92407p2.htm

推荐阅读

Hadoop 2.0 安装向导 (0.23.x) http://www.linuxidc.com/Linux/2012-05/61463.htm

Hadoop 1.2.1 单节点安装 (Single Node Setup) 步骤 http://www.linuxidc.com/Linux/2013-08/89377.htm

在 CentOS 上安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88600.htm

Ubuntu 12.04 安装 Hadoop http://www.linuxidc.com/Linux/2013-08/88187.htm

CentOS 6.3 x86_64 安装与配置 Hadoop-1.0 http://www.linuxidc.com/Linux/2013-07/87959.htm

Hadoop 入门 –Hadoop2 伪分布式安装 http://www.linuxidc.com/Linux/2013-06/86403.htm

Hadoop2.2.0 单节点安装及测试 http://www.linuxidc.com/Linux/2013-10/91911.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-20发表,共计7690字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中