Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Scott Sievert
    @stsievert
    @quasiben thanks for the link to Matthews latest post! That’s certainly an interesting read
    Matthew Rocklin
    @mrocklin
    All the cool kids are on twitter these days
    (or is that Instagram now?)
    David Hoese
    @djhoese
    Half formed idea: Has anyone ever used dask to read data from a file with a "ready signal" for each of the chunks? I have a project that writes data in real time to a NetCDF file and writes to a text file with the chunks that were written to the NetCDF file. I was thinking I could trick dask in to opening the file but waiting to read the data until the chunk is written to the text file. One complication is the chunk sizes aren't known before hand.
    Martin Durant
    @martindurant
    Sounds like a streamz-dask interaction. Not the same, but also interesting: the blockscache interface of fsspec allows you to store only accessed blocks of a remote file to local, which wouldn’t help you, but would be great for the situation where you want to repeatedly read smaller parts of something massive.
    David Hoese
    @djhoese
    @martindurant Interesting. My hope was to get a chunked dask array out at the end
    because then I can throw it in to normal processing, hit .compute() and then dask (not sure if the scheduler would like this) would just hang/wait for the chunk to be ready
    Martin Durant
    @martindurant
    I expect you would default to whatever hdf does with missing pieces. You could perhaps do this simpler by using client.submit and watch the text file yourself.
    David Hoese
    @djhoese
    Yeah the NetCDF file is filled at the start of processing and then filled in as data comes in from an instrument. My code would be reading from the NetCDF file and giving the data to Satpy which expects xarray DataArray's with dask arrays.
    Pier
    @PhenoloBoy
    @djhoese maybe I'm completely wrong but have you had a look to Streamz? it helps to build pipelines to manage continuous stream. Seems that has an integration with dask and could worht to have a look at it. Unfortunatelly I can't help you further as I've never use it but I found today checking for something something that can manage the output of futures that must reorganized and written on a netCDF.
    @djhoese to make your life easier https://streamz.readthedocs.io/en/latest/index.html#
    Pier
    @PhenoloBoy
    by the way if any has an idea on how to manage a stream of futures results that needs to be assigned to the correct position in a netCDF without blocking the dask flow is more than welcome. Up to now I'm appending results in an array but then to process it takes so much time that dask.distributed collapse as the master is fully occupied to deal with the recomposition.
    David Hoese
    @djhoese
    @PhenoloBoy That's what Martin was recommending just about your message. I'm not sure streamz will do exactly what I want due to the other software systems I'm connecting. I'll have to try later as this isn't something I'm supposed to be working on right now anyway.
    Dave Hirschfeld
    @dhirschfeld
    I've been trying to understand the relationship between dask and blazingsql - if they're competitors or complementary.
    Jarek Potiuk
    @potiuk

    I am writing this message on behalf of Apache Airflow community and it's about Apache Airflow integration with Dask.

    We are working on Apache Airflow 2.0 (still at least 3-4 months to release it) but we are doing various cleanup tasks. One of the things we noticed is that we have a DaskExecutor https://github.com/apache/airflow/blob/master/airflow/executors/dask_executor.py . It uses Dask to run the tasks. The executor is very little used. It's used so little that we even have all the Dask Executor tests failing for months - without anyone noticing. We understand that Dask is important for a number of people, but also in order to keep it in Airflow we need someone who actually uses Dask (and Dask Executor in Airflow) to keep it in a healthy state. We are querying at our devlist and checking if someone is willing to maintain it but we also think it might be a good idea to ask here. We are happy as committers to review and accept all the related code but we would love someone more familiar and using dask to maintain it :).

    Note that the executor is rather simple and the tests https://github.com/apache/airflow/blob/master/tests/executors/test_dask_executor.py are just a few. It's literally 200 lines of code in total. We could fix it ourselves, but maybe having someone who actually uses dask being just a bit more active in our community is a good idea :)

    Martin Durant
    @martindurant
    ^ you should also raise an issue for this, it will get more notice
    Jarek Potiuk
    @potiuk
    @martindurant will do, thanks
    one more above. I think this week is going to be a few more of these from my side :d
    Pier
    @PhenoloBoy
    @djhoese you are right, sorry !!! I was quite distracted but the intention was a positive one
    Andres
    @Nagsein
    ´´
    Andres
    @Nagsein
    Hello, I have some problems with worker memory. I can't flush the memory using del
    def paralell_prophet(df_g):
        return df_g
    
    g = df.groupby(['pdv_id', 'producto_id'])
    client = Client('scheduler:8786')
    futures = []
    for i in g:
        x = client.submit(paralell_prophet, i)
        futures.append(x)
    
    for future, result in as_completed(futures, with_results=True):
        result_data = result_data.append(result)
    
    del g
    del df
    del futures
    Alessio Civitillo
    @acivitillo
    @quasiben thanks, I read all dask docs but I missed the gateway one somehow, that is going to be a huge help as I put together the reference architecture for my company since we have many analytics teams and thousands of users so the question 1 cluster vs n clusters will pop up. Also I am guessing with the gateway you can in theory use glue (because IT likes it) and dask on top of the same ip address, so that might be an interesting selling point too if I am correct
    @mrocklin I have read your post about starting a dask company thanks to @quasiben. May I also ask, any discussions in supporting a couple of udemy or other online courses? I believe a virtual learning offer on dask could be quite successful. Tableau, which is a tenth of the value of Dask, charges $400 per year on virtual learning. I guess for open source the dichotomy is always free docs vs paid learning
    Jonas Krueger
    @JK87iab

    Hi, starting out with dask on kubernetes in azure.

    In my scenario I want to run a Azure Function App which gets triggered by a storage queue. Do I need to deploy dask inside the function on the same cluster or should I set up a seperate dask kubernetes cluster? Anyone has experience with this? The standard azure doc doesnt go to much into depth

    Martin Durant
    @martindurant
    ^ I do not suppose there is a “need”, you could choose to keep a dask cluster around, accepting work as it arrives, or spin up new clusters on demand. It depends on what kind of processing you will do, and the kind of latency versus idle resources tradeoff you are willing to make.
    Jonas Krueger
    @JK87iab

    Its a longer running process which as of now runs fine in azure functions (python) on a single machine.

    For larger workload I am looking to distribute numpy/numba calculations

    Azure functions appears to work well with kubernetes and KEDA (Scaling up based on number of events)

    My main concern is how to handle the dynamic scaling of the dask cluster. Couldnt find any information so far how to link it to KEDA

    High level I want to scale the dask cluster based on number of job messages
    Martin Durant
    @martindurant
    dask can use autoscaling in kubernetes, don’t know if it works with azure. You would be best off doing some experimentation, I think, before deciding your architecture.
    Jonas Krueger
    @JK87iab

    Yea I am playing around with dask on kubernetes right now, works fine. Now I am testing the integration with Azure functions.

    Scaling based on CPU/Memory metrics might be fine too

    Itamar Turner-Trauring
    @itamarst
    hi
    is there a strict definition of what can go in the keys of a graph?
    I guess docs say "any hashable value"
    Martin Durant
    @martindurant
    If you are using distributed, they had better be serialisable too
    Itamar Turner-Trauring
    @itamarst
    finding all kinds of fun edge cases I don't support in my add-tracing-logging-to-a-dask-graph code
    Itamar Turner-Trauring
    @itamarst
    I have a vague memory of topological graph traversal function somewhere, is that a public API?
    highlevelgraph seems new since I last looked at this code, can I expect the dask graph to always be one of those?
    ah, no
    David Kell
    @davidkell_gitlab

    @quasiben Asked me to re-post here from RAPIDS-GoAI -

    We have a use case where we need to run dask in a multi-tenant environment. We're aware of dask-gateway, but unfortunately, we need to serve 1000+ concurrent users and (i) it's not feasible to run cluster for each (ii) zero startup time is important (iii) we lose the ability for dask to "play tetris" with our jobs to be efficient with compute usage. We'd be happy to run a single dask instance, my only concern is that potentially a single user can submit a large job that swamps everyone else, unless there is some mechanism to limit the total number of workers per user at a given time.

    Is this something that dask supports - or does anyone have any experience doing something like this?

    Martin Durant
    @martindurant
    ^ no, I don’t believe there is any mechanism to assign user ownership or otherwise throttle particular workloads within a single scheduler. I imagine this kind of thing could be implemented as an extension to gateway (which does understand users), if you can express the work as discrete jobs and expected resource requirements.
    David Kell
    @davidkell_gitlab
    Thanks @martindurant. Do you have any further thoughts on what we'd have to change? I'm imaging that somehow we'd have to interface with the scheduling policy (https://distributed.dask.org/en/latest/scheduling-policies.html) to ensure that any one job only has max N tasks scheduled as a given time. Does that make sense?
    Martin Durant
    @martindurant
    No, the scheduler wouldn’t know about any higher level “job” concept. The scheduler executes tasks as efficiently as it can to free resources and get the pending graphs done. That’s why any per-user or other policies would need to be done outside of the scheduler. Not an easy problem.
    David Kell
    @davidkell_gitlab
    OK, think I understand. You would need to write something upstream that throttles the tasks passed into the scheduler on a per-user basis. As a first approximation, we might just turn away anything that requires more than N tasks, and provide an error and suggest they need a dedicated cluster. Does that make more sense?
    Martin Durant
    @martindurant
    Something like that
    David Kell
    @davidkell_gitlab
    Thanks, appreciated.
    Benjamin Zaitlen
    @quasiben
    @davidkell_gitlab how long would you be willing to wait for a cluster come up ? <1 minute ? <30 seconds ?
    Matthew Rocklin
    @mrocklin
    @davidkell_gitlab my first instinct would be to put something in front of Dask that would handle your user management and gate things. That might be simpler than baking it into Dask itself.
    David Kell
    @davidkell_gitlab

    @quasiben It would need to be <5 seconds, and ideally <1 second. (Unfortunately, otherwise, our users will get bored.)

    We will typically have 1000s of concurrent users, each with anywhere from 10MB-10GB of data. (And potentially a tail of users with >10GB.) Sessions will typically be quite short, starting at seconds, typically 1-10 minutes but with a long tail again up to an hour. It's important to provide a fast user experience with the initial load, and also responding to the changes in data volume.

    My guess is that the overheads involved with launching pods and autoscaling the dask cluster (we are on kubernetes) would make the one cluster per user approach infeasible. For example, a user might only need to interact for 5 seconds of CPU with 100MB of memory, but with all the overheads involved throwing up a cluster we end up with a pod with 1 CPU, 1GB of memory running for 1 minute. (Whereas on the single cluster, the scheduler is making efficient use of cluster resources.) Interested to hear your thoughts.

    But on the other hand, with the single cluster there's always the possibility that some user has a query that needs a huge number of tasks and swamps the cluster resources. I'm not sure how to catch this without inspecting the generated task graph @mrocklin?