使用Dask进行并行计算
Dask是一个灵活的开源Python库,用于并行计算。在本文中,我们将了解并行计算以及为什么应该选择Dask进行此类计算。
我们将与其他库(如Spark、Ray和Modin)进行比较。我们还讨论了Dask的用例。
并行计算
并行计算是一种同时执行多个计算或进程的计算类型。通常将大型问题划分为可解决的可管理的部分。
并行计算的四个类别包括:
- 位级(Bit-level)
-
指令级(Instruction-level)
-
数据级(Data-level)
-
作业并行性(Job parallelism)
尽管并行计算已经在高性能计算中使用了很长一段时间,但由于频率扩展的物理限制,它直到最近才变得更加流行。
Dask的需求
一个令人关注的问题是为什么我们需要Dask。
使用Python库(如NumPy、Sklearn、Seaborn等)可以简化数据处理和机器学习任务。对于大多数数据分析任务来说,Python的pandas模块已经足够了。可以使用多种不同方式处理数据,并使用该数据创建机器学习模型。
然而,如果数据量超过可用的RAM大小,pandas将不再足够。这是一个非常常见的问题。您可以使用Spark或Hadoop来解决此问题。然而,这些不是Python环境。您不能使用NumPy、pandas、sklearn、TensorFlow和其他著名Python机器学习工具。是否存在解决此问题的方法?是的!这就是Dask的用武之地。
Dask简介
Dask是一个用于并行计算的框架,可以与Jupyter笔记本无缝集成。最初,它是为了扩展NumPy、Pandas和Scikit-learn的计算能力,以突破单台计算机的存储限制。Dask模拟可以用于学习,但很快就被用作通用分布式系统。
Dask有两个主要优势:
可扩展性
Dask与Python版本的Pandas、NumPy和Scikit-Learn本机扩展,并可在具有多个核心的集群上具有韧性运行。它还可以缩小规模,运行在单个系统上。
计划
Dask任务调度器类似于Airflow、Luigi,经过计算优化。它提供快速反馈,使用任务图管理任务,并支持本地和分布式诊断,使其具有动态和响应能力。
此外,Dask还提供了实时的动态仪表板,每100毫秒更新一次,并显示各种信息,如进度、内存利用率等。
根据您的喜好,您可以克隆git存储库,或使用Conda/pip安装Dask。
conda install dask
只安装核心 –
conda install dask-core
Dask-core 是 Dask 的精简版本,只安装了必要的组件。pip也是一样的。如果你只关心使用Dask数据帧和Dask数组来扩展pandas、numpy或二者,你也可以只安装Dask数据帧或Dask数组。
python -m pip install dask
安装dataframe的要求
python -m pip install "dask[dataframe]" #
安装数组所需的要求
python -m pip install "dask[list]"
让我们来看一下使用这个库进行并行计算的几个实例。我们的代码使用dask.delayed来实现并行计算。
注意-以下两个代码片段应该在Jupyter笔记本的两个不同的单元格中运行。
import time
import random
def calcprofit(a, b):
time.sleep(random.random())
return a + b
def calcloss(a, b):
time.sleep(random.random())
return a - b
def calctotal(a, b):
time.sleep(random.random())
return a + b
现在运行下面的代码片段 −
%%time
profit = calcprofit(10, 22)
loss = calcloss(18, 3)
total = calctotal(profit, loss)
print(total)
输出
47
CPU times: user 4.13 ms, sys: 1.23 ms, total: 5.36 ms
Wall time: 1.35 s
尽管它们彼此独立,但这些功能将按照顺序相继执行。因此,我们可以同时执行它们以节省时间。
import dask
calcprofit = dask.delayed(calcprofit)
calcloss = dask.delayed(calcloss)
calctotal = dask.delayed(calctotal)
现在运行下面的代码片段 –
%%time
profit = calcprofit(10, 22)
loss = calcloss(18, 3)
total = calctotal(profit, loss)
print(total)
输出
Delayed('calctotal-9e3e896e-b4de-400c-aeb8-9e4c0961fe11')
CPU times: user 3.3 ms, sys: 0 ns, total: 3.3 ms
Wall time: 10.2 ms
即使在这个简单的示例中,运行时间已经改善了。我们还可以将任务图表示如下−
total.visualize(rankdir='LR')
Spark vs. Dask
Spark是一个强大的集群计算框架工具,可以将数据和处理分成可管理的部分,分布在任何大小的集群上,并并发执行。
尽管Spark是大数据分析的事实标准技术,但Dask似乎非常有前途。Dask是一个轻量级的Python组件,而Spark具有额外的功能,主要是用Scala开发,还支持Python/R。如果您想要一个实际的解决方案,或者甚至有JVM基础架构,Spark可能是您的首选。然而,如果您想要快速轻量级的并行处理,Dask是一个可行的选择。在快速的pip安装后,它将随时可用。
Dask、Ray和Modin
Ray和Dask具有不同的调度策略。Dask使用的是一个中央调度器来管理集群中的所有作业。由于Ray是分散的,每台计算机都有自己的调度器,可以在特定机器的级别解决计划任务的问题,而不是整个集群。Ray缺乏Dask提供的丰富的高级集合API(如数据框架、分布式数组等)。
另一方面,Modin是建立在Dask或Ray之上的。只需添加一行简单的代码import modin.pandas as pd,我们就可以通过Modin快速扩展我们的Pandas处理。尽管Modin努力使Pandas API的大部分并行化成为可能,但Dask DataFrame有时无法扩展完整的Pandas API。
Dask使用示例
Dask应用案例分为两类−
- 我们可以使用动态任务调度来优化计算。
-
大型数据集可以使用“大数据”集合(如并行数组和数据框)进行处理。
使用Dask集合来创建任务图,这是对数据处理作业组织的可视化描述。
任务图的执行是由Dask调度器完成的。
Dask使用并行编程来完成任务。
并行编程是指同时执行多个任务。
通过这样做,我们可以有效利用资源并同时完成多个任务。
让我们来看一些Dask提供的数据集。
- Dask.array - 使用NumPy接口,dask.array将巨大的数组分成较小的数组,使我们能够对大于系统内存的数组进行计算。
-
Dask.bag - 它提供了对标准Python对象集合的操作,如过滤、映射、分组和折叠。
-
Dask.dataframe - 分布式数据帧,类似于Pandas,它是由多个小型数据帧构建的巨大并行数据帧。
结论
在本文中,我们了解了Dask和并行计算。我们希望这有助于您增进对Dask的了解,了解它的需求以及与其他库的比较。