目前工作下需要接触storm,遂写个demo练练手。 代码主要借鉴的是storm官方的github项目:https://github.com/apache/storm
略做修改。 1.pom.xml引入
1 2 3 4 5 6 <dependency > <groupId > org.apache.storm</groupId > <artifactId > storm-core</artifactId > <type > jar</type > <version > 1.0.0</version > </dependency >
2.Spout Spout是strom里产生元数据(tuple)流的类,通常情况下Spout会读取外部数据,然后转换为Topology内部的元数据(tuple),主动角色。 nextTuple方法不断执行,数据在此方法内产生。 此处用在数组中随机获得句子作为元数据流做示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class RandomSentenceSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory .getLogger (RandomSentenceSpout .class ); SpoutOutputCollector _collector; Random _rand; @Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector ) { _collector = collector; _rand = new Random (); } @Override public void nextTuple ( ) { Utils .sleep (100 ); String [] sentences = new String []{sentence ("the cow jumped over the moon" ), sentence ("an apple a day keeps the doctor away" ), sentence ("four score and seven years ago" ), sentence ("snow white and the seven dwarfs" ), sentence ("i am at two with nature" )}; final String sentence = sentences[_rand.nextInt (sentences.length )]; LOG .info ("Emitting tuple: {}" , sentence); _collector.emit (new Values (sentence)); } protected String sentence (String input ) { return input; } @Override public void ack (Object id ) { } @Override public void fail (Object id ) { } @Override public void declareOutputFields (OutputFieldsDeclarer declarer ) { declarer.declare (new Fields ("word" )); } }
3.Bolt 在一个Topology中接收数据并处理的组件,被动角色。 execute函数处理接收到的数据,并生成新元组转发(emit)出去。 此处定义了两个bolt,第一个blot先把得到Sentence分隔成word,然后下一个bolt对word计数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class SplitSentenceBlot extends BaseBasicBolt { @Override public void execute (Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString (0 ); String [] words = word .split (" " ); for (String s : words) { System.out.println ("==========" + s); collector.emit (new Values (s)); } } @Override public void declareOutputFields (OutputFieldsDeclarer declarer) { declarer.declare (new Fields ("word" )); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class WordCountBlot extends BaseBasicBolt { Map<String , Integer> counts = new HashMap <String , Integer>(); @Override public void execute (Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString (0 ); Integer count = counts.get (word ); if (count == null) count = 0 ; count++; counts.put (word , count); System.out.println ("==============[" + word + "]:" + count); collector.emit (new Values (word , count)); } @Override public void declareOutputFields (OutputFieldsDeclarer declarer) { declarer.declare (new Fields ("word" , "count" )); } }
4.Topology 拓扑,storm里的关键性概念,本质上就是在storm里运行的一个实时应用程序,也就是此处要定义的主类。 具体看注释即可…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class SimpleTopology { public static void main(String[] args) throws Exception{ TopologyBuilder topologyBuilder = new TopologyBuilder() ; topologyBuilder.setSpout("spout" , new RandomSentenceSpout() , 1 ); topologyBuilder.setBolt("split" , new SplitSentenceBlot() , 3 ).shuffleGrouping("spout" ) ; topologyBuilder.setBolt("count" , new WordCountBlot() , 3 ).fieldsGrouping("split" , new Fields("word" ) ); String topologyName = "word-count" ; Config config = new Config() ; if (args != null && args.length > 0 ) { config.setNumWorkers(1) ; StormSubmitter . submitTopology(args [0], config , topologyBuilder .createTopology () ); } else { config.setMaxTaskParallelism(1) ; LocalCluster cluster = new LocalCluster() ; cluster.submitTopology(topologyName , config , topologyBuilder .createTopology () ); } } }