PySpark 如何在dataframe中按行分割成两个dataframe

PySpark 如何在dataframe中按行分割成两个dataframe

PySpark dataframe被定义为分布式数据的集合,可以在不同的机器上使用,并将结构化数据生成到一个命名列中。术语“slice”通常用于表示数据的分割。在Python中,我们有一些内置函数,如limit(),collect(),exceptAll()等,可以用来将PySpark dataframe按行切割为两个dataframe。

语法

以下示例中使用了以下语法:

limit()

这是Python中的一个内置方法,可以通过指定整数值来设置行范围。

subtract()

subtract()方法以一个新的数据帧的形式返回行的结果,这个数据不包含在另一个数据帧中。

collect()

Pyspark collect用于从给定的数据集中检索所有元素,可以通过循环和变量来使用。

createDataFrame()

这是Python中的一个内置方法,它使用schema参数来定义数据帧的模式。

[: before_slicing] [after_slicing :]

上面的表示法被称为列表切片,它将被用于按行将数据框分割成两部分。

head()

通常,在Python中,head()方法表示从数据表中提取5行数据,但在这里它接受一些参数,并根据给定的条件返回结果。

exceptAll()

这是Python中的一个内置函数,遵循PySpark模块,它返回一个新的数据框,其中包括DataFrame中的行,但不包括另一个DataFrame中的行,同时保留重复项。

count()

这是Python中的一个内置函数,用于返回特定数量的行作为结果。

drop()

drop方法可以删除特定的行或列。

Window.orderBy()

PySpark窗口函数是通过计算结果,如行号或排名来定义的。orderBy()是分区数据的唯一方法。

安装要求

pip install pyspark

这个必要的命令用于安装帮助运行PySpark程序。

使用Limit()和Subtract()方法

limit()和subtract方法用于将单个数据转换为两个按行排列的数据帧。limit()用于设置具体的行数,将整数值赋值给它,而subtract方法可用于包含在另一个数据帧中不存在的唯一行。

示例

在下面的示例中,我们首先导入pyspark和SparkSession模块,它将创建数据帧的会话。然后将数据的值设置为变量 rows 的行数据。接下来,将数据的列值设置为变量 cols 。现在使用名为 createDataFrame() 的方法与SparkSession模块一起使用,将行和列定义为数据帧的两个不同的模式,并将其存储在变量 df_first 中。然后初始化变量 df_second ,将其值设置为名为 subtract() 的内置函数,它接受变量 df_first 作为参数,它将返回新的数据帧。最后,我们使用 show() 方法对变量df_first和df_second进行操作,以获得结果。

# Import the PySpark module
import pyspark
from pyspark.sql 
import SparkSession
# Create the session
Spark_Session = SparkSession.builder.appName(
   'EMPLOYEE DATA'
).getOrCreate()
# rows of Dataframe
rows = [['1', 'RAHUL', 'INDIA','1243'],
   ['2','PETER', 'SRI LANKA','5461'],
   [ '3',' JOHN', 'SOUTH KOREA','2224'],
   [ '4', 'MARK', 'NEWYORK','9985'],
   [ '5', 'SUNNY', 'BANGLADESH','8912']
]
# Columns of DataFrame
cols = ['S.N', 'EMPLOYEE NAME', 'COUNTRY', 'EMP_ID']
# DataFrame creation for rows and columns
df = Spark_Session.createDataFrame(rows, cols)
# Getting the first two slicing of rows
df_first = df.limit(2)
# Getting the second slicing by removing the variable df1
df_second = df.subtract(df_first)
# first slice with 2 rows with columns names
df_first.show()
# Second slice with 3 rows with columns names
df_second.show()

输出

+---+-------------+---------+------+
|S.N|EMPLOYEE NAME|  COUNTRY|EMP_ID|
+---+-------------+---------+------+
|  1|        RAHUL|    INDIA|  1243|
|  2|        PETER|SRI LANKA|  5461|
+---+-------------+---------+------+

+---+-------------+-----------+------+
|S.N|EMPLOYEE NAME|    COUNTRY|EMP_ID|
+---+-------------+-----------+------+
|  3|         JOHN|SOUTH KOREA|  2224|
|  5|        SUNNY| BANGLADESH|  8912|
|  4|         MARK|    NEWYORK|  9985|
+---+-------------+-----------+------+

使用Collect()和CreateDataFrame()方法

collect方法用于检索给定数据中的所有元素,而createDataFrame()方法将数据帧的两个模式分开。

注意,模式是由表的结构定义的。

示例

在以下示例中,首先使用SparkSession创建会话。然后初始化变量data,将行数据设置为列表格式。然后使用方法createDataFrame()与spark一起接受参数- data(给定行)和[“Name”, “Age”](设置列的名称)来创建数据帧。为了获取行的列表,它将使用collect()方法作为变量df的对象引用,并将其存储在变量rows中。接下来,我们使用两个列表切片,即rows1和rows2。继续使用内置方法createDataFrame(),它接受两个参数- name_of_rows(rows1和rows2)和df.schema(设置模式的数据帧),并分别将其存储在变量df1和df2中。最后,使用show函数和这两个变量df1和df2来获取结果。

from pyspark.sql 
import SparkSession
# Create the Spark session
spark = SparkSession.builder.appName("EMPLOYEE DATA").getOrCreate()
# Create the sample dataframe
data = [("Vivek", 31), ("Aman", 20), ("Sohan", 13), ("David", 24)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Getting the list of row objects using the collect function
rows = df.collect()
# Getting two rows of the list by using slicing
rows1 = rows[:2]
rows2 = rows[2:]
# Convert the lists of Rows to PySpark DataFrames
df1 = spark.createDataFrame(rows1, df.schema)
df2 = spark.createDataFrame(rows2, df.schema)
# result
df1.show()
df2.show()

输出

+-----+---+

| Name|Age|
+-----+---+
|Vivek| 31|
| Aman| 20|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|Sohan| 13|
|David| 24|
+-----+---+

使用Count()、Filter()和Drop()方法

在这个程序中,将数据框分成两个行数据框需要使用Count()和Filter()方法,它们可以将特定唯一行分开。Count()方法返回总行数,而Filter()方法用于将数据框的两个不同行分开。然后,Drop()方法会删除表示数据框分区的行。

示例

在下面的示例中,首先建立Spark会话,然后将数据行存储在名为data的变量中。接下来,使用createDataFrame()与Spark一起设置列名,createDataFrame()接受两个参数- data(设置行)和list(设置列名),并将结果存储在变量df中。然后使用df.count()将总行数存储在变量total_rows中。接下来,定义第一个数据框的行数并将其存储在变量n_rows_first_df中。然后使用内置方法row_number()、over()和Window.orderBy()向数据框添加行号列。现在使用内置方法filter()将数据框分成两个不同的行数据框,并将结果分别存储在变量中。最后,使用两个不同的show()方法和两个不同的变量获取结果,以获得两个行数据框的形式。

from pyspark.sql 
import SparkSession, Window
from pyspark.sql.functions import row_number
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create the original DataFrame
data = [("Rabina", 35), ("Stephen", 31), ("Raman", 33), ("Salman", 44),("Meera",37)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Get the total number of rows
total_rows = df.count()
# Define the number of rows for the first DataFrame
n_rows_first_df = 2
# Add a row number column to the DataFrame
df_with_row_number = df.withColumn("row_number", row_number().over(Window.orderBy("Name")))
# Slice the DataFrame into two using filter()
first_df = df_with_row_number.filter(df_with_row_number.row_number <= n_rows_first_df).drop("row_number")
second_df = df_with_row_number.filter(df_with_row_number.row_number > n_rows_first_df).drop("row_number")
# Show the resulting DataFrames
first_df.show()
second_df.show()

输出

+------+---+
|  Name|Age|
+------+---+
| Meera| 37|
|Rabina| 35|
+------+---+

+-------+---+
|   Name|Age|
+-------+---+
|  Raman| 33|
| Salman| 44|
|Stephen| 31|
+-------+---+

使用Head()和ExceptAll()方法

将数据框分为两个以行为基础的数据框,它使用了两个方法head()和exceptAll()来将其中不重复的数据行分开。

示例

在以下示例中,它使用内置的count方法来获取总行数。然后,将第一个数据框的行数赋值给变量n_rows_first_df。为了创建两个数据框,它将使用三个不同的内置函数,即head()、createDataFrame()和exceptAll(),并将其存储在各自的变量中。最后,它将使用两个show()函数来获取这两个以行为基础的数据框。

from pyspark.sql 
import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create the original DataFrame
data = [("karisma", 25), ("Bobby", 30), ("Champak", 35), ("Mark", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Get the total number of rows
total_rows = df.count()

# Define the number of rows for the first DataFrame
n_rows_first_df = 2

# Slice the DataFrame into two using head() and exceptAll()
first_rows = df.head(n_rows_first_df)
first_df = spark.createDataFrame(first_rows, df.schema)
second_df = df.exceptAll(first_df)

# Show the resulting DataFrames
first_df.show()
second_df.show()

输出

+-------+---+
|   Name|Age|
+-------+---+
|karisma| 25|
|  Bobby| 30|
+-------+---+

+-------+---+
|   Name|Age|
+-------+---+
|Champak| 35|
|   Mark| 40|
+-------+---+

结论

我们讨论了四种将PySpark dataframe切分为两个按行划分的dataframe的独特方法。所有这些方法都有独特的方式来表示dataframe的分区。PySpark dataframe是高级交互式数据,可以被数据工程师和数据科学家使用。Python API用于spark和ML是可视化PySpark dataframe的常见示例。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程