读取cvs格式源文件后转成dataframe写入到mysql
/**
*
* @param adult 成人
* @param budget 预算
* @param genres 流派
* @param homepage 主页
* @param movie_id 编码
* @param Imdb_id imdb编码
* @param original_language 初始语言
* @param overview 概述
* @param popularity 声望
* @param production_companies 发行公司
* @param release_date 发布时间
* @param revenue 收入
* @param runtime 运行
* @param spoken_languages 语言
* @param tagline 标题
* @param title 题目
* @param vote_average 平均投票
* @param vote_count 投票数
*/
case class Movies_metadata(adult: String, budget: Float, genres: String, homepage: String, movie_id: Int, Imdb_id: String,
original_language: String, overview: String, popularity: String,
production_companies: String, release_date: String, revenue: Float,
runtime: Float, spoken_languages: String, tagline: String, title: String, vote_average: Float, vote_count: Float)
object DataToDB {
def main(args: Array[String]): Unit = {
//创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DataToDB").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建SparkSession
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//隐式转换
import sparkSession.implicits._
//读取配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
// val systemConfig = Map(
// "musql_url"->"jdbc:mysql://192.168.100.101:3306/db",
// "dateBase"->"",
// "es_httpHosts"->"192.168.100.103:9200",
// "es_transportHosts"->"192.168.100.103:9300",
// "es_index"->"recommender",
// "es_clusterName"->"elasticsearch"
// )
val json = new JsonParser()
//加载源数据
//解析评分源文件
val ratingRdd = sc.textFile("D:\\data\\ratings_small.csv")
val ratingDf = ratingRdd.map {
item => {
val strings = item.split(",")
Ratings(strings(0).toInt, strings(1).toInt, strings(2).toFloat, strings(3).toInt)
}
}.toDF()
val moviesRdd = sc.textFile("D:\\data\\movies_data.txt")
val moviesDf = moviesRdd.map {
item => {
val strings = item.split("___")
//输出测试
println(s"adult=${strings(0)} \t budget=${strings(1)} genres=${strings(2)} homepage=${strings(3)} m_id=${strings(4)} Imdb_id=${strings(5)} " +
s"original_language=${strings(6)} overview=${strings(7)} popularity=${strings(8)} production_companies=${strings(9)} release_date=${strings(10)}" +
s"revenue= ${strings(11)} runtime=${strings(12)} spoken_languages=${strings(13)} tagline=${strings(14)} title=${strings(15)} vote_average=${strings(16)}")
//判空填值
if (strings(1).equals("nan")) strings(1) = "0";
if (strings(4).equals("nan")) strings(4) = "0";
if (strings(11).equals("nan")) strings(11) = "0";
if (strings(12).equals("nan")) strings(12) = "0";
if (strings(16).equals("nan")) strings(16) = "0";
if (strings(17).equals("nan")) strings(17) = "0";
Movies_metadata(strings(0), strings(1).toFloat, strings(2), strings(3),
strings(4).toInt, strings(5), strings(6), strings(7), strings(8), strings(9), strings(10),
strings(11).toFloat, strings(12).toFloat, strings(13), strings(14), strings(15), strings(16).toFloat,
strings(17).toFloat)
}
}.toDF()
//保存数据到mysql
loadDataToMysql(moviesDf, ratingDf)
loadDataToMysql方法
//定义数据库表名
val MOVIE_TABLE = "movies"
val RATING_TABLE = "rating"
def loadDataToMysql(moviesDf: DataFrame, ratingsDf: DataFrame): Unit = {
//读取配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
// properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(path) , "UTF-8"))
//连接的url
val mysqlURI = properties.getProperty("musql_url")
val prop = new java.util.Properties
val user = properties.getProperty("jdbc_username")
val password = properties.getProperty("jdbc_password")
//设置连接对象
prop.setProperty("user", user)
prop.setProperty("password", password)
//获取连接信息 jdbc:mysql://localhost:3306/ry-vue?user=root&password=root&useSSL=false
// 写入数据库
moviesDf.write.mode("overwrite").jdbc(mysqlURI, MOVIE_TABLE, prop)
ratingsDf.write.mode("overwrite").jdbc(mysqlURI, RATING_TABLE, prop)
}
读数据
两种方法
//读取配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
//获取相关数据表名字
val movies_table = properties.getProperty("movies_table")
val rating_table = properties.getProperty("rating_table")
val dataMovieRating_table = properties.getProperty("dataMovieRating_table")
val movieAverageRating_table = properties.getProperty("movieAverageRating_table")
val movieTopN = properties.getProperty("movieTopN")
val mysqlURI = properties.getProperty("musql_url")
val prop = new java.util.Properties
val user = properties.getProperty("jdbc_username")
val password = properties.getProperty("jdbc_password")
//设置连接对象
//拼接连接的url
var url = s"${mysqlURI}?user=${user}&password=${password}"
prop.setProperty("user", user)
prop.setProperty("password", password)
//创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DataToDB").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建SparkSession
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//隐式转换
import sparkSession.implicits._
//加载数据 方法1
// val ratingDf = sparkSession.read.jdbc(mysqlURI,rating_table,prop)
//方法2
//注意精度问题,可将case class 改为double
val ratingDf = sparkSession.read
//url /127.0.0.1:3306/test?user=root&password=root
.option("url", url)
//表名
.option("dbtable", rating_table)
.format("jdbc")
.load()
//转换为ratings类
.as[Ratings]
.toDF()
//创建一个临时视图
ratingDf.createOrReplaceTempView("rating")
/**
* 历史的热门电影统计,统计历史的评分数据最多的电影
*/
val dateMovieDf = sparkSession.sql("select movie_id,count(movie_id) as count from rating group by movie_id")
//结果写入MySQL
storeResToMysql(dateMovieDf,dataMovieRating_table)(prop,mysqlURI)
/**
* 结果信息写到数据库
* @param dataFrame 缓存的数据集
* @param tableName 结果表名
* @param prop 数据库连接信息 user password
* @param mysqlURI mysql连接url
*/
def storeResToMysql(dataFrame:DataFrame, tableName:String)(implicit prop :Properties,mysqlURI:String):Unit={
dataFrame.write.mode("overwrite").jdbc(mysqlURI, tableName, prop)
}
错误
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2294)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2310)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2305)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
原因:数据精度问题,转换样例类时float类型精度不够,需要double
val moviesDf = sparkSession.read
.option("url", url)
.option("dbtable", movies_table)
.format("jdbc")
.load()
.as[Movies_metadata]
.toDF()
// .as[Movies_metadata] Movies_metadata的数据类型需要该、改
//case class Ratings(user_id: Int, movie_id: Int, rating: Float, rating_timestamp: Int)
case class Ratings(user_id: Int, movie_id: Int, rating: Double, rating_timestamp: Int)