十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
这篇文章主要为大家展示了“storm如何配置使用”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“storm如何配置使用”这篇文章吧。
我们提供的服务有:成都网站设计、做网站、微信公众号开发、网站优化、网站认证、金水ssl等。为成百上千家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的金水网站制作公司
示例代码如下:
#storm.yaml 配置 #zookeeper storm.zookeeper.servers: - "bigdata01" - "bigdata02" - "bigdata03" #本地存放数据的路径 storm.local.dir: "/apps/storm" #nimbus master nimbus.seeds: ["bigdata00"] #workder端口 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 启动命令 bin/ nohup storm nimbus & bin/ nohup storm supervisor & bin/ nohup storm ui & -------------------------------------------------------------------------------------- package com.hgs.storm; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class StormWordCountTest { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("wordspout", new WordCountSpout(), 3); builder.setBolt("splitword", (IRichBolt) new WordSpliteBolt(), 2).shuffleGrouping("wordspout"); //word 是splitword发出的字段,如第九十行 builder.setBolt("wordcount", new WordCountBolt(), 2).fieldsGrouping("splitword", new Fields("word")); Config config = new Config(); config.setNumWorkers(2); /* StormSubmitter.submitTopology("words-count", config, builder.createTopology()); if(args!=null && args.length>0) { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); }else { LocalCluster cluster = new LocalCluster(); }*/ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("words-count", config, builder.createTopology()); } } class WordCountSpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; //从open方法中的到collector,用于declareOutputFields 方法发出字段信息 SpoutOutputCollector collector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(" this is my first storm program so i hope it will success")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields("message")); } } class WordSpliteBolt extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0); String[] words = line.split(" "); for(String wd : words) { collector.emit(new Values(wd ,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } } class WordCountBolt extends BaseRichBolt{ ConcurrentHashMapwordsMap = new ConcurrentHashMap (); private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getString(0); Integer num = input.getInteger(1); if(wordsMap.containsKey(word)) { wordsMap.put(word, wordsMap.get(word)+num); }else { wordsMap.put(word, num); } System.out.println(word +"----"+wordsMap.get(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
以上是“storm如何配置使用”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!