正在使用--preload初始化DaskWorker中的全局任務模塊?

更新時間:2024-04-02 14:01:46

問題闡述

我試圖實現類似于這些問題(,)的內容,其中我有一個(相對)大的模型,我希望在接受需要該模型的任務的工作線程子集上預初始化該模型。理想情況ꩵ下,我甚至不希望客戶端計算機具有該模型。

在發現這些問題之前,我最初的嘗試是在共享模塊worker_task.model中定義delayed任務,并在工作程序的--preload腳本中為該任務分配一個模塊全局變量(例如worker_tasks.model.model)以供該任務使用;然而,由于某種原因,這并不起作用-該變量在預加載腳本中設置,但在調用該任務時仍為None

init_Model_worker.py:

import logging
from uuid import uuid4

from worker_tasks import model


def dask_setup(worker):
    model.model = f'<mock model {uuid4()}>'

    logger = logging.getLogger('distributed')
    logger.warning(f'model = {model.model}')

worker_tasks/model.py:

import logging
import random
from time import sleep
from uuid import uuid4

import dask

model = None


@dask.delayed
def compute_clinical(inp):        
    if model is None:
        raise RuntimeError('Model not initialized.')

    sleep(random.uniform(3, 17))

    return {
        'result': random.choice((True, False)),
        'confidence': random.uniform(0, 1)
        }

這是我啟動它并將某些內容提交給計劃程序時的工作日志:

> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.28.0.4:41743'                         
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging      
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>                   
distributed.worker - INFO -       Start worker at:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -          Listening to:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -              nanny at:           172.28.0.4:41743                       
distributed.worker - INFO -              bokeh at:           172.28.0.4:37766                       
distributed.worker - INFO - Waiting to connect to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -               Threads:                          4                       
distributed.worker - INFO -                Memory:                    1.93 GB                       
distributed.worker - INFO -       Local Directory:           /worker-mhozo9ru                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -         Registered to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.core - INFO - Starting established connection                                           
distributed.worker - WARNING -  Compute Failed                                                      
Function:  compute_clinical                                                                         
args:      ('mock')                                                                                 
kwargs:    {}                                                                                       
Exception: RuntimeError('Model not initialized.')                                                   

您可以看到,重新加載預加載腳本后,model<mock model faa41af0-d925-46ef-91c9-086093d37c71>;但當我嘗試從任務中調用它時,得到None

我將嘗試根據對其他問題的回答來實施精準答案,但我有幾個與Worker預加載相關的問題:

  1. 為什么在預加載腳本中分配任務后,調用任務時模型None會出現?
  2. 是否一般建議避免在Worker--preload腳本中執行此類操作?從客戶端調用工作進程狀態的初始化是否更好?如果是,為什么

精準答案

我懷疑模型變量會立即綁定到您的函數中,但是它會序列化函數。您可以嘗試執行以下操作:

@dask.delayed
def compute_clinical(inp):       
    from worker_tasks.model import model

    if model is None:
        raise RuntimeError('Model not initialized.')

或者,與其將變量分配給全局模塊作用域(這在Pythonཧ中可能很難理解),不如嘗試將其分配給Workeཧr本身。

from dask.distributed import get_worker

def dask_setup(worker):
    worker.model = f'<mock model {uuid4()}>'

@dask.delayed
def compute_clinical(inp):       
    if get_worker().model is None:
        raise RuntimeError('Model not initialized.')