package com.everdata.spark;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.reflect.ClassTag;
public class AppearOne {
//private static CanReportRequireConfigBean configBean ;
private static Long lastTimeMiles = new Date().getTime() - 10 * 60 * 1000;
private static String[] global_args ;
public static void main(String[] args) {
//读取json配置
//generateConfig(args[0]);
global_args = args;
/*if(StringUtils.isNotEmpty(args[1])) {
lastTimeMiles = DateUtil.parseDate(args[0].trim(), DateUtil.COMPACT_MINUTE_FORMAT).getTime();
System.out.println(lastTimeMiles +":"+ args[0]);
}*/
SparkSession spark = SparkSession
.builder()
.appName("AppearOne")
.master("local[1]")
.config("spark.sql.parquet.binaryAsString", "true")
.getOrCreate();
System.out.println("=========="+args[0]);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
Broadcast<String> s=spark.sparkContext().broadcast(args[0], tag);
Dataset<Row> parquetFileDF = spark.read().parquet("d://xxx.parquet");
final Dataset<String> baseDS=parquetFileDF.map(new MapFunction<Row,String>(){
private static final long serialVersionUID = 1L;
@Override
public String call(Row value) throws Exception {
return value.getLong(0)+"";
}
},Encoders.STRING());
System.out.println("==================="+baseDS.count());
baseDS.javaRDD().foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String line) throws Exception {
System.out.println(line);
System.out.println(s.getValue());
}
});
}
}
spark广播变量
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合! 前言:Spark是集群部署的,具有很多节点,节点之间的...
- 一.广播变量和累加器的作用累加器(集群规模之间的大变量):做Spark的全局统计使用广播变量(集群规模间的大常量)...
- Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去。这样的场景很多,比如...