Python Dask-第2部分

Python Dask-第2部分

在上一篇教程中,我们了解了分布式计算的概念和Dask的介绍。我们还了解了Dask Cluster是什么,以及如何安装Dask以及Dask接口的介绍。

Dask接口

正如我们之前讨论的那样,Dask接口具有各种用于分布式计算的并行算法集。数据科学从业者使用一些重要的用户界面来对NumPy、Pandas和scikit-learn进行扩展:

  1. 数组: 并行NumPy
  2. 数据框: 并行Pandas
  3. 机器学习: 并行Scikit-Learn

我们已经在上一篇教程中介绍了Dask数组;让我们直接进入Dask数据框。

Dask数据框

我们观察到,需要将多个NumPy数组分组才能形成一个Dask数组。类似地,Dask数据框包含多个较小的Pandas数据框。将大型的Pandas数据框按行拆分,形成多个较小的数据框。这些较小的数据框可以在单个系统或多个系统上使用(因此,允许我们存储比内存更大的数据集)。Dask数据框的每个计算都会在现有的Pandas数据框上并行执行函数。

下面的图像显示了Dask数据框的结构:

Python Dask-第2部分

Dask DataFrames还提供了与Pandas DataFrames提供的API非常相似的应用程序编程接口(API)。

现在,让我们考虑一些使用Dask DataFrames执行基本函数的示例。

示例1:读取CSV文件

使用Pandas帮助读取文件

# reading the file using pandas
import pandas as pd

my_pdfile = pd.read_csv("covid_19_india.csv")
print(my_pdfile)

输出:

       Sno      Date     Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational   Cured  Deaths  Confirmed
0        1  30/01/20  6:00 PM               Kerala                       1                        0       0       0          1
1        2  31/01/20  6:00 PM               Kerala                       1                        0       0       0          1
2        3  01/02/20  6:00 PM               Kerala                       2                        0       0       0          2
3        4  02/02/20  6:00 PM               Kerala                       3                        0       0       0          3
4        5  03/02/20  6:00 PM               Kerala                       3                        0       0       0          3
...    ...       ...      ...                  ...                     ...                      ...     ...     ...        ...
9286  9287  09/12/20  8:00 AM            Telengana                       -                        -  266120    1480     275261
9287  9288  09/12/20  8:00 AM              Tripura                       -                        -   32169     373      32945
9288  9289  09/12/20  8:00 AM          Uttarakhand                       -                        -   72435    1307      79141
9289  9290  09/12/20  8:00 AM        Uttar Pradesh                       -                        -  528832    7967     558173
9290  9291  09/12/20  8:00 AM          West Bengal                       -                        -  475425    8820     507995

[9291 rows x 9 columns]

使用Pandas读取文件

# reading the file using dask
import dask.dataframe as ddf

my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.compute())    

输出:

Sno      Date     Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational   Cured  Deaths  Confirmed
0        1  30/01/20  6:00 PM               Kerala                       1                        0       0       0          1
1        2  31/01/20  6:00 PM               Kerala                       1                        0       0       0          1
2        3  01/02/20  6:00 PM               Kerala                       2                        0       0       0          2
3        4  02/02/20  6:00 PM               Kerala                       3                        0       0       0          3
4        5  03/02/20  6:00 PM               Kerala                       3                        0       0       0          3
...    ...       ...      ...                  ...                     ...                      ...     ...     ...        ...
9286  9287  09/12/20  8:00 AM            Telengana                       -                        -  266120    1480     275261
9287  9288  09/12/20  8:00 AM              Tripura                       -                        -   32169     373      32945
9288  9289  09/12/20  8:00 AM          Uttarakhand                       -                        -   72435    1307      79141
9289  9290  09/12/20  8:00 AM        Uttar Pradesh                       -                        -  528832    7967     558173
9290  9291  09/12/20  8:00 AM          West Bengal                       -                        -  475425    8820     507995

[9291 rows x 9 columns]

解释:

在以上示例中,我们创建了两个不同的程序。在第一个程序中,我们导入了pandas库并使用read_csv()函数来读取CSV文件。相比之下,我们导入了dask库的dataframe模块并使用read_csv()函数来读取CSV文件。

这两个程序的结果都是相同的,但在处理时间上有所不同。与Pandas相比,Dask DataFrames在执行函数时具有更快的速度。一旦实际使用,这一点就会很明显。

示例2: 查找特定列的值计数

import dask.dataframe as ddf

my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.State.value_counts().compute())

输出:

Kerala                                      315
Delhi                                       283
Rajasthan                                   282
Haryana                                     281
Uttar Pradesh                               281
Tamil Nadu                                  278
Ladakh                                      278
Jammu and Kashmir                           276
Karnataka                                   276
Punjab                                      275
Maharashtra                                 275
Andhra Pradesh                              273
Uttarakhand                                 270
Odisha                                      269
West Bengal                                 267
Puducherry                                  267
Chhattisgarh                                266
Gujarat                                     265
Chandigarh                                  265
Madhya Pradesh                              264
Himachal Pradesh                            264
Bihar                                       263
Manipur                                     261
Mizoram                                     260
Andaman and Nicobar Islands                 259
Goa                                         259
Assam                                       253
Jharkhand                                   253
Arunachal Pradesh                           251
Tripura                                     247
Meghalaya                                   240
Telengana                                   236
Nagaland                                    207
Sikkim                                      200
Dadra and Nagar Haveli and Daman and Diu    181
Cases being reassigned to states             60
Telangana                                    45
Dadar Nagar Haveli                           37
Unassigned                                    3
Telangana***                                  1
Maharashtra***                                1
Telengana***                                  1
Chandigarh***                                 1
Daman & Diu                                   1
Punjab***                                     1
Name: State, dtype: int64

解释:

在上面的示例中,我们导入了dask库的dataframe模块并使用read_csv()函数来读取CSV文件中的内容。然后,我们使用列名“States”,后跟value_counts()方法来计算该特定列中每个值的总数。结果是,我们得到了该列中所有州的名称以及它们出现的总次数。

示例3:在Dask dataframe上使用groupby函数

import dask.dataframe as ddf

my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.groupby(my_ddfile.State).Cured.max().compute())

输出:

State
Andaman and Nicobar Islands                    4647
Andhra Pradesh                               860368
Arunachal Pradesh                             15690
Assam                                        209447
Bihar                                        232563
Cases being reassigned to states                  0
Chandigarh                                    16981
Chandigarh***                                 14381
Chhattisgarh                                 227158
Dadar Nagar Haveli                                2
Dadra and Nagar Haveli and Daman and Diu       3330
Daman & Diu                                       0
Delhi                                        565039
Goa                                           46924
Gujarat                                      203111
Haryana                                      232108
Himachal Pradesh                              37871
Jammu and Kashmir                            107282
Jharkhand                                    107898
Karnataka                                    858370
Kerala                                       582351
Ladakh                                         8056
Madhya Pradesh                               200664
Maharashtra                                 1737080
Maharashtra***                              1581373
Manipur                                       23166
Meghalaya                                     11686
Mizoram                                        3772
Nagaland                                      10781
Odisha                                       316970
Puducherry                                    36308
Punjab                                       145093
Punjab***                                    130406
Rajasthan                                    260773
Sikkim                                         4735
Tamil Nadu                                   770378
Telangana                                     41332
Telangana***                                  40334
Telengana                                    266120
Telengana***                                  42909
Tripura                                       32169
Unassigned                                        0
Uttar Pradesh                                528832
Uttarakhand                                   72435
West Bengal                                  475425
Name: Cured, dtype: int64

解释:

在上面的程序中,我们再次导入了dask库的dataframe模块,并使用read_csv函数来从指定的CSV文件中读取数据。然后,我们使用了dask dataframe的groupby函数和max()函数来找出每个州中治愈人数的最大值。

现在,让我们来了解一下另一个Dask接口,即Dask机器学习。

Dask机器学习

Dask机器学习提供了可扩展的Python机器学习算法,与scikit-learn兼容。让我们首先了解使用scikit-learn处理计算的方式,然后更详细地了解Dask如何以不同的方式执行这些功能。

Python Dask-第2部分

用户可以通过在scikit-learn中设置参数njobs = -1来在单个系统上执行并行计算。scikit-learn利用Joblib来执行这些并行计算。

Joblib 是一个提供并行化支持的Python库。当调用 fit() 函数时,根据要执行的任务(无论是超参数搜索还是模型拟合),Joblib会将任务分发到可用的核心上。
Python Dask-第2部分

然而,我们可以借助scikit-learn库将并行计算与多台机器一起扩展。而Dask在单个系统上表现良好,并且可以轻松扩展到一组系统。

Dask 提供了一个中央任务调度程序和一组工作节点。调度程序将任务分配给每个工作节点。然后这些工作节点被分配了可以执行计算的多个核心。工作节点提供两个功能:

  1. 计算分配的任务
  2. 根据需要将结果提供给其他工作节点。

让我们考虑一个示例,演示调度程序和工作节点之间的对话方式(此示例由Dask的开发人员Matthew Rocklin提供):

中央任务调度程序以Python函数的形式将工作发送给工作节点,在同一系统上或在群集中执行。

  1. 工作节点A,请计算x = f(1),工作节点B,请计算y = g(2)
  2. 当g(2)函数完成后,工作节点A,请从工作节点B获取y并执行z = h(x, y)

上面的示例应该清楚地说明了Dask的工作原理。现在让我们了解机器学习模型和Dask-Search CV。

机器学习模型

Dask机器学习(也称为Dask-ML)提供了可扩展的Python机器学习。但在开始之前,请按照以下给出的Dask-ML安装步骤进行操作:

使用conda安装

conda install -c conda-forge dask-ml

使用pip安装

$ pip install dask-ml

让我们转向直接理解和使用Dask Array重新实现算法。

1. 直接并行化Scikit-Learn

正如我们已经讨论过的,Scikit-Learn(也称为 sklearn )通过 Joblib 的帮助提供了(在单个CPU上的)并行计算。我们可以直接利用Dask来并行化多个scikit-learn评估器,只需插入几行代码(甚至不需要修改当前代码)。

主要步骤是从dask库的distributed模块中导入client。这个命令将在系统上生成一个本地的调度和工作进程。

from dask.distributed import Client 
# starting a local Dask client
my_client = Client()

下一步是在后端实例化Dask的joblib。我们需要从sklearn库的joblib中导入 parallel_backend ,如下所示:

import dask_ml.joblib
from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask'):
    # Normal scikit-learn code goes here
     from sklearn.ensemble import RandomForestClassifier 
     my_model = RandomForestClassifier()

2. 使用Dask Array重新实现算法

Dask-ML重新实现了简单的机器学习算法,以便使用NumPy Arrays。通过使用Dask Arrays替换NumPy数组,可以实现可扩展的算法。此替换有助于实现以下功能:

  1. 线性模型(例如线性回归,泊松回归,逻辑回归等)
  2. 预处理(例如比例器,转换等)
  3. 聚类(例如K-means,谱聚类等)

A. 线性模型示例

from dask_ml.linear_model import LogisticRegression

mymodel = LogisticRegression()
mymodel.fit(data, labels)

B. 预处理示例

from dask_ml.preprocessing import OneHotEncoder

myencoder = OneHotEncoder(sparse=True)
myresult = myencoder.fit(data)

C. 聚类示例

from dask_ml.cluster import KMeans

mymodel = KMeans()
mymodel.fit(data)

搜索 Dask CV

超参数调整 被认为是构建模型的一个重要步骤,可以对模型的实现产生重大影响。机器学习模型有各种超参数,很难理解在特定情况下哪个参数会表现更好。手动执行这个任务是非常繁琐的工作。然而,Scikit-Learn库提供了 网格搜索 以简化超参数调整的任务。用户必须提供参数, 网格搜索 将提供这些参数的最佳组合。

让我们考虑一个示例,我们需要选择一种随机森林技术来拟合数据集。该模型有三个重要的可调参数 – 第一个参数、第二个参数和第三个参数。

现在,让我们设置以下这些参数的值:

第一个参数 – Bootstrap = True

第二个参数 – max_depth – [8, 9]

第三个参数 – n_estimators : [50, 100 , 200]

1. sklearn 网格搜索: 对于每个参数组合,Scikit-learn网格搜索将执行任务,有时会重复执行单个任务多次。下面的图表显示了这不是最有效的方法:

Python Dask-第2部分

2. Dask-Search CV: 与sklearn的Gridsearch的CV相比,Dask提供了一个名为Dask-Search CV的库。Dask-Search CV合并了步骤以减少重复。我们可以使用下面的步骤来安装Dask-search:

使用conda安装Dask-Search CV

conda install dask-searchcv -c conda-forge

使用pip安装Dask-Search CV

$ pip install dask-searchcv

以下是显示Dask-Search CV工作原理的图表:

Python Dask-第2部分

Spark和Dask之间的区别

以下是Spark和Dask之间的一个关键区别:

序号 Spark Dask
1 Spark是用Scala编程语言编写的。 Dask是用Python编程语言编写的。
2 Spark支持R和Python Dask只支持Python
3 Spark提供自己的生态系统。 Dask是Python生态系统的组成部分之一。
4 Spark提供自己的应用程序编程接口(API)。 Dask重用Pandas的应用程序编程接口(API)。
5 Spark对于Scala和SQL用户来说易于理解和实现。 Dask通常被Python从业者所青睐。
6 Spark本身不支持多维数组。 Dask完全支持可扩展的多维数组的NumPy模型。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程