confluent简介
LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka做的。
Confluent Platform
Confluent Platform 是一个流数据平台,能够组织管理来自不同数据源的数据,拥有稳定高效的系统。
Confluent Platform不仅提供数据传输的系统,还提供所有的工具:连接数据源的工具,应用,以及数据接收。
Confluent Platform组件
Confluent Platform 很容易的建立实时数据管道和流应用。通过将多个来源和位置的数据集成到公司一个中央数据流平台,Confluent Platform使您可以专注于如何从数据中获得商业价值而不是担心底层机制,如数据是如何被运输或不同系统间摩擦。具体来说,Confluent Platform简化了连接数据源到Kafka,用Kafka构建应用程序,以及安全,监控和管理您的Kafka的基础设施。
Kafka 是最流行的开源即时通讯系统,Confluent Platform 基于Kafka. Kafka 是低延迟,高可扩展,分布式消息系统。它被数百家企业用于许多不同的场景,包括收集用户活动数据,系统日志,应用程序指标,股票行情数据和设备仪器的信号。
Kafka开源项目包括一些关键组件:
Kafka Brokers(开源)。构成Kafka的消息,数据持久性和存储层。
Kafka Java Clients(开源)。Java 库,写消息到kafka 或者从kafka 读消息。
Kafka Streams(开源)。Kafka Streams是一个库使kafka转换成功能齐全的流处理系统。
Kafka Connect(开源)。一种可扩展的和可靠的连接Kafka框架与外部系统(如数据库,键值存储,搜索索引和文件系统)的框架。
除了Kafka以外, Confluent Platform 包括更多的工具和服务,使构建和管理数据流平台更加容易。
Confluent Control Center(闭源)。管理和监控Kafka最全面的GUI驱动系统。
Confluent Kafka Connectors(开源)。连接SQL数据库/Hadoop/Hive
Confluent Kafka Clients(开源)。对于其他编程语言,包括C/C++,Python
Confluent Kafka REST Proxy(开源)。允许一些系统通过HTTP和kafka之间发送和接收消息。
Confluent Schema Registry(开源)。帮助确定每一个应用使用正确的schema当写数据或者读数据到kafka中。
总的来说,Confluent Platform平台的组件给你的团队朝着建立统一而灵活的方式建立一个企业范围的数据流平台。
confluent单节点安装部署
confluent的安装部署相对比较简单,confluent为我们提供了Confluent Platform,我们即可以快速启动整个confluent平台,也可以单独启动想要的组件。
java环境安装
confluent启动需要java环境,java环境安装参考:CentOS上安装Java环境
confluent platform下载
下载地址,下载最新版本1
2
3
4
5
6
7
8
9
10
11
12
13cd /data/backup
wget http://packages.confluent.io/archive/5.1/confluent-5.1.2-2.11.tar.gz
tar -zxvf confluent-5.1.2-2.11.tar.gz
mv confluent-5.1.2 /data/confluent
cd /data/confluent
ll
total 12
drwxr-xr-x 3 1000 1000 4096 Feb 18 19:49 bin
drwxr-xr-x 23 1000 1000 4096 Feb 18 19:49 etc
drwxr-xr-x 3 1000 1000 21 Feb 18 19:26 lib
-rw-r--r-- 1 1000 1000 871 Feb 18 20:24 README
drwxr-xr-x 7 1000 1000 106 Feb 18 19:49 share
drwxr-xr-x 2 1000 1000 179 Feb 18 20:24 src
添加环境变量1
2
3
4
5
6
7# vim /etc/profile
#末尾添加以下内容
#set confluent environment
PATH=/data/confluent/bin:$PATH
export PATH
# source /etc/profile #使环境变量生效
现在bin目录下的所有命令我们可以直接使用。
首先看看如何快速启动confluent platform全家桶ZooKeeper,Kafka,Schema Registry,Control Center,Kafka Connect,Kafka REST Proxy,KSQL。
快速启动
特别说明:我们的命令执行目录都是在confluent目录下。
启动1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
看到如下信息,说明我们的confluent platform中的多个组件都启动成功。
confluent平台各组件的默认端口号
Component | Default Port |
---|---|
Zookeeper | 2181 |
Apache Kafka brokers (plain text) | 9092 |
Schema Registry REST API | 8081 |
REST Proxy | 8082 |
Kafka Connect REST API | 8083 |
Confluent Control Center | 9021 |
访问测试
通过使用http://ip:9021来访问Control Center,如图:
集群搭建
这里我们使用两台机器模拟集群10.186.61.103,10.186.60.60,10.186.65.43分别编排为node01,node02,node03。修改三台机器对应的hosts文件。分别添加如下配置1
2
3
4
5
6
7
8
9
10
110.0.0.0 localhost node01
10.186.60.60 localhost node02
10.186.65.43 localhost node03
10.186.61.103 localhost node01
0.0.0.0 localhost node02
10.186.65.43 localhost node03
10.186.61.103 localhost node01
10.186.60.60 localhost node02
0.0.0.0 localhost node03
分别为每台机器创建myid文件,没个myid保存要给唯一的数字即可,我这里三个host分别指定为1,2,3。1
2mkdir /var/lib/zookeeper
vim /var/lib/zookeeper/myid
每台机器分别指定如下配置
zookeeper配置和启动
1 | # vim etc/kafka/zookeeper.properties #添加如下配置 |
启动1
zookeeper-server-start etc/kafka/zookeeper.properties
kafka配置和启动
修改配置1
2# vim etc/kafka/server.properties
zookeeper.connect=node01:2181,node02:2181,node03:2181
设置broker.id=0
这里我们可以使用broker.id.generation.enable=true自动生成替代。1
2
3#broker.id=0
broker.id.generation.enable=true
advertised.listeners=PLAINTEXT://本机IP:9092
启动1
kafka-server-start etc/kafka/server.properties
Schema Registry配置和启动(可选)
配置1
2# vim etc/schema-registry/schema-registry.properties
kafkastore.connection.url=node01:2181,node02:2181,node03:2181
启动1
schema-registry-start etc/schema-registry/schema-registry.properties
这里我们不使用官方模式的avro序列化方式,所有不启动组件schema-registry。
kafka connect配置和启动
配置1
cp etc/schema-registry/connect-avro-distributed.properties etc/schema-registry/connect-distributed.properties
修改1
2
3
4
5
6# vim etc/schema-registry/connect-distributed.properties
bootstrap.servers=node01:9092,node02:9092,node03:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://node01:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://node01:8081
启动1
./bin/connect-distributed etc/schema-registry/connect-distributed.properties
Control Center配置和启动
配置1
2
3
4
5# vim etc/confluent-control-center/control-center-dev.properties
bootstrap.servers=node01:9092,node02:9092,node03:9092
zookeeper.connect=node01:2181,node02:2181:node03:2181
#confluent.controlcenter.schema.registry.url=http://node01:8081,http://node02:8081,http://node03:8081
confluent.controlcenter.connect.cluster=http://node01:8083
启动1
control-center-start etc/confluent-control-center/control-center-dev.properties
到此为止kafka confluent集群搭建成功。