by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Eduardo Gonzalez
    @eddienko
    I would have expected it to be in https://distributed.dask.org/en/latest/web.html
    Benjamin Zaitlen
    @quasiben
    or there
    Eduardo Gonzalez
    @eddienko
    cool. I may submit a PR ;-)
    Matthew Rocklin
    @mrocklin
    The first link is better. We should retire the doc in distributed.dask.org
    Adam Thornton
    @athornton
    This is probably just something I missed when it changed (we're trying to get back to current versions of everything)--KubeCluster workers used to kill themselves after 60 seconds of not being able to talk to their master.
    They don't anymore.
    Is that something I need to explicitly configure now?
    (if you don't have some kind of dead-man, your cluster or namespace quickly fills up with workers trying to talk to nonexistent kernels)
    Adam Thornton
    @athornton
    It looks like I'm still setting death-timeout in my dask-worker invocation:
    /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/bin/python /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/bin/dask-worker --nthreads 4 --memory-limit 8192M --death-timeout 60 --local-directory /tmp/dask-krughoff-e8dd95ac-72drc
    But I just get this in my logs over and over:
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise
        raise IOError(msg)
    OSError: Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07069128>: ConnectionRefusedError: [Errno 111] Connection refused
    Shouldn't that trigger the death timeout after six of those?
    Whole stack trace, if it helps, is:
    tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7feb77f9b278>>, <Task finished coro=<Worker.heartbeat() done, defined at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/worker.py:866> exception=OSError("Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07444240>: ConnectionRefusedError: [Errno 111] Connection refused")>)
    Traceback (most recent call last):
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect
        quiet_exceptions=EnvironmentError,
    tornado.util.TimeoutError: Timeout
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
        ret = callback()
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
        future.result()
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/worker.py", line 875, in heartbeat
        metrics=await self.get_metrics(),
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 747, in send_recv_from_rpc
        comm = await self.pool.connect(self.addr)
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 874, in connect
        connection_args=self.connection_args,
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect
        _raise(error)
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise
        raise IOError(msg)
    OSError: Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07444240>: ConnectionRefusedError: [Errno 111] Connection refused
    Matthew Rocklin
    @mrocklin
    @athornton I recommend raising an issue. You'll get far faster response times and much better support there.
    Justin Sybrandt
    @JSybrandt

    Hey I just posted a question on SO: https://stackoverflow.com/questions/57760355/how-should-i-load-a-memory-intensive-helper-object-per-worker-in-dask-distribute

    Summary: I have a big object that I need per-worker and don't want to reinit every function call. Can dask handle this?

    Yash Aggarwal
    @FR4NKESTI3N
    @jcrist There was a GSoC project which mentioned cythonisation of scheduler. Is that project still available ? (Not as a part of GSoC, of course) I wish to do this project now as a part of my coursework project. Thanks!
    Jim Crist-Harif
    @jcrist
    That work has not been done. The project was to experiment and see if meaningful speedups could have been attained by cythonizing some algorithms used by the scheduler. Note that any work done on this may or may not be merged, but it may be an interesting experiment. I personally have no time to mentor/respond to this issue, others may.
    Yash Aggarwal
    @FR4NKESTI3N
    @jcrist Got it. Would it be possible to obtain a list of todo's or projects that the organization would be more interested in?
    Jim Crist-Harif
    @jcrist
    We don't have any large projects we're actively working on. Depending on the size of project you need for your class, you may be interested in issues in our issue tracker: https://github.com/dask/dask/issues. Otherwise, adding dask support to a library in the python ecosystem would be an excellent project. Dask itself is fairly - we're mostly focused on performance and small changes at this point.
    The Python ecosystem however still has plenty of work that could be done to add parallelism support. Libraries that have added dask support include xarray, scikit-image, hyperspy, etc... Finding a Python library that is looking to add dask support would likely be a good project, and is something the dask dev team is actively interested in.
    Varun B Patil
    @varunbpatil

    Hello everyone. Just wanted to know whether I can perform Dask array computations inside a delayed function, something like this.

    def dask_delayed_example(X):
        mean = X.mean(axis=0)
        std = X.std(axis=0)
        return X - mean
    
    X = df.to_dask_array(lengths=True)
    res = dask_delayed_example(X)

    Is Dask delayed only meant for parallelizing computation or can it also be used to perform computation on larger-than-memory datasets in a distributed manner?

    James Stidard
    @jamesstidard
    Hi @varunbpatil, I'm far from being an expert on dask but I think I have a similar use case. And I asked about it a couple days ago in here. @bnaul pointed me at this recipe https://distributed.dask.org/en/latest/task-launch.html - might be what you're after.
    I also recall reading in the docs that nesting delays that return delayed is a antipattern of sorts. So I think you want that task scheduling other tasks approach
    Varun B Patil
    @varunbpatil
    Thanks for the link @jamesstidard . I'll check it out.
    Yash Aggarwal
    @FR4NKESTI3N
    @jcrist Thank you. I'll try and look for potential libraries for now
    Fran├žois Steinmetz
    @fsteinmetz

    Hi there ! I'm trying to use dask-ssh as follows:

    % dask-ssh --scheduler master1 node1
    
    ---------------------------------------------------------------
                     Dask.distributed v1.27.0
    
    Worker nodes:
      0: node1
    scheduler node: master1:8786
    ---------------------------------------------------------------
    [ scheduler master1:8786 ] : /home/applis/anaconda/envs/py3v19.04/bin/python -m distributed.cli.dask_scheduler --port 8786
    [ worker node1 ] : /home/applis/anaconda/envs/py3v19.04/bin/python -m None master1:8786 --nthreads 0 --host node1 --memory-limit auto

    but I cannot connect a client to the scheduler and logging does not provide more information. Client('master1:8786') fails with OSError: Timed out trying to connect... and I can not access the web ui at master1:8787. However, running dask-scheduler and dask-workerwork fine. Any suggestion ?

    Davis Bennett
    @d-v-b
    simple question: when using dask.array with distributed to do something like darray.mean(), do the workers get the full task graph of darray?
    Davis Bennett
    @d-v-b
    i'm asking because I noticed that using da.from_array(large_numpy_array) was blowing up workers, as (counterintuitively) the large_numpy_array is not actually partitioned along the expected chunks. Each worker appears to get a full copy of large_numpy_array regardless of the chunking.
    i'm looking into the relevant code to see if there's a simple fix
    Martin Durant
    @martindurant
    No, the workers do not get a graph at all, they get tasks with inputs. Depending on the chunking scheme and the operation you are doing, tasks may require many pieces of the overall array. However, if you are doing from_array, it would seem it fits in memory anyway, so Dask is probably not helping you.
    Davis Bennett
    @d-v-b
    i'm in an HPC environment, so dispatching stuff to loads of cores is a common use-case, even if that stuff might fit in RAM on a single workstation
    it just seems weird to me that from_array(large_array, chunks=chunks)[0].compute(), for example, does not allocate data to workers the way one would expect from the chunking
    in terms of tasks, i'm guessing that the first task (array creation) requires the entire array, so every subsequent task inherits that dependency.
    Davis Bennett
    @d-v-b
    and this inheritance is independent of chunking, which was the counter-intuitive part (for me)
    Markus Binsteiner
    @makkus_gitlab
    Hey, has anyone tried Dask as a Celery replacement for Flask, to run long running jobs asynchronously?
    Martin Durant
    @martindurant
    Dask is used by airflow and preferct, and somewhere there is an old blog post specifically discussing dask in celery-like workflows
    Markus Binsteiner
    @makkus_gitlab
    Yes, I'm using prefect, but only non-web-related stuff.
    I guess it shouldn't be too hard to kick off dask-jobs from within Flask. Was just wondering whether there are any examples and experiences doing that around.
    Not having to setup message queues and the like seems like a huge plus to me.
    Martin Durant
    @martindurant
    I am not aware of a flask-driven dask example, but who knows
    Markus Binsteiner
    @makkus_gitlab
    No worries, thanks! I might play around a bit, see whether there are any downsides compared to using celery.
    Martin Durant
    @martindurant
    @d-v-b , you can always rebalance data/tasks held on the cluster. A number of heuristics are used to determine where to run things, and not copying data is a major factor.
    Davis Bennett
    @d-v-b
    @martindurant can you direct me to the relevant documentation for this?
    Davis Bennett
    @d-v-b
    and if I made an issue regarding this behavior, would it go in distributed or dask?
    Martin Durant
    @martindurant
    the former
    Davis Bennett
    @d-v-b
    I haven't checked memory usage using the multiprocessing scheduler, it's possible that the same behavior occurs there
    but thanks for the documentation link
    it looks like all these methods target optimal use of data that has been localized to the cluster using e.g. scatter, is that right?
    in my use case, there's no explicit management of data on the cluster -- I just locally define a dask array and try to compute its first element (for example) using the distributed scheduler