flink官方提供了一些测试的jar包供用户使用。
以examples/streaming/SocketWindowWordCount.jar为例,侦听本机某个端口的消息,统计一段时间内相同消息出现的次数。
提交flink job
启动本机flink集群
./bin/start-cluster.sh
启动监听本地9000端口
nc -l 9000 # 新启动一个命令行
如果没有这个命令,用yum -y install nc 进行安装, 或者根据自己的操作系统自行安装-
提交flink任务(任选一种方式)
(1) 命令行提交任务
一定要先启动端口监听再提交任务, 否则任务可能提交后启动失败。
提交任务时候指定的端口和监听的端口需要保持一致。
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
(2) web ui中提交flink job
在web中点击submit new job 选项卡, 上传jar包之后填写入口类路径、并发度、外部参数、save point路径等参数后点击submit即可提交flink job。
-
验证提交
flink前段界面中可以观察到有一个Running Job
运行flink job
-
发送数据
在启动nc的shell窗口,输入任意几个字符串:
-
查看日志
在web控制台中找到刚才启动的任务, 点击taskmanager中的LOG 查看输出的日志。
可以看到之前输入的3个water和4个test已经被统计输出。
-
查看日志文件
在flink文件的log目录下可以找到标准输出的日志文件。 任务运行产生的输出日志就保存在里面
停止flink job
-
在web ui中, 找到对应的运行中的job, 可以通过点击cancel job来停止任务。
通过命令取消job
提交任务时会分配一份jobId,也可以在web ui中看到每个job的ID
使用 flink cancel jobId 可以取消指定的flink job