spark dataframe 全局排名优化

服务器

浏览数:93

2020-6-13

spark提供给我们的全局排序,默认情况下只有spark-sql提供的窗口函数,但如果窗口是整个表eg:row_number() over(order by a) 会存在严重的数据倾斜,下面我们演示了俩种方式,例2是例1的改进方式

例1:Spark-SQL形式

df = spark.createDataFrame(
[{"b": 2, "c": 4, "a": 3}, {"a": 2, "c": 6, "b": 3}, {"a": 5, "c": 2, "b": 3}, {"a": 1, "c": 3, "b": 3}])
df.show()
df.createOrReplaceTempView("temp")
resDf1=spark.sql("select *,row_number() over(order by a) rank from temp")
resDf1.show()

例2:Spark-command 形式

def f(rows):
	for row in rows:
		t = row[0].asDict()
		#排名首位是0,正常情况下是1,故这边加一
		t["id"] = long(row[1]) + 1
		yield t
# 通过zipWithIndex获取排名,但是首位是0
# ascending=False 降序
indexrdd = df.select("*").rdd.sortBy(ascending=False, numPartitions=3,keyfunc=lambda x: x.a).zipWithIndex().mapPartitions(lambda x: f(x))
print indexrdd.collect()
indexdf = spark.createDataFrame(indexrdd)
indexdf.show()
dp = spark.createDataFrame(df.rdd.sortBy(ascending=True, numPartitions=3, keyfunc=lambda x: x.c).zipWithIndex())
dp.show()

例子2的基本原理 先进行一定的采样sample,然后确定数据边界,最终将数据根据这些边界进行分区,也就是所谓的rangepartition。在计算每个分区的顺序

作者:厮以为