Flink自身提供了远程提交任务的环境,源码如下:
请查看StreamExecutionEnvironment 类中 createRemoteEnvironment 方法
def createRemoteEnvironment(
host: String,
port: Int,
parallelism: Int,
jarFiles: String*): StreamExecutionEnvironment = {
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
javaEnv.setParallelism(parallelism)
new StreamExecutionEnvironment(javaEnv)
}
远程提交示例代码如下:
package com.flink.remotesubmit
import org.apache.flink.streaming.api.scala._
object RemoteSubmitApp extends App {
val host: String = "node02"
val port: Int = 8081
val jarFiles = "E:\\CDHProjectDemo\\flink-demo\\target\\flink-demo-0.0.1-SNAPSHOT.jar"
val env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFiles)
val socketHost: String = "node01"
val socketPort: Int = 7777
val socketDs: DataStream[String] = env.socketTextStream(socketHost, socketPort)
socketDs.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
.print()
env.execute("Remote Submit Job")
}
注意:
- 需要保持代码和jar一致性,意思就是修改代码之后需重新执行
mvn clean package
- 需在项目的
src/main/resource
目录中添加相关配置文件(core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml
等)