pyspark.sql module
Module context
Spark SQL和DataFrames中的重要类:
- pyspark.sql.SparkSession - DataFrame和SQL功能的主要入口点。
- pyspark.sql.DataFrame - 分布式数据集合分组到命名的列。
- pyspark.sql.Column - DataFrame中的列表达式。
- pyspark.sql.Row - DataFrame中的一行数据。
- pyspark.sql.GroupedData - 由DataFrame.groupBy()返回的聚合方法。
- pyspark.sql.DataFrameNaFunctions - 处理缺失数据的方法(空值)。
- pyspark.sql.DataFrameStatFunctions - 统计功能的方法。
- pyspark.sql.functions - 可用于DataFrame的内置函数列表。
- pyspark.sql.types - 可用的数据类型列表。
- pyspark.sql.Window - 用于处理窗口函数。
class pyspark.sql.SparkSession(sparkContext, jsparkSession=None)
使用Dataset和DataFrame API编程Spark的入口点。
SparkSession可用于创建DataFrame,将DataFrame注册为表格,在表格上执行SQL,缓存表格以及读取parquet文件。 要创建SparkSession,请使用以下构建器模式:
spark = SparkSession.builder.master('spark://cn01:7077').appName("Word Count").getOrCreate()
class Builder
SparkSession的生成器。
- appName(name)
为应用程序设置一个名称,该名称将显示在Spark Web UI中。
如果没有设置应用程序名称,则会随机生成名称。 - config(key=None, value=None, conf=None)
设置一个配置选项。 使用此方法设置的选项会自动传播到SparkConf和SparkSession自己的配置中。
对于现有的SparkConf,请使用conf参数。
>>> from pyspark import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
<pyspark.sql.session.Builder object at 0x2ab7d2ab7650>
- enableHiveSupport()
启用Hive支持,包括连接到持久化的Hive Metastore,支持Hive serdes和Hive用户定义的功能。 - getOrCreate()
获取现有的SparkSession,或者,如果没有现有的SparkSession,则根据此构建器中设置的选项创建一个新的SparkSession。
此方法首先检查是否存在有效的全局默认SparkSession,如果是,则返回该值。 如果不存在有效的全局默认SparkSession,则该方法创建一个新的SparkSession,并将新创建的SparkSession指定为全局默认值。 - master(master)
设置要连接到的Spark master URL,例如本地运行的“local”,本地运行4核的“local [4]”或运行在Spark独立群集上的“spark:// master:7077”。
SparkSession.builder = <pyspark.sql.session.Builder object at 0x7f51f134a110>
SparkSession.catalog
用户可以通过它创建,删除,修改或查询底层数据库,表格,函数等的接口SparkSession.conf
Spark的运行时配置接口。
这是用户可以获取并设置与Spark SQL相关的所有Spark和Hadoop配置的接口。 获取配置的值时,默认为基础SparkContext中设置的值(如果有)。SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
从RDD,列表或pandas.DataFrame创建一个DataFrame。
当schema是列名称的列表时,每列的类型将从数据中推断出来。
当schema为None时,它将尝试从数据中推断出schema(列名和类型),数据应该是Row的RDD,或者是namedtuple或者dict。
当schema是pyspark.sql.types.DataType或数据类型字符串时,它必须匹配真实数据,否则将在运行时引发异常。 如果给定的schema不是pyspark.sql.types.StructType,它将被封装成一个pyspark.sql.types.StructType作为唯一的字段,字段名称将是“值”,每个记录也将被包装成一个 元组,可以稍后转换为行。
如果需要(schema)模式推断,则使用samplingRatio来确定用于模式推断的行的比例。 如果samplingRatio为None,则使用第一行。
Parameters:
- data - 任何类型的SQL数据表示d的RDD(例如行,元组,int,布尔等)或列表或pandas.DataFrame。
- schema - 一个pyspark.sql.types.DataType或一个数据类型字符串或列名称列表,默认值为None。 数据类型字符串格式等于pyspark.sql.types.DataType.simpleString,除了顶层结构类型可以省略struct <>,原子类型使用typeName()作为它们的格式。 使用字节而不是tinyint为pyspark.sql.types.ByteType。 我们也可以使用int作为IntegerType的简称。
- samplingRatio - 用于推断行的样本比例。
- verifySchema - 根据模式验证每一行的数据类型。
Returns: DataFrame
# list to DataFrame
>>> l = [("name", 1), ("Bob", 2)]
>>> spark.createDataFrame(l, ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> d = [{'name': 'Alice', 'age': 1}, {"name": "Bob", "age": 2}]
>>> spark.createDataFrame(d, ["name", "age"]).collect()
[Row(name=1, age=u'Alice'), Row(name=2, age=u'Bob')]
#RDD to DataFrame
>>> spark.createDataFrame(sc.parallelize(l), ["name", "age"]).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
#pandas.DataFrame to DataFrame
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(0=1, 1=2)]
>>> from pyspark.sql.types import *
>>> schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)])
>>> spark.createDataFrame(l, schema).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> spark.createDataFrame(l, "name: string, age: int").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> spark.createDataFrame(l, Person).collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
- SparkSession.newSession()
以新会话形式返回一个新的SparkSession,它具有单独的SQLConf,注册的临时视图和UDF,但共享SparkContext和表缓存。 - SparkSession.range(start, end=None, step=1, numPartitions=None)
>>> spark.range(1,7,2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
- SparkSession.read
返回可用于读取DataFrame中的数据的DataFrameReader。 - SparkSession.readStream
返回一个DataStreamReader,它可以用来读取数据流作为一个数据流DataFrame。 - SparkSession.sparkContext
返回底层的SparkContext。 - SparkSession.sql(sqlQuery)
返回表示给定查询结果的DataFrame。
>>> l
[('name', 1), ('Bob', 2)]
>>> df = spark.createDataFrame(l, ["name", "age"])
#使用dataFrame(df)创建或替换本地临时视图。
>>> df.createOrReplaceTempView("table1")
>>> spark.sql("select * from table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
- SparkSession.stop()
停止底层的SparkContext。 - SparkSession.streams
返回一个StreamingQueryManager,它允许管理所有的StreamingQuery ,在此上下文中激活的StreamingQueries。 - SparkSession.table(tableName)
以DataFrame的形式返回指定的表。
>>> spark.table("table1").collect()
[Row(name=u'name', age=1), Row(name=u'Bob', age=2)]
- SparkSession.udf
返回UDF注册的UDFRegistration。 - SparkSession.version
运行此应用程序的Spark版本。
class pyspark.sql.SQLContext(sparkContext, sparkSession=None, jsqlContext=None)
在Spark 1.x中使用Spark中结构化数据(行和列)的入口点。
从Spark 2.0开始,它被SparkSession所取代。 但是,为了向后兼容,我们在这里保留这个类。
可以使用SQLContext创建DataFrame,将DataFrame注册为表,在表上执行SQL,缓存表和读取parquet文件。
- cacheTable(tableName)
在内存中缓存指定的表。 - clearCache()
从内存缓存中删除所有缓存的表。 - createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
- createExternalTable(tableName, path=None, source=None, schema=None, **options)
根据数据源中的数据集创建外部表。 - dropTempTable(tableName)
从目录中删除临时表。 - getConf(key, defaultValue=None)
返回Spark SQL配置属性中给定键的值。 - classmethod getOrCreate(sc)
获取现有的SQLContext或使用给定的SparkContext创建一个新的SQLContext。 - newSession()
将新的SQLContext作为新会话返回,它具有单独的SQLConf,注册的临时视图和UDF,但是共享SparkContext和表缓存。 - range(start, end=None, step=1, numPartitions=None)
- read
- readStream
- registerDataFrameAsTable(df, tableName)
将给定的DataFrame注册为目录中的临时表。
临时表仅在此SQLContext实例的生命周期中存在。 - registerFunction(name, f, returnType=StringType)
将一个python函数(包括lambda函数)注册为UDF(自定义函数),以便在SQL语句中使用。
除了名称和函数本身之外,还可以指定返回类型。 当返回类型没有给出它默认为一个字符串和转换将自动完成。 对于任何其他返回类型,生成的对象必须匹配指定的类型。
Parameters:
- name - udf的名字
- f - python 函数
- returnType - 一个pyspark.sql.types.DataType对象
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(stringLengthInt(test)=4)]
- registerJavaFunction(name, javaClassName, returnType=None)
注册一个Java UDF,以便在SQL语句中使用它。 - setConf(key, value)
设置给定的Spark SQL配置属性。 - sql(sqlQuery)
- streams
- table(tableName)
- tableNames(dbName=None)
返回数据库dbName中表的名称列表。 - tables(dbName=None)
返回包含给定数据库中表的名称的DataFrame。
如果未指定dbName,则将使用当前数据库。 - udf
- uncacheTable(tableName)
从内存缓存中删除指定的表。
class pyspark.sql.HiveContext(sparkContext, jhiveContext=None)
Spark SQL的一个变体,与存储在Hive中的数据整合在一起。
Hive配置是从classpath的hive-site.xml中读取的。 它支持同时运行SQL和HiveQL命令。
- refreshTable(tableName)
class pyspark.sql.UDFRegistration(sqlContext)
用户自定义函数注册的包装器。
- register(name, f, returnType=StringType)
将一个python函数(包括lambda函数)注册为UDF,以便在SQL语句中使用。
class pyspark.sql.DataFrame(jdf, sql_ctx)
分布式数据集合分组到命名的列。
DataFrame相当于Spark SQL中的关系表,可以使用SQLContext中的各种函数创建:
创建后,可以使用DataFrame,Column中定义的各种domain-specific-language(DSL)函数对其进行操作。
- agg(*exprs)
在没有组的情况下汇总整个DataFrame(df.groupBy.agg()的简写)。
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=2)]
>>> from pyspark.sql import functions as f
>>> df.agg(f.min(df.age)).collect()
[Row(min(age)=1)]
- alias(alias)
返回一个带有别名集的新DataFrame。 - approxQuantile(col, probabilities, relativeError)
计算DataFrame的数值列的近似分位数。 - cache()
使用默认存储级别(MEMORY_AND_DISK)存储DataFrame。 - checkpoint(eager=True)
返回此数据集的检查点版本。 检查点可用于截断此DataFrame的逻辑计划,这在计划可能呈指数增长的迭代算法中特别有用。 它将被保存到使用SparkContext.setCheckpointDir()设置的检查点目录内的文件中。 - coalesce(numPartitions)
返回具有完全numPartitions分区的新DataFrame。 - collect()
- columns
以列表形式返回所有列名称。
>>> df.columns
['name', 'age']
- corr(col1, col2, method=None)
以双精度值计算DataFrame的两列的相关性。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的别名。
Parameters:
- col1 - 第一列的名称
- col2 - 第二列的名称
- method - 相关方法。 目前只支持“皮尔森”
- count()
返回此DataFrame中的行数。
>>> df.count()
2
- cov(col1, col2)
计算给定列的样本协方差(由它们的名称指定)作为双精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是别名。 - createGlobalTempView(name)
使用此DataFrame创建全局临时视图。
这个临时视图的生命周期与这个Spark应用程序有关。 如果视图名称已经存在于目录中,则抛出TempTableAlreadyExistsException。 - createOrReplaceGlobalTempView(name)
使用给定名称创建或替换全局临时视图。
这个临时视图的生命周期与这个Spark应用程序有关。 - createOrReplaceTempView(name)
使用此DataFrame创建或替换本地临时视图。
此临时表的生命周期与用于创建此DataFrame的SparkSession绑定。 - createTempView(name)
使用此DataFrame创建本地临时视图。
此临时表的生命周期与用于创建此DataFrame的SparkSession绑定。如果视图名称已经存在于目录中,抛出TempTableAlreadyExistsException。 - crossJoin(other)
用另一个DataFrame相互作用返回笛卡尔积。
>>> df1 = spark.createDataFrame([("Alice", 1), ("Bob", 5)], ["name", "age"])
>>> df2 = spark.createDataFrame([("Alice", 66), ("Bob", 88)], ["name", "height"])
>>> df1.select(["name", "age"]).collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> df2.select(["name", "height"]).collect()
[Row(name=u'Alice', height=66), Row(name=u'Bob', height=88)]
>>> df1.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=1, name=u'Alice', height=66), Row(age=1, name=u'Alice', height=88), Row(age=5, name=u'Bob', height=66), Row(age=5, name=u'Bob', height=88)]
- crosstab(col1, col2)
- cube(*cols)
使用指定的列为当前的DataFrame创建一个多维数据集,所以我们可以对它们进行聚合。
>>> df.cube("name", df.age).count().orderBy("name", "age").show()
+----+----+-----+
|name| age|count|
+----+----+-----+
|null|null| 2|
|null| 1| 1|
|null| 2| 1|
| Bob|null| 1|
| Bob| 2| 1|
|name|null| 1|
|name| 1| 1|
+----+----+-----+
- describe(*cols)
计算数字和字符串列的统计信息。
这包括count,mean,stddev,min和max。 如果未给出具体的列名,则此函数计算所有数字或字符串列的统计信息。
>>> df.describe(["age"]).show()
+-------+------------------+
|summary| age|
+-------+------------------+
| count| 2|
| mean| 1.5|
| stddev|0.7071067811865476|
| min| 1|
| max| 2|
+-------+------------------+
- distinct()
返回包含此DataFrame中不相同行的新DataFrame。(去除相同的行) - drop(*cols)
返回删除指定列的新DataFrame。 如果模式不包含给定的列名,这是一个无意义操作。 - dropDuplicates(subset=None)
返回删除重复行的新DataFrame,可选地仅考虑某些列。
>>> df3 = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
>>> df3.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
- drop_duplicates(subset=None)
dropDuplicates()的别名。 - dropna(how='any', thresh=None, subset=None)
返回一个新的DataFrame,省略含有空值的行。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。
Parameters:
- how - “any”或“all”。 如果“any”,如果它包含任何空值,则删除一行。 如果'all',只有当所有的值都为null时才删除一行。
- thresh -
- subset -
- dtypes
以列表形式返回所有列名称及其数据类型。
>>> df3.dtypes
[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]
- explain(extended=False)
打印(逻辑和物理)计划到控制台进行调试。
Parameters:
- extended - 布尔值,默认为False。 如果为False,则仅打印物理计划。
>>> df3.explain()
== Physical Plan ==
Scan ExistingRDD[age#277L,height#278L,name#279]
- fillna(value, subset=None)
替换空值,na.fill()的别名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。 - filter(condition)
使用给定的条件过滤行。
where()是filter()的别名。
>>> df.filter(df.age > 1).collect()
[Row(name=u'Bob', age=2)]
>>> df.filter("age > 1").collect()
[Row(name=u'Bob', age=2)]
- first()
将第一行作为Row返回。
>>> df.first()
Row(name=u'name', age=1)
- foreach(f)
将f函数应用于此DataFrame的所有行。
这是df.rdd.foreach()的简写。 - foreachPartition(f)
将f函数应用于此DataFrame的每个分区。
这是df.rdd.foreachPartition()的简写。 - freqItems(cols, support=None)
找到列的频繁项,可能有误报。 - groupBy(*cols)
使用指定的列对DataFrame进行分组,所以我们可以对它们进行聚合。 有关所有可用的聚合函数,请参阅GroupedData。groupby()是groupBy()的别名。
>>> df.groupBy("name").agg({"age":"mean"}).collect()
[Row(name=u'name', avg(age)=1.0), Row(name=u'Bob', avg(age)=2.0)]
>>> df.groupBy(["name",df.age]).count().collect()
[Row(name=u'Bob', age=2, count=1), Row(name=u'name', age=1, count=1)]
- groupby(*cols)
- head(n=None)
返回前n行。 - hint(name, *parameters)
在当前的DataFrame上指定一些提示。 - intersect(other)
仅返回包含此frame和另一frame中的行的新DataFrame。(两者的交集) - isLocal()
如果collect()和take()方法可以在本地运行(没有任何Spark执行器),则返回True。 - isStreaming
如果此Dataset包含一个或多个在到达时连续返回数据的源,则返回true。 从流源读取数据的数据集必须使用DataStreamWriter中的start()方法作为StreamingQuery执行。 返回单个答案的方法(例如,count()或collect())将在存在流式源时引发AnalysisException。 - join(other, on=None, how=None)
使用给定的连接表达式与另一个DataFrame进行连接。
Parameters:
- other -
- on - 连接列名称的字符串,列名称列表,连接表达式(列)或列的列表。 如果on是一个字符串或者一个表示连接列名的字符串列表,那么这个列必须存在于两边,并且执行一个等连接。
- how - str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
- limit(num)
将结果计数限制为指定的数字。 - na
返回一个DataFrameNaFunctions来处理缺失的值。 - orderBy(*cols, **kwargs)
返回按指定列排序的新DataFrame。
>>> df1.orderBy(["name","age"],ascending=[0,1]).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=1)]
- persist(storageLevel=StorageLevel(True, True, False, False, 1))
- printSchema()
以树形结构打印schema。
>>> df1.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
- randomSplit(weights, seed=None)
用提供的权重随机分割这个DataFrame。 - rdd
将内容作为行的pyspark.RDD返回。 - registerTempTable(name)
使用给定名称将此RDD注册为临时表。
此临时表的生命周期与用于创建此DataFrame的SQLContext相关联。
>>> df1.registerTempTable("people")
>>> spark.sql("select * from people").collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=5)]
>>> spark.catalog.dropTampView("people")
- repartition(numPartitions, *cols)
返回由给定分区表达式分区的新DataFrame。 生成的DataFrame是hash分区的。
numPartitions可以是一个int来指定目标分区数量或一个Column。 如果它是一个列,它将被用作第一个分区列。 如果未指定,则使用默认的分区数量。 - replace(to_replace, value=None, subset=None)
返回一个新的DataFrame,用另一个值替换一个值。 DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的别名。
>>> df1.replace(["Alice", "Bob"], ["A", "B"]).show()
+----+---+
|name|age|
+----+---+
| A| 1|
| B| 5|
+----+---+
- rollup(*cols)
使用指定的列为当前的DataFrame创建一个多维汇总,所以我们可以在它上运行聚合函数。 - sample(withReplacement, fraction, seed=None)
- sampleBy(col, fractions, seed=None)
- schema
以pyspark.sql.types.StructType的形式返回此DataFrame的schema。
>>> df1.schema
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
- select(*cols)
投影一组表达式并返回一个新的DataFrame。
>>> df.select(df.name, (df.age + 10).alias("height")).show()
+----+------+
|name|height|
+----+------+
|name| 11|
| Bob| 12|
+----+------+
- selectExpr(*expr)
这是接受SQL表达式的select()的变体。 - show(n=20, truncate=True)
将前n行打印到控制台。 - sort(*cols, **kwargs)
返回按指定列排序的新DataFrame。 - sortWithinPartitions(*cols, **kwargs)
返回一个新的DataFrame,每个分区按指定的列排序。 - stat
为统计函数返回一个DataFrameStatFunctions。 - storageLevel
获取DataFrame的当前存储级别。
>>> df1.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df1.cache().storageLevel
StorageLevel(True, True, False, True, 1)
- subtract(other)
返回一个新的DataFrame,它包含这个frame中的行,但不包含在另一个frame中。 - take(num)
- toDF(*cols)
返回一个新的类:带有新指定列名的DataFrame。 - toJSON(use_unicode=True)
将DataFrame转换为字符串的RDD。
每行都被转换成一个JSON文档作为返回的RDD中的一个元素。
>>> df1.toJSON().collect()
[u'{"name":"Alice","age":1}', u'{"name":"Bob","age":5}']
- toLocalIterator()
返回包含此DataFrame中所有行的迭代器。 迭代器将占用与此DataFrame中最大分区一样多的内存。 - toPandas()
以Pandas中的pandas.DataFrame的形式返回此DataFrame的内容。 - union(other)
在这个和另一个frame中返回一个包含行联合的新DataFrame。 - unpersist(blocking=False)
将DataFrame标记为非持久性,并从内存和磁盘中删除所有的块。 - where(condition)
与filter()相同。 - withColumn(colName, col)
通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
>>> df1.withColumn("height", df1.age + 50).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 1| 51|
| Bob| 5| 55|
+-----+---+------+
- withColumnRenamed(existing, new)
通过重命名现有列来返回新的DataFrame。 如果模式不包含给定的列名,则这是一个无意义操作。 - withWatermark(eventTime, delayThreshold)
为此DataFrame定义事件时间水印。 一个水印跟踪一个时间点,在这个时间点之前,我们假设没有更晚的数据将要到达。 - write
用于将非流式DataFrame的内容保存到外部存储器的接口。 - writeStream
用于将流式DataFrame的内容保存到外部存储的接口。
class pyspark.sql.GroupedData(jgd, sql_ctx)
由DataFrame.groupBy()创建的DataFrame上的一组聚合方法。
- Note: 实验阶段
- agg(*exprs)
计算聚合并将结果作为DataFrame返回。
可用的集合函数是avg,max,min,sum,count。
如果exprs是从字符串到字符串的单个字典映射,则key是要执行聚合的列名,并且该value是聚合函数名。
或者,exprs也可以是聚合列表达式的列表。 - avg(*cols)
计算每个组的每个数字列的平均值。
mean()是avg()的别名。 - count()
统计每个组的记录数。 - max(*cols)
计算每个组的每个数字列的最大值。 - mean(*cols)
计算每个组的每个数字列的平均值。 - min(*cols)
计算每个组的每个数字列的最小值。 - pivot(pivot_col, values=None)
旋转当前[[DataFrame]]的列并执行指定的聚合。 有两个版本的透视函数:一个需要调用者指定不同值的列表以进行透视,另一个不支持。 后者更简洁但效率更低,因为Spark需要首先在内部计算不同值的列表。
Parameters:
- pivot_col - 要转移的列的名称。
- values - 将被转换为输出DataFrame中的列的值的列表。
- sum(*cols)
计算每个组的每个数字列的总和。
class pyspark.sql.Column(jc)
DataFrame中的一列。
- alias(*alias, **kwargs)
使用新名称返回此列的别名。 - asc()
基于给定列名称的升序返回一个排序表达式。 - astype(dataType)
astype()是cast()的别名。 - between(lowerBound, upperBound)
一个布尔表达式,如果此表达式的值位于给定列之间,则该表达式的值为true。
>>> df1.select(d1.name, df1.age.between(2, 4)).show()
+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice| false|
| Bob| false|
+-----+---------------------------+
5 .bitwiseAND(other)
二元运算符
- bitwiseOR(other)
二元运算符 - bitwiseXOR(other)
二元运算符 - cast(dataType)
将列转换为dataType类型。(转换某列的类型)
>>> df.select(df.name, df.age.cast("string").alias("ages")).collect()
[Row(name=u'Alice', ages=u'1'), Row(name=u'Bob', ages=u'5')]
- contains(other)
二元运算符 - desc()
基于给定列名称的降序返回一个排序表达式。 - endswith(other)
根据匹配的字符串结尾返回一个布尔列。
>>> df.filter(df.name.endswith("ce")).collect()
[Row(name=u'Alice', age=1)]
- getField(name)
在StructField中通过名称获取字段的表达式。 - getItem(key)
从列表中获取位置序号的项,或者通过字典获取项的表达式。 - isNotNull()
如果当前表达式为null,则为真。 通常结合DataFrame.filter()来选择具有非空值的行。
>>> df2 = sc.parallelize([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]).toDF()
>>> df2.filter(df2.height.isNotNull()).collect()
[Row(height=80, name=u'Tom')]
- isNull()
如果当前表达式为null,则为真。 通常与DataFrame.filter()结合来选择具有空值的行。 - isin(*cols)
一个布尔表达式,如果此表达式的值由参数的评估值包含,则该值被评估为true。
>>> df[df.age.isin([1,2,3])].collect()
[Row(name=u'Alice', age=1)]
- like(other)
返回基于SQL LIKE匹配的布尔列。
>>> df.filter(df.name.like("Al%")).collect()
[Row(name=u'Alice', age=1)]
- name(*alias, **kwargs)
name()是alias()的别名。 - otherwise(value)
评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。
>>> from pyspark.sql import functions as f
>>> df.select(df.name, f.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice| 0|
| Bob| 1|
+-----+-------------------------------------+
- over(window)
定义一个窗口列。 - rlike(other)
基于正则表达式匹配返回一个布尔列。
>>> df.filter(df.name.rlike('ice$')).collect()
[Row(age=2, name=u'Alice')]
- startswith(other)
根据字符串匹配返回一个布尔列。 - substr(startPos, length)
返回一个列,它是该列的一个子字符串。 - when(condition, value)
评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。
>>> df.select(df.name, f.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+------------------------------------------------------------+
class pyspark.sql.Row
DataFrame中的一行。 其中的字段可以被访问:row.key
或者row[key]
。
>>> from pyspark.sql import Row
>>> row = Row(name="Alice", age=1)
>>> row.name
'Alice'
>>> row["age"]
1
>>> row
Row(age=1, name='Alice')
>>> "name" in row
True
- asDict(recursive=False)
recursive - 将嵌套的行转换为字典(默认为False)。
>>> row = Row(name="Alice",value=Row(age=1, height=88))
>>> row.asDict()
{'name': 'Alice', 'value': Row(age=1, height=88)}
>>> row.asDict(True)
{'name': 'Alice', 'value': {'age': 1, 'height': 88}}
class pyspark.sql.DataFrameNaFunctions(df)
在DataFrame中处理丢失的数据的功能。
- drop(how='any', thresh=None, subset=None)
返回一个新的DataFrame,省略含有空值的行。 DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。 - fill(value, subset=None)
替换空值,na.fill()的别名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。 - replace(to_replace, value, subset=None)
class pyspark.sql.DataFrameStatFunctions(df)
DataFrame的统计函数的功能。
- approxQuantile(col, probabilities, relativeError)
计算DataFrame的数值列的近似分位数。 - corr(col1, col2, method=None)
以双精度值计算DataFrame的两列的相关性。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的别名。 - cov(col1, col2)
计算给定列的样本协方差(由它们的名称指定)作为双精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是别名。 - crosstab(col1, col2)
计算给定列的成对频率表。 也被称为应急表。 - freqItems(cols, support=None)
找到列的频繁项,可能有误报。 - sampleBy(col, fractions, seed=None)
根据每层上给出的分数返回一个没有更换的分层样本。
class pyspark.sql.Window
用于在DataFrame中定义窗口的实用函数。
>>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
- Note: 实验阶段
currentRow = 0
static orderBy(cols) - 用定义的顺序创建一个WindowSpec。
static partitionBy(cols) - 用定义的分区创建一个WindowSpec。
static rangeBetween(start, end) -
static rowsBetween(start, end) -
unboundedFollowing = 9223372036854775807L
unboundedPreceding = -9223372036854775808L
class pyspark.sql.WindowSpec(jspec)
定义 partitioning, ordering, and frame的窗口规范。
使用Window
中的静态方法创建一个WindowSpec
。
- orderBy(*cols)
定义WindowSpec中的排序列。 - partitionBy(*cols)
定义WindowSpec中的分区列。 - rangeBetween(start, end)
定义从开始(包含)到结束(包含)的框架边界。 - rowsBetween(start, end)
定义从开始(包含)到结束(包含)的框架边界。
class pyspark.sql.DataFrameReader(spark)
用于从外部存储系统(例如文件系统,键值存储等)加载DataFrame的接口。 使用spark.read()来访问它。
- csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
加载CSV文件并将结果作为DataFrame返回。
>>> df = spark.read.csv("file:/home/spark_sql_test.csv",header=True)
>>> df.show()
+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
| 1|Alice| 11| 111|
| 2| Bob| 22| 222|
+---+-----+---+------+
- format(source)
指定输入数据源格式。
>>> df = spark.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
- jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
Parameters:
- url – a JDBC URL of the form jdbc:subprotocol:subname
- table – the name of the table
- json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
加载JSON文件并将结果作为DataFrame返回。
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
- load(path=None, format=None, schema=None, **options)
从数据源加载数据并将其作为:class DataFrame返回。 - option(key, value)
为基础数据源添加一个输入选项。
您可以设置以下选项来读取文件:
- timeZone: 设置指示用于分析时间戳的时区的字符串
在JSON / CSV数据源或分区值。 如果没有设置,它使用默认值,会话本地时区。
- options(**options)
- orc(path)
加载ORC文件,将结果作为DataFrame返回。 - parquet(*paths)
加载Parquet文件,将结果作为DataFrame返回。 - schema(schema)
指定输入模式。 - table(tableName)
以DataFrame的形式返回指定的表。 - text(paths)
加载文本文件并返回一个DataFrame,该DataFrame的架构以名为“value”的字符串列开头,如果有的话,后跟分区列。
class pyspark.sql.DataFrameWriter(df)
用于将DataFrame写入外部存储系统(例如文件系统,键值存储等)的接口。 使用DataFrame.write()来访问这个。
- csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None)
以指定的路径以CSV格式保存DataFrame的内容。
>>> df1 = spark.createDataFrame([(3,"Tom",33,333),],["id","name","age","salary"])
>>> df1.show()
+---+----+---+------+
| id|name|age|salary|
+---+----+---+------+
| 3| Tom| 33| 333|
+---+----+---+------+
>>> df1.write.csv("file:/home/spark_sql_test",mode="overwrite",header=True)
- format(source)
指定基础输出数据源。
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
- insertInto(tableName, overwrite=False)
将DataFrame的内容插入到指定的表中。
它要求类的架构:DataFrame与表的架构相同。
可以覆盖任何现有的数据。 - jdbc(url, table, mode=None, properties=None)
将DataFrame的内容通过JDBC保存到外部数据库表中。 - json(path, mode=None, compression=None, dateFormat=None, timestampFormat=None)
将DataFrame的内容以JSON格式保存在指定的路径中。 - mode(saveMode)
指定数据或表已经存在的行为。
选项包括:
- append:将此DataFrame的内容附加到现有数据。
- overwrite:覆盖现有数据。
- ignore: 如果数据已经存在,静默地忽略这个操作。
- error:如果数据已经存在,则抛出异常。
- option(key, value)
- options(**options)
- orc(path, mode=None, partitionBy=None, compression=None)
以指定的路径以ORC格式保存DataFrame的内容。 - parquet(path, mode=None, partitionBy=None, compression=None)
将DataFrame的内容以Parquet格式保存在指定的路径中。 - partitionBy(*cols)
按文件系统上的给定列对输出进行分区。
如果指定,则输出将在文件系统上进行布局,类似于Hive的分区方案。 - save(path=None, format=None, mode=None, partitionBy=None, **options)
将DataFrame的内容保存到数据源。
数据源由格式和一组选项指定。 如果未指定format,则将使用由spark.sql.sources.default配置的缺省数据源。 - saveAsTable(name, format=None, mode=None, partitionBy=None, **options)
将DataFrame的内容保存为指定的表格。 - text(path, compression=None)
将DataFrame的内容保存在指定路径的文本文件中。
pyspark.sql.types module
class pyspark.sql.types.DataType
数据类型的基类。
- fromInternal(obj)
将内部SQL对象转换为本地Python对象。 - json()
- jsonValue()
- needConversion()
这种类型是否需要在Python对象和内部SQL对象之间进行转换?
这用于避免ArrayType / MapType / StructType的不必要的转换。 - simpleString()
- toInternal(obj)
将Python对象转换为内部SQL对象。 - classmethod typeName()
class pyspark.sql.types.NullType
空类型。
表示None的数据类型,用于无法推断的类型。
class pyspark.sql.types.StringType
字符串数据类型。
class pyspark.sql.types.BinaryType
二进制(字节数组)数据类型。
class pyspark.sql.types.BooleanType
布尔数据类型。
class pyspark.sql.types.DateType
Date(datetime.date)数据类型。
EPOCH_ORDINAL = 719163
- fromInternal(v)
- needConversion()
- toInternal(d)
class pyspark.sql.types.TimestampType
时间戳(datetime.datetime)数据类型。
- fromInternal(ts)
- needConversion()
- toInternal(dt)
class pyspark.sql.types.DecimalType(precision=10, scale=0)
十进制(decimal.Decimal)数据类型。
- jsonValue()
- simpleString()
class pyspark.sql.types.DoubleType
双数据类型,表示双精度浮点数。
class pyspark.sql.types.FloatType
浮点数据类型,表示单精度浮点数。
class pyspark.sql.types.ByteType
字节数据类型,即单个字节中的有符号整数。
- simpleString()
class pyspark.sql.types.IntegerType
Int数据类型,即有符号的32位整数。
- simpleString()
class pyspark.sql.types.LongType
长数据类型,即有符号的64位整数。
- simpleString()
class pyspark.sql.types.ShortType
短数据类型,即有符号的16位整数。
- simpleString()
class pyspark.sql.types.ArrayType(elementType, containsNull=True)
数组数据类型。
- fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)
Map数据类型。
- fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)
StructType中的一个字段。
- fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
class pyspark.sql.types.StructType(fields=None)
结构类型,由StructField的列表组成。
这是表示一个行的数据类型。
- add(field, data_type=None, nullable=True, metadata=None)
通过添加新元素来构造一个StructType来定义模式。 该方法接受:
一个参数是一个StructField对象;介于2到4之间的参数(name,data_type,nullable(可选),metadata(可选))。data_type参数可以是String或DataType对象。 - fromInternal(obj)
- classmethod fromJson(json)
- jsonValue()
- needConversion()
- simpleString()
- toInternal(obj)
pyspark.sql.functions module
内建函数的集合
- pyspark.sql.functions.abs(col)
计算绝对值。 - pyspark.sql.functions.acos(col)
计算给定值的余弦逆; 返回的角度在0到π的范围内。 - pyspark.sql.functions.add_months(start, months)
返回开始后几个月的日期
>>> from pyspark.sql import functions as f
>>> df = spark.createDataFrame([("2017-12-25",)],["d"])
>>> df.select(f.add_months(df.d,1).alias("d")).collect()
[Row(d=datetime.date(2018, 1, 25))]
- pyspark.sql.functions.approx_count_distinct(col, rsd=None)
返回col的近似不同计数的新列。 - pyspark.sql.functions.array(*cols)
创建一个新的数组列。 - pyspark.sql.functions.array_contains(col, value)
集合函数:如果数组为null,则返回null;如果数组包含给定值,则返回true;否则返回false。 - pyspark.sql.functions.asc(col)
基于给定列名称的升序返回一个排序表达式。 - pyspark.sql.functions.ascii(col)
计算字符串列的第一个字符的数值。 - pyspark.sql.functions.asin(col)
计算给定值的正弦倒数; 返回的角度在- π/ 2到π/ 2的范围内。 - pyspark.sql.functions.atan(col)
计算给定值的正切倒数。 - pyspark.sql.functions.atan2(col1, col2)
返回直角坐标(x,y)到极坐标(r,theta)转换的角度theta。 - pyspark.sql.functions.avg(col)
聚合函数:返回组中的值的平均值。 - pyspark.sql.functions.base64(col)
计算二进制列的BASE64编码并将其作为字符串列返回。 - pyspark.sql.functions.bin(col)
返回给定列的二进制值的字符串表示形式。 - pyspark.sql.functions.bitwiseNOT(col)
不按位计算。 - pyspark.sql.functions.broadcast(df)
将DataFrame标记为足够小以用于广播连接。 - pyspark.sql.functions.bround(col, scale=0)
如果scale> = 0,则使用HALF_EVEN舍入模式对给定值进行四舍五入以缩放小数点;如果scale <0,则将其舍入到整数部分。 - pyspark.sql.functions.cbrt(col)
计算给定值的立方根。 - pyspark.sql.functions.ceil(col)
计算给定值的上限。 - pyspark.sql.functions.coalesce(*cols)
返回不为空的第一列。 - pyspark.sql.functions.col(col)
根据给定的列名返回一个列。 - pyspark.sql.functions.collect_list(col)
聚合函数:返回重复对象的列表。 - pyspark.sql.functions.collect_set(col)
聚合函数:返回一组消除重复元素的对象。 - pyspark.sql.functions.column(col)
根据给定的列名返回一个列。 - pyspark.sql.functions.concat(*cols)
将多个输入字符串列连接成一个字符串列。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(f.concat(df.s, df.d).alias('s')).collect()
[Row(s=u'abcd123')]
- pyspark.sql.functions.concat_ws(sep, *cols)
使用给定的分隔符将多个输入字符串列连接到一个字符串列中。
>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]
- pyspark.sql.functions.conv(col, fromBase, toBase)
将字符串列中的数字从一个进制转换为另一个进制。
>>> df = spark.createDataFrame([("010101",)], ['n'])
>>> df.select(conv(df.n, 2, 16).alias('hex')).collect()
[Row(hex=u'15')]
- pyspark.sql.functions.corr(col1, col2)
返回col1和col2的Pearson相关系数的新列。 - pyspark.sql.functions.cos(col)
计算给定值的余弦。 - pyspark.sql.functions.cosh(col)
计算给定值的双曲余弦。 - pyspark.sql.functions.count(col)
聚合函数:返回组中的项目数量。 - pyspark.sql.functions.countDistinct(col, *cols)
返回col或col的不同计数的新列。 - pyspark.sql.functions.covar_pop(col1, col2)
返回col1和col2的总体协方差的新列。 - pyspark.sql.functions.covar_samp(col1, col2)
返回col1和col2的样本协方差的新列。 - pyspark.sql.functions.crc32(col)
计算二进制列的循环冗余校验值(CRC32),并将该值作为bigint返回。 - pyspark.sql.functions.create_map(*cols)
创建一个新的地图列。 - pyspark.sql.functions.cume_dist()
窗口函数:返回窗口分区内值的累积分布,即在当前行下面的行的分数。 - pyspark.sql.functions.current_date()
以日期列的形式返回当前日期。 - pyspark.sql.functions.current_timestamp()
将当前时间戳作为时间戳列返回。 - pyspark.sql.functions.date_add(start, days)
返回开始后几天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]
- pyspark.sql.functions.date_format(date, format)
将日期/时间戳/字符串转换为由第二个参数给定日期格式指定格式的字符串值。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect()
[Row(date=u'04/08/2015')]
- pyspark.sql.functions.date_sub(start, days)
返回开始前几天的日期
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_sub(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 7))]
- pyspark.sql.functions.datediff(end, start)
返回从开始到结束的天数。
>>> df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
>>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
[Row(diff=32)]
- pyspark.sql.functions.dayofmonth(col)
将给定日期的月份的日期解压为整数。(一月中第几天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofmonth('a').alias('day')).collect()
[Row(day=8)]
- pyspark.sql.functions.dayofyear(col)
将给定日期的年份中的某一天提取为整数。(一年中第几天)
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofyear('a').alias('day')).collect()
[Row(day=98)]
- pyspark.sql.functions.decode(col, charset)
Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). - pyspark.sql.functions.degrees(col)
将以弧度度量的角度转换为以度数度量的近似等效角度。 - pyspark.sql.functions.dense_rank()
窗口函数:返回窗口分区内的行的等级,没有任何间隙 - pyspark.sql.functions.desc(col)
基于给定列名称的降序返回一个排序表达式。 - pyspark.sql.functions.encode(col, charset)
Computes the first argument into a binary from a string using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). - pyspark.sql.functions.exp(col)
计算给定值的指数。 - pyspark.sql.functions.explode(col)
返回给定数组或映射中每个元素的新行。 - pyspark.sql.functions.expm1(col)
计算给定值的指数减1。 - pyspark.sql.functions.expr(str)
将表达式字符串解析到它表示的列中 - pyspark.sql.functions.factorial(col)
计算给定值的阶乘。
>>> df = spark.createDataFrame([(5,)], ['n'])
>>> df.select(factorial(df.n).alias('f')).collect()
[Row(f=120)]
- pyspark.sql.functions.first(col, ignorenulls=False)
聚合函数:返回组中的第一个值。 - pyspark.sql.functions.floor(col)
计算给定值的floor。 - pyspark.sql.functions.format_number(col, d)
将数字X格式化为像'#, - #, - #.-'这样的格式,用HALF_EVEN舍入模式四舍五入到小数点后的位置,然后以字符串形式返回结果。 - pyspark.sql.functions.format_string(format, *cols)
以printf-style格式化参数,并将结果作为字符串列返回。 - pyspark.sql.functions.from_json(col, schema, options={})
使用指定的模式将包含JSON字符串的列解析为[[StructType]]的[[StructType]]或[[ArrayType]]。 在不可解析的字符串的情况下返回null。 - pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
将来自unix时期(1970-01-01 00:00:00 UTC)的秒数转换为以给定格式表示当前系统时区中该时刻的时间戳的字符串。 - pyspark.sql.functions.from_utc_timestamp(timestamp, tz)
给定一个时间戳,对应于UTC中的某个特定时间,返回对应于给定时区中同一时间的另一个时间戳。 - pyspark.sql.functions.get_json_object(col, path)
从基于指定的json路径的json字符串中提取json对象,并返回提取的json对象的json字符串。 如果输入的json字符串无效,它将返回null。 - pyspark.sql.functions.greatest(*cols)
返回列名称列表的最大值,跳过空值。 该功能至少需要2个参数。 如果所有参数都为空,它将返回null。 - pyspark.sql.functions.grouping(col)
聚合函数:指示GROUP BY列表中的指定列是否被聚合,在结果集中返回1表示聚合或0表示未聚合。 - pyspark.sql.functions.grouping_id(*cols)
聚合函数:返回分组的级别,等于(grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) - pyspark.sql.functions.hash(*cols)
计算给定列的哈希码,并将结果作为int列返回。
>>> spark.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect()
[Row(hash=-757602832)]
- pyspark.sql.functions.hex(col)
计算给定列的十六进制值,可能是pyspark.sql.types.StringType,pyspark.sql.types.BinaryType,pyspark.sql.types.IntegerType或pyspark.sql.types.LongType。
>>> spark.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect()
[Row(hex(a)=u'414243', hex(b)=u'3')]
- pyspark.sql.functions.hour(col)
将给定日期的小时数提取为整数。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(hour('a').alias('hour')).collect()
[Row(hour=13)]
- pyspark.sql.functions.hypot(col1, col2)
计算sqrt(a ^ 2 + b ^ 2),无中间上溢或下溢。 - pyspark.sql.functions.initcap(col)
在句子中将每个单词的第一个字母翻译成大写。
>>> spark.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v=u'Ab Cd')]
- pyspark.sql.functions.input_file_name()
为当前Spark任务的文件名创建一个字符串列。 - pyspark.sql.functions.instr(str, substr)
找到给定字符串中第一次出现substr列的位置。 如果其中任一参数为null,则返回null。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(instr(df.s, 'b').alias('s')).collect()
[Row(s=2)]
- pyspark.sql.functions.isnan(col)
如果列是NaN,则返回true的表达式。 - pyspark.sql.functions.isnull(col)
如果列为空,则返回true的表达式。 - pyspark.sql.functions.json_tuple(col, *fields)
根据给定的字段名称为json列创建一个新行。
Parameters:
- col - json格式的字符串列
- fields - 要提取的字段列表
- pyspark.sql.functions.kurtosis(col)
聚合函数:返回组中值的峰度。 - pyspark.sql.functions.lag(col, count=1, default=None)
窗口函数:返回当前行之前的偏移行值;如果当前行之前的行数小于偏移量,则返回defaultValue。例如,若偏移量为1,将返回窗口分区中任何给定点的前一行。
这相当于SQL中的LAG函数。
Parameters:
- col - 列名或表达式的名称
- count - 要扩展的行数
- default - 默认值
- pyspark.sql.functions.last(col, ignorenulls=False)
聚合函数:返回组中的最后一个值。
该函数默认返回它看到的最后一个值。 当ignoreNulls设置为true时,它将返回它看到的最后一个非null值。 如果所有值都为空,则返回null。 - pyspark.sql.functions.last_day(date)
返回给定日期所属月份的最后一天。
>>> df = spark.createDataFrame([('1997-02-10',)], ['d'])
>>> df.select(last_day(df.d).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
- pyspark.sql.functions.lead(col, count=1, default=None)
Window函数:返回当前行之后的偏移行值;如果当前行之后的行数小于偏移行,则返回defaultValue。 例如,偏移量为1,将返回窗口分区中任意给定点的下一行。
这相当于SQL中的LEAD函数。 - pyspark.sql.functions.least(*cols)
返回多列中的最小值,跳过空值。 该功能至少需要2个参数,及最少需要两个列名。 如果所有参数都为空,它将返回null。
>>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(least(df.a, df.b, df.c).alias("least")).collect()
[Row(least=1)]
- pyspark.sql.functions.length(col)
计算字符串或二进制表达式的长度。
>>> spark.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect()
[Row(length=3)]
- pyspark.sql.functions.levenshtein(left, right)
计算两个给定字符串的Levenshtein距离。
Levenshtein距离(编辑距离),是指两个字串之间,由一个转成另一个所需的最少编辑操作次数。具体可自行百度。
>>> df0 = spark.createDataFrame([('kitten', 'sitting',)], ['l', 'r'])
>>> df0.select(levenshtein('l', 'r').alias('d')).collect()
[Row(d=3)]
- pyspark.sql.functions.lit(col)
创建一个字面值的列。 - pyspark.sql.functions.locate(substr, str, pos=1)
在str字符串列中找到在pos位置后面第一个出现substr的位置。
- Note: 该位置不是从零开始的,而是从1开始的。 如果在str中找不到substr,则返回0。
Parameters: - substr - 要查找的字符串
- str - pyspark.sql.types.StringType的列
- pos - 起始位置
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]
- pyspark.sql.functions.log(arg1, arg2=None)
返回第二个参数的基于第一个参数的对数。
如果只有一个参数,那么这个参数就是自然对数。
>>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
>>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
- pyspark.sql.functions.log10(col)
计算给定一个数以10为底的对数。 - pyspark.sql.functions.log1p(col)
Computes the natural logarithm of the given value plus one. - pyspark.sql.functions.log2(col)
返回参数的基数为2的对数。
>>> spark.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect()
[Row(log2=2.0)]
- pyspark.sql.functions.lower(col)
将字符串列转换为小写。 - pyspark.sql.functions.lpad(col, len, pad)
左填充到指定长度。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(lpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'##abcd')]
- pyspark.sql.functions.ltrim(col)
去掉字符串左边的空格。 - pyspark.sql.functions.max(col)
聚合函数:返回组中表达式的最大值。 - pyspark.sql.functions.md5(col)
计算某给定值的MD5值,将该值作为32个字符的十六进制字符串返回。
>>> spark.createDataFrame([('ABC',)], ['a']).select(md5('a').alias('hash')).collect()
[Row(hash=u'902fbdd2b1df0c4f70b4a5d23525e932')]
- pyspark.sql.functions.mean(col)
聚合函数:返回组中所有值的平均值。 - pyspark.sql.functions.min(col)
聚合函数:返回组中表达式的最小值。 - pyspark.sql.functions.minute(col)
提取给定日期的分钟数为整数。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(minute('a').alias('minute')).collect()
[Row(minute=8)]
- pyspark.sql.functions.monotonically_increasing_id()
生成单调递增的64位整数的列。
生成的ID保证是单调递增和唯一的,但不是连续的。 当前的实现将分区ID放在高31位,并将每个分区内的记录号放在低33位。 假设数据框的分区少于10亿个,每个分区少于80亿条记录。
作为一个例子,考虑一个带有两个分区的DataFrame,每个分区有三个记录。 该表达式将返回以下ID:0,1,2,8589934592(1L << 33),8589934593,8589934594。
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
- pyspark.sql.functions.month(col)
将给定日期的月份提取为整数。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]
- pyspark.sql.functions.months_between(date1, date2)
返回date1和date2之间的月数。 - pyspark.sql.functions.nanvl(col1, col2)
如果col1不是NaN,则返回col1;如果col1是NaN,则返回col2。
两个输入都应该是浮点列(DoubleType或FloatType)。
>>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect()
[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]
- pyspark.sql.functions.next_day(date, dayOfWeek)
返回晚于日期列值的第一个日期。
星期几参数不区分大小写,并接受:“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”.
>>> df = spark.createDataFrame([('2015-07-27',)], ['d'])
>>> df.select(next_day(df.d, 'Sun').alias('date')).collect()
[Row(date=datetime.date(2015, 8, 2))]
- pyspark.sql.functions.ntile(n)
窗口函数:在有序的窗口分区中返回ntile组ID(从1到n)。 例如,如果n是4,则第一季度行将得到值1,第二季度将得到2,第三季度将得到3,并且最后一个季度将得到4。
这相当于SQL中的NTILE函数。
Parameters: n – an integer - pyspark.sql.functions.percent_rank()
窗口函数:返回窗口分区内的行的相对等级(即百分比)。 - pyspark.sql.functions.posexplode(col)
为给定数组或映射中的每个元素返回一个新行。
>>> from pyspark.sql import Row
>>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(posexplode(eDF.intlist)).collect()
[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
>>> eDF.select(posexplode(eDF.mapfield)).show()
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| b|
+---+---+-----+
- pyspark.sql.functions.pow(col1, col2)
返回col1的col2次方的值。 - pyspark.sql.functions.quarter(col)
提取给定日期所属的季度值。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(quarter('a').alias('quarter')).collect()
[Row(quarter=2)]
- pyspark.sql.functions.radians(col)
将以度数度量的角度转换为以弧度测量的近似等效角度。 - pyspark.sql.functions.rand(seed=None)
从U [0.0,1.0]生成一个具有独立且分布相同(i.i.d.)样本的随机列。 - pyspark.sql.functions.randn(seed=None)
从标准正态分布生成具有独立且分布相同(i.i.d.)样本的列。 - pyspark.sql.functions.rank()
窗口函数:返回窗口分区内的行的等级。
rank和dense_rank之间的区别在于,当有tie时,dense_rank在排序顺序上没有差距。也就是说,如果你使用dense_rank排名比赛,并且有三个人排在第二位,那么你会说这三个都排在第二位,下一个排在第三位。排名会给我连续的数字,使排在第三位(关系之后)的人将登记为第五名。
这相当于SQL中的RANK函数。 - pyspark.sql.functions.regexp_extract(str, pattern, idx)
从指定的字符串列中提取由Java正则表达式匹配的特定组。 如果正则表达式不匹配,或者指定的组不匹配,则返回空字符串。
>>> df = spark.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)-(\d+)', 1).alias('d')).collect()
[Row(d=u'100')]
>>> df = spark.createDataFrame([('foo',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)', 1).alias('d')).collect()
[Row(d=u'')]
>>> df = spark.createDataFrame([('aaaac',)], ['str'])
>>> df.select(regexp_extract('str', '(a+)(b)?(c)', 2).alias('d')).collect()
[Row(d=u'')]
- pyspark.sql.functions.regexp_replace(str, pattern, replacement)
将与regexp匹配的指定字符串值的所有子字符串替换为rep。
>>> df = spark.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_replace('str', '(\d+)', '+').alias('d')).collect()
[Row(d=u'+-+')]
- pyspark.sql.functions.repeat(col, n)
重复一个字符串列n次,并将其作为新的字符串列返回。
>>> df = spark.createDataFrame([('ab',)], ['s',])
>>> df.select(repeat(df.s, 3).alias('s')).collect()
[Row(s=u'ababab')]
- pyspark.sql.functions.reverse(col)
反转字符串列并将其作为新的字符串列返回。 - pyspark.sql.functions.rint(col)
返回值最接近参数的double值,等于一个数学整数。 - pyspark.sql.functions.round(col, scale=0)
如果scale> = 0,则使用HALF_UP舍入模式对给定值进行四舍五入以缩放小数点;如果scale <0,则将其舍入到整数部分。
>>> spark.createDataFrame([(2.5,)], ['a']).select(round('a', 0).alias('r')).collect()
[Row(r=3.0)]
- pyspark.sql.functions.row_number()
窗口函数:返回窗口分区内从1开始的连续编号。 - pyspark.sql.functions.rpad(col, len, pad)
右填充到指定长度。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(rpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'abcd##')]
- pyspark.sql.functions.rtrim(col)
去除字符串右边的空格。 - pyspark.sql.functions.second(col)
提取给定日期的秒数为整数。
>>> df = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(second('a').alias('second')).collect()
[Row(second=15)]
- pyspark.sql.functions.sha1(col)
返回SHA-1的十六进制字符串结果。
>>> spark.createDataFrame([('ABC',)], ['a']).select(sha1('a').alias('hash')).collect()
[Row(hash=u'3c01bdbb26f358bab27f267924aa2c9a03fcfdb8')]
- pyspark.sql.functions.sha2(col, numBits)
返回SHA-2系列散列函数(SHA-224,SHA-256,SHA-384和SHA-512)的十六进制字符串结果。 numBits表示结果的所需位长度,其值必须为224,256,384,512或0(相当于256)。
>>> digests = df.select(sha2(df.name, 256).alias('s')).collect()
>>> digests[0]
Row(s=u'3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043')
>>> digests[1]
Row(s=u'cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961')
- pyspark.sql.functions.shiftLeft(col, numBits)
将给定值col左移numBits位。
>>> spark.createDataFrame([(21,)], ['a']).select(shiftLeft('a', 1).alias('r')).collect()
[Row(r=42)]
- pyspark.sql.functions.shiftRight(col, numBits)
将给定值col右移numBits位(Signed)。 - pyspark.sql.functions.shiftRightUnsigned(col, numBits)
将给定值col右移numBits位(Unsigned)。
>>> df = spark.createDataFrame([(-42,)], ['a'])
>>> df.select(shiftRightUnsigned('a', 1).alias('r')).collect()
[Row(r=9223372036854775787)]
- pyspark.sql.functions.signum(col)
计算给定值的正负号。 - pyspark.sql.functions.sin(col)
计算给定值的正弦值。 - pyspark.sql.functions.sinh(col)
计算给定值的双曲正弦值。 - pyspark.sql.functions.size(col)
集合函数:返回存储在列中的数组或映射的长度。
>>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
>>> df.select(size(df.data)).collect()
[Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)]
- pyspark.sql.functions.skewness(col)
聚合函数:返回组中值的偏度。 - pyspark.sql.functions.sort_array(col, asc=True)
Collection函数:对输入数组进行升序或降序排序。
>>> df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])
>>> df.select(sort_array(df.data).alias('r')).collect()
[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
>>> df.select(sort_array(df.data, asc=False).alias('r')).collect()
[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
- pyspark.sql.functions.soundex(col)
返回字符串的SoundEx编码。
>>> df = spark.createDataFrame([("Peters",),("Uhrbach",)], ['name'])
>>> df.select(soundex(df.name).alias("soundex")).collect()
[Row(soundex=u'P362'), Row(soundex=u'U612')]
- pyspark.sql.functions.spark_partition_id()
列所在的分区id
- Note: 这是不确定的,因为它依赖于数据分区和任务调度。
>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
- pyspark.sql.functions.split(str, pattern)
切分字符串。
>>> df = spark.createDataFrame([('ab12cd',)], ['s',])
>>> df.select(split(df.s, '[0-9]+').alias('s')).collect()
[Row(s=[u'ab', u'cd'])]
- pyspark.sql.functions.sqrt(col)
计算指定浮点值的平方根。 - pyspark.sql.functions.stddev(col)
聚合函数:返回组中表达式的无偏样本标准差。 - pyspark.sql.functions.stddev_pop(col)
聚合函数:返回一个组中表达式的总体标准差。 - pyspark.sql.functions.stddev_samp(col)
聚合函数:返回组中表达式的无偏样本标准差。 - pyspark.sql.functions.struct(*cols)
创建一个新的结构列。
>>> df.select(struct('age', 'name').alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
>>> df.select(struct([df.age, df.name]).alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
- pyspark.sql.functions.substring(str, pos, len)
返回在str中从pos位置开始的长度为len值的substring。
>>> df = spark.createDataFrame([('abcd',)], ['s',])
>>> df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s=u'ab')]
- pyspark.sql.functions.substring_index(str, delim, count)
在计数定界符delimiter之前,返回字符串str的子串。 如果count是正数,则返回最后一个分隔符左边的数字(从左数起)。 如果计数为负数,则返回最后一个分隔符右边的数字(从右数起)。 substring_index搜索delim时执行区分大小写的匹配。
>>> df = spark.createDataFrame([('a.b.c.d',)], ['s'])
>>> df.select(substring_index(df.s, '.', 2).alias('s')).collect()
[Row(s=u'a.b')]
>>> df.select(substring_index(df.s, '.', -3).alias('s')).collect()
[Row(s=u'b.c.d')]
- pyspark.sql.functions.sum(col)
聚合函数:返回表达式中所有值的总和。 - pyspark.sql.functions.sumDistinct(col)
聚合函数:返回表达式中不同值的总和。 - pyspark.sql.functions.tan(col)
计算给定值的正切值。 - pyspark.sql.functions.tanh(col)
计算给定值的双曲正切。 - pyspark.sql.functions.to_date(col, format=None)
使用可选的指定格式将pyspark.sql.types.StringType或pyspark.sql.types.TimestampType的列转换为pyspark.sql.types.DateType。默认格式是'yyyy-MM-dd'。 根据SimpleDateFormats指定格式。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
- pyspark.sql.functions.to_json(col, options={})
将包含[[StructType]]的[[StructType]]或[[ArrayType]]的列转换为JSON字符串。 在不支持的类型的情况下会引发异常。
Parameters:
- col - 包含结构体或结构体数组的列的名称
- options - 控制转换的选项。 接受与json数据源相同的选项
>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"age":2,"name":"Alice"}')]
>>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
- pyspark.sql.functions.to_timestamp(col, format=None)
使用可选的指定格式将pyspark.sql.types.StringType或pyspark.sql.types.TimestampType的列转换为pyspark.sql.types.DateType。 默认格式是'yyyy-MM-dd HH:mm:ss'。 根据SimpleDateFormats指定格式。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_timestamp(df.t).alias('dt')).collect()
[Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect()
[Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
- pyspark.sql.functions.to_utc_timestamp(timestamp, tz)
给定一个时间戳,它对应于给定时区中的特定时间,返回对应于UTC中同一时间的另一个时间戳。
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_utc_timestamp(df.t, "PST").alias('t')).collect()
[Row(t=datetime.datetime(1997, 2, 28, 18, 30))]
- pyspark.sql.functions.translate(srcCol, matching, replace)
一个函数通过匹配中的一个字符来转换srcCol中的任何字符。 替换中的字符对应于匹配的字符。 当字符串中的任何字符与匹配中的字符匹配时,翻译将发生。
>>> spark.createDataFrame([('translate',)], ['a']).select(translate('a', "rnlt", "123").alias('r')).collect()
[Row(r=u'1a2s3ae')]
- pyspark.sql.functions.trim(col)
去除字符串两边的空格。 - pyspark.sql.functions.trunc(date, format)
返回截断到格式指定单位的日期。
Parameters: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
>>> df = spark.createDataFrame([('1997-02-28',)], ['d'])
>>> df.select(trunc(df.d, 'year').alias('year')).collect()
[Row(year=datetime.date(1997, 1, 1))]
>>> df.select(trunc(df.d, 'mon').alias('month')).collect()
[Row(month=datetime.date(1997, 2, 1))]
- pyspark.sql.functions.udf(f=None, returnType=StringType)
创建一个表示用户定义函数(UDF)的列表达式。
*Note: 用户定义的函数必须是确定性的。 由于优化,可能会消除重复的调用,甚至可能会调用该函数的次数超过查询中的次数。
>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> :udf
... def to_upper(s):
... if s is not None:
... return s.upper()
...
>>> :udf(returnType=IntegerType())
... def add_one(x):
... if x is not None:
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+
- pyspark.sql.functions.unbase64(col)
解码BASE64编码的字符串列并将其作为二进制列返回。 - pyspark.sql.functions.unhex(col)
十六进制的反转。 将每对字符解释为十六进制数字并转换为数字的字节表示。
>>> spark.createDataFrame([('414243',)], ['a']).select(unhex('a')).collect()
[Row(unhex(a)=bytearray(b'ABC'))]
- pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')
使用默认时区和默认语言环境,将具有给定模式的时间字符串(默认为'yyyy-MM-dd HH:mm:ss')转换为Unix时间戳(以秒为单位),如果失败则返回null。
如果时间戳记为None,则返回当前时间戳。 - pyspark.sql.functions.upper(col)
将字符串列转换为大写。 - pyspark.sql.functions.var_pop(col)
聚合函数:返回组中值的总体方差。 - pyspark.sql.functions.var_samp(col)
聚合函数:返回组中值的无偏差。 - pyspark.sql.functions.variance(col)
聚合函数:返回组中值的总体方差。 - pyspark.sql.functions.weekofyear(col)
返回指定时间是一年中的第几周。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(weekofyear(df.a).alias('week')).collect()
[Row(week=15)]
- pyspark.sql.functions.when(condition, value)
评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。(条件判断)
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]
>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]
- pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),
... w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)]
- pyspark.sql.functions.year(col)
将给定日期的年份提取为整数。
>>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(year('a').alias('year')).collect()
[Row(year=2015)]
pyspark.sql.streaming module
class pyspark.sql.streaming.StreamingQuery(jsq)
当新数据到达时,在后台执行的查询的句柄。 所有这些方法都是线程安全的。
- awaitTermination(timeout=None)
通过query.stop()或异常来等待查询的终止。如果由于异常而查询终止,则会抛出异常。 如果设置了超时,则会在超时秒数内返回查询是否终止。如果查询已被终止,则对该方法的所有后续调用将立即返回(如果查询已由stop()终止),或者立即抛出异常(如果查询已由异常终止)。如果由于一个异常使这个查询已经终止了,则会抛出StreamingQueryException。 - exception()
Returns: StreamingQueryException(如果查询由异常终止)或None。 - explain(extended=False)
打印(逻辑和物理)计划到控制台进行调试。
Parameters: 布尔值,默认为False。 如果为False,则仅打印物理计划。 - id
返回检查点数据重新启动时持续存在的此查询的唯一标识。 也就是说,这个ID是在第一次启动查询时生成的,每次从检查点数据重新启动时都会一样。 Spark群集中只能有一个查询具有相同的激活码。 另请参阅runId。 - isActive
此流式查询当前是否处于活动状态。 - lastProgress
返回此流式查询的最新StreamingQueryProgress更新;如果没有进度更新,则返回None:return:一个映射。 - name
给此查询起一个名字,如果未指定,则返回null。 这个名称可以在org.apache.spark.sql.streaming.DataStreamWriter中指定为dataframe.writeStream.queryName(“query”).start()。 该名称(如果已设置)在所有活动查询中必须唯一。 - processAllAvailable()
阻塞直到源中的所有可用数据都被处理并提交到接收器。 此方法用于测试。
- Note: 在连续到达数据的情况下,这种方法可能永远阻塞。 另外,只有在调用之前,这个方法才被保证阻塞,直到数据已经同步地将数据附加到流源。 (即getOffset必须立即反映添加)。
- recentProgress
返回此查询的最新[[StreamingQueryProgress]]更新数组。 为每个流保留的进度更新数由Spark会话配置spark.sql.streaming.numRecentProgressUpdates进行配置。 - runId
返回此查询的唯一标识,该标识在重新启动时不会保留。 也就是说,每个启动(或从检查点重新启动)的查询将具有不同的runId。 - status
返回查询的当前状态。 - stop()
停止此流式查询。
class pyspark.sql.streaming.StreamingQueryManager(jsqm)
一个来管理所有的StreamingQuery StreamingQueries活动的类。
- active
返回与此SQLContext关联的活动查询的列表。 - awaitAnyTermination(timeout=None)
等到相关SQLContext的任何查询自上下文创建以来,或者自调用resetTerminated()以来已终止。 如果有任何查询由于异常而终止,那么异常将被抛出。 如果设置了超时,则会在超时秒数内返回查询是否终止。 - get(id)
返回来自此SQLContext的活动查询,或者如果具有此名称的活动查询不存在,则会抛出异常。 - resetTerminated()
忘记过去已终止的查询,以便awaitAnyTermination()可以再次用于等待新的终止。
class pyspark.sql.streaming.DataStreamReader(spark)
用于从外部存储系统(例如文件系统,键值存储等)加载流式DataFrame的接口。 使用spark.readStream()来访问它。
- csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
加载CSV文件流并将结果作为DataFrame返回。 - format(source)
指定输入数据源格式。
Parameters: source - 字符串,数据源的名称,例如 'json','parquet'。
>>> s = spark.readStream.format("text")
- json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
加载JSON文件流并将结果作为DataFrame返回。 - load(path=None, format=None, schema=None, **options)
从数据源加载数据流并将其作为:class DataFrame 返回。 - option(key, value)
为基础数据源添加一个输入选项。 - options(**options)
为底层数据源添加多个输入选项。 - parquet(path)
加载Parquet文件流,将结果作为DataFrame返回。 - schema(schema)
指定输入模式。
某些数据源(例如JSON)可以从数据自动推断输入模式。 通过在这里指定模式,底层数据源可以跳过模式推断步骤,从而加速数据加载。
Parameters: schema – 一个pyspark.sql.types.StructType 对象。 - text(path)
加载一个文本文件流并返回一个DataFrame,其架构以一个名为“value”的字符串列开始,如果有的话,后跟分区列。
文本文件中的每一行都是生成的DataFrame中的新行。
class pyspark.sql.streaming.DataStreamWriter(df)
用于将流式DataFrame写入外部存储系统(例如文件系统,键值存储等)的接口。 使用DataFrame.writeStream()来访问这个。
- format(source)
指定基础输出数据源。 - option(key, value)
添加一个底层数据源的输出选项。 - options(**options)
为底层数据源添加多个输出选项。 - outputMode(outputMode)
指定如何将DataFrame/Dataset流数据写入流式接收器。
Options include:
- append - 只有DataFrame / Dataset流数据中的新行才会写入接收器
- complete - DataFrame / Dataset流中的所有行将在每次更新时写入接收器
- update - 每当有更新时,只有在DataFrame / Dataset流数据中更新的行才会被写入接收器。 如果查询不包含聚合,它将相当于追加模式。
writer = sdf.writeStream.outputMode('append')
- partitionBy(*cols)
按文件系统上的给定列对输出进行分区。
如果指定,则输出将在文件系统上进行布局,类似于Hive的分区方案。 - queryName(queryName)
指定可以用start()启动的StreamingQuery的名称。 该名称在相关联的SparkSession中的所有当前活动查询中必须是唯一的。 - start(path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options)
将DataFrame的内容流式传输到数据源。
数据源由格式和一组选项指定。 如果未指定format,则将使用由spark.sql.sources.default配置的缺省数据源。 - trigger(*args, **kwargs)
设置流查询的触发器。 如果没有设置,它将尽可能快地运行查询,这相当于将触发器设置为processingTime ='0秒'。
Parameters: processingTime – a processing time interval as a string, e.g. ‘5 seconds’, ‘1 minute’.
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)