如何将 Pandas 转换为 PySpark DataFrame?

如何将 Pandas 转换为 PySpark DataFrame?

Pandas 是一个用于数据分析和数据处理的 Python 库。而 PySpark 是 Apache SparkPython 接口,用于大数据处理和分布式数据处理。在数据处理时,Pandas 和 PySpark 都各有优劣。但是在某些情况下,可能需要将 Pandas 数据框转换为 PySpark 数据框,以便于在 PySpark 中进行分布式处理和大规模计算。

本文将介绍如何将 Pandas 转换为 PySpark DataFrame。在介绍转换方法之前,先介绍一下 Pandas 和 PySpark 的基本概念和特性。

Pandas 数据框

Pandas 中的数据框是二维的表格结构,可以类比于 Excel 中的表格。Pandas 数据框的每一列可以是不同的数据类型,可以含有空值。Pandas 数据框具有以下特性:

  • Pandas 数据框是基于 NumPy 数组实现的,因此具有高效数值计算的能力。
  • Pandas 数据框提供了灵活而强大的数据分析和数据处理能力,例如数据清洗、数据重构、数据透视等。
  • Pandas 数据框可以直接读写多种数据格式,包括 CSV、Excel、数据库、JSON、HTML 等。

下面是一个简单的 Pandas 数据框示例:

import pandas as pd

data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
        'age': [25, 30, 35, 40],
        'gender': ['F', 'M', 'M', 'M']}
df = pd.DataFrame(data)
print(df)

输出结果:

      name  age gender
0    Alice   25      F
1      Bob   30      M
2  Charlie   35      M
3    David   40      M

PySpark 数据框

PySpark 中的数据框是一种基于 RDD 的分布式数据集,类似于 Pandas 中的数据框。PySpark 数据框的每一列也可以是不同的数据类型,可以含有空值。PySpark 数据框具有以下特性:

  • PySpark 数据框在分布式集群上运行,因此可处理大规模数据并兼顾计算效率,适合于大规模机器学习和数据分析。
  • PySpark 数据框可以直接读写多种数据格式,包括 CSV、Parquet、JSON 等。
  • PySpark 数据框与 RDD 可以无缝转换,具有完整的弹性分布式数据集(RDD)API 进行操作。

下面是一个简单的 PySpark 数据框示例:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25, 'F'), ('Bob', 30, 'M'), ('Charlie', 35, 'M'), ('David', 40, 'M')]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(['name', 'age', 'gender'])
df.show()

输出结果:

+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
|  David| 40|     M|
+-------+---+------+

将 Pandas 数据框转换为 PySpark 数据框

将 Pandas 数据框转换为 PySpark 数据框的方法有多种,包括使用 PySpark 的 toDF 方法、使用 Arrow、使用 Spark DataFrame 和 Pandas DataFrame 之间的转换等。

使用 PySpark 的 toDF 方法

首先介绍使用 PySpark 的 toDF 方法将 Pandas 数据框转换为 PySpark 数据框的方法。toDF 方法在 PySpark 2.0 及以上版本中可用。该方法可以将一个 Pandas 数据框转换为 PySpark 数据框,同时为每个列指定一个数据类型。

以下示例展示了如何使用 PySpark 的 toDF 方法将 Pandas 数据框转换为 PySpark 数据框:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("example").getOrCreate()

data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
        'age': [25, 30, 35, 40],
        'gender': ['F', 'M', 'M', 'M']}
df_pd = pd.DataFrame(data)

schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True)])

df_spark = spark.createDataFrame(df_pd, schema)
df_spark.show()

输出结果:

+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
|  David| 40|     M|
+-------+---+------+

在上面的示例中,首先定义了一个 PySpark 的数据框结构(即数据框列的数据类型)。然后使用 PySpark 的 createDataFrame 方法将 Pandas 数据框转换为 PySpark 数据框,并将 PySpark 数据框结构指定为创建过程中的参数。

注意,使用 toDF 方法进行转换,不能指定数据类型。

使用 Arrow

另一种方法是使用 Arrow,这是一种跨语言的统一列式存储格式。Pandas 可以将其 DataFrame 转换为 Arrow 数据格式,PySpark 可以将 Arrow 数据转换为 PySpark 数据框。

以下示例展示了如何使用 Arrow 将 Pandas 数据框转换为 PySpark 数据框:

from pyspark.sql import SparkSession
import pyarrow as pa
import pyarrow.parquet as pq

spark = SparkSession.builder.appName("example").getOrCreate()

data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
        'age': [25, 30, 35, 40],
        'gender': ['F', 'M', 'M', 'M']}
df_pd = pd.DataFrame(data)

table = pa.Table.from_pandas(df_pd)
df_spark = spark.createDataFrame(table.to_pandas())
df_spark.show()

输出结果:

+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
|  David| 40|     M|
+-------+---+------+

在上面的示例中,首先使用 PyArrow 的 from_pandas 方法将 Pandas 数据框转换为 Arrow 表格数据格式。然后再将其转换为 Pandas 数据框,并使用 PySpark 的 createDataFrame 方法将其转换为 PySpark 数据框。

注意,在此例中,Pandas 数据框转换为 Arrow 格式,可以更好的保留数据类型信息。

使用 Spark DataFrame 和 Pandas DataFrame 之间的转换

最后一种方法是使用 Spark DataFrame 和 Pandas DataFrame 之间的转换。可以首先使用 PySpark 的 createDataFrame 方法创建一个空的 PySpark 数据框,然后将 Pandas 数据框转换为 Spark 数据框,最后使用 PySpark 的 union 方法将两个数据框连接到一起。

以下是一个示例:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

data_pd = {'name': ['Alice', 'Bob', 'Charlie', 'David'],
           'age': [25, 30, 35, 40],
           'gender': ['F', 'M', 'M', 'M']}
df_pd = pd.DataFrame(data_pd)

df_spark = spark.createDataFrame([], schema = df_pd.schema)

df_spark = df_spark.union(spark.createDataFrame(df_pd))
df_spark.show()

输出结果:

+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 30|     M|
|Charlie| 35|     M|
|  David| 40|     M|
+-------+---+------+

在上面的示例中,首先使用 PySpark 的 createDataFrame 方法创建一个空的 PySpark 数据框,然后使用 Pandas 数据框的结构创建一个相同结构的 PySpark 数据框。最后使用 PySpark 的 union 方法将两个数据框连接到一起,得到一个完整的 PySpark 数据框。

注意,在此方法中,需要创建一个空的 PySpark 数据框以保持数据框结构一致。

结论

本文介绍了三种将 Pandas 数据框转换为 PySpark 数据框的方法:使用 PySpark 的 toDF 方法、使用 Arrow、使用 Spark DataFrame 和 Pandas DataFrame 之间的转换。不同方法适用于不同的情况和需求。

如果需要灵活地指定每个列的数据类型,可以使用 PySpark 的 toDF 方法;如果需要更好地保留数据类型信息,可以使用 Arrow;如果只是简单的转换,可以使用 Spark DataFrame 和 Pandas DataFrame 之间的转换。

转换后的 PySpark 数据框可以用于 PySpark 的分布式计算和分布式机器学习等场景,适合大规模数据处理和数据分析。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程