Flink初体验

Flink初体验

安装

官网:http://flink.apache.org/downloads.html

可以看到flink Last stable release是1.4..0

看下根据安装的hadoop版本下载对应的flink版本,由于我安装的hadoop是2.7.2的,所以选择下图进行安装。


image

执行wget命令下载flink:

➜ wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz

配置Flink_home环境变量:


image

查看配置的环境变量

➜ bin more ~/.bash_profile

#maven
export M2_HOME=/Users/zzy/Downloads/apache-maven-3.5.0
export PATH=$PATH:$M2_HOME/bin
#java1.8
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home
#java1.7
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home
CLASSPAHT=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
PATH=$JAVA_HOME/bin:$PATH:
export JAVA_HOME
export CLASSPATH
export PATH
#hadoop
export HADOOP_HOME=/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
#pig
export PIG_HOME=/Users/zzy/Documents/zzy/software/bigdata/pig-0.16.0
export PATH=$PATH:$PIG_HOME/bin

#scala
export SCALA_HOME=/Users/zzy/Documents/zzy/software/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin

#flink
export FLINK_HOME=/Users/zzy/Documents/zzy/software/flink-1.4.0
export PATH=$PATH:$FLINK_HOME/bin

#mysql alias
alias mysql='/usr/local/mysql/bin/mysql'
alias mysqladmin='/usr/local/mysql/bin/mysqladmin'

#git
export GIT_HOME=/usr/local/bin
export PATH=$PATH:$GIT_HOME/git

#ES
export ELASTICSEARCH_HOME=/Users/zzy/Documents/zzy/software/bigdata/elasticsearch-5.5.2
export PATH=$PATH:$ELASTICSEARCH_HOME/bin
#kibana
export KIBANA_HOME=/Users/zzy/Documents/zzy/software/bigdata/kibana-5.5.2-darwin-x86_64
export PATH=$PATH:$KIBANA/bin

#added by Anaconda2 4.4.0 installer
export PATH="/Users/zzy/anaconda/bin:$PATH"

可以看到flink-1.4.0要求scala是2.11的所以要安装2.11的scala
到scala官网安装即可。
配置scala_home:


图片.png

QuickStart:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/setup_quickstart.html

Start a Local Flink Cluster

$ ./bin/start-local.sh  # Start Flink

Check the **JobManager’s web frontend**at [<u>http://localhost:8081</u>](http://localhost:8081) and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

看到启动的进程有:


图片.png
➜  bin ps -ef |grep 25159
  501 25159     1   0 11:27上午 ttys007    0:14.66 /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java -Xms1024m -Xmx1024m -Dlog.file=/Users/zzy/Documents/zzy/software/flink-1.4.0/log/flink-zzy-jobmanager-0-zzydeMBP.log -Dlog4j.configuration=file:/Users/zzy/Documents/zzy/software/flink-1.4.0/conf/log4j.properties -Dlogback.configurationFile=file:/Users/zzy/Documents/zzy/software/flink-1.4.0/conf/logback.xml -classpath /Users/zzy/Documents/zzy/software/flink-1.4.0/lib/flink-python_2.11-1.4.0.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/log4j-1.2.17.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/slf4j-log4j12-1.7.7.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/flink-dist_2.11-1.4.0.jar::/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/etc/hadoop::/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/etc/hadoop:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/contrib/capacity-scheduler/*.jar org.apache.flink.runtime.jobmanager.JobManager --configDir /Users/zzy/Documents/zzy/software/flink-1.4.0/conf --executionMode cluster
  501 25596 16218   0 11:40上午 ttys007    0:00.00 grep --color=auto 25159
➜  bin ps -ef |grep 25496
  501 25496     1   0 11:27上午 ttys007    0:13.58 /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java -XX:+UseG1GC -Xms1024M -Xmx1024M -XX:MaxDirectMemorySize=8388607T -Dlog.file=/Users/zzy/Documents/zzy/software/flink-1.4.0/log/flink-zzy-taskmanager-0-zzydeMBP.log -Dlog4j.configuration=file:/Users/zzy/Documents/zzy/software/flink-1.4.0/conf/log4j.properties -Dlogback.configurationFile=file:/Users/zzy/Documents/zzy/software/flink-1.4.0/conf/logback.xml -classpath /Users/zzy/Documents/zzy/software/flink-1.4.0/lib/flink-python_2.11-1.4.0.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/log4j-1.2.17.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/slf4j-log4j12-1.7.7.jar:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/flink-dist_2.11-1.4.0.jar::/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/etc/hadoop::/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/etc/hadoop:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/contrib/capacity-scheduler/*.jar org.apache.flink.runtime.taskmanager.TaskManager --configDir /Users/zzy/Documents/zzy/software/flink-1.4.0/conf
  501 25603 16218   0 11:40上午 ttys007    0:00.00 grep --color=auto 25496

启动日志:

➜  log ll
total 144
-rw-r--r--  1 zzy  staff  27935  1  9 11:27 flink-zzy-jobmanager-0-zzydeMBP.log
-rw-r--r--  1 zzy  staff    532  1  9 11:27 flink-zzy-jobmanager-0-zzydeMBP.out
-rw-r--r--  1 zzy  staff  33783  1  9 11:27 flink-zzy-taskmanager-0-zzydeMBP.log
-rw-r--r--  1 zzy  staff    532  1  9 11:27 flink-zzy-taskmanager-0-zzydeMBP.out
➜  log tail flink-zzy-jobmanager-0-zzydeMBP.log
2018-01-09 11:27:26,252 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager actor
2018-01-09 11:27:26,258 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /var/folders/3x/csj5l35n7pl73rr_m94nwfzm0000gn/T/blobStore-4bef70e0-90fe-4372-849b-23c71255c92a
2018-01-09 11:27:26,259 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:56665 - max concurrent requests: 50 - max backlog: 1000
2018-01-09 11:27:26,345 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist akka://flink/user/archive
2018-01-09 11:27:26,346 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at akka.tcp://flink@localhost:6123/user/jobmanager.
2018-01-09 11:27:26,357 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager akka.tcp://flink@localhost:6123/user/jobmanager was granted leadership with leader session ID Some(00000000-0000-0000-0000-000000000000).
2018-01-09 11:27:26,369 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka.tcp://flink@localhost:6123/user/jobmanager
2018-01-09 11:27:26,375 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager#2017012179] - leader session 00000000-0000-0000-0000-000000000000
2018-01-09 11:27:27,695 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - TaskManager 062219ce0d130bd05ad322f1a584c7de has started.
2018-01-09 11:27:27,707 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at zzydembp (akka.tcp://flink@zzydembp:56667/user/taskmanager) as 164d7b2a6f48f6fc278ac43e15a28d20. Current number of registered hosts is 1. Current number of alive task slots is 1.
➜  log tail flink-zzy-taskmanager-0-zzydeMBP.log
2018-01-09 11:27:27,494 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /var/folders/3x/csj5l35n7pl73rr_m94nwfzm0000gn/T/flink-dist-cache-94e5fe4a-f4af-416c-9b38-9cb16e321c09
2018-01-09 11:27:27,504 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at akka://flink/user/taskmanager#-266437785.
2018-01-09 11:27:27,504 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: 062219ce0d130bd05ad322f1a584c7de @ zzydembp (dataPort=56668)
2018-01-09 11:27:27,504 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 1 task slot(s).
2018-01-09 11:27:27,506 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2018-01-09 11:27:27,513 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2018-01-09 11:27:27,735 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.
2018-01-09 11:27:27,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:56665. Starting BLOB cache.
2018-01-09 11:27:27,745 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Created BLOB cache storage directory /var/folders/3x/csj5l35n7pl73rr_m94nwfzm0000gn/T/blobStore-610b6734-e828-4232-b69a-a489e7737580
2018-01-09 11:27:27,749 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /var/folders/3x/csj5l35n7pl73rr_m94nwfzm0000gn/T/blobStore-94f069a9-43d2-47de-9947-aefef5604339
➜  log

Demo01

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.4.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.4.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>1.4.0</version>
</dependency>

运行官网的例子

- First of all, we use netcat to start local server via
$ nc -l 9000
- Submit the Flink program:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
- 日志如下:
➜  flink-1.4.0 ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Using the result of 'hadoop classpath' to augment the Hadoop classpath: /Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/etc/hadoop:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/contrib/capacity-scheduler/*.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: 0e40acff6c8a90508fb640d6643e4e58. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#2017012179] with leader session id 00000000-0000-0000-0000-000000000000.
01/09/2018 11:56:21 Job execution switched to status RUNNING.
01/09/2018 11:56:21 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
01/09/2018 11:56:21 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@5004a829, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4d518b32}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to SCHEDULED
01/09/2018 11:56:21 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
01/09/2018 11:56:21 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@5004a829, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4d518b32}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to DEPLOYING
01/09/2018 11:56:22 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
01/09/2018 11:56:22 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@5004a829, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4d518b32}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to RUNNING

查看8081端口,可以看到有一个Running job里有一个job在运行


图片.png

图片.png

Flink提交任务的方式

  • 抛砖引玉:
    在Spark集群提交作业时候可以使用--deploy参数指定client或者cluster方式提交作业到集群,前者是客户端模式,后者是集群模式,两者主要区别就是Driver的运行位置,在客户端模式下,Driver运行在提交作业的客户端机器上负责与集群进行资源申请调度等工作。而集群模式下Driver运行在集群中的某一个节点上负责资源申请以及调度。
    一般的,客户端模式适合程序的调试,这种模式下,程序中的print等类似控制台打印方法可以在提交作业的控制台打印输出,后者由于Driver运行在集群中的某一节点上,所以不会将打印信息在提交的客户端上进行打印。spark默认提交方式是客户端方式

  • Flink的提交作业方式:
    https://www.2cto.com/net/201706/644062.html
    flink同样支持两种提交方式,默认不指定就是客户端方式。如果需要使用集群方式提交的话。可以在提交作业的命令行中指定-d或者--detached 进行进群模式提交。
    -d,--detached If present, runs the job indetached mode(分离模式)
    客户端提交方式:FLINK_HOME/bin/flink run -c com.daxin.batch.App flinkwordcount.jar,客户端会多出来一个CliFrontend进程,就是驱动进程。 集群模式提交:FLINK_HOME/bin/flink run -d -c com.daxin.batch.App flinkwordcount.jar 程序提交完毕退出客户端,不在打印作业进度等信息!

./bin/flink run -c cn.com.xxx.zzy.SocketWindowWordCount ./lib_code/flink_learn-1.0-SNAPSHOT.jar --port 9000
打印日志如下:
➜ flink-1.4.0 ./bin/flink run -c cn.com.xxx.zzy.SocketWordCount ./lib_code/flink_learn-1.0-SNAPSHOT.jar --port 9000

Using the result of 'hadoop classpath' to augment the Hadoop classpath: /Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/etc/hadoop:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/hdfs/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/yarn/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/mapreduce/*:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/contrib/capacity-scheduler/*.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/zzy/Documents/zzy/software/flink-1.4.0/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/zzy/Documents/zzy/software/bigdata/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: c49b234b0e32d093ba0c93de53e18345. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1717984141] with leader session id 00000000-0000-0000-0000-000000000000.
01/10/2018 12:49:02 Job execution switched to status RUNNING.
01/10/2018 12:49:02 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
01/10/2018 12:49:02 TriggerWindow(SlidingProcessingTimeWindows(5000, 1000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@90a81510, reduceFunction=cn.com.xxx.zzy.SocketWordCount$1@4c012563}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to SCHEDULED
01/10/2018 12:49:02 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
01/10/2018 12:49:02 TriggerWindow(SlidingProcessingTimeWindows(5000, 1000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@90a81510, reduceFunction=cn.com.xxx.zzy.SocketWordCount$1@4c012563}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to DEPLOYING
01/10/2018 12:49:03 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
01/10/2018 12:49:03 TriggerWindow(SlidingProcessingTimeWindows(5000, 1000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@90a81510, reduceFunction=cn.com.xxx.zzy.SocketWordCount$1@4c012563}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to RUNNING

在flink界面可以看到多了一个Running的job
http://localhost:8081/#/overview

图片.png

图片.png

可以通过add jar的方式来Run一个job


图片.png

图片.png

图片.png

flink结果:

Words are counted in time windows of 5 seconds (processing time, tumbling windows) and are printed to stdout. Monitor the TaskManager’s output file and write some text in nc (input is sent to Flink line by line after hitting ):
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye
The .out file will print the counts at the end of each time window as long as words are floating in, e.g.:
$ tail -f log/flink-*-taskmanager-*.out
lorem : 1
bye : 1
ipsum : 4

结果存到.out文件中了(flink的结果没有直接打印在终端上)


图片.png

附上代码:

package cn.com.xxx.zzy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Created with IntelliJ IDEA.
 * To change this template use File | Settings | File Templates.
 */
public class SocketWordCount {

    public static void main(String[] args) throws Exception {
        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket WordCount");

    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容

  • 1、运行环境 主机IP 主机名 2、配置主机名(分别在五台机器上执行) hostname +主机名例如: h...
    献给记性不好的自己阅读 3,544评论 0 6
  • [root@hh hadoop]# hdfs namenode -format 17/12/12 17:42:06...
    好了伤疤忘了阅读 1,332评论 0 51
  • 1 目的将hadoop 2.7.1 安装到 166、167、168 三台机器上2 提供环境练习环境192.168....
    灼灼2015阅读 3,385评论 4 40
  • 每个人都有属于他们的不愉快和糟心,无关金钱只与人性。 反而快乐的事大部分与钱财无关。 上天很公平的给每个人配了24...
    LiaChing24阅读 152评论 0 0
  • 又是一天的这个时间,小宝终于不敌瞌睡进入梦乡,耳畔传来小宝均匀的呼吸声。腰则在此时刷起了存在感。翻看亲子日...
    云熙_365c阅读 180评论 2 4