Spark-RDD操作MySQL

Spark支持通过Java JDBC访问关系型数据库,需要通过JdbcRDD进行访问,示例如下:

  1. 添加依赖
// 在build.sbt中添加依赖
libraryDependencies ++= Seq (
  "mysql" % "mysql-connector-java" % "5.1.47"
)
  1. MySQL读取
import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object MySQLDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MySQLDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://172.16.0.31:3306/db_canal_test"
    val username = "root"
    val password = "123456"

    val sql = "select name, age from tbl_person_info where id >= ? and id <= ?"

    val jdbcRdd = new JdbcRDD(sc,
      () => DriverManager.getConnection(url, username, password),
      sql, 1, 2, 2,
      (res) => {
        println(res.getString(1) + ", " + res.getInt(2))})

    jdbcRdd.collect()
    sc.stop()

  }
}

结果:

john, 18

lucy, 20

JdbcRDD的构造方法中有7个参数,他们分别是:

  1. sc: SparkContext - 当前应用的SparkContext对象
  2. getConnection: () => Connection - 获取Jdbc链接对象的方法
  3. sql: String - 请求的sql
  4. lowerBound: Long - 数据下边界
  5. upperBound: Long - 数据上边届
  6. numPartitions: Int - 分区数量
  7. mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _ - 用来处理结果的方法

其中,5、6两个参数值会通过sql中的?占位符传递进去,如果sql中没有占位符,就会抛出java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0)异常

  1. MySQL写入
import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object MySQLWriteDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MySQLWriteDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://172.16.0.31:3306/db_canal_test"
    val username = "root"
    val password = "123456"

    val dataRdd = sc.parallelize(List(("steve", 30), ("elly", 21), ("sam", 13)))

    dataRdd.foreach {
      case (name, age) => {
        val conn = DriverManager.getConnection(url, username, password)
        val sql = "insert into tbl_person_info (name, age) values (?, ?)"
        val statement = conn.prepareStatement(sql)
        try {
          statement.setString(1, name)
          statement.setInt(2, age)
          statement.executeUpdate()
        } finally {
          statement.close()
          conn.close()
        }
      }
    }

    sc.stop()

  }
}

通过sql查询结果:

1 john 18
2 lucy 20
3 elly 21
4 sam 13
5 steve 30

对于上述代码,写入MySQL的功能已经实现,但是由于与MySQL的链接是在foreach中创建的,那就意味着RDD中有多少元素就会进行多少次的链接创建,当数据量增大后,这种建立连接的开销是巨大的,于是我们将建立连接提前:

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object MySQLWriteDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MySQLWriteDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://172.16.0.31:3306/db_canal_test"
    val username = "root"
    val password = "123456"

    val dataRdd = sc.parallelize(List(("steve", 30), ("elly", 21), ("sam", 13)))

    val conn = DriverManager.getConnection(url, username, password)

    try {
      dataRdd.foreach {
        case (name, age) => {
          val sql = "insert into tbl_person_info (name, age) values (?, ?)"
          val statement = conn.prepareStatement(sql)
          try {
            statement.setString(1, name)
            statement.setInt(2, age)
            statement.executeUpdate()
          } finally {
            statement.close()
          }
        }
      }
    } finally {
      conn.close()
    }

    sc.stop()

  }
}

我们再次运行,此时抛出异常:org.apache.spark.SparkException: Task not serializable

由于建立连接相关对象不能序列化,导致序列化异常,于是我们改进如下:

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object MySQLWriteDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MySQLWriteDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://172.16.0.31:3306/db_canal_test"
    val username = "root"
    val password = "123456"

    val dataRdd = sc.parallelize(List(("steve", 30), ("elly", 21), ("sam", 13)))

    dataRdd.foreachPartition(iter => {
      val conn = DriverManager.getConnection(url, username, password)
      try {
        iter.foreach {
          case (name, age) => {
            val sql = "insert into tbl_person_info (name, age) values (?, ?)"
            val statement = conn.prepareStatement(sql)
            try {
              statement.setString(1, name)
              statement.setInt(2, age)
              statement.executeUpdate()
            } finally {
              statement.close()
            }
          }
        }
      } finally {
        conn.close()
      }
    })

    sc.stop()

  }
}

我们将之前插入的三条数据删除后,再次执行,之后查询结果:

1 john 18
2 lucy 20
6 sam 13
7 steve 30
8 elly 21

通过foreachPartition的方式遍历分区,只需每个分区建立一个连接即可,大大减少了连接的数量。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348