Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Martin Durant
    @martindurant
    https://docs.dask.org/en/latest/develop.html is the first thing you should read; and of course: get familiar with using dask.
    rajat-99
    @rajat-99
    @martindurant thanks for the reply. I have read this and I will get myself familiar with dask.
    Angus Hollands
    @agoose77:matrix.org
    [m]

    @martindurant: Following up a conversation I think we had recently-

    I have a Dask graph where the main constraint is memory. The graph in question is in a simplistic sense a tree where one task's result is used by several other tasks. At the moment, it looks like Dask is computing one "leaf" result at a time, and so these intermediate"shared task" results are sitting there until the final leaf result is computed. Is there any way to resolve this without introducing arbitrary memory resource constraints?

    Martin Durant
    @martindurant
    There was some work on this kind of problem that I think made it into the latest release, so the answer might be: upgrade!
    2 replies
    Angus Hollands
    @agoose77:matrix.org
    [m]
    @martindurant: do you think it was this one? https://github.com/dask/distributed/pull/5018/files
    Martin Durant
    @martindurant
    yes
    (and the other work referred to in there)
    Angus Hollands
    @agoose77:matrix.org
    [m]
    Thanks! I'm using delayed so I wonder if the tasks are being scheduled elsewhere
    Mike Pfaffenberger
    @mpfaffenberger
    hey all, is there a separate gitter chatroom for dask on kubernetes
    Martin Durant
    @martindurant
    No there isn’t. A lot of conversation now happens on slack, so you could ask for a new channel there. Do you mean dask-kubernetes, the helm chart, or dask-gateway-on-kubernetes?
    Mike Pfaffenberger
    @mpfaffenberger
    probably dask-kubernetes
    Martin Durant
    @martindurant
    You can of course ask on stack-overflow or post an issue, if appropriate.
    oojo12
    @oojo12
    Hey guys is there a general approach to solving cancelled errors with Dask Distributed? Im trying to troubleshoot a cancelled error my team and I seem to get whenever read in up to 2TB of data with Dask Distributed, though when we subset it to the data of interest for our tests it is more like 10GB. We've tried increasing the dask_distribjted_admin_tick_limit configuration and persisting the 10GB subset to the cluster and then running the other processing functions on the Dask dataframe collection. Any thoughts? I don't have a minimal reproducible example at this time as it uses sensitive data/functions. Thank you.
    oojo12
    @oojo12
    Actually moving this question to stack overflow so it can potentially help other users.
    MQMQ2018
    @MQMQ2018

    Does anyone experience using "dask_ml.model_selection.GridSearchCV(sklearn_function(),param_list,cv=5,n_jobs=16,scheduler="multiprocessing")" in parallel computing on a single cluster, but encounter all assigned 16 CPUs workers keep sleeping forever ?

    ===

    import sklearn.neural_network as snn
    import dask_ml.model_selection as dcv
    njobs=16
    regressor = dcv.GridSearchCV(snn.MLPRegressor(verbose=False),param_list,cv=5,n_jobs=njobs,scheduler="multiprocessing")
    regressor.fit(X_train, y_train)

    ===

    And the slurm file I generated to run the job of code above on Cluster is as below:

    ===

    !/bin/bash

    SBATCH

    -A erw234 --partition=shared --nodes=1 --ntasks-per-node=16 --mem=80GB
    --time=2:00:00

    ===

    But there is still no execution process generated and all the running
    jobs' state keep "S", which looks like it is holding there forever. Do
    you have any experience and suggestion on this? Thank you so much in
    advanced.

    1 reply
    C0dingBastard
    @C0dingB_twitter
    Hi, I'd like to invite a Dask maintainer to give a university talk
    For example, focusing in the scheduling algorithms
    or the architecture in general and some implementation details, and invite students to join the community to contribute
    Martin Durant
    @martindurant
    This channel gets less traffic these days than slack, you might want to repeat your request there (see the doc pages to find the invite)
    aavbsouza
    @aavbsouza

    Hello everyone. what could be a reason for this messaging?

    distributed.utils - ERROR - ('fetch', 'memory')
    Traceback (most recent call last):
      File "/usr/local/python/lib/python3.9/site-packages/distributed/utils.py", line 638, in log_errors
        yield
      File "/usr/local/python/lib/python3.9/site-packages/distributed/worker.py", line 2411, in gather_dep
        self.transition(ts, "memory", value=data[d])
      File "/usr/local/python/lib/python3.9/site-packages/distributed/worker.py", line 1692, in transition
        func = self._transitions[start, finish]
    KeyError: ('fetch', 'memory')
    tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fccc940de80>>, <Task finished name='Task-2753' coro=<Worker.gather_dep() done, defined at /usr/local/python/lib/python3.9/site-packages/distributed/worker.py:2267> exception=KeyError(('fetch', 'memory'))>)

    I am getting this message. Despite of this error. The application apears to be completing correctly. thanks =)

    Martin Durant
    @martindurant
    If this is with the latest versions of dask/distributed (note that a release is currently underway), please submit a bug report.
    (if one does not already exist)
    aavbsouza
    @aavbsouza
    Hello @martindurant (this message was obtained using 2021.7.1) Yesterday I updated the env that I am currently to the latest version (2021.8.0). I will test again using the newer version
    aavbsouza
    @aavbsouza

    Hello @martindurant , the previous message does not appear anymore. However I still with a problem trying to save a somewhat large dataframe using the to_parquet method (using the pyarrow engine). The scheduler present this message repeatedly:

    distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.87s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
    Task exception was never retrieved
    future: <Task finished name='Task-59964' coro=<_listener_handler_coroutine() done, defined at /usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py:146> exception=UCXError('<stream_send>: Connection reset by remote peer')>
    Traceback (most recent call last):
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 163, in _listener_handler_coroutine
        peer_info = await exchange_peer_info(
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 59, in exchange_peer_info
        await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)
    ucp.exceptions.UCXError: <stream_send>: Connection reset by remote peer
    Task exception was never retrieved
    future: <Task finished name='Task-59972' coro=<_listener_handler_coroutine() done, defined at /usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py:146> exception=UCXError('<stream_send>: Connection reset by remote peer')>
    Traceback (most recent call last):
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 163, in _listener_handler_coroutine
        peer_info = await exchange_peer_info(
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 59, in exchange_peer_info
        await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)

    The workers presents this kind of message:

    distributed.core - INFO - Event loop was unresponsive in Worker for 33.98s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
    distributed.utils - ERROR - Timed out trying to connect to ucx://10.20.30.22:12345 after 30 s
    Traceback (most recent call last):
      File "/usr/local/python/lib/python3.9/site-packages/distributed/comm/ucx.py", line 351, in connect
        ep = await ucx_create_endpoint(ip, port)
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 960, in create_endpoint
        return await _get_ctx().create_endpoint(
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 361, in create_endpoint
        peer_info = await exchange_peer_info(
      File "/usr/local/ucx/ucx_4.6-1.0.1.1/lib/python3.9/site-packages/ucp/core.py", line 62, in exchange_peer_info
        await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
    asyncio.exceptions.CancelledError
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/python/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
        fut.result()
    asyncio.exceptions.CancelledError
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/python/lib/python3.9/site-packages/distributed/comm/core.py", line 283, in connect
        comm = await asyncio.wait_for(
      File "/usr/local/python/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
        raise exceptions.TimeoutError() from exc
    asyncio.exceptions.TimeoutError

    The application hangs without any message. This execution was done using the ucx protocol. thanks

    ananthajanakiraman
    @ananthajanakiraman
    Hi All , I am trying to set up a Prefect flow with executor as the DaskExecutor and creating the dask cluster using dask gateway. I am trying to increase the connection timeout and tick interval but not sure how to pass the config when the dask cluster is created. DaskExecutor doesn't seem to have a way for me to pass the configuration and the documentation is not helpful as well. I tried to read through multiple stackoverflow posts and read through the dask documentation but not been successful in finding an answer about setting the configuration that I am looking for when the dask cluster is created. Can you please point me to a resource or guide me?
    Joshua Pacifici
    @Jpac14_gitlab
    Hey I wanted to know if I could apply dask to this piece of code? I was thinking about adding all the visit_oceans and visit_land into a array then executing them at once? Any ideas, my friends was running this and it has been taking months, going sequentially but I think I can speed it up with dask. https://gitlab.com/-/snippets/2162776 I really dont want this process to take months. If I can use dask I plan to scale this across multiple machines.
    Martin Durant
    @martindurant
    @ananthajanakiraman , I’m not sure anyone here knows,perhaps approach the prefect team
    @Jpac14_gitlab , it’s unlikely anyone will have the time to analyse the code in your link. Maybe you can make it into. a “well formed question” for stackoverflow (i.e., try some things, report problems, define expectations…).
    Louis Maddox
    @lmmx
    Hi @martindurant I took a look at the way fsspec.read_block is used to identify newline delimiters at the block offsets in dask's interface to pandas.read_csv, and was wondering if you've any comments on my feature suggestion for validating the partition offsets for CSV files with unescaped newlines within rows dask/dask#8045 I hope this would be a welcome contribution to dask -- as without this dask can only handle a subset of the CSVs pandas can handle -- and I'd like to find the most reliable approach/fit in with the existing library design. If not I am happy to take ownership of this and submit it for review afterwards
    Martin Durant
    @martindurant
    @lmmx , if you can think of a way to do it, you are very welcome to contribute! This has been an outstanding problem for a long time, you don’t need permission to help :) Now I’ll take a little time to actually read your issue (I hadn’t noticed it before)
    Louis Maddox
    @lmmx
    Fantastic, thank you, no rush :-)
    Søren Fuglede Jørgensen
    @fuglede
    Hi Dask community. I'm playing around with the idea of automating the process of "create an AKS cluster (or node pool in existing cluster) with pre-defined node counts/sizes and pip/conda requirements using the dask/dask chart" to make ephemeral clusters available for experimental workflows and applications for non-tech-savvy users -- all the APIs are there to do so are already there, so it should be too too bad, but I'm wondering if there's any prior art one could lean on?
    Angus Hollands
    @agoose77:matrix.org
    [m]
    @fuglede: I am not an expert, but I can recommend something like Terraform to facilitate infrastructure-as-code for this
    Søren Fuglede Jørgensen
    @fuglede
    Thanks @agoose77:matrix.org; looking around for people who have Terraformed Dask, I came across Quansight's QHub, which certainly comes close to what I was asking. In particular, if I read it correctly, they seem to have managed to couple scaling of the Dask cluster a la dask_gateway.Gateway().new_cluster().scale(60) with scaling of the Kubernetes cluster itself (but maybe you can achieve that through simpler means).
    13 replies
    agoose77
    @agoose77:matrix.org
    [m]
    • only being able to provide environments as Docker files (i.e. nothing like EXTRA_PIP_PACKAGES -- is that accurate?) -- which raises the barrier to entry. And afaict from a cursory glance, only publicly available Docker images (as opposed to arbitrary Docker registries), which makes internal use tougher,
    I think you can set env_vars['EXTRA_PIP_PACKAGES'] on the cluster provider, which will then install those packages
    2 replies
    Joshua Pacifici
    @Jpac14_gitlab
    Hey, how can change kill time to longer than 3 seconds, because in those 3 seconds it does some IO process:
    distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
    Søren Fuglede Jørgensen
    @fuglede
    I seem to be able to recall that there's a way to avoid that futures are cancelled when a client is closed (so another client can pick it up later), but am unable to dig it up. Am I misremembering?
    5 replies
    Talley Lambert
    @tlambert03

    hi! I'm creating an nD dask array backed by a map_blocks operation that reads planes from disk when computed:

    my_array = da.map_blocks(some_reader, ....)

    however:

    a) I want to close the underlying file handle except when necessary to perform the lazy read operations

    b) I'm not the one doing the indexing/computing on my_array, (it's passed to a user who is free to index however they want)

    I'm wondering if there's a context so to speak that can wrap all of the "inner chunk" calls to compute() to open/close the underlying file just once when a user calls my_array[some_idx].compute(). (I know that I can put the with open(...): context inside of the some_reader function that I pass to map_blocks, but depending on how that array is chunked... that could cause hundreds of open/close events for a single call to np.asarray(my_array)).

    I suspect it's hard to a priori exactly when to open/close in this case, but just curious.

    6 replies
    Mahendra Paipuri
    @mahendrapaipuri
    Hello, we have been using dask distributed for some complex workflows in radio astronomy. We have a question regarding the dask managed memory, if someone can shed some light. So, when we run the workflows in remote cluster (usually on a SLURM cluster on HPC platforms), we notice that managed memory in the dashboard is always either zero or close to zero. All the data that is being read into dask workers while doing computations is accounted as unmanaged memory. As far as I understand, all the data objects that dask workers are handling should be accounted in managed memory. The workflows are running well and we are getting correct results. Our question is why do we see zero managed memory all the time? We do not use any dask arrays/dataframes in our computations. All the data objects are either custom or xarray/numpy arrays! We construct the graph by delaying the functions that do computations on these data types and submit them to dask client. Any leads on this would be very much helpful! Thanks!
    7 replies
    Niels Cautaerts
    @din14970

    Hello! I'm writing some code that should process a larger than memory multidimensional array dataset using dask and partially a GPU. The basic idea of the code is:

    • create a dask array from a file on disk
    • perform some operations on the chunks on the CPU
    • send the data to the GPU for an intensive calculation that happens on a chunk wise basis with map_blocks
    • send back an array of reduced size and store in memory

    Initiallly I did the sending to the GPU with array.map_blocks(cupy.asarray) and the returning from the GPU with array.map_blocks(cupy.asnumpy), but it seems the single machine scheduler does not take into consideration limitations in GPU memory, so even on a single thread it could plan all to cupy.asarray operations sequentially crashing the program with an OutOfMemory exception. I could resolve the problem in two ways: by including the send to and receive from GPU calls in the GPU processing function, or by using the Local(CUDA)Cluster. I noticed that the "distributed" approach works much faster, presumably because it can more efficiently schedule GPU-CPU traffic. I also noticed that with the threads scheduler, the GPU memory remains high even after the program terminates (presumeably due to the way cupy's memory pool works) but this does not happen with the distributed schedulers. I have a feeling though that since part of the work is I/O bound, part is CPU bound, the default LocalCUDACluster with one worker is sub-optimal. My questions:

    • is it possible to enforce VRAM limits on the threads scheduler? Is there an underlying difference in how the distributed schedulers interact with GPUs that explains my observations?
    • is there an approach to optimally schedule tasks and use all available resources in a workflow that includes loading data from disk, partially processing on the CPU, sending to and processing on the GPU?
    2 replies
    Pascal Bourgault
    @aulemahal

    Hi! I'm having some weird problem and I don't know where to start the debugging. On Centos 7, I am using dask with a (local) distributed scheduler, let's say 3 workers and 4 threads per worker. From time to time, it seems the parallelization is blocked : the sum of cpu usage by each worker is 100%. Often, I'll kill the process, restart it and the problem is solved : individual worker cpu usage goes back to oscillating between 200 and 400%.
    We have another machine that has the same specs and almost the same env and there the problem never occurs.

    I'm lost. What should I look for? What to diagnostic? The problem seems so random...

    estraven
    @estraven:matrix.org
    [m]
    Hi, I have some performance issues with my local dask client and I wonder if I can change something in the configuration (client = Client(n_workers=4, threads_per_worker=4, processes=False, memory_limit='2GB'). The issue is that it takes too long to complete the task while the task stream of the dashboard is empty, but the 4 workers are near 100% CPU usage and 45% memory. The task Is to read a huge csv called links, reduce it to a shape 35K x 8, and then run the following computation: res = links.map_partitions(
    lambda df: df.apply((lambda row: search_sensors(sensors, row.l_start_time, row.l_stop_time)), axis=1)).compute(). The function search_sensors is supposed to use the start and stop time of each row to search all the relevant rows in 'sensors' df which is ~11k x 600.
    Søren Fuglede Jørgensen
    @fuglede
    Hi again! Is there a way to force subsequent tasks on the same worker to run in separate Python processes? I have a task that relies on an ill-behaved extension module that is unloadable and whose state carries over between tasks.
    Martin Durant
    @martindurant
    You could make your task call Popen or multiprocessing directly? Of course, you want to be super-certain you don’t saturate the node with running processes.
    8 replies
    Ian Liu Rodrigues
    @ianliu
    When I issue read_parquet('*.parquet'), does this assign a partition for each file?
    Martin Durant
    @martindurant
    ^ There are various options, but generally: yes
    Ian Liu Rodrigues
    @ianliu
    ok, thanks!
    is it possible to take the file name? Or the filename is lost?
    something like df.partition_name[0]
    Martin Durant
    @martindurant
    The filename is not part of th output, no.