阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Storm初体验

131次阅读
没有评论

共计 5686 个字符,预计需要花费 15 分钟才能阅读完成。

Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与语言无关。了解更多请自己 google, 安装过程也请自己搜索。

做了一个简单的例子

package mapstorm;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class StormMain {

 public static void main(String[] args) throws Exception {

  TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(“word-reader”, new WordReader());
        builder.setBolt(“word-normalizer”, new WordNormalizer()).shuffleGrouping(“word-reader”);
        builder.setBolt(“word-counter”, new WordCounter(), 1).fieldsGrouping(“word-normalizer”, new Fields(“word”));

        //Configuration
        Config conf = new Config();
        conf.put(“wordsFile”, args[0]);
        conf.setDebug(true);
      //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
       
        StormSubmitter.submitTopology(“wordCounterTopology”, conf, builder.createTopology());
      //  Thread.sleep(1000);
        //StormSubmitter.(“wordCounterTopology”);
    //  StormSubmitter.shutdown();
       
     
        //Topology run
        //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        //LocalCluster cluster = new LocalCluster();
        //cluster.submitTopology(“Getting-Started-Toplogie”, conf, builder.createTopology());
        //Thread.sleep(2000);
        //cluster.shutdown();
        //
 }

}

—————————-

package mapstorm;

import Java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {
 private static final long serialVersionUID = 5678586644899822142L;
 Integer id;
    String name;
    Map<String, Integer> counters;

 @Override
 public void execute(Tuple input, BasicOutputCollector collector) {
    String str = input.getString(0);
        System.out.println(“WordCounter word “+ str);
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
 
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {}

 @Override
    public void cleanup() {
        System.out.println(“– Word Counter [“+name+”-“+id+”] –“);
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+”: “+entry.getValue());
        }
        System.out.println(“finish———–“);
    }
 

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
}

———————————-

package mapstorm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {
 
 public void cleanup() {
  System.out.println(“finish”);
 }

 @Override
 public void execute(Tuple input, BasicOutputCollector collector) {
  String sentence = input.getString(0);
        String[] words = sentence.split(” “);
        System.out.println(“WordNormalizer recevie  “+ sentence);
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                System.out.println(“WordNormalizer recevie “+ sentence+”words  “+ word);
                collector.emit(new Values(word));
            }
        }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(“word”));
 
 }

}

————————————-

package mapstorm;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {
 
 private SpoutOutputCollector collector;
    private FileReader fileReader;
    private String filePath;
    private boolean completed = false;
 
    public void ack(Object msgId) {
        System.out.println(“OK:”+msgId);
    }
    public void close() {}

    public void fail(Object msgId) {
        System.out.println(“FAIL:”+msgId);
    }

 @Override
 public void open(Map conf, TopologyContext context,
   SpoutOutputCollector collector) {
  try {
            this.fileReader = new FileReader(conf.get(“wordsFile”).toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException(“Error reading file [“+conf.get(“wordFile”)+”]”);
        }
     this.filePath = conf.get(“wordsFile”).toString();
        this.collector = collector;

 }

 @Override
 public void nextTuple() {
  if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            return;
        }
        String str;
        BufferedReader reader =new BufferedReader(fileReader);
        try{
            while((str = reader.readLine()) != null){
             System.out.println(“WordReader read”+ str);
                this.collector.emit(new Values(str),str);
                System.out.println(“WordReader out”+ str);
            }
        }catch(Exception e){
            throw new RuntimeException(“Error reading tuple”,e);
        }finally{
            completed = true;
        }

 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(“line”));

 }

}

完成后打包成 storm.jar

通过 storm jar storm.jar mapstorm.StormMain  /data/words.txt 即可启动运行.ps:words.txt 要分发到各 Supervisor 相应目录下。

可以通过 storm ui 页面看到 Topology 中多了一条任务。

如果要终止任务 storm kill name 即可,这里是 storm kill wordCounterTopology

推荐阅读:

Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm

安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm

Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm

Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计5686字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中