组件版本
- spark版本 2.3.1 (hdp)
- hadoop 3.1.1 (hdp)
- HDP hive 3.1.2
- HBase 2.0.0
- mysql 版本5.x
使用Spark Structured Streaming读取kafka的数据写入hive、HBase和MySQL在spark里没有原生支持,整理实测。
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.insight.spark</groupId>
<artifactId>SparkDemo</artifactId>
<version>1.1</version>
<properties>
<encoding>UTF-8</encoding>
<spark.version>2.3.1</spark.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.sun.jersey/jersey-core -->
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.19</version>
</dependency>
<!-- Spark核心库 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jersey-client</artifactId>
<groupId>org.glassfish.jersey.core</groupId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope>-->
</dependency>
<!--Spark sql库 提供DF类API -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--HBase相关库-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.1.0-incubating</version>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<!--spark与hive交互 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.10</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
主要代码与使用方法
Usage: StructuredKafkaWordCount <bootstrap-servers> <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1] ……
程序接收多个参数:第一个是kafka的broker地址,第二个是消费的topic名称、第三个是输出类型,有4种,用 0 1 2 3 表示,第4个是checkpoint
的路径,后续更多的参数可以传递给连接mysql使用。程序的逻辑是接收kafka的消息,做wordcount处理后输出结果。
package com.insight.spark.streaming
import com.insight.spark.util.ConfigLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import java.sql
import java.sql.{DriverManager, PreparedStatement}
import java.util.UUID
object StructuredStreamingTest {
System.setProperty("HADOOP_USER_NAME","hdfs")
val conf: Configuration = HBaseConfiguration.create()
def main(args: Array[String]): Unit = {
SetLogLevel.setStreamingLogLevels()
if (args.length < 2) {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
" <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1]")
System.exit(1)
}
val Array(bootstrapServers, topics, number, _*) = args
val checkpointLocation =
if (args.length > 3) args(3) else "/tmp/temp-spark-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
/**
* Start running the query with user params:0 1 2 3
* 1:结果写入hive
* 2:结果写入hbase
* 3:结果写入mysql
* 0/other:console 结果打印到控制台
*/
val dsw = number match {
//写hive
case "1" =>
wordCounts.writeStream
.outputMode("complete")
.trigger(Trigger.ProcessingTime("10 seconds"))//批次时间
.format("com.insight.spark.streaming.HiveSinkProvider")//自定义HiveSinkProvider
.option("checkpointLocation", checkpointLocation)
.queryName("write hive")
case "2" =>
wordCounts.writeStream
.outputMode("update")
.foreach(new ForeachWriter[Row] {
var connection: Connection = _
def open(partitionId: Long, version: Long): Boolean = {
conf.set("hbase.zookeeper.quorum", ConfigLoader.getString("hbase.zookeeper.list"))
conf.set("hbase.zookeeper.property.clientPort", ConfigLoader.getString("hbase.zookeeper.port"))
conf.set("zookeeper.znode.parent", ConfigLoader.getString("zookeeper.znode.parent"))
import org.apache.hadoop.hbase.client.ConnectionFactory
connection = ConnectionFactory.createConnection(conf)
true
}
def process(record: Row): Unit = {
val tableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name")) //表名
val table = connection.getTable(tableName)
val put = new Put(Bytes.toBytes(record.mkString))
put.addColumn("info".getBytes(), Bytes.toBytes("word"), Bytes.toBytes(record(0).toString))
put.addColumn("info".getBytes(), Bytes.toBytes("count"), Bytes.toBytes(record(1).toString))
table.put(put)
}
def close(errorOrNull: Throwable): Unit = {
connection.close()
}
})
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("10 seconds"))
.queryName("write hbase")
case "3" =>
/** 建表语句,先建个spark库
* CREATE TABLE `words` (
* `id` int(11) NOT NULL AUTO_INCREMENT,
* `word` varchar(255) NOT NULL,
* `count` int(11) DEFAULT 0,
* PRIMARY KEY (`id`),
* UNIQUE KEY `word` (`word`)
* ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
*/
val (url, user, pwd) = (args(4), args(5), args(6))
wordCounts.writeStream
.outputMode("complete")
.foreach(new ForeachWriter[Row] {
var conn: sql.Connection = _
var p: PreparedStatement = _
def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, pwd)
p = conn.prepareStatement("replace into spark.words(word,count) values(?,?)")
true
}
def process(record: Row): Unit = {
p.setString(1, record(0).toString)
p.setInt(2, record(1).toString.toInt)
p.execute()
}
def close(errorOrNull: Throwable): Unit = {
p.close()
conn.close()
}
})
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("10 seconds"))
.queryName("write mysql")
case _ =>
wordCounts.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("checkpointLocation", checkpointLocation)
.queryName("print it")
}
dsw.start().awaitTermination()
}
}
HiveSinkProvider源码
其中用到的HiveSinkProvider代码如下:
package com.insight.spark.streaming
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.slf4j.LoggerFactory
case class HiveSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val logger = LoggerFactory.getLogger(this.getClass)
val schema = StructType(Array(
StructField("word", StringType),
StructField("count", IntegerType)
))
val res = data.queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
// 转化df格式
val df = data.sparkSession.createDataFrame(res, schema)
df.write.mode(SaveMode.Append).format("hive").saveAsTable("words")
}
}
class HiveSinkProvider extends StreamSinkProvider with DataSourceRegister {
override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
HiveSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "HiveSinkProvider"
}
打包运行,spark-submit --xxx this.jar ...就可以了。
点:结构化流、Spark Structured Streaming、hive、hbase、mysql
线:spark
面:内存计算