环境准备
安装Docker及相关镜像
Docker下载链接 由于
穷
买不起mac,所以挂的windows的链接,可自行去Docker官网下载其他版本。
安装过程省略,毕竟一路下一步的东西,下面开始安装并启动kafka、mysql等服务。-
安装zookeeper服务
- 因为kafka需要将许多信息固化存储在zk上,所以我们首先得安装zookeeper服务
- 执行
docker run -d --name zookeeper --publish 2181:2181 wurstmeister/zookeeper
,这里将2181端口绑定到本地,之后kafka才能连上zk。这样,我们的zk就安装完毕,接下来我们开始验证是否正确启动。 - 本地执行
docker ps -a
,能看到如下所示:
PS C:\Users\tzmaj> docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 2b8cd369aa3e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 3 seconds ago Up 2 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181>2181/tcp zookeeper
第一个值是容器id,现在我们把这个id拷出来,执行
docker exec -it 1bf952a747ef '/bin/bash'
,这样我们就进入到了容器里面。接下来进入/opt/zookeeper-3.4.13/bin
目录,执行./zkCli.sh
,进入zk的客户端root@2b8cd369aa3e:/opt/zookeeper-3.4.13/bin# ./zkCli.sh Connecting to localhost:2181 2020-04-28 08:13:40,238 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT 2020-04-28 08:13:40,241 [myid:] - INFO [main:Environment@100] - Client environment:host.name=2b8cd369aa3e 2020-04-28 08:13:40,241 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.7.0_65 2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre 2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/opt/zookeeper-3.4.13/bin/../build/classes:/opt/zookeeper-3.4.13/bin/../build/lib/*.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/opt/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/opt/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/opt/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/opt/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/opt/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/opt/zookeeper-3.4.13/bin/../conf: 2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib 2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.19.76-linuxkit 2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2020-04-28 08:13:40,245 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2020-04-28 08:13:40,245 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/opt/zookeeper-3.4.13/bin 2020-04-28 08:13:40,246 [myid:] - INFO [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@674e5e21 Welcome to ZooKeeper! 2020-04-28 08:13:40,268 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2020-04-28 08:13:40,278 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating session JLine support is enabled 2020-04-28 08:13:40,300 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x100005e85070000, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0]
接下来再执行
ls /
[zk: localhost:2181(CONNECTED) 0] ls / [zookeeper]
看来我们的zk应该装的没什么问题,那么一路
ctrl+d
退到最外面- 如果在执行
docker ps -a
发现STATUS那一列显示EXIT,那说明有问题,使用docker logs CONTAINER ID
查看具体日志 - 也可以使用类似于
ZooInspector
或者JAVA客户端等其他方式去连接zk,来验证zk服务是否正确启动
-
安装kafka服务
- 执行
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.47.44:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.47.44:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.11-0.11.0.3
注意:此处的172.17.47.44
是我本地的ip,大家在抄作业的时候,记得写上自己的名字
。接下来我们开始验证kafka能否正常使用。 - 执行
docker ps -a
找到kafka的容器id,然后执行docker exec -it e51c12e9a077 '/bin/bash'
,这样我们进入了kafka的容器里。接下来进入kafka客户端的目录/opt/kafka/bin
,执行kafka-console-producer.sh --broker-list 172.17.47.44:9092 --topic mykafka
,进入kafka生产者客户端。 - 再启动一个命令行,同样进入kafka客户端目录,然后执行
kafka-console-consumer.sh --bootstrap-server 172.17.47.44:9092 --topic mykafka --from-beginning
,这样进入kafka消费者客户端。 - 切回kafka生产者客户端窗口,输入我们准备好的JSON数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
,此时,观察kafka消费者客户端窗口,能看到一条数据被打印出来。如下所示:
PS C:\Users\tzmaj> docker exec -it 7a58196af291 '/bin/bash' bash-4.4# cd /opt/kafka/bin bash-4.4# kafka-console-producer.sh --broker-list 172.17.47.44:9092 --topic mykafka >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} >{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
PS C:\Users\tzmaj> docker exec -it 7a58196af291 '/bin/bash' bash-4.4# cd /opt/kafka/bin/ bash-4.4# kafka-console-consumer.sh --bootstrap-server 172.17.47.44:9092 --topic mykafka --from-beginning {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} ^CProcessed a total of 4 messages
- 第一段是生产者,第二段是消费者,看样子我们能够正确的消费和生产数据。NICE!下面用JAVA测试一下能否正常消费
Properties properties = new Properties(); properties.put("bootstrap.servers", "172.17.47.44.:9092"); properties.put("group.id", "test1"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("mykafka")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf(" value is =>>> %s", record.value()); } }
启动,然后观察控制台
value is =>>> {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
能够正常消费,我们的kafka客户端也没问题
- 执行
-
安装mysql服务
执行
docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d -p 3306:3306 mysql:5.6
命令,此处的MYSQL_ROOT_PASSWORD=123456
是将root密码设置为123456,大家可以根据个人喜爱自己修改-
使用Navicat连接我们的mysql
主机和端口分别就是我们的ip和设置的端口3306,密码就是上一步提到的
MYSQL_ROOT_PASSWORD
,测试连接
连接成功!接下来连入我们的mysql,并执行
select CURRENT_TIMESTAMP() from dual
,返回2020-04-28 07:38:48
,看样子我们的mysql也没什么问题。
最后,数据下载地址 UserBehavior.csv
好了,我们Flink Sql教程系列第一课环境准备完成了,接下来开始正式进入Flink Sql的学习之旅。