PySpark 从dataframe获取特定行

PySpark 从dataframe获取特定行

PySpark是一个强大的数据处理和分析工具。在使用PySpark DataFrame处理数据时,有时候需要从dataframe中获取特定的行。它可以帮助用户以分布式和并行的方式轻松地操作和访问数据,非常适用于大数据应用。在本文中,我们将探讨如何使用PySpark中的各种方法从PySpark dataframe中获取特定行。我们将使用PySpark的DataFrame API以函数式编程风格来完成这些方法。

在继续之前,让我们创建一个示例dataframe,我们将从中获取行。

from colorama import Fore
from pyspark.sql import SparkSession

# Building a SparkSession named "column_sum"
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Creating the Spark DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3),
   ('Row2', 4, 5, 6),
   ('Row3', 7, 8, 9)],
   ['__', 'Col1', 'Col2', 'Col3'])

# Printing the schema of the DataFrame
df.printSchema()

# Showing the DataFrame
df.show()

输出

这个Python脚本首先会打印出我们创建的数据帧的架构,然后是数据帧本身。

root
|-- __: string (nullable = true)
|-- Col1: long (nullable = true)
|-- Col2: long (nullable = true)
|-- Col3: long (nullable = true)

+----+----+----+----+
|  __|Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
|Row2|   4|   5|   6|
|Row3|   7|   8|   9|
+----+----+----+----+

下面列出了完成任务所使用的方法:

方法

  • 使用collect()

  • 使用first()

  • 使用show()

  • 使用head()

  • 使用tail()

  • 使用select()和collect()

  • 使用filter()和collect()

  • 使用where()和collect()

  • 使用take()

现在让我们讨论每种方法以及它们如何用于添加列。

方法1:使用collect()

在PySpark中,collect()方法可用于从PySpark DataFrame中检索所有数据并将其作为列表返回。当您希望查看或操作数据框中的数据时,通常会使用此函数。下面是使用的语法:

dataframe.collect()[index]

这里:

  • dataframe是我们应用该方法的数据框

  • Index是我们要获取的行。

将数据框以列表形式获取后,我们可以将索引传递给表示我们想要获取的行的列表。

步骤

  • 首先,使用上述代码创建一个数据框。

  • 使用collect()函数从数据框中检索所需的行,将每行存储在单独的变量中。

  • 将包含所需行的变量的值打印到控制台。

示例

# Retrieving the first row of the DataFrame using collect() function
Row1 = df.collect()[0]
print(Row1)

# Retrieving the last row of the DataFrame using collect() function
Row2 = df.collect()[-1]
print(Row2)

# Retrieving the second row of the DataFrame using collect() function
Row3 = df.collect()[1]
print(Row3)

输出

Row(__='Row1', Col1=1, Col2=2, Col3=3)
Row(__='Row3', Col1=7, Col2=8, Col3=9)
Row(__='Row2', Col1=4, Col2=5, Col3=6)

方法2:使用first()

在PySpark中,first()函数返回一个数据帧或RDD的第一个元素。我们可以使用这个函数从数据帧中提取特定的行。当您想要查看数据帧中的数据时,通常会使用这个函数。以下是一般语法:

dataframe.first()

在这里:

  • dataframe是要应用该方法的数据帧

步骤

  • 导入必要的库

  • 创建一个SparkSession

  • 创建一个数据帧

  • 使用first()函数检索数据帧的第一行

  • 将第一行打印到控制台

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Retrieve the first row
Row1 = df.first()
print(Row1)

输出

Row(Row1, 1, 2, 3)

方法3:使用show()

在PySpark中,show()函数用于显示Python数据帧中的前n行。此函数的返回值是由前n行组成的小型数据帧。以下是使用的语法:

dataframe.show(n)

在这里:

  • dataframe是我们应用该方法的数据帧
  • n是行数

步骤

  • 导入必要的库
  • 创建一个SparkSession
  • 创建一个数据帧
  • 使用show()函数通过将行参数设为1来检索数据帧的第一行
  • 将第一行打印到控制台
  • 使用show()函数通过将行参数设为2来检索数据帧的前两行
  • 将前两行打印到控制台
  • 使用show()函数通过将行参数设为3来检索数据帧的前三行
  • 将前三行打印到控制台

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Retrieve the first row
df1= df.show(1)
print(df1)

# Retrieve the first two rows
df2= df.show(2)
print(df2)

# Retrieve the first three rows
df3= df.show(3)
print(df3)

输出

+----+----+----+----+
|__  |Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
+----+----+----+----+

+----+----+----+----+
|__  |Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
|Row2|   4|   5|   6|
+----+----+----+----+

+----+----+----+----+
|__  |Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
|Row2|   4|   5|   6|
|Row3|   7|   8|   9|
+----+----+----+----+

方法4:使用head()

在PySpark中,head()函数用于显示Python数据帧中的前n行。该函数的返回值是由前n行组成的小型数据帧。以下是所使用的语法:

dataframe.head(n)

这里

  • dataframe是我们应用该方法的数据帧

  • n是行数

步骤

  • 导入所需的库

  • 创建一个SparkSession

  • 创建一个数据帧

  • 使用head()函数并将row参数设置为1来检索数据帧的第一行

  • 将第一行打印到控制台

  • 使用head()函数并将row参数设置为2来检索数据帧的前两行

  • 将前两行打印到控制台

  • 使用head()函数并将row参数设置为3来检索数据帧的前三行

  • 将前三行打印到控制台

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Retrieve the first row
df1= df.head(1)
print(df1)

# Retrieve the first two rows
df2= df.head(2)
print(df2)

# Retrieve the first three rows
df3= df.head(3)
print(df3)

输出

[Row(__='Row1', Col1=1, Col2=2, Col3=3)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

方法5:使用tail()

在PySpark中,tail()函数用于显示python数据帧中的最后n行。此函数的返回值是由最后n行组成的小型数据帧。以下是所使用的语法:

dataframe.tail(n)

其中:

  • dataframe是我们应用该方法的数据帧

  • n是行数

步骤

  • 导入必要的库

  • 创建一个SparkSession

  • 创建一个数据帧

  • 使用tail()函数通过将行参数设为1来获取数据帧的第一行

  • 将最后一行打印到控制台

  • 使用tail()函数通过将行参数设为2来获取数据帧的前两行

  • 将最后两行打印到控制台

  • 使用tail()函数通过将行参数设为3来获取数据帧的前三行

  • 将最后三行打印到控制台

示例

# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Retrieve the last row
df1= df.tail(1)
print(df1)

# Retrieve the last two rows
df2= df.tail(2)
print(df2)

# Retrieve the last three rows
df3= df.tail(3)
print(df3)

输出

[Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

方法6:使用select()和collect()函数

我们可以使用select()函数和collect()方法来显示Pyspark数据框中的特定行。下面是使用的语法:

dataframe.select([columns]).collect()[index]

这里:

  • dataframe是我们应用该方法的数据框。

  • columns是我们想要在输出中拥有的列列表。

  • index是我们想要在输出中拥有的行号。

步骤

  • 导入所需的库。

  • 创建一个SparkSession。

  • 创建一个数据框。

  • 使用select()函数和collect()函数的组合来从数据框中检索所需的行,将每一行存储在单独的变量中。

  • 将包含所需行的变量的值打印到控制台上。

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Retrieve the last row
df1= df.select(['Col1', 'Col2', 'Col3']).collect(0)
print(df1)

# Retrieve the last two rows
df2= df.select(['Col1', 'Col2', 'Col3']).collect(-1)
print(df2)

# Retrieve the last three rows
df3= df.select(['Col1', 'Col2', 'Col3']).collect(1)
print(df3)

输出

[Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

方法7:使用filter()和collect()方法

我们可以使用filter()函数和collect()方法来展示Pyspark数据帧中的特定行。以下是所使用的语法:

dataframe.filter(condition).collect()[index]

这里:

  • dataframe是我们应用方法的数据帧

  • condition是基于该条件进行筛选数据帧行的条件

  • index是我们想要输出的行号

步骤

  • 导入所需的库

  • 创建SparkSession

  • 创建一个数据帧

  • 使用filter()函数和collect()函数的组合来从数据帧中检索所需的行,将每一行存储在单独的变量中

  • 将包含所需行的变量的值打印到控制台

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("filter_collect_example").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Filter the DataFrame
df1 = df.filter(df.Col1 > 1).collect()[0]

# Print the collected data
print(df1)

# Filter the DataFrame
df2 = df.filter(df.Col1 > 1).collect()[1]

# Print the collected data
print(df2)

# Filter the DataFrame
df3 = df.filter(df.Col1 > 1).collect()[-1]

# Print the collected data
print(df3)

输出

Row(Col1=4, Col2=5, Col3=6)
Row(Col1=7, Col2=8, Col3=9)
Row(Col1=7, Col2=8, Col3=9)

方法8:使用where()和collect()

我们可以使用where()函数和collect()方法来显示Pyspark数据框中的特定行。使用where()方法,我们可以根据传递给方法的条件对特定行进行筛选,然后我们可以应用collect()方法将结果存储在一个变量中。以下是使用的语法:

dataframe.where(condition).collect()[index]

在这里:

  • dataframe是我们应用方法的数据框

  • Condition是基于该条件过滤数据框行的条件。

  • Index是我们想要在输出中具有的行号。

步骤

  • 导入必要的库

  • 创建一个SparkSession

  • 创建一个数据框

  • 使用where()函数和collect()函数的组合从数据框中检索所需的行,将每行存储在单独的变量中。

  • 将包含所需行的变量的值打印到控制台。

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("filter_collect_example").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Filter the DataFrame
df1 = df.where(df.Col1 > 1).collect()[0]

# Print the collected data
print(df1)

# Filter the DataFrame
df2 = df.where(df.Col1 > 1).collect()[1]

# Print the collected data
print(df2)

# Filter the DataFrame
df3 = df.where(df.Col1 > 1).collect()[-1]

# Print the collected data
print(df3)

输出

Row(Col1=4, Col2=5, Col3=6)
Row(Col1=7, Col2=8, Col3=9)
Row(Col1=7, Col2=8, Col3=9)

方法9:使用take()

在PySpark中,take()函数也用于显示Python数据帧中的前n行。该函数的返回值是由前n行组成的小型数据帧。以下是使用的语法:

dataframe.take(n)

这里:

  • dataframe是我们应用该方法的数据帧
  • n是行数

步骤

  • 导入必要的库
  • 创建一个SparkSession
  • 创建一个数据帧
  • 使用take()函数通过将row参数设置为1来获取数据帧的第一行
  • 将第一行打印到控制台
  • 使用take()函数通过将row参数设置为2来获取数据帧的前两行
  • 将前两行打印到控制台
  • 使用take()函数通过将row参数设置为3来获取数据帧的前三行
  • 将前三行打印到控制台

示例

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# Retrieve the first row
df1= df.take(1)
print(df1)

# Retrieve the first two rows
df2= df.take(2)
print(df2)

# Retrieve the first three rows
df3= df.take(3)
print(df3)

输出

[Row(__='Row1', Col1=1, Col2=2, Col3=3)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

结论

根据使用情况,每种方法的效率可能更高或更低,并且每种方法都有其优缺点。选择最适合特定任务的方法更为重要。由于其高效性,这些方法也可以应用于大型数据集。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程