Python Dask-第2部分
在上一篇教程中,我们了解了分布式计算的概念和Dask的介绍。我们还了解了Dask Cluster是什么,以及如何安装Dask以及Dask接口的介绍。
Dask接口
正如我们之前讨论的那样,Dask接口具有各种用于分布式计算的并行算法集。数据科学从业者使用一些重要的用户界面来对NumPy、Pandas和scikit-learn进行扩展:
- 数组: 并行NumPy
- 数据框: 并行Pandas
- 机器学习: 并行Scikit-Learn
我们已经在上一篇教程中介绍了Dask数组;让我们直接进入Dask数据框。
Dask数据框
我们观察到,需要将多个NumPy数组分组才能形成一个Dask数组。类似地,Dask数据框包含多个较小的Pandas数据框。将大型的Pandas数据框按行拆分,形成多个较小的数据框。这些较小的数据框可以在单个系统或多个系统上使用(因此,允许我们存储比内存更大的数据集)。Dask数据框的每个计算都会在现有的Pandas数据框上并行执行函数。
下面的图像显示了Dask数据框的结构:
Dask DataFrames还提供了与Pandas DataFrames提供的API非常相似的应用程序编程接口(API)。
现在,让我们考虑一些使用Dask DataFrames执行基本函数的示例。
示例1:读取CSV文件
使用Pandas帮助读取文件
# reading the file using pandas
import pandas as pd
my_pdfile = pd.read_csv("covid_19_india.csv")
print(my_pdfile)
输出:
Sno Date Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational Cured Deaths Confirmed
0 1 30/01/20 6:00 PM Kerala 1 0 0 0 1
1 2 31/01/20 6:00 PM Kerala 1 0 0 0 1
2 3 01/02/20 6:00 PM Kerala 2 0 0 0 2
3 4 02/02/20 6:00 PM Kerala 3 0 0 0 3
4 5 03/02/20 6:00 PM Kerala 3 0 0 0 3
... ... ... ... ... ... ... ... ... ...
9286 9287 09/12/20 8:00 AM Telengana - - 266120 1480 275261
9287 9288 09/12/20 8:00 AM Tripura - - 32169 373 32945
9288 9289 09/12/20 8:00 AM Uttarakhand - - 72435 1307 79141
9289 9290 09/12/20 8:00 AM Uttar Pradesh - - 528832 7967 558173
9290 9291 09/12/20 8:00 AM West Bengal - - 475425 8820 507995
[9291 rows x 9 columns]
使用Pandas读取文件
# reading the file using dask
import dask.dataframe as ddf
my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.compute())
输出:
Sno Date Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational Cured Deaths Confirmed
0 1 30/01/20 6:00 PM Kerala 1 0 0 0 1
1 2 31/01/20 6:00 PM Kerala 1 0 0 0 1
2 3 01/02/20 6:00 PM Kerala 2 0 0 0 2
3 4 02/02/20 6:00 PM Kerala 3 0 0 0 3
4 5 03/02/20 6:00 PM Kerala 3 0 0 0 3
... ... ... ... ... ... ... ... ... ...
9286 9287 09/12/20 8:00 AM Telengana - - 266120 1480 275261
9287 9288 09/12/20 8:00 AM Tripura - - 32169 373 32945
9288 9289 09/12/20 8:00 AM Uttarakhand - - 72435 1307 79141
9289 9290 09/12/20 8:00 AM Uttar Pradesh - - 528832 7967 558173
9290 9291 09/12/20 8:00 AM West Bengal - - 475425 8820 507995
[9291 rows x 9 columns]
解释:
在以上示例中,我们创建了两个不同的程序。在第一个程序中,我们导入了pandas库并使用read_csv()函数来读取CSV文件。相比之下,我们导入了dask库的dataframe模块并使用read_csv()函数来读取CSV文件。
这两个程序的结果都是相同的,但在处理时间上有所不同。与Pandas相比,Dask DataFrames在执行函数时具有更快的速度。一旦实际使用,这一点就会很明显。
示例2: 查找特定列的值计数
import dask.dataframe as ddf
my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.State.value_counts().compute())
输出:
Kerala 315
Delhi 283
Rajasthan 282
Haryana 281
Uttar Pradesh 281
Tamil Nadu 278
Ladakh 278
Jammu and Kashmir 276
Karnataka 276
Punjab 275
Maharashtra 275
Andhra Pradesh 273
Uttarakhand 270
Odisha 269
West Bengal 267
Puducherry 267
Chhattisgarh 266
Gujarat 265
Chandigarh 265
Madhya Pradesh 264
Himachal Pradesh 264
Bihar 263
Manipur 261
Mizoram 260
Andaman and Nicobar Islands 259
Goa 259
Assam 253
Jharkhand 253
Arunachal Pradesh 251
Tripura 247
Meghalaya 240
Telengana 236
Nagaland 207
Sikkim 200
Dadra and Nagar Haveli and Daman and Diu 181
Cases being reassigned to states 60
Telangana 45
Dadar Nagar Haveli 37
Unassigned 3
Telangana*** 1
Maharashtra*** 1
Telengana*** 1
Chandigarh*** 1
Daman & Diu 1
Punjab*** 1
Name: State, dtype: int64
解释:
在上面的示例中,我们导入了dask库的dataframe模块并使用read_csv()函数来读取CSV文件中的内容。然后,我们使用列名“States”,后跟value_counts()方法来计算该特定列中每个值的总数。结果是,我们得到了该列中所有州的名称以及它们出现的总次数。
示例3:在Dask dataframe上使用groupby函数
import dask.dataframe as ddf
my_ddfile = ddf.read_csv("covid_19_india.csv")
print(my_ddfile.groupby(my_ddfile.State).Cured.max().compute())
输出:
State
Andaman and Nicobar Islands 4647
Andhra Pradesh 860368
Arunachal Pradesh 15690
Assam 209447
Bihar 232563
Cases being reassigned to states 0
Chandigarh 16981
Chandigarh*** 14381
Chhattisgarh 227158
Dadar Nagar Haveli 2
Dadra and Nagar Haveli and Daman and Diu 3330
Daman & Diu 0
Delhi 565039
Goa 46924
Gujarat 203111
Haryana 232108
Himachal Pradesh 37871
Jammu and Kashmir 107282
Jharkhand 107898
Karnataka 858370
Kerala 582351
Ladakh 8056
Madhya Pradesh 200664
Maharashtra 1737080
Maharashtra*** 1581373
Manipur 23166
Meghalaya 11686
Mizoram 3772
Nagaland 10781
Odisha 316970
Puducherry 36308
Punjab 145093
Punjab*** 130406
Rajasthan 260773
Sikkim 4735
Tamil Nadu 770378
Telangana 41332
Telangana*** 40334
Telengana 266120
Telengana*** 42909
Tripura 32169
Unassigned 0
Uttar Pradesh 528832
Uttarakhand 72435
West Bengal 475425
Name: Cured, dtype: int64
解释:
在上面的程序中,我们再次导入了dask库的dataframe模块,并使用read_csv函数来从指定的CSV文件中读取数据。然后,我们使用了dask dataframe的groupby函数和max()函数来找出每个州中治愈人数的最大值。
现在,让我们来了解一下另一个Dask接口,即Dask机器学习。
Dask机器学习
Dask机器学习提供了可扩展的Python机器学习算法,与scikit-learn兼容。让我们首先了解使用scikit-learn处理计算的方式,然后更详细地了解Dask如何以不同的方式执行这些功能。
用户可以通过在scikit-learn中设置参数njobs = -1来在单个系统上执行并行计算。scikit-learn利用Joblib来执行这些并行计算。
Joblib 是一个提供并行化支持的Python库。当调用 fit() 函数时,根据要执行的任务(无论是超参数搜索还是模型拟合),Joblib会将任务分发到可用的核心上。
然而,我们可以借助scikit-learn库将并行计算与多台机器一起扩展。而Dask在单个系统上表现良好,并且可以轻松扩展到一组系统。
Dask 提供了一个中央任务调度程序和一组工作节点。调度程序将任务分配给每个工作节点。然后这些工作节点被分配了可以执行计算的多个核心。工作节点提供两个功能:
- 计算分配的任务
- 根据需要将结果提供给其他工作节点。
让我们考虑一个示例,演示调度程序和工作节点之间的对话方式(此示例由Dask的开发人员Matthew Rocklin提供):
中央任务调度程序以Python函数的形式将工作发送给工作节点,在同一系统上或在群集中执行。
- 工作节点A,请计算x = f(1),工作节点B,请计算y = g(2)
- 当g(2)函数完成后,工作节点A,请从工作节点B获取y并执行z = h(x, y)
上面的示例应该清楚地说明了Dask的工作原理。现在让我们了解机器学习模型和Dask-Search CV。
机器学习模型
Dask机器学习(也称为Dask-ML)提供了可扩展的Python机器学习。但在开始之前,请按照以下给出的Dask-ML安装步骤进行操作:
使用conda安装
conda install -c conda-forge dask-ml
使用pip安装
$ pip install dask-ml
让我们转向直接理解和使用Dask Array重新实现算法。
1. 直接并行化Scikit-Learn
正如我们已经讨论过的,Scikit-Learn(也称为 sklearn )通过 Joblib 的帮助提供了(在单个CPU上的)并行计算。我们可以直接利用Dask来并行化多个scikit-learn评估器,只需插入几行代码(甚至不需要修改当前代码)。
主要步骤是从dask库的distributed模块中导入client。这个命令将在系统上生成一个本地的调度和工作进程。
from dask.distributed import Client
# starting a local Dask client
my_client = Client()
下一步是在后端实例化Dask的joblib。我们需要从sklearn库的joblib中导入 parallel_backend ,如下所示:
import dask_ml.joblib
from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask'):
# Normal scikit-learn code goes here
from sklearn.ensemble import RandomForestClassifier
my_model = RandomForestClassifier()
2. 使用Dask Array重新实现算法
Dask-ML重新实现了简单的机器学习算法,以便使用NumPy Arrays。通过使用Dask Arrays替换NumPy数组,可以实现可扩展的算法。此替换有助于实现以下功能:
- 线性模型(例如线性回归,泊松回归,逻辑回归等)
- 预处理(例如比例器,转换等)
- 聚类(例如K-means,谱聚类等)
A. 线性模型示例
from dask_ml.linear_model import LogisticRegression
mymodel = LogisticRegression()
mymodel.fit(data, labels)
B. 预处理示例
from dask_ml.preprocessing import OneHotEncoder
myencoder = OneHotEncoder(sparse=True)
myresult = myencoder.fit(data)
C. 聚类示例
from dask_ml.cluster import KMeans
mymodel = KMeans()
mymodel.fit(data)
搜索 Dask CV
超参数调整 被认为是构建模型的一个重要步骤,可以对模型的实现产生重大影响。机器学习模型有各种超参数,很难理解在特定情况下哪个参数会表现更好。手动执行这个任务是非常繁琐的工作。然而,Scikit-Learn库提供了 网格搜索 以简化超参数调整的任务。用户必须提供参数, 网格搜索 将提供这些参数的最佳组合。
让我们考虑一个示例,我们需要选择一种随机森林技术来拟合数据集。该模型有三个重要的可调参数 – 第一个参数、第二个参数和第三个参数。
现在,让我们设置以下这些参数的值:
第一个参数 – Bootstrap = True
第二个参数 – max_depth – [8, 9]
第三个参数 – n_estimators : [50, 100 , 200]
1. sklearn 网格搜索: 对于每个参数组合,Scikit-learn网格搜索将执行任务,有时会重复执行单个任务多次。下面的图表显示了这不是最有效的方法:
2. Dask-Search CV: 与sklearn的Gridsearch的CV相比,Dask提供了一个名为Dask-Search CV的库。Dask-Search CV合并了步骤以减少重复。我们可以使用下面的步骤来安装Dask-search:
使用conda安装Dask-Search CV
conda install dask-searchcv -c conda-forge
使用pip安装Dask-Search CV
$ pip install dask-searchcv
以下是显示Dask-Search CV工作原理的图表:
Spark和Dask之间的区别
以下是Spark和Dask之间的一个关键区别:
序号 | Spark | Dask |
---|---|---|
1 | Spark是用Scala编程语言编写的。 | Dask是用Python编程语言编写的。 |
2 | Spark支持R和Python。 | Dask只支持Python。 |
3 | Spark提供自己的生态系统。 | Dask是Python生态系统的组成部分之一。 |
4 | Spark提供自己的应用程序编程接口(API)。 | Dask重用Pandas的应用程序编程接口(API)。 |
5 | Spark对于Scala和SQL用户来说易于理解和实现。 | Dask通常被Python从业者所青睐。 |
6 | Spark本身不支持多维数组。 | Dask完全支持可扩展的多维数组的NumPy模型。 |