`
yangyangmyself
  • 浏览: 229561 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

STORM 拓扑构建

阅读更多

一、Storm大数据位置

 

解决方案

开发商

类型

描述

Storm

Twitter

流式处理

Twitter 的新流式大数据分析解决方案

S4

Yahoo!

流式处理

来自 Yahoo! 的分布式流计算平台

Hadoop

Apache

批处理

MapReduce 范式的第一个开源实现

Spark

UC Berkeley AMPLab

批处理/流处理

支持内存中数据集和恢复能力的最新分析平台

Disco

Nokia

批处理

Nokia 的分布式 MapReduce 框架

HPCC

LexisNexis

批处理

HPC 大数据集群

 

 

二、Storm概念及组件

 

  在Storm拓扑构建前我们先复习一下Storm概念及组件:

 

     Nimbus:负责资源分配和任务调度。

 

     Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。

 

     Worker:运行具体处理组件逻辑的进程。

 

      Taskworker中每一个spout/bolt的线程称为一个task. storm0.8之后,task不再与物理线程对应,同一个   

 

      spout/bolttask可能会共享一个物理线程,该线程称为executor

 

      Topologystorm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。

 

      Spout:在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为

 

       topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用

 

   此函数,用户只要在其中生成源数据即可。

 

       Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等

 

   任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函

 

   数,用户可以在其中执行自己想要的操作。

 

       TupleStorm SpoutBolt组件消息传递的基本单元(数据模型),Tuple是包含名称的列表,Storm支持所

 

   有原生类型,字节数组为Tuple字段传递,如果要传递自定义对象,需要实现接口serializer

 

       Stream:源源不断传递的tuple就组成了stream

  

  

 

图(一)

 

 三、创建逻缉组件

      

       以下图二所示,创建Storm组件SpoutBoltStorm拓扑构建并结合拓扑描述并发情况(Worker

 

   ExecutorTask关系)

 



 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 图(二)

 

      SPOUT:自定义类实现IRichSpout接口或继承BaseRichSpout即创建Spout组件。Spout从外部数据源读取数据

 

(队列、DRPC等)为Storm拓扑提供数据,Storm可以实现可靠或不可靠,可靠性表现在Storm可以对Storm

 

 toplogy处理失败的tuple进行重发,反之不处理。

 

    Spout可以发送多个流,通过在调用SpoutOutputCollector类的emit方法的同时使用declareStream方法申

 

 明并指定多个流发送。


     Spout中主要方法nextTuple,能够发送新流到toplopy及无数据流发送时直接返回。最重要的是不要阻塞

 

 nextTuple方法,因为Spout在同一个线程执行。Spout主要将读取到的数据组织成Tuple发送到Bolt组件处

 

 理。


     Spout另一个重要的方法时ack和fail,Storm监控到tuple从Spout发送到toplogy成功完成或失败时调用ack

 

 和fail(数据可靠性请参考其它文档)


     示例Spout发tuple到默认流:

 

package com.sunshine.spout;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* 绿色Spout,参考图二
*/
public class GreenSpout extends BaseRichSpout{
	private static final long serialVersionUID = -1215556162813479167L;
	private SpoutOutputCollector collector;
	/**
	 * Storm自动初始化
	 */
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
	}
	/**
	 * Storm不断调用此方法,传递单词到Bolt组件处理
	 */
	@Override
	public void nextTuple() {
		String[] words = {"green", "yellow", "blue"}; 
		for(String word : words){
			/**
			 * Values是List子类,发送数据需要封装在Values
			 * 本次存储一个单词发送与declareOutputFields()
			 * 方法申明输出一个字段对应
			 */
			collector.emit(new Values(word), word); // 1
		}
	}
	/**
	 * 申明发送tuple字段名称
	 * 在Bolt组件可以根所名称或索引获取Spout传递的Tuple
	 * tuple.getValue(0)
	 * 或tuple.getValueByField("word");
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {	
		declarer.declare(new Fields("word")); // 1
	}
	@Override
	public void ack(Object msgId) {
		System.out.println("success-->" + msgId);
	}
	@Override
	public void fail(Object msgId) {
		System.out.println("fail-->" + msgId);
	}
}

 
 

      BOLT:自定义类实现IRichBolt接口或者继承BaseRichBolt类即Bolt组件。Toplogy里所有处理都在Bolt完成,

 

 Bolt可以做任何事,如过滤、聚合、连接、操作数据库等,也可以将数据传递一下Bolt组件处理。

 

       Bolt可以做简单流转化或发送多个流(参考Spout发送多个流的方式),Bolt成功将消息处理后通知ACK

 

 件(可靠)。

 

package com.sunshine.bolt;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
*黄色Spout,参考图二
*/
public class YellowBolt extends BaseRichBolt{
	private static final long serialVersionUID = 7593355203928566992L;
	private OutputCollector collector;
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}
	@Override
	public void execute(Tuple input) {
		String word = (String)input.getValueByField("word");
		if(word != null){
			collector.emit(new Values(word, word));
		}
		collector.ack(input);
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("yword1","yword2"));
	}
}

 

package com.sunshine.bolt;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* 蓝色Spout,参考图二
*/
public class BlueBolt extends BaseRichBolt{
	private static final long serialVersionUID = 4342676753918989102L;
	private OutputCollector collector;
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}
	@Override
	public void execute(Tuple input) {
		String yword1 = (String)input.getValueByField("yword1");
		String yword2 = (String)input.getValueByField("yword2");
		System.out.println("yword1:" + yword1 +",yword2:" + yword2);
		collector.ack(input);
	}
	/**
	 * 结束
	 */
	@Override
	public void cleanup() {
		super.cleanup();
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 不再传递下一个Bolt组件处理
	}
}

 

       Toplogy(拓扑):参考图二及上述代码,构建Storm拓扑代码如下;在WIN7上使用本地运行模式,存在

 

        Zookeeper连接问题:

 

java.net.SocketException: Address family not supported by protocol family: connect”

 

 原因:Win7启动IPV6

 

 解决:增加配置属性System.setProperty("java.net.preferIPv4Stack", "true");

 

package com.sunshine;
import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import com.sunshine.bolt.BlueBolt;
import com.sunshine.bolt.YellowBolt;
import com.sunshine.spout.GreenSpout;
import com.sunshine.tools.StormRunner;
/**
 * STROM启动类
 * @author OY
 * @version 0.1
 */
public class SimpleTopolog {
	public static void main(String[] args) throws Exception {
		// 解决ZOOKEEPER客户端连接服务端问题(IPV6)
		System.setProperty("java.net.preferIPv4Stack", "true");
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("green-spout", new GreenSpout(), 2) // 2 executor(线程)
		       .setNumTasks(4); // 4 task 对应2个executor
		builder.setBolt("yellow-bolt", new YellowBolt(),6) // 6 executor(线程)
				.shuffleGrouping("green-spout"); //tuple随机分发到bolt处理
		builder.setBolt("blue-bolt", new BlueBolt(), 2) // 2 executor(线程)
			   .shuffleGrouping("yellow-bolt");
		Config conf = new Config();
		/*设置工作进程数*/
		conf.setNumWorkers(2); 
		conf.setDebug(true);
		/*本地运行模式*/
		StormRunner.runTopologyLocally(builder.createTopology(), "simpleTopology", conf, 0);
	}
}

 

Storm运行模式工具类

 

 

package com.sunshine.tools;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
public final class StormRunner {
  private static final int MILLIS_IN_SEC = 1000;
  private StormRunner() {
  }
  public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
      throws InterruptedException {
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topologyName, conf, topology);
    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
    cluster.killTopology(topologyName);
    cluster.shutdown();
  }
  public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf)
      throws AlreadyAliveException, InvalidTopologyException {
    StormSubmitter.submitTopology(topologyName, conf, topology);
  }
}

 

 

 

  • 大小: 84.4 KB
  • 大小: 79.4 KB
  • 大小: 30.2 KB
分享到:
评论

相关推荐

    storm-config:将配置文件传递到Storm拓扑的示例

    这是如何将配置参数传递到Storm拓扑中的示例。 它建立在基础上。 与简单的Echo拓扑一样,包括基于的生产者。 配置 配置文件存储在config目录中。 其中包含的两个是docker.properties和aws.properties 。 码头工人 ...

    jetstorm:用于加速Storm拓扑开发的Eclipse工具

    Eclipse工具,用于应用几种Storm设计模式,创建完整的Storm拓扑实现(例如业务逻辑) 该工具的输入是一个简单的XML模型,其中列出了喷嘴,螺栓和连接它们的流。 该工具实现为基于Eclipse的JET转换,并应用以下模式...

    storm-sample:在 Hortonworks HDP 中构建风暴拓扑的示例代码

    在topologies目录下我们可以看到Storm拓扑(基类和实现类) 在螺栓下,您可以看到 HDFS(带旋转)、hive 和 solr 的螺栓 StormKafka方案在kafka目录下 此拓扑中的所有项目都是可选的,您可以禁用除 Kafka spout ...

    Frank_storm:Frank_test

    使用 Node.js 构建和运行 Storm 拓扑。先决条件你的 Storm 集群应该安装了 Node.js 并且在系统路径上可用。 Jar 实用程序需要在您的 PATH 中才能打包拓扑。 为了使用“本地集群”测试 Node.js Storm 拓扑,您需要并...

    Storm-Kafka-ES:风暴拓扑将风暴与Kafka和Elasticsearch集成

    该Storm拓扑使用Kafka Spout读取来自Kafka的消息,并使用Bolt将从Kafka读取的传入消息解析为JSON消息。 然后将已解析的JSON消息加载到Elastic搜索中以使用Kibana进行仪表板和分析 该项目的前提条件:Zookeeper安装...

    csharp-storm-example

    要构建/使用,请按照的步骤学习如何构建/部署 C# 拓扑在 HDInsight 上进行 Storm。 注意:如果您按照上面的文章进行操作,则有一部分是关于本地测试的。 这似乎不适用于多个流,因为 SCP.NET 框架返回抱怨流的异常...

    storm-python-example:使用 Python ShellBolts 通过 Storm 处理 Kafka 消息的示例

    使用提供的 target/storm-python-example.jar,您可以添加新的 Python bolts,而无需重新构建 jar。 为了让 Storm 能够将 Python 脚本作为 bolt 执行,脚本需要在 /resources 目录下的 jar 中可用。 mkdir ...

    storm:冲向Mesos!

    不支持Storm的拓扑“重新平衡”操作,并且在用于构建此项目的自定义Storm版本中明确禁用了它。 无法通过Storm UI的链接将Supervisor日志加载到每个工作人员的Logviewer,因为在Mesos下运行时Supervisor日志是按拓扑...

    Storm环境下基于权重的任务调度算法

    大数据流式计算平台Apache Storm默认采用轮询的方式进行任务调度,未考虑到拓扑中各任务计算开销的差异以及任务之间不同类型的通信模式,在负载均衡和通信开销方面存在较大的优化空间。针对这一问题,提出 一种Storm...

    meghaduta:为Amazon Hackathon构建的通知系统(2015年9月)

    梅加杜塔依存关系JDK7 雷迪斯设计通过传播事件的Storm拓扑监视文件更改。 该事件会传播到逐步构建项目的每个阶段,最后通知通知者(可插入)。入门$ mvn clean package# To start the storm Topology$ java -cp ...

    storm-dynamic

    提供允许用户从配置文件创建风暴拓扑的包。 构建: mvn 包 例子: $storm jar target/storm-dynamic-builder-0.1.0-SNAPSHOT-jar-with-dependencies.jar ...

    yet-another-storm-ui:另一个基于storm ui rest api 0.9.2或更高版本的storm ui

    这个项目正试图为风暴构建另一个用户界面。 许可证:Apache 许可证 2为什么是另一个 Storm UI? 原始风暴 UI 不可自定义。 所以危险操作暴露给所有用户,例如“Kill Topology” 无法添加ACL、SSO(本项目也不支持,...

    netty-storm:使用 SSL (TLS) 在 Netty 客户端和 Apache Storm 之间进行集成。 Netty Spout 和 Netty Producer

    使用 SSL (TLS) 在 Netty 客户端和 Apache Storm 之间进行集成。 交换的消息采用 JSON 格式。 使用 JSON 通过加密通道进行通信的示例 Netty Spout 和 Netty 生产者... 此外,Storm 必须运行兼容的拓扑结构 (NettySpout

    Storm-Dev-TestingFramework

    这是一个非常简单的storm开发和测试框架,可以真正减少开发时间,无需编写完整的拓扑代码,也无需担心其他bolt的处理部分先完成。 此外,您可以构建成熟的螺栓特定单元测试用例。 这个开发框架可以模拟风暴拓扑运行...

    storm-statistics:基于风暴的流式实时计算实现的案例

    风暴统计 基于flume-kafka-storm的流式实时计算实现的案例 1.在Windows平台上构建flume架构,修改属性文件:配置agent数据源,配置... 3.ide开发工具上构建拓扑,单机运行可以不用安装storm,分散则需要进行额外的配置

    基于storm实时热点统计的分布式并行缓存预热

    二、基于storm+kafka完成商品访问次数实时统计拓扑的开发 ================================ maven构建出的一些问题,直接从maven中央仓库可能下载不到jar包,自己去百度一下jar,下载下来 根据错误提示,拷贝到...

    数据实时分析平台Heron.zip

    API来构建和提交topologies来实现一个调度。调度运行的每一个topology作为一个job,有几个容器组成,其中一个容器运行主 topology,负责管理topology。每个剩余的容器运行一个流管理器,负责数据路由——一个权值...

    real-time-sentiment-analytic

    该项目使用 Maven 来构建和运行拓扑。 您的机器上需要以下内容: Oracle JDK >= 1.7.x Apache Maven >= 3.0.5 克隆这个 repo 并作为一个现有的 Maven 项目导入到 Eclipse IDE 或 IntelliJ IDEA。 此应用程序使用...

    myCoffeeStorm:myCoffee IoT框架的数据处理程序(Parser)和连接器(Message Broker,KairosDB)

    介绍这是的 Application项目。 myCoffee IoT框架的其他组件是:先决条件带...java -cp lib\* org.kairosdb.core.Main -p conf\kairosdb.properties -c run 本地模式下的拓扑java -jar target/myCoffee-storm-0.0.1.jar

    iot-masterclass

    使用该存储库中的实验室,您将学习如何使用Spark,Storm,Kafka,Akka,Hbase和Hive等工具在Hortonworks Data Platform(HDP)上构建实时事件处理应用程序。 这个主类有多个部分,这些部分是相互构建的。 通过按...

Global site tag (gtag.js) - Google Analytics