目前工作下需要接触storm,遂写个demo练练手。https://github.com/apache/storm 
略做修改。
1 2 3 4 5 6 <dependency > <groupId > org.apache.storm</groupId > <artifactId > storm-core</artifactId > <type > jar</type > <version > 1.0.0</version > </dependency > 
2.Spout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public  class  RandomSentenceSpout  extends  BaseRichSpout  {private  static  final Logger  LOG  = LoggerFactory .getLogger (RandomSentenceSpout .class );SpoutOutputCollector  _collector;Random  _rand;@Override public  void  open (Map  conf, TopologyContext context, SpoutOutputCollector collectornew  Random ();@Override public  void  nextTuple (Utils .sleep (100 );String [] sentences = new  String []{sentence ("the cow jumped over the moon" ), sentence ("an apple a day keeps the doctor away" ),sentence ("four score and seven years ago" ), sentence ("snow white and the seven dwarfs" ), sentence ("i am at two with nature" )};String  sentence = sentences[_rand.nextInt (sentences.length )];LOG .info ("Emitting tuple: {}" , sentence);emit (new  Values (sentence));protected  String  sentence (String  inputreturn  input;@Override public  void  ack (Object  id@Override public  void  fail (Object  id@Override public  void  declareOutputFields (OutputFieldsDeclarer declarer ) {declare (new  Fields ("word" ));
3.Bolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  class  SplitSentenceBlot  extends BaseBasicBolt {Override     public  void  execute (Tuple tuple, BasicOutputCollector collector)   {String  word  = tuple.getString (0 );String [] words = word .split (" " );for  (String  s : words) {println ("=========="  + s);emit (new  Values (s));Override     public  void  declareOutputFields (OutputFieldsDeclarer declarer)   {declare (new  Fields ("word" ));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  class  WordCountBlot  extends BaseBasicBolt {String , Integer> counts = new  HashMap <String , Integer>();Override     public  void  execute (Tuple tuple, BasicOutputCollector collector)   {String  word  = tuple.getString (0 );get (word );if  (count == null)0 ;put (word , count);println ("==============["  + word  + "]:"  + count);emit (new  Values (word , count));Override     public  void  declareOutputFields (OutputFieldsDeclarer declarer)   {declare (new  Fields ("word" , "count" ));
4.Topology
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class  SimpleTopology {[]  args) throws Exception{new  TopologyBuilder() ;Spout("spout" , new  RandomSentenceSpout() , 1 );Bolt("split" , new  SplitSentenceBlot() , 3 ).shuffleGrouping("spout" ) ;Bolt("count" , new  WordCountBlot() , 3 ).fieldsGrouping("split" , new  Fields("word" ) );"word-count" ;new  Config() ;if  (args != null &&  args.length > 0 ) {NumWorkers(1) ;StormSubmitter .Topology(args [0], config , topologyBuilder .createTopology () );else  {MaxTaskParallelism(1) ;new  LocalCluster() ;Topology(topologyName , config , topologyBuilder .createTopology () );