Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Davis Bennett
    @d-v-b
    simple question: when using dask.array with distributed to do something like darray.mean(), do the workers get the full task graph of darray?
    Davis Bennett
    @d-v-b
    i'm asking because I noticed that using da.from_array(large_numpy_array) was blowing up workers, as (counterintuitively) the large_numpy_array is not actually partitioned along the expected chunks. Each worker appears to get a full copy of large_numpy_array regardless of the chunking.
    i'm looking into the relevant code to see if there's a simple fix
    Martin Durant
    @martindurant
    No, the workers do not get a graph at all, they get tasks with inputs. Depending on the chunking scheme and the operation you are doing, tasks may require many pieces of the overall array. However, if you are doing from_array, it would seem it fits in memory anyway, so Dask is probably not helping you.
    Davis Bennett
    @d-v-b
    i'm in an HPC environment, so dispatching stuff to loads of cores is a common use-case, even if that stuff might fit in RAM on a single workstation
    it just seems weird to me that from_array(large_array, chunks=chunks)[0].compute(), for example, does not allocate data to workers the way one would expect from the chunking
    in terms of tasks, i'm guessing that the first task (array creation) requires the entire array, so every subsequent task inherits that dependency.
    Davis Bennett
    @d-v-b
    and this inheritance is independent of chunking, which was the counter-intuitive part (for me)
    Markus Binsteiner
    @makkus_gitlab
    Hey, has anyone tried Dask as a Celery replacement for Flask, to run long running jobs asynchronously?
    Martin Durant
    @martindurant
    Dask is used by airflow and preferct, and somewhere there is an old blog post specifically discussing dask in celery-like workflows
    Markus Binsteiner
    @makkus_gitlab
    Yes, I'm using prefect, but only non-web-related stuff.
    I guess it shouldn't be too hard to kick off dask-jobs from within Flask. Was just wondering whether there are any examples and experiences doing that around.
    Not having to setup message queues and the like seems like a huge plus to me.
    Martin Durant
    @martindurant
    I am not aware of a flask-driven dask example, but who knows
    Markus Binsteiner
    @makkus_gitlab
    No worries, thanks! I might play around a bit, see whether there are any downsides compared to using celery.
    Martin Durant
    @martindurant
    @d-v-b , you can always rebalance data/tasks held on the cluster. A number of heuristics are used to determine where to run things, and not copying data is a major factor.
    Davis Bennett
    @d-v-b
    @martindurant can you direct me to the relevant documentation for this?
    Davis Bennett
    @d-v-b
    and if I made an issue regarding this behavior, would it go in distributed or dask?
    Martin Durant
    @martindurant
    the former
    Davis Bennett
    @d-v-b
    I haven't checked memory usage using the multiprocessing scheduler, it's possible that the same behavior occurs there
    but thanks for the documentation link
    it looks like all these methods target optimal use of data that has been localized to the cluster using e.g. scatter, is that right?
    in my use case, there's no explicit management of data on the cluster -- I just locally define a dask array and try to compute its first element (for example) using the distributed scheduler
    Martin Durant
    @martindurant
    Yes, about manully moving things around in cluster memory. https://distributed.dask.org/en/latest/work-stealing.html describes when Dask might do this automatically
    You should isolate the exact code, expectations and findings in an issue, then
    Davis Bennett
    @d-v-b
    sounds good
    Davis Bennett
    @d-v-b
    @martindurant see dask/distributed#3032
    Michał Jastrzębski
    @inc0
    hello everyone, I have question. I'm trying to run computation that would require large dataset as argument. Something like df.apply(navigate_over_array, args=(very_big_array,)). I'd like to pre-scatter very_big_array to all workers so function can reference it there. What's good way to pass it? If I scatter, I get future, is there a way that would allow me to access data behind this future from applied function in worker?
    or is there better way to achieve this result?
    Apostolos Vlachopoulos
    @avlahop
    Davis Bennett
    @d-v-b
    @avlahop is the code on github? I couldn't find a link in the paper
    Isaiah Norton
    @ihnorton
    is it possible to show all of the annotations in the dashboard at once?
    Apostolos Vlachopoulos
    @avlahop
    @d-v-b I'm sorry not sure. I couldn't find it either from a first brief reading
    Dustin van Weersel
    @dvweersel
    I'm configuring Dask on a single node together with a JupyterHub instance. I would like to specify a default LocalCluster when people create one. I found the distributed.yaml in /etc/dask. Can I specify stuff like n_workers, threads_per_worker anywhere there? I've been looking for a complete list of configuration options, but I'm unable to find it
    Bence Nagy
    @underyx

    heya, I had a pretty large graph (around 20k tasks) and when I clicked on the Tasks tab on the web UI, the scheduler seems to have crashed and I lost a couple hours of processing effort

    is this expected or did I misconfigure things somehow?

    Martin Durant
    @martindurant
    I’m not sure what the largest output we’ve tested with that view is. It’s conceivable that yours is the largest. Was there no error message? Probably you should raise an issue on distributed, with all the information you can gather.
    Bence Nagy
    @underyx

    yep, that's fair enough

    there wasn't any error message as I think the container just ran out of memory

    mainly I just wanted to confirm whether there's any way to be resilient against such failures

    Martin Durant
    @martindurant
    You should be able to find the relevant parts of the graphical output quite easily. It ought to be possible to make aggregate graphics (like the memory and processing sections on the status/ dashboard) or just refuse to work with datasets that have become too large, or even to monitor internal memory usage of the dashboard and halt when it gets too big.
    Bence Nagy
    @underyx
    I think that would be great indeed, I worry about people unknowingly crashing production workloads by just randomly clicking around on the dashboard
    (as I've just done myself :D)
    Martin Durant
    @martindurant
    It is, of course, in a separate thread, as opposed to separate process, so it will always be a little tricky to know when some memory limit is approached because of the graphics; however, I can imagine safety safeguards which turn off some graphics known to be resource-hungry above some threshold
    TomMaullin
    @TomMaullin
    Hi all, I'm pretty new to Dask and just had a quick question I was wondering if anyone could help me with - I have two 3D dask arrays and want to broadcast the solve operation across the last 2 dimensions, i.e. given A and B I want to do C[i,:,:]=solve(A[i,:,:], B[i,:,:]) for all i in the leading dimension... I tried the below but it seems to be really slow (slower than numpy) - does anyone know what I'm doing wrong/what I could do better? Sorry if this was the wrong place to ask.
    C = da.apply_gufunc(np.linalg.solve,  "(i,j),(i,k)->(j,k)", A, B, vectorize=True,output_dtypes=A.dtype)
    mattchan-tencent
    @mattchan-tencent
    Hi, I've got a question about the scale function in the cluster class. From the source, it doesn't look like scale_up does anything.
    Does it just set scale and then read it back then scale up/down from that?
    David Hoese
    @djhoese
    @TomMaullin You may want to post on stackoverflow with the dask tag. You'll get a wider audience there and a better answer.
    Apostolos Vlachopoulos
    @avlahop
    is it possible to scatter a small dataframe and join it with a larger one kind like Spark uses the Broadcast Join? Will we see a performance gain on that?
    TomMaullin
    @TomMaullin
    • ah thank you! I believe the question is already on overflow with no reply - seeing it on there is what made me realise this would be an issue for my own code also