PySpark 如何按值排序
PySpark是分布式数据处理引擎,用于编写API的代码。PySpark是Apache Spark和Python的合作。Spark是大规模数据处理平台,提供处理千亿字节规模数据的能力。在Python中,我们有PySpark的内置函数,如orderBy()、sort()、sortBy()、createDataFrame()、collect()和asc_nulls_last(),可以用来对值进行排序。
语法
以下语法在示例中使用:
createDataFrame()
这是Python中的一个内置函数,表示了通过PySpark模块创建DataFrame的另一种方式。
orderBy()
这是Python中的内置方法,它遵循PySpark模块,用于表示一个或多个列的升序或降序的顺序。
sort()
此方法用于按升序列出订单。如果我们将任何条件分类到排序函数中,那么它允许我们按降序工作。
sortBy()
在Python中,sortBy()方法遵循sparkContext,可用于对数据进行排序。
parallelize()
这个方法被sparkContext包含,它允许数据分发到多个节点。
collect()
这通常被称为PySpark RDD或DataFrame collect,可用于访问数据集中的所有项目或数字。
asc_nulls_last("column_name")
这是Python中的一种内置函数,它按升序返回顺序序列。
需要安装 –
pip install pyspark
此命令可以基于PySpark运行程序。
示例1
在以下示例中,我们将展示 如何按单个列对DataFrame进行排序 首先,导入名为pyspark.sql.SparkSession的模块。然后创建一个名为SparkSession的对象。然后将元组列表存储在变量spark中。接下来,使用spark.createDataFrame()函数创建一个DataFrame,并提供数据和列名。继续使用orderBy()函数在DataFrame上进行排序,按照所需的列进行排序,本例中为”Age”列。最后,使用show方法展示已排序的DataFrame以获得最终结果。
from pyspark.sql
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of DataFrame
stu_data = [("Akash", 25), ("Bhuvan", 23), ("Peter", 18), ("Mohan", 26)]
df = spark.createDataFrame(stu_data, ["Name", "Age"])
# Sorting of Dataframe column(Age) in ascending order
sorted_df = df.orderBy("Age")
# sorted DataFrame
sorted_df.show()
输出
+------+---+
| Name|Age|
+------+---+
| Peter| 18|
|Bhuvan| 23|
| Akash| 25|
| Mohan| 26|
+------+---+
示例2
在下面的示例中,我们将展示如何按多个列对DataFrame进行排序。 在这里,使用了orderBy方法,它接受两个参数- list(设置列名)和ascending(将值设置为true,允许按顺序排序),并将其存储在变量sorted_df中。 然后使用sorted_df的show()方法获取结果。
from pyspark.sql
import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("Umbrella", 125), ("Bottle", 20), ("Colgate", 118)]
df = spark.createDataFrame(data, ["Product", "Price"])
# Sort DataFrame by product and price in ascending order
sorted_df = df.orderBy(["Price", "Product"], ascending=[True, True])
# Show the sorted DataFrame
sorted_df.show()
输出
+--------+-----+
| Product|Price|
+--------+-----+
| Bottle| 20|
| Colgate| 118|
|Umbrella| 125|
+--------+-----+
示例3
在下面的示例中,我们将展示如何按降序对数据框进行排序。这里使用内置函数createDataframe手动设置2D结构的数据框。然后通过使用名为sort()的方法来存储值的变量sorted_df进行初始化(sort()函数接受内置函数desc(“Age”)作为参数,该函数将按照降序的顺序排序)。最后,我们使用sorted_df.show()打印结果。
from pyspark.sql
import SparkSession
from pyspark.sql.functions
import desc
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of DataFrame
Emp_data = [("Abhinav", 25, "Male"), ("Meera", 32, "Female"), ("Riya", 18, "Female"), ("Deepak", 33, "Male"), ("Elon", 50, "Male")]
df = spark.createDataFrame(Emp_data, ["Name", "Age", "Gender"])
# Sort DataFrame by Age in descending order
sorted_df = df.sort(desc("Age"))
# Show the sorted DataFrame
sorted_df.show()
输出
+-------+---+------+
| Name|Age|Gender|
+-------+---+------+
| Elon| 50| Male|
| Deepak| 33| Male|
| Meera| 32|Female|
|Abhinav| 25| Male|
| Riya| 18|Female|
+-------+---+------+
示例 4
在下面的示例中,我们将展示如何按值对RDD进行排序。
首先,我们从一个元组列表创建一个RDD。然后,在RDD上使用sortBy()函数,提供一个lambda函数,该函数提取要按照每个元组的第二个元素排序的值。接下来,收集并迭代排序后的RDD,以访问排序后的记录。
# Sorting RDD by Value
from pyspark.sql
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of RDD
data = [("X", 25), ("Y", 32), ("Z", 18)]
rdd = spark.sparkContext.parallelize(data)
# Sort RDD by value in ascending order
sorted_rdd = rdd.sortBy(lambda x: x[1])
# Print the sorted RDD
for record in sorted_rdd.collect():
print(record)
输出
('Z', 18)
('X', 25)
('Y', 32)
示例5
下面的示例中,我们将演示如何对包含Null值的Dataframe进行排序。我们使用sort()函数对Dataframe按照”Price”列进行升序排序,其中Null值为none。然后,将asc_nulls_last(“Price”)作为参数传递给sort()函数,并按升序到降序的顺序指定排序顺序。然后,将排序后的Dataframe分配给变量sorted_df,并使用相同的变量和show()方法获取结果。
# Sorting DataFrame with Null Values
from pyspark.sql
import SparkSession
from pyspark.sql.functions
import asc_nulls_last
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of DataFrame with null values
data = [("Charger", None), ("Mouse", 320), ("PEN", 18), ("Bag", 1000), ("Notebook", None)] # None = null
df = spark.createDataFrame(data, ["Product", "Price"])
# Sorting of DataFrame column(Price) in ascending order with null values last
sorted_df = df.sort(asc_nulls_last("Price"))
# Show the sorted DataFrame
sorted_df.show()
输出
+--------+-----+
| Product|Price|
+--------+-----+
| PEN| 18|
| Mouse| 320|
| Bag| 1000|
| Charger| null|
|Notebook| null|
+--------+-----+
结论
我们讨论了在PySpark中进行值排序的不同方法。我们使用了一些内置函数,如orderBy(),sort(),asc_nulls_last()等。排序的目的是为了得到升序或降序的顺序。与PySpark相关的各种应用如实时库、大规模数据处理和构建API等。