SparkSql读写dataframe到MySQL数据库

读取cvs格式源文件后转成dataframe写入到mysql

/**
*
* @param adult                成人
* @param budget               预算
* @param genres               流派
* @param homepage             主页
* @param movie_id             编码
* @param Imdb_id              imdb编码
* @param original_language    初始语言
* @param overview             概述
* @param popularity           声望
* @param production_companies 发行公司
* @param release_date         发布时间
* @param revenue              收入
* @param runtime              运行
* @param spoken_languages     语言
* @param tagline              标题
* @param title                题目
* @param vote_average         平均投票
* @param vote_count           投票数
*/
case class Movies_metadata(adult: String, budget: Float, genres: String, homepage: String, movie_id: Int, Imdb_id: String,
                         original_language: String, overview: String, popularity: String,
                         production_companies: String, release_date: String, revenue: Float,
                         runtime: Float, spoken_languages: String, tagline: String, title: String, vote_average: Float, vote_count: Float)

object DataToDB {

  def main(args: Array[String]): Unit = {

    //创建SparkConf

    val conf: SparkConf = new SparkConf().setAppName("DataToDB").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //创建SparkSession
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    //隐式转换
    import sparkSession.implicits._

    //读取配置文件
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))

    //    val systemConfig = Map(
    //      "musql_url"->"jdbc:mysql://192.168.100.101:3306/db",
    //      "dateBase"->"",
    //      "es_httpHosts"->"192.168.100.103:9200",
    //      "es_transportHosts"->"192.168.100.103:9300",
    //      "es_index"->"recommender",
    //      "es_clusterName"->"elasticsearch"
    //    )
    val json = new JsonParser()


    //加载源数据 

    //解析评分源文件
    val ratingRdd = sc.textFile("D:\\data\\ratings_small.csv")
    val ratingDf = ratingRdd.map {
      item => {
        val strings = item.split(",")
        Ratings(strings(0).toInt, strings(1).toInt, strings(2).toFloat, strings(3).toInt)
      }
    }.toDF()

    val moviesRdd = sc.textFile("D:\\data\\movies_data.txt")
    val moviesDf = moviesRdd.map {
      item => {
        val strings = item.split("___")

        //输出测试
        println(s"adult=${strings(0)} \t budget=${strings(1)}    genres=${strings(2)}   homepage=${strings(3)}     m_id=${strings(4)}     Imdb_id=${strings(5)}     " +
          s"original_language=${strings(6)}   overview=${strings(7)}    popularity=${strings(8)}   production_companies=${strings(9)}      release_date=${strings(10)}" +
          s"revenue= ${strings(11)}    runtime=${strings(12)}    spoken_languages=${strings(13)}      tagline=${strings(14)}   title=${strings(15)}      vote_average=${strings(16)}")
        //判空填值
        if (strings(1).equals("nan")) strings(1) = "0";
        if (strings(4).equals("nan")) strings(4) = "0";
        if (strings(11).equals("nan")) strings(11) = "0";
        if (strings(12).equals("nan")) strings(12) = "0";
        if (strings(16).equals("nan")) strings(16) = "0";
        if (strings(17).equals("nan")) strings(17) = "0";
        Movies_metadata(strings(0), strings(1).toFloat, strings(2), strings(3),
          strings(4).toInt, strings(5), strings(6), strings(7), strings(8), strings(9), strings(10),
          strings(11).toFloat, strings(12).toFloat, strings(13), strings(14), strings(15), strings(16).toFloat,
          strings(17).toFloat)
      }
    }.toDF()

    //保存数据到mysql
    loadDataToMysql(moviesDf, ratingDf)

loadDataToMysql方法


 //定义数据库表名
 val MOVIE_TABLE = "movies"
 val RATING_TABLE = "rating"


def loadDataToMysql(moviesDf: DataFrame, ratingsDf: DataFrame): Unit = {

   //读取配置文件
   val properties = new Properties()
   val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
   properties.load(new FileInputStream(path))
   //    properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(path) , "UTF-8"))

   //连接的url
   val mysqlURI = properties.getProperty("musql_url")

   val prop = new java.util.Properties
   val user = properties.getProperty("jdbc_username")
   val password = properties.getProperty("jdbc_password")
   //设置连接对象
   prop.setProperty("user", user)
   prop.setProperty("password", password)
   //获取连接信息  jdbc:mysql://localhost:3306/ry-vue?user=root&password=root&useSSL=false

   // 写入数据库
   moviesDf.write.mode("overwrite").jdbc(mysqlURI, MOVIE_TABLE, prop)

   ratingsDf.write.mode("overwrite").jdbc(mysqlURI, RATING_TABLE, prop)

 }

读数据

两种方法

 //读取配置文件
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))

    //获取相关数据表名字
    val movies_table = properties.getProperty("movies_table")
    val rating_table = properties.getProperty("rating_table")
    val dataMovieRating_table = properties.getProperty("dataMovieRating_table")
    val movieAverageRating_table = properties.getProperty("movieAverageRating_table")
    val movieTopN = properties.getProperty("movieTopN")

    val mysqlURI = properties.getProperty("musql_url")

    val prop = new java.util.Properties
    val user = properties.getProperty("jdbc_username")
    val password = properties.getProperty("jdbc_password")
    //设置连接对象
    //拼接连接的url
    var url = s"${mysqlURI}?user=${user}&password=${password}"

    prop.setProperty("user", user)
    prop.setProperty("password", password)

    //创建SparkConf

    val conf: SparkConf = new SparkConf().setAppName("DataToDB").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //创建SparkSession
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    //隐式转换
    import sparkSession.implicits._

    //加载数据 方法1
//    val ratingDf = sparkSession.read.jdbc(mysqlURI,rating_table,prop)

    //方法2
    //注意精度问题,可将case class 改为double
    val ratingDf = sparkSession.read
      //url /127.0.0.1:3306/test?user=root&password=root
      .option("url", url)
      //表名
      .option("dbtable", rating_table)
      .format("jdbc")
      .load()
      //转换为ratings类
      .as[Ratings]
      .toDF()

    //创建一个临时视图
    ratingDf.createOrReplaceTempView("rating")

    /**
     * 历史的热门电影统计,统计历史的评分数据最多的电影
     */
    val dateMovieDf = sparkSession.sql("select movie_id,count(movie_id) as count from rating group by movie_id")
    //结果写入MySQL
    storeResToMysql(dateMovieDf,dataMovieRating_table)(prop,mysqlURI)



  /**
   * 结果信息写到数据库
   * @param dataFrame   缓存的数据集
   * @param tableName   结果表名
   * @param prop        数据库连接信息 user  password
   * @param mysqlURI     mysql连接url
   */
  def storeResToMysql(dataFrame:DataFrame, tableName:String)(implicit prop :Properties,mysqlURI:String):Unit={
    dataFrame.write.mode("overwrite").jdbc(mysqlURI, tableName, prop)
  }

错误

You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2294)
   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2310)
   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2305)
   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

原因:数据精度问题,转换样例类时float类型精度不够,需要double

val moviesDf = sparkSession.read
      .option("url", url)
      .option("dbtable", movies_table)
      .format("jdbc")
      .load()
      .as[Movies_metadata]
      .toDF()

//    .as[Movies_metadata]   Movies_metadata的数据类型需要该、改


//case class Ratings(user_id: Int, movie_id: Int, rating: Float, rating_timestamp: Int)
  case class Ratings(user_id: Int, movie_id: Int, rating: Double, rating_timestamp: Int)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容