一、简介
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。(摘自百度百科)
二、下载
下载地址:https://flink.apache.org/
下载得到:flink-1.11.1-bin-scala_2.12.tgz (注意选择相应的scala版本)
三、安装
tar zxvf flink-1.11.1-bin-scala_2.12.tgz -C /mylab/soft/
四、配置
1.修改环境变量
修改~/.bashrc
vi ~/.bashrc
#flink-1.11.1
export FLINK_HOME=$MYLAB_BASE_HOME/flink-1.11.1
export PATH=$PATH:$FLINK_HOME/bin
source ~/.bashrc
env
2.修改配置文件
a)flink-conf.yaml
vi $FLINK_HOME/conf/flink-conf.yaml
jobmanager.rpc.address: master
最后加一句:
env.java.home: /mylab/soft/jdk1.8.0_251
env.yarn.conf.dir: /mylab/soft/hadoop-3.2.1/etc/hadoop
env.hadoop.conf.dir:/mylab/soft/hadoop-3.2.1/etc/hadoop
其他可配的选项有(这些在$FLINK_HOME/bin/config.sh中有枚举值)
env.pid.dir
env.log.dir
env.log.max
env.yarn.conf.dir
env.hadoop.conf.dir
env.java.home
env.java.opts
env.java.opts.jobmanager
env.java.opts.taskmanager
env.java.opts.historyserver
env.java.opts.client
env.ssh.opts
high-availability
zookeeper.heap.mb
b)改变日志输出级别
log4j-cli.properties
log4j-console.properties
log4j-session.properties
log4j.properties
(可选,主要将日志的基本从INFO改为ERROR,减少日志输出)
把里面的大写INFO都替换成ERROR、WARN、DEBUG
c) masters
master:8081
d) workers
master
e) zoo.cfg
server.1=master:2888:3888
五.验证:本地模式
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html
1. 启动服务
start-cluster.sh
2.测试
mkdir $FLINK_HOME/examples/output
flink run $FLINK_HOME/examples/streaming/WordCount.jar --input $FLINK_HOME/LICENSE --output $FLINK_HOME/examples/output/wordcount-result4.txt
cat /$FLINK_HOME/examples/output/wordcount-result1.txt
3.WebUI
4. 停止服务
start-cluster.sh
六.Yarn Cluster模式(这个还有问题,可用slot一直为0,应该是jobmanager或者taskmanager启动有问题)
1. 启动
yarn-session.sh
这时候会一直停留在前台,要退出Ctrl+C或者输入stop
yarn-session.sh -d
以后台的detached方式运行
yarn-session.sh -id application_1595917718211_0001<appId>
重新attach上appid,appid在启动时有显示,或者在日志中找,类似这样的字符串:application_1595670231954_0002
2.提交作业
下面命令的端口号需要根据实际的运行做改变
flink run -m master:<port> $FLINK_HOME/examples/batch/WordCount.jar --input $FLINK_HOME/LICENSE --output $FLINK_HOME/examples/output/wordcount-result3.txt
3.WebUI
http://master:36875 (数字每次都不一样,根据启动后的展示)
4.yarn的web界面上查看
http://master:8088/cluster/apps
(主机和端口在hadoop的yarn-site.xml中的yarn.resourcemanager.webapp.address定义)
5.获取日志
yarn logs -applicationId <application ID>
或者
yarn logs -applicationId <application ID> >flink-<application ID>.log
6.退出
yarn-session前台停留时,用Ctrl+C或者输入stop
yarn-session后台驻留时
方法1:echo "stop" | yarn-session.sh -id <appId>
方法2:先用yarn-session.sh -id <appId>重新attach上,然后用Ctrl+C或者输入stop退出
实在没招时,用yarn application -kill <appId>(谨用!)
七.yarn-session用法
yarn-session.sh --help
Usage:
Optional
-at,--applicationType <arg> Set a custom application type for the application on YARN
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
举例:
yarn-session.sh -d detached方式运行
yarn-session.sh -tm 8192 -s32
八.flink用法
flink --help
flink <ACTION> [OPTIONS] [ARGUMENTS]
The following actions are available:
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the
JAR file does not specify the class in
its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-py,--python <pythonFile> Python script with the program entry
point. The dependent resources can be
configured with the `--pyFiles`
option.
-pyarch,--pyArchives <arg> Add python archive files for job. The
archive files will be extracted to the
working directory of python UDF
worker. Currently only zip-format is
supported. For each archive file, a
target directory be specified. If the
target directory name is specified,
the archive file will be extracted to
a name can directory with the
specified name. Otherwise, the archive
file will be extracted to a directory
with the same name of the archive
file. The files uploaded via this
option are accessible via relative
path. '#' could be used as the
separator of the archive file path and
the target directory name. Comma (',')
could be used as the separator to
specify multiple archive files. This
option can be used to upload the
virtual environment, the data files
used in Python UDF (e.g.: --pyArchives
file:///tmp/py37.zip,file:///tmp/data.
zip#data --pyExecutable
py37.zip/py37/bin/python). The data
files could be accessed in Python UDF,
e.g.: f = open('data/data.txt', 'r').
-pyexec,--pyExecutable <arg> Specify the path of the python
interpreter used to execute the python
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.19.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
These files will be added to the
PYTHONPATH of both the local client
and the remote python UDF worker. The
standard python resource file suffixes
such as .py/.egg/.zip or directory are
all supported. Comma (',') could be
used as the separator to specify
multiple files (e.g.: --pyFiles
file:///tmp/myresource.zip,hdfs:///$na
menode_address/myresource2.zip).
-pym,--pyModule <pythonModule> Python module with the program entry
point. This option must be used in
conjunction with `--pyFiles`.
-pyreq,--pyRequirements <arg> Specify a requirements.txt file which
defines the third-party dependencies.
These dependencies will be installed
and added to the PYTHONPATH of the
python UDF worker. A directory which
contains the installation packages of
these dependencies could be specified
optionally. Use '#' as the separator
if the optional parameter exists
(e.g.: --pyRequirements
file:///tmp/requirements.txt#file:///t
mp/cached_dir).
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
-sae,--shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
Options for Generic CLI mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "collection", "remote",
"local", "kubernetes-session", "yarn-per-job",
"yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. The currently available targets are:
"collection", "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session",
"yarn-application" and "kubernetes-application".
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one
specified in the configuration.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "info" shows the optimized execution plan of the program (JSON).
Syntax: info [OPTIONS] <jar-file> <arguments>
"info" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the JAR
file does not specify the class in its
manifest.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
Action "list" lists running and scheduled programs.
Syntax: list [OPTIONS]
"list" action options:
-a,--all Show all programs and their JobIDs
-r,--running Show only running programs and their JobIDs
-s,--scheduled Show only scheduled programs and their JobIDs
Options for Generic CLI mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "collection", "remote",
"local", "kubernetes-session", "yarn-per-job",
"yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. The currently available targets are:
"collection", "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session",
"yarn-application" and "kubernetes-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "stop" stops a running program with a savepoint (streaming jobs only).
Syntax: stop [OPTIONS] <Job ID>
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
-p,--savepointPath <savepointPath> Path to the savepoint (for example
hdfs:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
("state.savepoints.dir").
Options for Generic CLI mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "collection", "remote",
"local", "kubernetes-session", "yarn-per-job",
"yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. The currently available targets are:
"collection", "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session",
"yarn-application" and "kubernetes-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "cancel" cancels a running program.
Syntax: cancel [OPTIONS] <Job ID>
"cancel" action options:
-s,--withSavepoint <targetDirectory> **DEPRECATION WARNING**: Cancelling
a job with savepoint is deprecated.
Use "stop" instead.
Trigger savepoint and cancel job.
The target directory is optional. If
no directory is specified, the
configured default directory
(state.savepoints.dir) is used.
Options for Generic CLI mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "collection", "remote",
"local", "kubernetes-session", "yarn-per-job",
"yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. The currently available targets are:
"collection", "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session",
"yarn-application" and "kubernetes-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
"savepoint" action options:
-d,--dispose <arg> Path of savepoint to dispose.
-j,--jarfile <jarfile> Flink program JAR file.
Options for Generic CLI mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "collection", "remote",
"local", "kubernetes-session", "yarn-per-job",
"yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. The currently available targets are:
"collection", "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session",
"yarn-application" and "kubernetes-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
举例:
wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:///mylab/mydata
Submit Job to Flink
flink run $FLINK_HOME/examples/batch/WordCount.jar --input hdfs:///mylab/mydata/LICENSE-2.0.txt --output hdfs:///mylab/mydata/wordcount-result102411.txt
hadoop fs -ls /mylab/mydata
hadoop fs -cat /mylab/mydata/wordcount-result1024.txt
Run a single Flink job on standalone
flink run -m master:8081 $FLINK_HOME/examples/batch/WordCount.jar --input hdfs:///mylab/mydata/LICENSE-2.0.txt --output hdfs:///mylab/mydata/wordcount-result10240.txt