Python Dask
在现代的机器学习和数据科学世界中,很容易使用不同寻常的Python工具。这些工具包括 scikit-learn、NumPy 或者 Pandas ,但这些工具在处理较大的数据量时无法适应内存使用或处理时间。
使用分布式计算工具(传统上是 Apache Spark )是一种预期的方法。但这可能意味着必须重新配置整个工作流程,从熟悉的Python生态系统导航到一个不同的 Java虚拟机(JVM) 世界,并且会显著复杂化开发工作流程。
Dask 库用于将分布式计算能力与Python数据科学的灵活性相结合,与Python的标准数据工具无缝集成。
理解分布式计算
让我们考虑一个场景:我们有一个数据集,可能是一组文本文件,由于太大而无法全部载入计算机内存中。我们可以利用Python的文件流和其他生成器工具迭代访问数据集,而无需将它们加载到内存中。然而,问题在于程序仍在单个线程上运行,这最终会限制速度,即使进行了内存管理也是如此。
因此, Python 提供了一种名为全局解释器锁的安全功能(换句话说,大多数开发人员使用 CPython )来编写Python中的并行代码,但这可能有些棘手。
因此,存在一些好的解决方案选择。这些解决方案包括使用GIL以外的低级工具(例如NumPy在编译的代码中执行多线程重任务工作),或者利用Python代码包中的多个进程/线程,例如 multiprocessing 或 joblib 。
然而,尝试并行化以加快代码速度变得困难,结果即使在正确完成了该过程的情况下,代码也较难阅读,需要开发人员完全重新设计该过程,但系统资源又可能有限。
对于像上述这样的大规模问题,分布式计算可以被视为一种有效的解决方案。任务分布到分布式系统中的多个独立工作机上,而不仅仅是在单个设备上的多个线程中工作。
这些独立的工作机处理数据集的部分数据,并使用自己的处理器、磁盘空间或内存。这些工作机仅通过相对简单的消息传递与对方或中央调度器进行通信,而不是像多线程代码那样共享磁盘空间和内存。
分布式计算系统还允许开发人员在任意数量的工作机上并行地运行非常大的数据集,从而为代码扩展提供了可能,但也增加了设计集中调度器和完全将工作机相互隔离的复杂性。
让我们了解一下 Dask 是什么以及它是如何工作的。
理解Dask
Dask 是一个免费且开源的库,与其他社区项目如 ** Pandas , NumPy ** 和 scikit-learn 协作开发和设计。它是一个并行计算库,可以通过任务工作进程和任务调度器将更复杂的计算分发并拆分为更小的计算任务。 Dask 库可以在比内存大小更大的数据集上进行分布式并行和多核执行。
Dask 通过其底层调度器和高级集合提供了不同的实用工具。
- 低级调度器:Dask 提供了动态的任务调度器,可以并行处理任务图。这些执行机器控制 高级集合 。但是,我们可以使用它们来为用户定义的自定义和工作负载提供支持。这些调度器具有较低的延迟(约1毫秒),并且在小内存占用的情况下努力处理计算。在复杂情况下,或者其他任务调度系统(如 IPython并行 或 Luigi )中,Dask的调度器是替代使用多进程和多线程库的选择。
- 高级集合:Dask 提供了高级的Array、DataFrame和Bag集合,模拟了Pandas、列表和NumPy的功能。但是,我们可以在内存不足的数据集上并行操作它们。Dask的高级集合是处理大型数据集的Pandas和NumPy的替代方案。
Dask 的使用案例提供了几个样本工作流,可以认为 Dask 是一个完美的选择。
Dask调度器的类型
Dask 提供了两种主要类型的调度器: 单机调度器和分布式调度器 。
- 单机调度器: 单机调度器针对大于内存的利用进行了优化。该调度器易于使用、类似且成本低廉,但由于在单台机器上运行,因此无法扩展。
- 分布式调度器: 分布式调度器比单机调度器更复杂和完全异步(持续的非阻塞对话)。
推荐在大多数情况下使用分布式调度器,因为它提供了一个方便且交互式的仪表板,包含多个表格和图表以及实时信息。默认情况下,在初始化集群时,仪表板位于端口8787。
在进入安装部分之前,让我们了解一下 Dask集群 。
了解Dask集群
一个 集群 是一个分布式或并行处理系统,包含一组相互连接的独立计算机,它们共同作为一个单一的、集成的计算资源进行支持性功能。集群中的节点可以被认为是一个单处理器或多处理器系统,如个人计算机(PC)、工作站,甚至SMP。
在集群的世界中,有各种不同的架构形式可供选择,以决定如何准确地将工作分配给计算机。让我们了解一下集群在 Dask 中的组织方式。
Dask网络由三个部分组成:
- 集中式调度器: 集中式调度器管理工作人员并指派给他们需要完成的任务。
- 多个工作人员: 多个工作人员执行计算,保存结果并与其他工作人员交流结果。
- 一个或多个客户端: 一个或多个客户端可以与Jupyter Notebooks或脚本中的用户进行交互。这些客户端还会将工作提交给调度器,由工作人员进行处理。
客户端将发送描述计算的代码类型的请求给调度器。一旦收到请求,调度器将工作分配给工作人员以满足请求,最后,工作人员完成计算工作。
正如我们所观察到的, Dask 将这些大量的数据计算分解成多个小的计算。
还值得注意的是, Dask 也可以部署在基于集群的各种技术上,例如:
如何安装 Dask Python
我们可以使用 Anaconda 或 pip 来安装 Dask 。
通过 Anaconda 安装 Dask 的语法如下:
conda install dask
或者
可以直接在终端或命令提示符中使用以下命令,通过pip安装 Dask :
$ pip install dask[complete]
一旦我们成功安装了 Dask 库,让我们来了解 Dask接口 。
了解Dask接口
Dask 提供了不同的用户接口。这些接口包含一组不同的并行算法,用于分布式计算。对于希望扩展NumPy、Pandas和scikit-learn的数据科学实践者来说,以下是一些重要的用户接口。
- 数组: 并行NumPy
- 数据框: 并行Pandas
- 机器学习: 并行Scikit-Learn
Dask数组
Dask中的数组使用分块算法提供了一个大于内存的、并行的、n维数组。换句话说,它是NumPy数组的分布式形式。
以下是一张帮助我们理解Dask数组外观的图片:
正如我们所观察到的,多个NumPy数组被组织成网格以形成一个Dask数组。当我们创建一个Dask数组时,我们可以指定块的大小,这决定了NumPy数组的大小。例如,如果我们在一个数组中有十个值,并且指定了块大小为五个,它将返回两个具有每个值五个的NumPy数组。
Dask数组 提供了以下一些重要功能:
- 大于内存: Dask数组让我们可以处理比可用内存更大的数据集。Dask会将数组分解为许多小块来处理,以减少计算的内存占用,并有效地从磁盘流式传输数据。
- 并行: Dask数组利用所有核心进行并行计算。
- 块算法: Dask数组还提供了块算法,以便在块或子矩阵上操作,而不是在整行或整列上运行。这个函数通过处理许多小计算来帮助执行大型计算。
这里有一些使用Dask创建数组的简单案例。
示例1:使用Dask数组创建一个随机数组
import dask.array as darray
# using arange for creating an array with values from 0 to 15
my_array = darray.arange(16, chunks = 5)
print( my_array.compute())
# using chunks for checking the size of each chunk
print(my_array.chunks)
输出:
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
((5, 5, 5, 1),)
解释:
在上面的程序中,我们从 dask 库中导入了数组模块,并使用 arange() 方法创建了一个由16个值组成的数组,并将分块大小定义为5。然后我们使用 compute() 方法打印该数组。我们还使用 chunks 函数来检查每个块的大小。结果是我们得到了结果数组,我们还可以观察到该数组被分为四个块,其中第一个、第二个和第三个块每个包含五个值,而第四个块只有一个值。
示例2:将NumPy数组转换为Dask数组
import numpy as np
import dask.array as darray
first_array = np.arange(15)
second_array = darray.from_array(first_array, chunks = 5)
# resulting in a dask array
print(second_array.compute())
输出:
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14]
说明:
在上面的示例中,我们导入了 NumPy 库和 dask 库的数组模块。然后,我们使用 arange() 方法创建了一个包含15个值的NumPy数组作为 first_array 。然后,我们使用 from_array() 方法将 first_array 转换为Dask数组作为 second_array ,并定义块大小分别为5个。然后,我们使用 compute() 函数打印数组。
此外,Dask数组支持大多数NumPy数组的函数。例如,我们可以使用 mean() , sum() 等等。
示例3:计算前100个数字的总和
import numpy as np
import dask.array as darray
# arange is used to create array on values from 0 to 100
first_array = np.arange(100)
# converting numpy array to dask array
second_array = darray.from_array(first_array, chunks = (10))
# computing mean of the array
print(second_array.sum().compute())
输出:
4950
解释:
在上面的示例中,我们导入了 NumPy 库和 Dask 库的数组模块,并使用 arange 函数创建了一个范围从1到100的NumPy数组。然后我们将NumPy数组转换为Dask数组,并使用 sum() 函数打印了Dask数组值的总和。结果是前100个数字的总和。
我们讨论了Dask Python的基本介绍,但还有一些重要的概念尚未讨论。教程的剩余部分将在第二部分中介绍。