准备源数据集
https://github.com/drabastomek/learningPySpark
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate(
)
# Set File Paths
flightPerfFilePath = "/root/learningPySpark/Chapter03/flight-data/departuredelays.csv"
airportsFilePath = "/root/learningPySpark/Chapter03/flight-data/airport-codes-na.txt"
# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")
# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")
# Cache the Departure Delays dataset
flightPerf.cache()
通过城市和起飞代码查询航班延误的总数
query_sql = """select a.City, f.origin, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.State = 'WA'
group by a.City, f.origin
order by sum(f.delay) desc
"""
print(spark.sql(query_sql).show())
使用Apache Zeppelin
%spark.pyspark
# Set File Paths
flightPerfFilePath = "/root/learningPySpark/Chapter03/flight-data/departuredelays.csv"
airportsFilePath = "/root/learningPySpark/Chapter03/flight-data/airport-codes-na.txt"
# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")
# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")
# Cache the Departure Delays dataset
flightPerf.cache()
%spark.sql
select a.City, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.State = 'WA'
group by a.City
order by sum(f.delay) desc
分析美国大路上所有联邦州