如何将 Pandas 转换为 PySpark DataFrame?
Pandas 是一个用于数据分析和数据处理的 Python 库。而 PySpark 是 Apache Spark 的 Python 接口,用于大数据处理和分布式数据处理。在数据处理时,Pandas 和 PySpark 都各有优劣。但是在某些情况下,可能需要将 Pandas 数据框转换为 PySpark 数据框,以便于在 PySpark 中进行分布式处理和大规模计算。
本文将介绍如何将 Pandas 转换为 PySpark DataFrame。在介绍转换方法之前,先介绍一下 Pandas 和 PySpark 的基本概念和特性。
Pandas 数据框
Pandas 中的数据框是二维的表格结构,可以类比于 Excel 中的表格。Pandas 数据框的每一列可以是不同的数据类型,可以含有空值。Pandas 数据框具有以下特性:
- Pandas 数据框是基于 NumPy 数组实现的,因此具有高效数值计算的能力。
- Pandas 数据框提供了灵活而强大的数据分析和数据处理能力,例如数据清洗、数据重构、数据透视等。
- Pandas 数据框可以直接读写多种数据格式,包括 CSV、Excel、数据库、JSON、HTML 等。
下面是一个简单的 Pandas 数据框示例:
import pandas as pd
data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
'age': [25, 30, 35, 40],
'gender': ['F', 'M', 'M', 'M']}
df = pd.DataFrame(data)
print(df)
输出结果:
name age gender
0 Alice 25 F
1 Bob 30 M
2 Charlie 35 M
3 David 40 M
PySpark 数据框
PySpark 中的数据框是一种基于 RDD 的分布式数据集,类似于 Pandas 中的数据框。PySpark 数据框的每一列也可以是不同的数据类型,可以含有空值。PySpark 数据框具有以下特性:
- PySpark 数据框在分布式集群上运行,因此可处理大规模数据并兼顾计算效率,适合于大规模机器学习和数据分析。
- PySpark 数据框可以直接读写多种数据格式,包括 CSV、Parquet、JSON 等。
- PySpark 数据框与 RDD 可以无缝转换,具有完整的弹性分布式数据集(RDD)API 进行操作。
下面是一个简单的 PySpark 数据框示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
data = [('Alice', 25, 'F'), ('Bob', 30, 'M'), ('Charlie', 35, 'M'), ('David', 40, 'M')]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(['name', 'age', 'gender'])
df.show()
输出结果:
+-------+---+------+
| name|age|gender|
+-------+---+------+
| Alice| 25| F|
| Bob| 30| M|
|Charlie| 35| M|
| David| 40| M|
+-------+---+------+
将 Pandas 数据框转换为 PySpark 数据框
将 Pandas 数据框转换为 PySpark 数据框的方法有多种,包括使用 PySpark 的 toDF 方法、使用 Arrow、使用 Spark DataFrame 和 Pandas DataFrame 之间的转换等。
使用 PySpark 的 toDF 方法
首先介绍使用 PySpark 的 toDF 方法将 Pandas 数据框转换为 PySpark 数据框的方法。toDF 方法在 PySpark 2.0 及以上版本中可用。该方法可以将一个 Pandas 数据框转换为 PySpark 数据框,同时为每个列指定一个数据类型。
以下示例展示了如何使用 PySpark 的 toDF 方法将 Pandas 数据框转换为 PySpark 数据框:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("example").getOrCreate()
data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
'age': [25, 30, 35, 40],
'gender': ['F', 'M', 'M', 'M']}
df_pd = pd.DataFrame(data)
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True),
StructField('gender', StringType(), True)])
df_spark = spark.createDataFrame(df_pd, schema)
df_spark.show()
输出结果:
+-------+---+------+
| name|age|gender|
+-------+---+------+
| Alice| 25| F|
| Bob| 30| M|
|Charlie| 35| M|
| David| 40| M|
+-------+---+------+
在上面的示例中,首先定义了一个 PySpark 的数据框结构(即数据框列的数据类型)。然后使用 PySpark 的 createDataFrame 方法将 Pandas 数据框转换为 PySpark 数据框,并将 PySpark 数据框结构指定为创建过程中的参数。
注意,使用 toDF 方法进行转换,不能指定数据类型。
使用 Arrow
另一种方法是使用 Arrow,这是一种跨语言的统一列式存储格式。Pandas 可以将其 DataFrame 转换为 Arrow 数据格式,PySpark 可以将 Arrow 数据转换为 PySpark 数据框。
以下示例展示了如何使用 Arrow 将 Pandas 数据框转换为 PySpark 数据框:
from pyspark.sql import SparkSession
import pyarrow as pa
import pyarrow.parquet as pq
spark = SparkSession.builder.appName("example").getOrCreate()
data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
'age': [25, 30, 35, 40],
'gender': ['F', 'M', 'M', 'M']}
df_pd = pd.DataFrame(data)
table = pa.Table.from_pandas(df_pd)
df_spark = spark.createDataFrame(table.to_pandas())
df_spark.show()
输出结果:
+-------+---+------+
| name|age|gender|
+-------+---+------+
| Alice| 25| F|
| Bob| 30| M|
|Charlie| 35| M|
| David| 40| M|
+-------+---+------+
在上面的示例中,首先使用 PyArrow 的 from_pandas 方法将 Pandas 数据框转换为 Arrow 表格数据格式。然后再将其转换为 Pandas 数据框,并使用 PySpark 的 createDataFrame 方法将其转换为 PySpark 数据框。
注意,在此例中,Pandas 数据框转换为 Arrow 格式,可以更好的保留数据类型信息。
使用 Spark DataFrame 和 Pandas DataFrame 之间的转换
最后一种方法是使用 Spark DataFrame 和 Pandas DataFrame 之间的转换。可以首先使用 PySpark 的 createDataFrame 方法创建一个空的 PySpark 数据框,然后将 Pandas 数据框转换为 Spark 数据框,最后使用 PySpark 的 union 方法将两个数据框连接到一起。
以下是一个示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
data_pd = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
'age': [25, 30, 35, 40],
'gender': ['F', 'M', 'M', 'M']}
df_pd = pd.DataFrame(data_pd)
df_spark = spark.createDataFrame([], schema = df_pd.schema)
df_spark = df_spark.union(spark.createDataFrame(df_pd))
df_spark.show()
输出结果:
+-------+---+------+
| name|age|gender|
+-------+---+------+
| Alice| 25| F|
| Bob| 30| M|
|Charlie| 35| M|
| David| 40| M|
+-------+---+------+
在上面的示例中,首先使用 PySpark 的 createDataFrame 方法创建一个空的 PySpark 数据框,然后使用 Pandas 数据框的结构创建一个相同结构的 PySpark 数据框。最后使用 PySpark 的 union 方法将两个数据框连接到一起,得到一个完整的 PySpark 数据框。
注意,在此方法中,需要创建一个空的 PySpark 数据框以保持数据框结构一致。
结论
本文介绍了三种将 Pandas 数据框转换为 PySpark 数据框的方法:使用 PySpark 的 toDF 方法、使用 Arrow、使用 Spark DataFrame 和 Pandas DataFrame 之间的转换。不同方法适用于不同的情况和需求。
如果需要灵活地指定每个列的数据类型,可以使用 PySpark 的 toDF 方法;如果需要更好地保留数据类型信息,可以使用 Arrow;如果只是简单的转换,可以使用 Spark DataFrame 和 Pandas DataFrame 之间的转换。
转换后的 PySpark 数据框可以用于 PySpark 的分布式计算和分布式机器学习等场景,适合大规模数据处理和数据分析。