PySpark 如何从dataframe中选择一系列行
PySpark中的dataframe由共享的数据集合定义,可以在计算机机器上运行并将数据以行和列的形式进行结构化。行的范围定义了数据集中的一条水平线(根据条件的一组多个值)。通常,这个范围设置了最低和最高值。在Python中,我们可以使用一些内置函数如filter()、where()和collect()来从PySpark的dataframe中选择一系列行。
语法
示例中使用了以下语法-
createDataFrame()
这是Python中的一个内置方法,它接受一个schema参数来定义数据帧的模式。
filter()
filter() 是Python中的内置函数,根据给定的条件定义并允许迭代特定的行或列。
where()
这是Python中的一个内置函数,用于根据行或列设置特定条件,并以特定元素返回。
collect()
Pyspark collect用于访问给定数据集中的所有元素,它在循环中使用。
show()
show() 方法是Python中的内置方法,用于在程序结束时获取结果。
安装要求 –
pip install pyspark
使用CreateDataframe()
createDataframe遵循PySpark模块,接受两个参数- data_name(设置每行中每列的值)和data_Columns(通过定义所有列来设置值)。
示例
在下面的示例中,我们将通过构建SparkSession.builder开始程序,并使用appName()设置数据库的名称,并使用getOrCreate()函数创建会话并将其存储在变量spark中。然后将customer数据库的数据值设置为customer_Data变量的值。接下来,在data_Columns变量中设置所有值。现在使用createDataFrame()方法创建Dataframe,接受两个参数customer_Data和data_Columns,将格式化为表格形式,并连接到spark并将其存储在customer_DF变量中。表格形式以行和列的形式显示值。最后,我们使用名为show()的方法与customer_DF一起获取每列的整体数据。
# Creation of SparkSession
spark = SparkSession.builder \
.appName('CUSTOMER') \
.getOrCreate()
# customer DataFrame
customer_Data = [("PREM KUMAR", 1281, "AC", 40000,4000),
("RATAN SINGH", 1289, "HOME THEATER", 35000, 5000),
("DAVID K", 1221, "NIKON CAMERA", 88000, 10000),
("JONATHAN REDDY", 1743, "GEYSER", 15000, 500),
("JASPREET BRAR", 1234, "HP LAPTOP", 78000, 3564),
("NEIL KAMANT", 1222, "WASHING MACHINE", 25000, 2000)
]
data_Columns = ["CUSTOMER NAME","PRODUCT ID","PRODUCT NAME",
"Actual Price","EMI PER MONTH"]
customer_DF = spark.createDataFrame(customer_Data, data_Columns)
customer_DF.show()
输出
+--------------+----------+---------------+------------+-------------+
| CUSTOMER NAME|PRODUCT ID| PRODUCT NAME|Actual Price|EMI PER MONTH|
+--------------+----------+---------------+------------+-------------+
| PREM KUMAR| 1281| AC| 40000| 4000|
| RATAN SINGH| 1289| HOME THEATER| 35000| 5000|
| DAVID K| 1221| NIKON CAMERA| 88000| 10000|
|JONATHAN REDDY| 1743| GEYSER| 15000| 500|
| JASPREET BRAR| 1234| HP LAPTOP| 78000| 3564|
| NEIL KAMANT| 1222|WASHING MACHINE| 25000| 2000|
+--------------+----------+---------------+------------+-------------+
使用过滤方法
此方法遵循上述程序,并且使用此方法允许我们设置数据帧的行范围条件。
示例
在以下示例中,我们遵循先前的示例代码,在此处允许我们使用数据帧i.e. DF来设置两个条件。即,参数值为“实际价格”,它设置了从25000到40000之间的价格范围的行条件,它将从行中找到指定的范围。最后,我们使用show()方法来获取结果。
DF.filter((DF['Actual Price'] >= 25000) & (DF['Actual Price'] <= 40000)).show()
输出
+-------------+----------+---------------+------------+-------------+
|CUSTOMER NAME|PRODUCT ID| PRODUCT NAME|Actual Price|EMI PER MONTH|
+-------------+----------+---------------+------------+-------------+
| PREM KUMAR| 1281| AC| 40000| 4000|
| RATAN SINGH| 1289| HOME THEATER| 35000| 5000|
| NEIL KAMANT| 1222|WASHING MACHINE| 25000| 2000|
+-------------+----------+---------------+------------+-------------+
使用 where() 方法
该方法遵循本文的第一个示例,其中使用 where() 方法来设置 PySpark 中数据帧的行范围。
示例
在下面的示例中,我们将使用内置的 where() 方法来接受两个条件,并使用 and(&) 运算符来获取行的范围。接下来,where() 方法与 show() 方法一起使用以获取结果。
DF.where((DF['EMI PER MONTH'] >= 10000) & (DF['EMI PER MONTH'] <= 38000)).show()
输出
+-------------+----------+------------+------------+-------------+
|CUSTOMER NAME|PRODUCT ID|PRODUCT NAME|Actual Price|EMI PER MONTH|
+-------------+----------+------------+------------+-------------+
| DAVID K| 1221|NIKON CAMERA| 88000| 10000|
+-------------+----------+------------+------------+-------------+
使用Collect()方法
该方法遵循第一个示例,允许迭代到特定列,以从PySpark的数据帧中获取行的范围。
示例
在下面的示例中,我们将使用for循环,其中变量row通过collect()方法迭代数据帧,并迭代给定数据帧的所有元素。现在它使用if语句设置条件,如果实际价格在30000到50000之间,则返回特定的行范围。接下来,它使用print()方法接受行作为参数来返回结果。
for row in DF.collect():
if 30000 <= row['Actual Price'] <= 50000:
print(row)
输出
Row(CUSTOMER NAME='PREM KUMAR', PRODUCT ID=1281, PRODUCT NAME='AC', Actual Price=40000, EMI PER MONTH=4000)
Row(CUSTOMER NAME='RATAN SINGH', PRODUCT ID=1289, PRODUCT NAME='HOME THEATER', Actual Price=35000, EMI PER MONTH=5000)
结论
我们讨论了从PySpark的数据帧中获取行范围的各种方法。所有的方法都遵循第一个示例,因为该示例设置了行和列数据的完整输入结构。PySpark已被亚马逊、沃尔玛、Trivago等顶级跨国公司使用。