PySpark 如何按值排序

PySpark 如何按值排序

PySpark是分布式数据处理引擎,用于编写API的代码。PySpark是Apache Spark和Python的合作。Spark是大规模数据处理平台,提供处理千亿字节规模数据的能力。在Python中,我们有PySpark的内置函数,如orderBy()、sort()、sortBy()、createDataFrame()、collect()和asc_nulls_last(),可以用来对值进行排序。

语法

以下语法在示例中使用:

createDataFrame()

这是Python中的一个内置函数,表示了通过PySpark模块创建DataFrame的另一种方式。

orderBy()

这是Python中的内置方法,它遵循PySpark模块,用于表示一个或多个列的升序或降序的顺序。

sort()

此方法用于按升序列出订单。如果我们将任何条件分类到排序函数中,那么它允许我们按降序工作。

sortBy()

在Python中,sortBy()方法遵循sparkContext,可用于对数据进行排序。

parallelize()

这个方法被sparkContext包含,它允许数据分发到多个节点。

collect()

这通常被称为PySpark RDD或DataFrame collect,可用于访问数据集中的所有项目或数字。

asc_nulls_last("column_name")

这是Python中的一种内置函数,它按升序返回顺序序列。

需要安装 –

pip install pyspark

此命令可以基于PySpark运行程序。

示例1

在以下示例中,我们将展示 如何按单个列对DataFrame进行排序 首先,导入名为pyspark.sql.SparkSession的模块。然后创建一个名为SparkSession的对象。然后将元组列表存储在变量spark中。接下来,使用spark.createDataFrame()函数创建一个DataFrame,并提供数据和列名。继续使用orderBy()函数在DataFrame上进行排序,按照所需的列进行排序,本例中为”Age”列。最后,使用show方法展示已排序的DataFrame以获得最终结果。

from pyspark.sql 
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame
stu_data = [("Akash", 25), ("Bhuvan", 23), ("Peter", 18), ("Mohan", 26)]
df = spark.createDataFrame(stu_data, ["Name", "Age"])

# Sorting of Dataframe column(Age) in ascending order
sorted_df = df.orderBy("Age")

# sorted DataFrame
sorted_df.show()

输出

+------+---+
|  Name|Age|
+------+---+
| Peter| 18|
|Bhuvan| 23|
| Akash| 25|
| Mohan| 26|
+------+---+

示例2

在下面的示例中,我们将展示如何按多个列对DataFrame进行排序。 在这里,使用了orderBy方法,它接受两个参数- list(设置列名)和ascending(将值设置为true,允许按顺序排序),并将其存储在变量sorted_df中。 然后使用sorted_df的show()方法获取结果。

from pyspark.sql 
import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [("Umbrella", 125), ("Bottle", 20), ("Colgate", 118)]
df = spark.createDataFrame(data, ["Product", "Price"])

# Sort DataFrame by product and price in ascending order
sorted_df = df.orderBy(["Price", "Product"], ascending=[True, True])

# Show the sorted DataFrame
sorted_df.show()

输出

+--------+-----+
| Product|Price|
+--------+-----+
|  Bottle|   20|
| Colgate|  118|
|Umbrella|  125|
+--------+-----+

示例3

在下面的示例中,我们将展示如何按降序对数据框进行排序。这里使用内置函数createDataframe手动设置2D结构的数据框。然后通过使用名为sort()的方法来存储值的变量sorted_df进行初始化(sort()函数接受内置函数desc(“Age”)作为参数,该函数将按照降序的顺序排序)。最后,我们使用sorted_df.show()打印结果。

from pyspark.sql 
import SparkSession
from pyspark.sql.functions 
import desc
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame
Emp_data = [("Abhinav", 25, "Male"), ("Meera", 32, "Female"), ("Riya", 18, "Female"), ("Deepak", 33, "Male"), ("Elon", 50, "Male")]
df = spark.createDataFrame(Emp_data, ["Name", "Age", "Gender"])

# Sort DataFrame by Age in descending order
sorted_df = df.sort(desc("Age"))

# Show the sorted DataFrame
sorted_df.show()

输出

+-------+---+------+
|   Name|Age|Gender|
+-------+---+------+
|   Elon| 50|  Male|
| Deepak| 33|  Male|
|  Meera| 32|Female|
|Abhinav| 25|  Male|
|   Riya| 18|Female|
+-------+---+------+

示例 4

在下面的示例中,我们将展示如何按值对RDD进行排序。

首先,我们从一个元组列表创建一个RDD。然后,在RDD上使用sortBy()函数,提供一个lambda函数,该函数提取要按照每个元组的第二个元素排序的值。接下来,收集并迭代排序后的RDD,以访问排序后的记录。

# Sorting RDD by Value
from pyspark.sql 
import SparkSession

# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of RDD
data = [("X", 25), ("Y", 32), ("Z", 18)]
rdd = spark.sparkContext.parallelize(data)

# Sort RDD by value in ascending order
sorted_rdd = rdd.sortBy(lambda x: x[1])

# Print the sorted RDD
for record in sorted_rdd.collect():
   print(record)

输出

('Z', 18)
('X', 25)
('Y', 32)

示例5

下面的示例中,我们将演示如何对包含Null值的Dataframe进行排序。我们使用sort()函数对Dataframe按照”Price”列进行升序排序,其中Null值为none。然后,将asc_nulls_last(“Price”)作为参数传递给sort()函数,并按升序到降序的顺序指定排序顺序。然后,将排序后的Dataframe分配给变量sorted_df,并使用相同的变量和show()方法获取结果。

# Sorting DataFrame with Null Values
from pyspark.sql 
import SparkSession
from pyspark.sql.functions 
import asc_nulls_last

# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame with null values
data = [("Charger", None), ("Mouse", 320), ("PEN", 18), ("Bag", 1000), ("Notebook", None)] # None = null
df = spark.createDataFrame(data, ["Product", "Price"])

# Sorting of DataFrame column(Price) in ascending order with null values last
sorted_df = df.sort(asc_nulls_last("Price"))

# Show the sorted DataFrame
sorted_df.show()

输出

+--------+-----+
| Product|Price|
+--------+-----+
|     PEN|   18|
|   Mouse|  320|
|     Bag| 1000|
| Charger| null|
|Notebook| null|
+--------+-----+

结论

我们讨论了在PySpark中进行值排序的不同方法。我们使用了一些内置函数,如orderBy(),sort(),asc_nulls_last()等。排序的目的是为了得到升序或降序的顺序。与PySpark相关的各种应用如实时库、大规模数据处理和构建API等。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程