抽离Common信息
SparkHelper用于获取SparkSession和SparkContext。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* @Auther: majx2
* @Date: 2019-8-2 09:41
* @Description:
*/
public class SparkHelper {
private static SparkSession session = SparkSession.builder().config(getConf()).getOrCreate();
// private static JavaSparkContext context = new JavaSparkContext(getConf());
public static JavaSparkContext getContext(){
return JavaSparkContext.fromSparkContext(session.sparkContext());
// return context;
}
public static SparkSession getSession() {
return session;
}
private static SparkConf getConf(){
final SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local[4]");
// other config
return conf;
}
}
使用SparkSQL操作Mysql数据库
Spark SQL也包括一个可以使用JDBC从其它数据库读取数据的数据源。该功能应该优于使用JdbcRDD,因为它的返回结果是一个DataFrame,而在Spark SQL中DataFrame处理简单,且和其它数据源进行关联操作。
1、连接MySQL读取数据
连接Mysql数据库,全量读取表数据。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
/**
* @Auther: majx2
* @Date: 2019-8-2 10:10
* @Description:
*/
public class SparkMyTest {
private final static String jdbcUrl = "jdbc:mysql://localhost:3306/test?autoReconnect=true&characterEncoding=UTF8&tinyInt1isBit=false&allowMultiQueries=true";
private final static String tableName = "user"; // 表名
private final static String targetTable = "user_target"; // 目标表
public static void main(String[] args) {
final SparkSession session = SparkHelper.getSession();
Dataset<Row> dataset = session.read().jdbc(jdbcUrl,tableName,getProperties());
dataset.show();
}
private static Properties getProperties() {
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root"); // 用户名
connectionProperties.put("password", "123456"); // 密码
connectionProperties.put("driver", "com.mysql.jdbc.Driver"); // 数据库驱动
return connectionProperties;
}
}
2、利用lowerBound,upperBound,numPartitions分区读取数据
简明解析:
partitionColumn:分片字段;
lowerBound:下界;
upperBound:上界;
numpartition:分区数下面例子Spark会转换为:
第一个分区:select * from tablename where test_id <122901;
第二个分区:select * from tablename where test_id >=122901 and id <245802;
第三个分区:select * from tablename where test_id >=245802 and id <368705;
第四个分区:select * from tablename where test_id >= 368705;
Dataset<Row> dataset = session.read().jdbc(jdbcUrl, tableName, "test_id", 2, 491606, 4, getProperties());
返回结果:
+-------+---------+------+---+---------+-------------------+----+-----+
|test_id|tenant_id| name|age|test_type| test_date|role|phone|
+-------+---------+------+---+---------+-------------------+----+-----+
| 2| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|
| 3| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 4| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 5| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|
| 6| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 100| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 107| 1| 三毛| 2| 1|2017-02-02 01:01:01| 1|10086|
| 108| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|
| 109| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 110| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 111| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|
| 112| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 113| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 114| 1| 三毛| 2| 1|2017-02-02 01:01:01| 1|10086|
| 115| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|
| 116| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 117| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 118| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|
| 119| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 120| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
+-------+---------+------+---+---------+-------------------+----+-----+
only showing top 20 rows
4、使用条件分区读取数据
修改session.read().jdbc入参条件。每一个条件表示一个分区,利用分布式特性加速数据导入。
Dataset<Row> dataset = session.read().jdbc(jdbcUrl, tableName, new String[]{
"tenant_id=2 AND test_id <122901 order by test_id ", // 每个条件表示一个分区
"tenant_id=2 AND test_id >=122901 and test_id <245802 order by test_id",
"tenant_id=2 AND test_id >=245802 and test_id <368705 order by test_id",
"tenant_id=2 AND test_id >= 368705 order by test_id"
}, getProperties());
返回结果:
+-------+---------+------+---+---------+-------------------+----+-----+
|test_id|tenant_id| name|age|test_type| test_date|role|phone|
+-------+---------+------+---+---------+-------------------+----+-----+
| 3| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 4| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 109| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 110| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 116| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 117| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 123| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 124| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 131| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 132| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 138| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 139| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 145| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 146| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 152| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 153| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 162| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 163| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 169| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 170| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
+-------+---------+------+---+---------+-------------------+----+-----+
only showing top 20 rows
5、做个统计
根据name字段统计出相同姓名的总数
// dataset.groupBy("name").count().show();
dataset.createOrReplaceTempView(tableName);
Dataset<Row> result = dataset.sqlContext().sql("SELECT name, COUNT(name) count FROM " + tableName + " GROUP BY name");
result.show();
返回结果:
+------+------+
| name| count|
+------+------+
| 王五| 65537|
| 小马| 65537|
| 雷锋|131072|
| 东狗| 65537|
| 三毛| 65535|
|麻花藤| 65536|
+------+------+
6、将结果入库
result.repartition(4) // 访问mysql的并发数/分区数
// SaveMode.Append:在数据源后添加;
// SaveMode.Overwrite:如果如果数据源已经存在记录,则覆盖;
// SaveMode.ErrorIfExists:如果如果数据源已经存在记录,则包异常;
// SaveMode.Ignore:如果如果数据源已经存在记录,则忽略;
.write().mode(SaveMode.Overwrite)
// 类型参考:org.apache.spark.sql.catalyst.parser.AstBuilder#visitPrimitiveDataType
.option("createTableColumnTypes", "name VARCHAR(100),count int") // 设置创建类型,多个以逗号间隔。
.option("createTableOptions", "ENGINE=INNODB DEFAULT CHARSET=utf8")
.option("truncate", "true") // 当savemode是 overwrite时,若dataframe 与原mysql 结构相同,则只truncate mysql,不会重新建表
.jdbc(jdbcUrl,targetTable,getProperties());
转换Dataset
想要得到一个不一样的DataSet结构怎么办?
private static void testMy() {
final SparkSession session = SparkHelper.getSession();
Dataset<Row> report = session.read().jdbc(SparkHelper.jdbcUrl, "f_order_report_701", SparkHelper.getProperties());
Dataset<Row> relation = session.read().jdbc(SparkHelper.jdbcUrl, "f_sales_order_relation", SparkHelper.getProperties());
relation = relation.select("settle_date", "sales_order_id", "ou_id").distinct();
Dataset<Row> result = report.join(relation,
relation.col("sales_order_id").equalTo(report.col("sales_order_id")).and(
relation.col("ou_id").equalTo(report.col("ou_id")))
, "left").drop(relation.col("ou_id")).drop(relation.col("sales_order_id"))
.selectExpr(ArrayUtil.append(FIELDS,"if(write_off_amount = settlement_amount,1,0) as write_off_status"));
JavaRDD<Row> rdd = result.javaRDD().map(p -> {
Map<String, Object> map = Maps.newHashMap();
Lists.newArrayList(ArrayUtil.append(FIELDS, "write_off_status")).forEach(f -> map.put(StrUtil.toCamelCase(f), p.getAs(f)));
Map newMap = MapUtil.sort(map);
return RowFactory.create(p.getAs("id").toString(), JSON.toJSONString(newMap));
});
Dataset<Row> dataset = SparkHelper.getSession().createDataFrame(rdd, SparkHelper.getStructType());
dataset.show(3);
log.info(dataset.head().toString());
}
返回结果:
+---+--------------------+
| id| entity|
+---+--------------------+
| 69|{"accountTime":15...|
| 54|{"applyType":3,"c...|
| 68|{"applyType":1,"c...|
+---+--------------------+
only showing top 3 rows
2019-08-06 15:40:46.367 [main] [INFO ] [c.m.e.f.d.common.spark.SparkMyTest] - [69,{"accountTime":1563379200000,"applyType":5,"cateId":"1","createTime":1563420070000,"customerCode":"C0033688","franchiser":"0","id":69,"isAdjust":0,"itemId":"31031050000042","orderAttribute":2,"ouId":701,"outerOrderId":"920190718004","outerStoreCode":"SHC44F4140","salesCenterCode":"3608380682-1","salesChannel":2,"salesOrderId":"OM19071800000003","settlementAmount":699.00,"shipedReturnTime":1563420061000,"shopId":2,"status":5,"writeOffAmount":0.00,"writeOffStatus":0}]
The End !