阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Apache Hama安装部署

500次阅读
没有评论

共计 10837 个字符,预计需要花费 28 分钟才能阅读完成。

安装 Hama 之前,应该首先确保系统中已经安装了 Hadoop,本集群使用的版本为 hadoop-2.3.0

一、下载及解压 Hama 文件

下载地址:http://www.apache.org/dyn/closer.cgi/hama,选用的是目前最新版本:hama0.6.4。解压之后的存放位置自己设定。

二、修改配置文件

  1. 在 hama-env.sh 文件中加入 JAVA_HOME 变量(分布式情况下,设为机器的值)
  2. 配置 hama-site.xml(分布式情况下,所有机器的配置相同)

bsp.master.address 为 bsp master 地址。fs.default.name 参数设置成 hadoop 里 namenode 的地址。hama.zookeeper.quorum 和 hama.zookeeper.property.clientPort 两个参数和 zookeeper 有关,设置成为 zookeeper 的 quorum server 即可,单机伪分布式就是本机地址。

 Apache Hama 安装部署

4. 配置 groomservers 文件。hama 与 hadoop 具有相似的主从结构,该文件存放从节点的 IP 地址,每个 IP 占一行。(分布式情况下只需要配置 BSPMaster 所在的机器即可)

 

5. hama0.6.4 自带的 hadoop 核心包为 1.2.0,与集群 hadoop2.3.0 不一致,需要进行替换,具体是在 hadoop 的 lib 文件夹下找到 hadoop-core-2.3.0*.jar 和 hadoop-test-2.3.0*.jar,拷贝到 hama 的 lib 目录下,并删除 hadoop-core-1.2.0.jar 和 hadoop-test-1.2.0.jar 两个文件。

6. 此时可能会报找不到类的错,需加入缺失的 jar 包。(把 hadoop 开头的 jar 包和 protobuf-java-2.5.0.jar 导入到 hama/lib 下)

三、编写 Hama job

在 eclipse 下新建 Java Project,将 hama 安装时需要的 jar 包全部导入工程。

官网中计算 PI 的例子:

package pi;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.bsp.sync.SyncException;

public class PiEstimator {
    private static Path TMP_OUTPUT = new Path(“/tmp/pi-“
            + System.currentTimeMillis());

    public static class MyEstimator
            extends
            BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
        public static final Log LOG = LogFactory.getLog(MyEstimator.class);
        private String masterTask;
        private static final int iterations = 100000;

        @Override
        public void bsp(
                BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
                throws IOException, SyncException, InterruptedException {

            int in = 0;
            for (int i = 0; i < iterations; i++) {
                double x = 2.0 * Math.random() – 1.0, y = 2.0 * Math.random() – 1.0;
                if ((Math.sqrt(x * x + y * y) < 1.0)) {
                    in++;
                }
            }

            double data = 4.0 * in / iterations;

            peer.send(masterTask, new DoubleWritable(data));
            peer.sync();

            if (peer.getPeerName().equals(masterTask)) {
                double pi = 0.0;
                int numPeers = peer.getNumCurrentMessages();
                DoubleWritable received;
                while ((received = peer.getCurrentMessage()) != null) {
                    pi += received.get();
                }

                pi = pi / numPeers;
                peer.write(new Text(“Estimated value1 of PI is”),
                        new DoubleWritable(pi));
            }
            peer.sync();

            int in2 = 0;
            for (int i = 0; i < iterations; i++) {
                double x = 2.0 * Math.random() – 1.0, y = 2.0 * Math.random() – 1.0;
                if ((Math.sqrt(x * x + y * y) < 1.0)) {
                    in2++;
                }
            }

            double data2 = 4.0 * in2 / iterations;

            peer.send(masterTask, new DoubleWritable(data2));
            peer.sync();

            if (peer.getPeerName().equals(masterTask)) {
                double pi2 = 0.0;
                int numPeers = peer.getNumCurrentMessages();
                DoubleWritable received;
                while ((received = peer.getCurrentMessage()) != null) {
                    pi2 += received.get();
                }

                pi2 = pi2 / numPeers;
                peer.write(new Text(“Estimated value2 of PI is”),
                        new DoubleWritable(pi2));
            }
            peer.sync();

        }

        @Override
        public void setup(
                BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
                throws IOException {
            // Choose one as a master

            this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
        }

        @Override
        public void cleanup(
                BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
                throws IOException {

            // if (peer.getPeerName().equals(masterTask)) {
            // double pi = 0.0;
            // int numPeers = peer.getNumCurrentMessages();
            // DoubleWritable received;
            // while ((received = peer.getCurrentMessage()) != null) {
            // pi += received.get();
            // }
            //
            // pi = pi / numPeers;
            // peer.write(new Text(“Estimated value of PI is”),
            // new DoubleWritable(pi));
            // }
        }
    }

    static void printOutput(HamaConfiguration conf) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] files = fs.listStatus(TMP_OUTPUT);
        for (int i = 0; i < files.length; i++) {
            if (files[i].getLen() > 0) {
                FSDataInputStream in = fs.open(files[i].getPath());
                IOUtils.copyBytes(in, System.out, conf, false);
                in.close();
                break;
            }
        }

        fs.delete(TMP_OUTPUT, true);
    }

    public static void main(String[] args) throws InterruptedException,
            IOException, ClassNotFoundException {
        // BSP job configuration
        HamaConfiguration conf = new HamaConfiguration();
        BSPJob bsp = new BSPJob(conf, PiEstimator.class);
        // Set the job name
        bsp.setJobName(“Pi Estimation Example”);
        bsp.setBspClass(MyEstimator.class);
        bsp.setInputFormat(NullInputFormat.class);
        bsp.setOutputKeyClass(Text.class);
        bsp.setOutputValueClass(DoubleWritable.class);
        bsp.setOutputFormat(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);

        BSPJobClient jobClient = new BSPJobClient(conf);
        ClusterStatus cluster = jobClient.getClusterStatus(true);

        if (args.length > 0) {
            bsp.setNumBspTask(Integer.parseInt(args[0]));
        } else {
            // Set to maximum
            bsp.setNumBspTask(cluster.getMaxTasks());
        }

        long startTime = System.currentTimeMillis();

        if (bsp.waitForCompletion(true)) {
            printOutput(conf);
            System.out.println(“Job Finished in “
                    + (System.currentTimeMillis() – startTime) / 1000.0
                    + ” seconds”);
        }
    }

}

将工程 Export 成 Jar 文件,发到集群上运行。运行命令:

$HAMA_HOME/bin/hama  jar  jarName.jar

输出:

 Apache Hama 安装部署

Current supersteps number: 0()

Current supersteps number: 4()

The total number of supersteps: 4(总超级步数目)

Counters: 8(一共 8 个计数器,如下 8 个。所有计数器列表待完善)

org.apache.hama.bsp.JobInProgress$JobCounter

SUPERSTEPS=4(BSPMaster 超级步数目)

LAUNCHED_TASKS=3(共多少个 task)

org.apache.hama.bsp.BSPPeerImpl$PeerCounter

SUPERSTEP_SUM=12(总共的超级步数目,task 数目 *BSPMaster 超级步数目)

MESSAGE_BYTES_TRANSFERED=48(传输信息字节数)

TIME_IN_SYNC_MS=657(同步消耗时间)

TOTAL_MESSAGES_SENT=6(发送信息条数)

TOTAL_MESSAGES_RECEIVED=6(接收信息条数)

TASK_OUTPUT_RECORDS=2(任务输出记录数)

PageRank 例子:

package pi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;

/**
 * Real pagerank with dangling node contribution.
 */
public class PageRank {

    public static class PageRankVertex extends
            Vertex<Text, NullWritable, DoubleWritable> {

        static double DAMPING_FACTOR = 0.85;
        static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
       
        @Override
        public void setup(HamaConfiguration conf) {
            String val = conf.get(“hama.pagerank.alpha”);
            if (val != null) {
                DAMPING_FACTOR = Double.parseDouble(val);
            }
            val = conf.get(“hama.graph.max.convergence.error”);
            if (val != null) {
                MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
            }
        }

        @Override
        public void compute(Iterable<DoubleWritable> messages)
                throws IOException {
            // initialize this vertex to 1 / count of global vertices in this
            // graph
            if (this.getSuperstepCount() == 0) {
                this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
            } else if (this.getSuperstepCount() >= 1) {
                double sum = 0;
                for (DoubleWritable msg : messages) {
                    sum += msg.get();
                }
                double alpha = (1.0d – DAMPING_FACTOR) / this.getNumVertices();
                this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
            }

            // if we have not reached our global error yet, then proceed.
            DoubleWritable globalError = this.getAggregatedValue(0);
            if (globalError != null && this.getSuperstepCount() > 2
                    && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
                voteToHalt();
                return;
            }

            // in each superstep we are going to send a new rank to our
            // neighbours
            sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
                    / this.getEdges().size()));
        }
    }

    public static GraphJob createJob(String[] args, HamaConfiguration conf)
            throws IOException {
        GraphJob pageJob = new GraphJob(conf, PageRank.class);
        pageJob.setJobName(“Pagerank”);

        pageJob.setVertexClass(PageRankVertex.class);
        pageJob.setInputPath(new Path(args[0]));
        pageJob.setOutputPath(new Path(args[1]));

        // set the defaults
        pageJob.setMaxIteration(30);
        pageJob.set(“hama.pagerank.alpha”, “0.85”);
        // reference vertices to itself, because we don’t have a dangling node
        // contribution here
        pageJob.set(“hama.graph.self.ref”, “true”);
        pageJob.set(“hama.graph.max.convergence.error”, “1”);

        if (args.length == 3) {
            pageJob.setNumBspTask(Integer.parseInt(args[2]));
        }

        // error
        pageJob.setAggregatorClass(AverageAggregator.class);

        // Vertex reader
        pageJob.setVertexInputReaderClass(PagerankTextReader.class);

        pageJob.setVertexIDClass(Text.class);
        pageJob.setVertexValueClass(DoubleWritable.class);
        pageJob.setEdgeValueClass(NullWritable.class);

        pageJob.setPartitioner(HashPartitioner.class);
        pageJob.setOutputFormat(TextOutputFormat.class);
        pageJob.setOutputKeyClass(Text.class);
        pageJob.setOutputValueClass(DoubleWritable.class);
        return pageJob;
    }

    private static void printUsage() {
        System.out.println(“Usage: <input> <output> [tasks]”);
        System.exit(-1);
    }

    public static class PagerankTextReader
            extends
            VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {

        @Override
        public boolean parseVertex(LongWritable key, Text value,
                Vertex<Text, NullWritable, DoubleWritable> vertex)
                throws Exception {
            String[] split = value.toString().split(“\t”);
            for (int i = 0; i < split.length; i++) {
                if (i == 0) {
                    vertex.setVertexID(new Text(split[i]));
                } else {
                    vertex.addEdge(new Edge<Text, NullWritable>(new Text(
                            split[i]), null));
                }
            }
            return true;
        }

    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        if (args.length < 2)
            printUsage();

        HamaConfiguration conf = new HamaConfiguration(new Configuration());
        GraphJob pageJob = createJob(args, conf);

        long startTime = System.currentTimeMillis();
        if (pageJob.waitForCompletion(true)) {
            System.out.println(“Job Finished in “
                    + (System.currentTimeMillis() – startTime) / 1000.0
                    + ” seconds”);
        }
    }
}

输出:

 Apache Hama 安装部署

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2016-04/130626.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计10837字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7992157
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
星哥带你玩飞牛NAS-8:有了NAS你可以干什么?软件汇总篇

星哥带你玩飞牛NAS-8:有了NAS你可以干什么?软件汇总篇

星哥带你玩飞牛 NAS-8:有了 NAS 你可以干什么?软件汇总篇 前言 哈喽各位玩友!我是是星哥,不少朋友私...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸

一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸

一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸 前言 作为天天跟架构图、拓扑图死磕的...
星哥带你玩飞牛NAS硬件03:五盘位+N5105+双网口的成品NAS值得入手吗

星哥带你玩飞牛NAS硬件03:五盘位+N5105+双网口的成品NAS值得入手吗

星哥带你玩飞牛 NAS 硬件 03:五盘位 +N5105+ 双网口的成品 NAS 值得入手吗 前言 大家好,我...
300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

  300 元就能买到的 ” 小钢炮 ”?惠普 7L 四盘位小主机解析 最近...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
你的云服务器到底有多强?宝塔跑分告诉你

你的云服务器到底有多强?宝塔跑分告诉你

你的云服务器到底有多强?宝塔跑分告诉你 为什么要用宝塔跑分? 宝塔跑分其实就是对 CPU、内存、磁盘、IO 做...
还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手!

还在找免费服务器?无广告免费主机,新手也能轻松上手! 前言 对于个人开发者、建站新手或是想搭建测试站点的从业者...
星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

  星哥带你玩飞牛 NAS-16:不再错过公众号更新,飞牛 NAS 搭建 RSS 对于经常关注多个微...
浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍 前言 在 AI 自动化快速发展的当下,浏览器早已不再只是...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...