# Storm集群WordCount的示例分析 ## 一、Storm框架概述 ### 1.1 实时计算与Storm Apache Storm是一个开源的分布式实时计算系统,由Nathan Marz团队在BackType创建(后被Twitter收购)。它能够以可靠的方式处理无限的数据流,实现实时分析、在线机器学习、持续计算等场景。与Hadoop的批处理模式不同,Storm采用流式处理模型,延迟可达到秒级甚至毫秒级。 **核心特点:** - 高可靠性:保证每条消息至少被处理一次(exactly-once语义可选) - 水平扩展:通过增加节点线性提升处理能力 - 容错机制:任务失败时自动重启 - 多语言支持:支持Java、Python等多种语言开发组件 ### 1.2 Storm架构组成 ```mermaid graph TD Nimbus-->|提交拓扑|Zookeeper Supervisor-->|心跳|Zookeeper Nimbus-->|分配任务|Supervisor Supervisor-->Worker Worker-->Executor Executor-->Task
关键角色说明: - Nimbus:主节点,负责拓扑分发、任务调度和故障监测 - Supervisor:工作节点,监听并执行分配给它的任务 - Zookeeper:协调服务,维护集群状态和配置信息 - Worker:JVM进程,执行特定拓扑的子集 - Executor:线程,运行一个或多个相同类型的Task - Task:实际的数据处理单元
WordCount作为经典案例,在Storm中的实现展示了流式处理的核心思想。数据流动过程如下:
Spout(数据源) --> SplitBolt(分词) --> CountBolt(计数) --> ReportBolt(报告)
public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector; private String[] sentences = { "storm cluster is powerful", "learn storm by example", "word count topology demo" }; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(1000); // 防止过度消耗CPU String sentence = sentences[new Random().nextInt(sentences.length)]; collector.emit(new Values(sentence)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split("\\s+"); for(String word : words) { collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private Map<String, Integer> counts = new HashMap<>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); counts.put(word, counts.getOrDefault(word, 0) + 1); collector.emit(new Values(word, counts.get(word))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
public class WordCountTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentence-spout", new SentenceSpout()); builder.setBolt("split-bolt", new SplitSentenceBolt()) .shuffleGrouping("sentence-spout"); builder.setBolt("count-bolt", new WordCountBolt()) .fieldsGrouping("split-bolt", new Fields("word")); Config config = new Config(); config.setNumWorkers(3); if(args != null && args.length > 0) { // 集群模式 StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { // 本地测试模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", config, builder.createTopology()); Utils.sleep(10000); cluster.shutdown(); } } }
参数名 | 默认值 | 说明 |
---|---|---|
topology.workers | 1 | 工作进程数 |
topology.max.spout.pending | null | Spout最大未完成元组数 |
message.timeout.secs | 30 | 消息超时时间(秒) |
topology.debug | false | 是否开启调试模式 |
通过设置并行度参数实现水平扩展:
builder.setSpout("sentence-spout", new SentenceSpout(), 2); // 2个executor builder.setBolt("split-bolt", new SplitSentenceBolt(), 4) .setNumTasks(8) // 每个executor运行2个task .shuffleGrouping("sentence-spout");
分组类型 | 数据分发方式 | 适用场景 |
---|---|---|
Shuffle | 随机均匀分配 | 无状态处理 |
Fields | 相同字段值发往同一任务 | 需要状态维护的操作(如计数) |
All | 广播到所有任务 | 全局通知类操作 |
Global | 全部发往同一任务(最低ID) | 汇总统计类操作 |
通过锚定(anchoring)和ack机制实现:
// 在Spout中发送时添加MessageID collector.emit(new Values(sentence), messageId); // Bolt中处理时锚定输入元组 collector.emit(tuple, new Values(word)); collector.ack(tuple);
topology.max.spout.pending
设置config.registerSerialization(MyClass.class);
worker.heap.memory.mb
supervisor.slots.ports
控制每节点worker数graph LR LogSpout -->|原始日志| FilterBolt FilterBolt -->|错误日志| AlarmBolt FilterBolt -->|正常日志| StatsBolt
Storm的WordCount示例虽然简单,但完整展示了实时流处理的核心概念。随着Flink等新框架的出现,Storm在某些场景已被替代,但其设计思想仍具参考价值。建议学习者进一步探索: - Trident高级抽象 - 与Kafka的集成方案 - 状态管理优化技巧
最佳实践建议:生产环境中建议使用Storm 2.x版本,其引入的分布式API和性能改进(如流量控制)能显著提升稳定性。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。