PySpark 数据框架中的全外连接
全外连接是将左外连接和右外连接的结果合并的一种操作。在PySpark中,它用于基于特定条件连接两个数据框,其中输出中包含来自两个数据框的所有记录,无论是否存在匹配项。本文将详细解释如何在PySpark中执行全外连接,并提供一个实际示例来说明其实现。
安装和设置
在我们可以在PySpark中执行全外连接之前,我们需要设置一个工作环境。首先,我们需要通过运行以下命令在终端中安装PySpark: “pip install pyspark” 其次,我们需要通过运行以下命令导入所需的模块−
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
语法
在PySpark中执行全外连接的语法如下:
df_full = df1.join(df2, (df1.column_name == df2.column_name), 'full')
步骤
- 首先,我们导入必要的模块,即 SparkSession 和 col.
-
我们使用 builder() 方法创建一个SparkSession对象,并指定应用程序名称和主节点URL。
-
我们使用 read.csv() 方法从CSV文件中读取数据,并将其转换为数据框。在这个示例中,我们将使用虚拟数据框。
-
我们使用join()方法执行全外连接操作,并将条件作为参数传递。
-
我们使用 show() 方法显示结果数据框。
示例
假设我们有两个数据框, “sales_df” 和 “customer_df” 。 “sales_df”包含公司的销售信息,而”customer_df”包含购买者的客户信息。我们希望在”customer_id”列上连接这两个数据框,并获取两个数据框的所有记录。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a SparkSession object
spark = SparkSession.builder.appName("Full Outer Join").getOrCreate()
# Create sample dataframes
data_sales = [("S1", "Product1", 100),
("S2", "Product2", 200),
("S3", "Product3", 300),
("S4", "Product4", 400),
("S5", "Product5", 500),
("S6", "Product6", 600),
("S7", "Product7", 700),
("S8", "Product8", 800),
("S9", "Product9", 900),
("S10", "Product10", 1000)]
df_sales = spark.createDataFrame(data_sales, ["sale_id", "product", "amount"])
data_customers = [("C1", "John"),
("C2", "Jane"),
("C3", "Mike"),
("C4", "Emily"),
("C5", "Bob"),
("C6", "Alice"),
("C7", "Dave"),
("C8", "Jenny"),
("C9", "Peter"),
("C10", "Sarah")]
df_customers = spark.createDataFrame(data_customers, ["customer_id", "name"])
# Perform the full outer join operation
df_full = df_sales.join(df_customers, (df_sales.sale_id == df_customers.customer_id), 'full')
# Display the resultant dataframe
df_full.show()
输出
sale_id product amount customer_id name
S1 Product1 100 C1 John
S2 Product2 200 C2 Jane
S3 Product3 300 C3 Mike
S4 Product4 400 C4 Emily
S5 Product5 500 C5 Bob
S6 Product6 600 C6 Alice
S7 Product7 700 C7 Dave
S8 Product8 800 C8 Jenny
S9 Product9 900 C9 Peter
S10 Product10 1000 C10 Sarah
使用每个数据集中的10组示例数据,此代码创建了两个数据帧df sales和df customers。df sales数据帧中有三个变量:销售编号、对象和金额。df customers数据帧中有两个变量:顾客编号和姓名。然后使用full join类型的join()方法在两个数据帧之间执行完整的外连接过程。在df customers的顾客编号字段和df sales的销售编号列必须相符才能进行连接。
然后,脚本使用show()方法来展示最终的数据帧。以这种方式在组合数据帧df full中显示来自两个数据帧的列。如果一个数据帧中的条目在另一个数据帧中没有相应的记录,则缺失的值将被替换为null。
应用
在使用可能包含缺失数据或null值的大型数据库时,完整的外连接是一种有用的操作。它可应用于许多情况,包括数据清洗、合并来自各种来源的数据以及评估各个区域的数据。
结论
可以使用称为完整外连接的强大操作基于预定条件组合两个数据帧的数据。通过将条件作为参数提供给PySpark中的join()函数,我们可以执行完整的外连接。您只需按照本文提供的说明,并在数据分析和处理任务中充分利用其优势,就可以简单地在PySpark中进行完整的外连接。