01 Dask源码剖析-Dask的数据模型-Delayed

    技术2022-07-10  183

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看。后续我会更新更多内容上来。

    文章目录

    Dask的数据模型概述Collections:Delayeddelayed函数Delayed对象和DelayedLeaf对象 GraphHighLevelGraphDelayed与Graph的关系 SchedulersDaskMethodsMixin 小结

    Dask的数据模型概述

    在阅读代码前,给大家一些小建议: 15个小技巧包括: 了解作者开发项目的目的 先熟练的使用项目 阅读官方文档 理解项目中的概念 了解项目技术背景 没必要读最新版本的代码 不需要读完所有的源码 版本间比较阅读 自顶向下梳理 自底向上归纳 先做减法,再做加法 从接口找关系 画图辅助阅读 设计模式辅助阅读 debug只是辅助

    那么对于Dask,咱们先用一个关键技巧,阅读官方文档,捋一捋Dask的数据模型:Dask官方文档链接 在官方文档首页,就提到了Dask的数据模型 这幅图很好的概述了dask的几个概念与基本原理:

    dask目前支持5种主要的数据结构,分别是Array(用于存放类numpy的多维数组),DataFrame(不用多说,类pandas的二维表结构的数据帧),Bag(更简单的一个数组),Delayed(对函数的异步处理封装,针对本地多进程与多线程),Futures(对函数的分布式异步提交处理封装,比delayed多提供网络api)。Graph,计算图,也有一个比较常见的概念交DAG(有向无环图)。Dask的任务都是先转化成一个个异步任务,并连接成Graph,等到必要的时候才去执行,也就是lazy懒执行的方式来做的。Scheduer,调度器,既有本机的多进程、多线程、同步调度器,也有分布式任务调度器。

    官网给出了几个调用例子,这里我们调整一下啃源码的顺序,上图的collections->task graph->schedulers顺序不变。但我们先看最基本的delayed异步处理,再看futures分布式异步处理,再去读bag->array->dataframe。安排这个顺序的原因是源码阅读技巧之一:“了解项目技术背景”。我们已经用过dask,最简单的dask应用无非就是inc(自增)+add(两数相加)的一个异步调用场景,这样无需了解更多的数据结构与代码。

    Collections:Delayed

    最简单的一个计算任务: 用Python直接实现,结果都能猜出来吧:

    from time import sleep def inc(x): # 便于观察异步效果 sleep(1) return x + 1 def add(x, y): sleep(1) return x + y if __name__ == '__main__': x = inc(1) # 等1秒,x=2 y = inc(2) # 等1秒,y=3 z = add(x, y) #等1秒,z=5

    在python里串行,时长3秒左右,结果是5。 通过看这个简单任务的Graph我们可以发现,其实inc这个自增函数,其实是可以并行的对吧?如果我们多线程或多进程或者分布式跑inc,最后结果汇总后,再执行add,时间是可以缩短到2秒左右的。 那么Delayed的作用,就是通过简单的方法,让我们可以做到并行。修改后的代码如下:

    from time import sleep from dask import delayed def inc(x): sleep(1) return x + 1 def add(x, y): sleep(1) return x + y if __name__ == '__main__': x = delayed(inc)(1) # dask的命名做的还是不错的,名副其实,delayed就是把我们的任务延期执行,等到整个任务都规划好了,用尽可能好的方法调度,来提高效率,实现并行。 y = delayed(inc)(2) z = delayed(add)(x, y) print(z) # 立即得到一个Delayed对象,但任务是延期执行的,并未实际执行。 print(z.compute()) # 实际去执行,等待2s左右,会输出5

    OK,这里我们先提出几个问题,然后带着问题去肯源码。 1、delayed对add、inc做了什么?或者说把两个函数变成了什么? 2、为什么调用完函数又调用了一次?delayed(inc)(1) 这里的(1)是传递给了谁? 3、z.compute()又发生了什么

    delayed函数

    好了,带着问题读源码总比没目标的读要强,咱们先debug第一行,注意,这里还未用到scheduer和worker,可以先不启动scheduer和worker进程: x = delayed(inc)(1) 加了断点直接跳入(step into),会发现我们步入了一个陌生的库,toolz,什么鬼? 大概的代码如下:

    # toolz.functoolz class curry(object): ... def __call__(self, *args, **kwargs): try: return self._partial(*args, **kwargs) except TypeError as exc: if self._should_curry(args, kwargs, exc): return self.bind(*args, **kwargs) raise

    可恶,半路杀出个程咬金,要不要管呢?建议是先不要管,原因很简单,因为我们的问题中现在没有这个问题,我们可以把问题记下来,放到第二波问题列表里去解决。 我们跳出__call__,再次step in,会看到delayed函数内容:

    @curry def delayed(obj, name=None, pure=None, nout=None, traverse=True): """Wraps a function or object to produce a ``Delayed``. ``Delayed`` objects act as proxies for the object they wrap, but all operations on them are done lazily by building up a dask graph internally. 。。。 """ if isinstance(obj, Delayed): # 如果已经是Delayed对象,则直接返回,我们可以猜测,本函数返回的一定是Delayed或它的子类的对象。 return obj if is_dask_collection(obj) or traverse: task, collections = unpack_collections(obj) # 分出任务与集合,可以跟到源码里看下,根据注释也能猜个大概: # task就是要执行的任务,可以是普通的python函数,也可能是Delayed对象的_key参数。 # collections,合并子图的集合 else: task = quote(obj) collections = set() if task is obj: # 任务即本身,一般发生在要转delayed的是一个原python函数的时候。 if not (nout is None or (type(nout) is int and nout >= 0)): raise ValueError( "nout must be None or a non-negative integer, got %s" % nout ) if not name: # 任务没有名字的话,会生成一个唯一id作为名字 try: prefix = obj.__name__ except AttributeError: prefix = type(obj).__name__ token = tokenize(obj, nout, pure=pure) name = "%s-%s" % (prefix, token) return DelayedLeaf(obj, name, pure=pure, nout=nout) 生成Delayed else: if not name: name = "%s-%s" % (type(obj).__name__, tokenize(task, pure=pure)) layer = {name: task} graph = HighLevelGraph.from_collections(name, layer, dependencies=collections) return Delayed(name, graph)

    Delayed对象和DelayedLeaf对象

    Delayed对象和DelayedLeaf对象的方法与属性如下图所示。 DelayedLeaf源码不多,我贴出来:

    class DelayedLeaf(Delayed): __slots__ = ("_obj", "_key", "_pure", "_nout") def __init__(self, obj, key, pure=None, nout=None): self._obj = obj self._key = key self._pure = pure self._nout = nout @property def dask(self): return HighLevelGraph.from_collections( self._key, {self._key: self._obj}, dependencies=() ) def __call__(self, *args, **kwargs): return call_function( self._obj, self._key, args, kwargs, pure=self._pure, nout=self._nout )

    我们例子里给的inc函数会生成一个DelayedLeaf对象:

    _key:inc的name的话是<函数名+uuid>作为_key存在了属性中,例如’inc-fb15e064-05f6-4418-9158-ed8551f93a56’ _obj:就是函数体本身 _pure:是否是纯函数,这里先不管,函数纯不纯看一下定义:https://www.jianshu.com/p/400926eeadfb _nout:返回值的数量,这里先不管。

    OK,咱们可以回答第一个问题,

    1、delayed对add、inc做了什么?或者说把两个函数变成了什么? delayed是把inc、add等转成了DelayedLeaf对象,而DelayedLeaf继承自Delayed对象。函数本身存在属性_obj中,并且给了一个唯一id作为key,存在_key属性中。

    继续向下debug,到了delayed(inc)(1) 这里的(1),应该是一次调用,这个其实就是调用了DelayedLeaf的__call__方法(源码看上面),它的源码也很简单,返回call_function的结果。那么call_function是返回啥呢?

    def call_function(func, func_token, args, kwargs, pure=None, nout=None): dask_key_name = kwargs.pop("dask_key_name", None) pure = kwargs.pop("pure", pure) if dask_key_name is None: name = "%s-%s" % ( funcname(func), tokenize(func_token, *args, pure=pure, **kwargs), ) else: name = dask_key_name args2, collections = unzip(map(unpack_collections, args), 2) collections = list(concat(collections)) if kwargs: dask_kwargs, collections2 = unpack_collections(kwargs) collections.extend(collections2) task = (apply, func, list(args2), dask_kwargs) else: task = (func,) + args2 graph = HighLevelGraph.from_collections( name, {name: task}, dependencies=collections ) nout = nout if nout is not None else None return Delayed(name, graph, length=nout)

    嗯,从源码上看生成了一个新的Delayed对象,name(_key)又生成了一个新的,有一个graph是通过HighLevelGraph.from_collections生成的,然后nout是返回值数量。 这里怎么理解呢? inc本身生成了一个DelayedLeaf对象,而DelayedLeaf的调用又产生一个全新的Delayed对象,它其实是DelayedLeaf中保存的_obj属性(即inc函数),再加上了传入参数1的一个结合。即把inc(1)变成了一个Delayed对象 call_function函数里还有一个关键,graph。还记得本篇开始的图吗?创建collections 一个最重要的目的就是create task graphs。

    Graph

    HighLevelGraph

    我们重点看一下

    graph = HighLevelGraph.from_collections( name, {name: task}, dependencies=collections )

    from_collections是HighLevelGraph的工厂函数,负责创建HighLevelGraph对象。创建的时候要告诉它name,layer,dependencies。

    name:就是Delayed的_key(不是DelayedLeaf的_key)layer:可以理解为计算层吧,是一个字典,{name:task}构成的,这儿的task是一个元组,是有(任务,参数)构成,即(inc函数,1)构成的,add的话是由(add, ‘inc-edb7daeb-62f9-4726-84f9-a93b3621a32f’, ‘inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b’)构成的。即只保留了key,value在dependencies里。dependencies:依赖的任务,inc的话是空,它的参数是可以立即获得的,add函数的话,它会依赖两个inc(x)的结果,所以会是[Delayed(‘inc-edb7daeb-62f9-4726-84f9-a93b3621a32f’), Delayed(‘inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b’)]对象列表。

    graph类图: layers是一个字典结构,是把各计算层通过字典结构构建起来的一个更大的字典,里面存储了Delayed对象的_key、具体的执行函数、参数。比如刚才的inc与add的layers:

    {'add-28bab8ca-e244-4962-a2af-f93220659ce8': {'add-28bab8ca-e244-4962-a2af-f93220659ce8': (<function __main__.add(x, y)>, 'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f', 'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b')}, 'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f': {'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f': (<function __main__.inc(x)>, 1)}, 'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b': {'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b': (<function __main__.inc(x)>, 2)}}

    dependencies里存了依赖关系,我们知道add肯定依赖于inc,打印出来就是

    {'add-28bab8ca-e244-4962-a2af-f93220659ce8': {'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b', 'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f'}, 'inc-edb7daeb-62f9-4726-84f9-a93b3621a32f': set(), 'inc-7c4819ce-bf24-41e8-b796-5eb9b171e37b': set()}

    Delayed与Graph的关系

    OK,至此Delayed对象内容有什么就清晰了,它有一个key做唯一标识,然后属性里保存了graph计算图,存储在dask属性里。这样我们可以回答第二个问题了

    2、为什么调用完函数又调用了一次?delayed(inc)(1) 这里的(1)是传递给了谁? delayed(inc)是一个Delayed对象,再调一次是把Delayed与参数组合成一个新的Delayed对象。具体存储在Delayed的dask属性中,而这个属性是一个HighLevelGraph,通过字典形式存储了key、task、args等信息以及他们的依赖关系。

    dask的代码结构还是比较清晰的,delayed相关的都在dask/delayed.py下,HighLevelGraph相关的代码在dask/highlevelgraph.py下。比较好找。

    Schedulers

    DaskMethodsMixin

    目前我们有了一个Graph,我们如何让它进行计算呢?这里就不得不提到Delayed的父类:DaskMethodsMixin了,它的代码在dask/base.py里面,这里的代码是整个dask源码的基础类或方法所在。

    class DaskMethodsMixin(object): def visualize(self, filename="mydask", format=None, optimize_graph=False, **kwargs): # 用graphviz对dask的collections进行可视化,不是咱们的重点,有兴趣的可自行研究 ... def persist(self, **kwargs): # 持久化,这个是把dask的collections内容都读到内存中。后面我们再研究,暂时先不管 ... def compute(self, **kwargs)# 对dask的collections执行计算, # 支持的参数包括scheduler,即可以指定用哪种调度器去执行任务,不指定的话也会自动识别scheduer # optimize_graph,是否对graph进行优化后进行,默认会优化 (result,) = compute(self, traverse=False, **kwargs) return result def __await__(self): # 使用async/await异步处理,先不管。 def compute(*args, **kwargs): traverse = kwargs.pop("traverse", True) optimize_graph = kwargs.pop("optimize_graph", True) collections, repack = unpack_collections(*args, traverse=traverse) if not collections: return args # 获取scheduler,跟进去看的话,其实默认会用threaded.get作为scheduer schedule = get_scheduler( scheduler=kwargs.pop("scheduler", None), collections=collections, get=kwargs.pop("get", None), ) dsk = collections_to_dsk(collections, optimize_graph, **kwargs) keys, postcomputes = [], [] for x in collections: keys.append(x.__dask_keys__()) postcomputes.append(x.__dask_postcompute__()) results = schedule(dsk, keys, **kwargs) return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

    以我们当前的例子,scheduler会使用多线程的方式进行计算。源码在dask/threaded.py中的get函数定义:

    def get(dsk, result, cache=None, num_workers=None, pool=None, **kwargs): """ Threaded cached implementation of dask.get Parameters ---------- dsk: dict A dask dictionary specifying a workflow result: key or list of keys Keys corresponding to desired data num_workers: integer of thread count The number of threads to use in the ThreadPool that will actually execute tasks cache: dict-like (optional) Temporary storage of results Examples -------- >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} >>> get(dsk, 'w') 4 >>> get(dsk, ['w', 'y']) (4, 2) """ global default_pool pool = pool or config.get("pool", None) num_workers = num_workers or config.get("num_workers", None) thread = current_thread() # 获取当前线程 with pools_lock: if pool is None: if num_workers is None and thread is main_thread: # 初次运行,会按CPU核数创建线程池 if default_pool is None: default_pool = ThreadPool(CPU_COUNT) atexit.register(default_pool.close) pool = default_pool elif thread in pools and num_workers in pools[thread]: pool = pools[thread][num_workers] else: pool = ThreadPool(num_workers) atexit.register(pool.close) pools[thread][num_workers] = pool # 获取异步结果,注意是会阻塞当前线程的。这里我们不展开看,等我们看完数据模型后,一起看scheduler。 results = get_async( pool.apply_async, len(pool._pool), dsk, result, cache=cache, get_id=_thread_get_id, pack_exception=pack_exception, **kwargs ) # Cleanup pools associated to dead threads with pools_lock: active_threads = set(threading.enumerate()) if thread is not main_thread: for t in list(pools): if t not in active_threads: for p in pools.pop(t).values(): p.close() return results

    终于,我们的结果出来了,是5。 所以回答第3个问题:

    z.compute()又发生了什么? z.compute()又发生了什么做了两件主要的事情,1是决定用哪个scheduler,另外就是讲任务提交给scheduler并等待结果。

    小结

    本篇主要讲解了dask里面第一个数据模型Delayed,让我们大致认识到了dask的工作机制:把计算、数据通过collections,组织成graph,然后交给scheduler。等最后结果回来。 下一篇,我们开始研究另外一个数据模型,Bag。

    Processed: 0.015, SQL: 9