Storm集群搭建

Apache Storm简介

Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。
Apache Storm继续成为实时数据分析的领导者。Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。

Apache Storm vs Hadoop

基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。下表比较了Storm和Hadoop的属性。

Storm Hadoop
实时流处理 批量处理
无状态 有状态
主/从架构与基于ZooKeeper的协调。主节点称为nimbus,从属节点是主管 具有/不具有基于ZooKeeper的协调的主 - 从结构。主节点是作业跟踪器,从节点是任务跟踪器
Storm流过程在集群上每秒可以访问数万条消息。 Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 MapReduce作业按顺序执行并最终完成。
两者都是分布式和容错的
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 如果JobTracker死机,所有正在运行的作业都会丢失。

使用Apache Storm的例子

Apache Storm对于实时大数据流处理非常有名。因此,大多数公司都将Storm用作其系统的一个组成部分。一些值得注意的例子如下 -
Twitter - Twitter正在使用Apache Storm作为其“发布商分析产品”。 “发布商分析产品”处理Twitter平台中的每个tweets和点击。 Apache Storm与Twitter基础架构深度集成。
NaviSite - NaviSite正在使用Storm进行事件日志监控/审计系统。系统中生成的每个日志都将通过Storm。Storm将根据配置的正则表达式集检查消息,如果存在匹配,那么该特定消息将保存到数据库。
Wego - Wego是位于新加坡的旅行元搜索引擎。旅行相关数据来自世界各地的许多来源,时间不同。Storm帮助Wego搜索实时数据,解决并发问题,并为最终用户找到最佳匹配。

Apache Storm优势

下面是Apache Storm提供的好处列表:

Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
Storm允许实时流处理。
Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
Storm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高度可扩展的。
Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具有非常低的延迟。
Storm有操作智能。
Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。

Apache Storm核心概念

Apache Storm从一端读取​​实时数据的原始流,并将其传递通过一系列小处理单元,并在另一端输出处理/有用的信息。
下图描述了Apache Storm的核心概念:
Alt text
Apache Storm的组件:

组件 描述
Tuple Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。
Stream 流是元组的无序序列。
Spouts 流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。
Bolts Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。

下图描述了一个“Twitter分析”的实时示例,看看如何在Apache Storm中建模:
Alt text
“Twitter分析”的输入来自Twitter Streaming API。Spout将使用Twitter Streaming API读取用户的tweets,并作为元组流输出。来自spout的单个元组将具有twitter用户名和单个tweet作为逗号分隔值。然后,这个元组的蒸汽将被转发到Bolt,并且Bolt将tweet拆分成单个字,计算字数,并将信息保存到配置的数据源。现在,我们可以通过查询数据源轻松获得结果。

拓扑

Spouts和Bolts连接在一起,形成拓扑结构。实时应用程序逻辑在Storm拓扑中指定。简单地说,拓扑是有向图,其中顶点是计算,边缘是数据流。
简单拓扑从spouts开始。Spouts将数据发射到一个或多个Bolts。Bolt表示拓扑中具有最小处理逻辑的节点,并且Bolts的输出可以发射到另一个Bolts作为输入。
Storm保持拓扑始终运行,直到您终止拓扑。Apache Storm的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。

任务

现在你有一个关于Spouts和Bolts的基本想法。它们是拓扑的最小逻辑单元,并且使用单个Spout和Bolt阵列构建拓扑。应以特定顺序正确执行它们,以使拓扑成功运行。Storm执行的每个Spout和Bolt称为“任务”。简单来说,任务是Spouts或Bolts的执行。在给定时间,每个Spout和Bolt可以具有在多个单独的螺纹中运行的多个实例。

进程

拓扑在多个工作节点上以分布式方式运行。Storm将所有工作节点上的任务均匀分布。工作节点的角色是监听作业,并在新作业到达时启动或停止进程。

流分组

数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts。流分组控制元组在拓扑中的路由方式,并帮助我们了解拓扑中的元组流。有四个内置分组,如下所述:

随机分组

在随机分组中,相等数量的元组随机分布在执行Bolts的所有工人中。如下图所示:
Alt text

字段分组

元组中具有相同值的字段组合在一起,其余的元组保存在外部。然后,具有相同字段值的元组被向前发送到执行Bolts的同一进程。例如,如果流由字段“字”分组,则具有相同字符串“Hello”的元组将移动到相同的工作者。下图显示了字段分组的工作原理。
Alt text

全局分组

所有流可以分组并向前到一个Bolts。此分组将源的所有实例生成的元组发送到单个目标实例(具体来说,选择具有最低ID的工作程序)。
Alt text

所有分组

所有分组将每个元组的单个副本发送到接收Bolts的所有实例。这种分组用于向Bolts发送信号。所有分组对于连接操作都很有用。
Alt text

Apache Storm集群架构

Apache Storm的主要亮点是,它是一个容错,快速,没有“单点故障”(SPOF)分布式应用程序。我们可以根据需要在多个系统中安装Apache Storm,以增加应用程序的容量。
如下图所示:
Alt text
Apache Storm有两种类型的节点,Nimbus(主节点)和Supervisor(工作节点)。Nimbus是Apache Storm的核心组件。Nimbus的主要工作是运行Storm拓扑。Nimbus分析拓扑并收集要执行的任务。然后,它将任务分配给可用的supervisor。
Supervisor将有一个或多个工作进程。Supervisor将任务委派给工作进程。工作进程将根据需要产生尽可能多的执行器并运行任务。Apache Storm使用内部分布式消息传递系统来进行Nimbus和管理程序之间的通信。

组件 描述
Nimbus(主节点) Nimbus是Storm集群的主节点。集群中的所有其他节点称为工作节点。主节点负责在所有工作节点之间分发数据,向工作节点分配任务和监视故障。
Supervisor(工作节点) 遵循指令的节点被称为Supervisors。Supervisor有多个工作进程,它管理工作进程以完成由nimbus分配的任务。
Worker process(工作进程) 工作进程将执行与特定拓扑相关的任务。工作进程不会自己运行任务,而是创建执行器并要求他们执行特定的任务。工作进程将有多个执行器。
Executor(执行者) 执行器只是工作进程产生的单个线程。执行器运行一个或多个任务,但仅用于特定的spout或bolt。
Task(任务) 任务执行实际的数据处理。所以,它是一个spout或bolt。
ZooKeeper framework(ZooKeeper框架) Apache的ZooKeeper的是使用群集(节点组)自己和维护具有强大的同步技术共享数据之间进行协调的服务。Nimbus是无状态的,所以它依赖于ZooKeeper来监视工作节点的状态。ZooKeeper的帮助supervisor与nimbus交互。它负责维持nimbus,supervisor的状态。

Storm是无状态的。即使无状态性质有它自己的缺点,它实际上帮助Storm以最好的可能和最快的方式处理实时数据。
Storm虽然不是完全无状态的。它将其状态存储在Apache ZooKeeper中。由于状态在Apache ZooKeeper中可用,故障的网络可以重新启动,并从它离开的地方工作。通常,像monit这样的服务监视工具将监视Nimbus,并在出现任何故障时重新启动它。
Apache Storm还有一个称为Trident拓扑的高级拓扑,它具有状态维护,并且还提供了一个高级API,如Pig。

Apache Storm工作流程

一个工作的Storm集群应该有一个Nimbus和一个或多个supervisors。另一个重要的节点是Apache ZooKeeper,它将用于nimbus和supervisors之间的协调。
Apache Storm的工作流程:

  • 最初,nimbus将等待“Storm拓扑”提交给它。
  • 一旦提交拓扑,它将处理拓扑并收集要执行的所有任务和任务将被执行的顺序。
  • 然后,nimbus将任务均匀分配给所有可用的supervisors。
  • 在特定的时间间隔,所有supervisor将向nimbus发送心跳以通知它们仍然运行着。
  • 当supervisor终止并且不向心跳发送心跳时,则nimbus将任务分配给另一个supervisor。
  • 当nimbus本身终止时,supervisor将在没有任何问题的情况下对已经分配的任务进行工作。
  • 一旦所有的任务都完成后,supervisor将等待新的任务进去。
  • 同时,终止nimbus将由服务监控工具自动重新启动。
  • 重新启动的网络将从停止的地方继续。同样,终止supervisor也可以自动重新启动。由于网络管理程序和supervisor都可以自动重新启动,并且两者将像以前一样继续,因此Storm保证至少处理所有任务一次。
  • 一旦处理了所有拓扑,则网络管理器等待新的拓扑到达,并且类似地,管理器等待新的任务。

默认情况下,Storm集群中有两种模式:

  • 本地模式:此模式用于开发,测试和调试,因为它是查看所有拓扑组件协同工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的Storm配置环境中运行。在本地模式下,storm拓扑在本地机器上在单个JVM中运行。
  • 生产模式:在这种模式下,我们将拓扑提交到工作Storm集群,该集群由许多进程组成,通常运行在不同的机器上。如在storm的工作流中所讨论的,工作集群将无限地运行,直到它被关闭。

Apache Storm分布式消息系统

Apache Storm处理实时数据,并且输入通常来自消息排队系统。外部分布式消息系统将提供实时计算所需的输入。Spout将从消息系统读取数据,并将其转换为元组并输入到Apache Storm中。有趣的是,Apache Storm在内部使用其自己的分布式消息传递系统,用于其nimbus和主管之间的通信。

分布式消息系统

分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息传递系统提供可靠性,可扩展性和持久性的好处。
大多数消息模式遵循发布 - 订阅模型(简称发布 - 订阅),其中消息的发送者称为发布者,而想要接收消息的那些被称为订阅者。
一旦消息已经被发​​送者发布,订阅者可以在过滤选项的帮助下接收所选择的消息。通常我们有两种类型的过滤,一种是基于主题的过滤,另一种是基于内容的过滤。
需要注意的是,pub-sub模型只能通过消息进行通信。它是一个非常松散耦合的架构;甚至发件人不知道他们的订阅者是谁。许多消息模式使消息代理能够交换发布消息以便由许多订户及时访问。一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。
Alt text
一些流行的高吞吐量消息传递系统:

分布式消息系统 描述
Apache Kafka Kafka是在LinkedIn公司开发的,后来它成为Apache的一个子项目。 Apache Kafka基于brokerenabled的,持久的,分布式的发布订阅模型。 Kafka是快速,可扩展和高效的。
RabbitMQ RabbitMQ是一个开源的分布式鲁棒消息应用程序。它易于使用并在所有平台上运行。
JMS(Java Message Service) JMS是一个开源API,支持创建,读取和从一个应用程序向另一个应用程序发送消息。它提供有保证的消息传递并遵循发布 - 订阅模型。
ActiveMQ ActiveMQ消息系统是JMS的开源API。
ZeroMQ ZeroMQ是无代理的对等体消息处理。它提供推拉,路由器 - 经销商消息模式。
Kestrel Kestrel是一个快速,可靠,简单的分布式消息队列。

Apache Storm安装

服务器规划

主机名 IP Zookeeper Nimbus Supbervisor
node01 10.186.63.112
node02 10.186.63.119
node03 10.186.63.122

安装Java环境

所有节点都需要安装Java环境。

1
2
3
tar -zxvf jdk-8u161-linux-x64.tar.gz -C /data
mv /data/jdk1.8.0_161 /data/jdk1.8
vim /etc/profile

修改环境变量,末尾加入以下内容

1
2
3
4
#set java environment
JAVA_HOME=/data/jdk1.8
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH

执行以下命令,使环境变量生效

1
source /etc/profile

验证

1
2
3
4
# java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

搭建Zookeeper集群

下载、解压

1
2
3
4
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar -zxvf apache-zookeeper-3.4.14.tar.gz -C /data/
cd /data
mv zookeeper-3.4.14 zookeeper

修改文件/etc/profile,添加环境变量

1
2
3
#set zookeeper environment
export ZOOKEEPER_HOME=/data/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf

配置

1
2
3
4
cd zookeeper
mkdir data dataLog
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg

内容如下:

1
2
3
4
5
6
7
8
9
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/dataLog
clientPort=2181
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

创建id文件

1
2
touch data/myid
echo 1 > data/myid

注意此处id值分别为1、2、3,每台服务器不一样。
启动

1
./bin/zkServer.sh start

在当前目录,即/data/zookeeper会生成日志文件zookeeper.out,查看日志中是否有报错。
Zookeeper集群安装完成。

安装Storm集群

官方下载地址

下载

1
2
3
4
5
6
7
wget http://mirror.bit.edu.cn/apache/storm/apache-storm-1.2.2/apache-storm-1.2.2.tar.gz
tar -zxvf apache-storm-1.2.2.tar.gz -C /data
cd /data
mv apache-storm-1.2.2 storm #重新命名文件夹
cd storm
mkdir data #创建数据目录
vim conf/storm.yaml #修改配置文件

配置

配置文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
storm.zookeeper.servers:
- "node01"
- "node02"
- "node03"
storm.zookeeper.port: 2181
torm.local.dir: "/data/storm/data"

nimbus.seeds: ["node01"]
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.health.check.dir: "healthchecks"
storm.health.check.timeout.ms: 5000

修改/etc/profile,添加环境变量

1
2
3
#set storm environment
export STORM_HOME=/data/storm
export PATH=$PATH:$STORM_HOME/bin

同步storm以及环境配置

1
2
3
4
scp -r storm root@10.186.63.119:/data
scp -r storm root@10.186.63.122:/data
scp /etc/profile root@10.186.63.119:/etc
scp /etc/profile root@10.186.63.122:/etc

三台服务器执行source /etc/profile,使环境变量生效。

启动storm集群

node01上启动
启动storm nimbus

1
storm nimbus

启动ui(再复制一个窗口)

1
storm ui

node02和node03上启动

1
storm supervisor

上述启动方式中,进程并没有以后台方式运行,如果断开远程连接窗口,则进程关闭。
也可以使用以下命令

1
2
3
nohup /data/storm/bin/storm nimbus >/dev/null 2>&1 &
nohup /data/storm/bin/storm ui >/dev/null 2>&1 &
nohup /data/storm/bin/storm supervisor >/dev/null 2>&1 &

也可以使用supervisor(和storm中的supervisor节点不是同一个东西),使用方法参考:
Supervisor的作用与配置实例
打开node01上8080端口,浏览器访问,即可看到以下截图内容
Alt text
Nimbus Summary和Supervisor Summary运行正常,storm集群已安装完成。

本文标题:Storm集群搭建

文章作者:Francis

原始链接:http://www.cnops.com/posts/48352a80.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。