Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    David Hoese
    @djhoese
    Yeah the NetCDF file is filled at the start of processing and then filled in as data comes in from an instrument. My code would be reading from the NetCDF file and giving the data to Satpy which expects xarray DataArray's with dask arrays.
    Pier
    @PhenoloBoy
    @djhoese maybe I'm completely wrong but have you had a look to Streamz? it helps to build pipelines to manage continuous stream. Seems that has an integration with dask and could worht to have a look at it. Unfortunatelly I can't help you further as I've never use it but I found today checking for something something that can manage the output of futures that must reorganized and written on a netCDF.
    @djhoese to make your life easier https://streamz.readthedocs.io/en/latest/index.html#
    Pier
    @PhenoloBoy
    by the way if any has an idea on how to manage a stream of futures results that needs to be assigned to the correct position in a netCDF without blocking the dask flow is more than welcome. Up to now I'm appending results in an array but then to process it takes so much time that dask.distributed collapse as the master is fully occupied to deal with the recomposition.
    David Hoese
    @djhoese
    @PhenoloBoy That's what Martin was recommending just about your message. I'm not sure streamz will do exactly what I want due to the other software systems I'm connecting. I'll have to try later as this isn't something I'm supposed to be working on right now anyway.
    Dave Hirschfeld
    @dhirschfeld
    I've been trying to understand the relationship between dask and blazingsql - if they're competitors or complementary.
    Jarek Potiuk
    @potiuk

    I am writing this message on behalf of Apache Airflow community and it's about Apache Airflow integration with Dask.

    We are working on Apache Airflow 2.0 (still at least 3-4 months to release it) but we are doing various cleanup tasks. One of the things we noticed is that we have a DaskExecutor https://github.com/apache/airflow/blob/master/airflow/executors/dask_executor.py . It uses Dask to run the tasks. The executor is very little used. It's used so little that we even have all the Dask Executor tests failing for months - without anyone noticing. We understand that Dask is important for a number of people, but also in order to keep it in Airflow we need someone who actually uses Dask (and Dask Executor in Airflow) to keep it in a healthy state. We are querying at our devlist and checking if someone is willing to maintain it but we also think it might be a good idea to ask here. We are happy as committers to review and accept all the related code but we would love someone more familiar and using dask to maintain it :).

    Note that the executor is rather simple and the tests https://github.com/apache/airflow/blob/master/tests/executors/test_dask_executor.py are just a few. It's literally 200 lines of code in total. We could fix it ourselves, but maybe having someone who actually uses dask being just a bit more active in our community is a good idea :)

    Martin Durant
    @martindurant
    ^ you should also raise an issue for this, it will get more notice
    Jarek Potiuk
    @potiuk
    @martindurant will do, thanks
    one more above. I think this week is going to be a few more of these from my side :d
    Pier
    @PhenoloBoy
    @djhoese you are right, sorry !!! I was quite distracted but the intention was a positive one
    Andres
    @Nagsein
    ´´
    Andres
    @Nagsein
    Hello, I have some problems with worker memory. I can't flush the memory using del
    def paralell_prophet(df_g):
        return df_g
    
    g = df.groupby(['pdv_id', 'producto_id'])
    client = Client('scheduler:8786')
    futures = []
    for i in g:
        x = client.submit(paralell_prophet, i)
        futures.append(x)
    
    for future, result in as_completed(futures, with_results=True):
        result_data = result_data.append(result)
    
    del g
    del df
    del futures
    Alessio Civitillo
    @acivitillo
    @quasiben thanks, I read all dask docs but I missed the gateway one somehow, that is going to be a huge help as I put together the reference architecture for my company since we have many analytics teams and thousands of users so the question 1 cluster vs n clusters will pop up. Also I am guessing with the gateway you can in theory use glue (because IT likes it) and dask on top of the same ip address, so that might be an interesting selling point too if I am correct
    @mrocklin I have read your post about starting a dask company thanks to @quasiben. May I also ask, any discussions in supporting a couple of udemy or other online courses? I believe a virtual learning offer on dask could be quite successful. Tableau, which is a tenth of the value of Dask, charges $400 per year on virtual learning. I guess for open source the dichotomy is always free docs vs paid learning
    Jonas Krueger
    @JK87iab

    Hi, starting out with dask on kubernetes in azure.

    In my scenario I want to run a Azure Function App which gets triggered by a storage queue. Do I need to deploy dask inside the function on the same cluster or should I set up a seperate dask kubernetes cluster? Anyone has experience with this? The standard azure doc doesnt go to much into depth

    Martin Durant
    @martindurant
    ^ I do not suppose there is a “need”, you could choose to keep a dask cluster around, accepting work as it arrives, or spin up new clusters on demand. It depends on what kind of processing you will do, and the kind of latency versus idle resources tradeoff you are willing to make.
    Jonas Krueger
    @JK87iab

    Its a longer running process which as of now runs fine in azure functions (python) on a single machine.

    For larger workload I am looking to distribute numpy/numba calculations

    Azure functions appears to work well with kubernetes and KEDA (Scaling up based on number of events)

    My main concern is how to handle the dynamic scaling of the dask cluster. Couldnt find any information so far how to link it to KEDA

    High level I want to scale the dask cluster based on number of job messages
    Martin Durant
    @martindurant
    dask can use autoscaling in kubernetes, don’t know if it works with azure. You would be best off doing some experimentation, I think, before deciding your architecture.
    Jonas Krueger
    @JK87iab

    Yea I am playing around with dask on kubernetes right now, works fine. Now I am testing the integration with Azure functions.

    Scaling based on CPU/Memory metrics might be fine too

    Itamar Turner-Trauring
    @itamarst
    hi
    is there a strict definition of what can go in the keys of a graph?
    I guess docs say "any hashable value"
    Martin Durant
    @martindurant
    If you are using distributed, they had better be serialisable too
    Itamar Turner-Trauring
    @itamarst
    finding all kinds of fun edge cases I don't support in my add-tracing-logging-to-a-dask-graph code
    Itamar Turner-Trauring
    @itamarst
    I have a vague memory of topological graph traversal function somewhere, is that a public API?
    highlevelgraph seems new since I last looked at this code, can I expect the dask graph to always be one of those?
    ah, no
    David Kell
    @davidkell_gitlab

    @quasiben Asked me to re-post here from RAPIDS-GoAI -

    We have a use case where we need to run dask in a multi-tenant environment. We're aware of dask-gateway, but unfortunately, we need to serve 1000+ concurrent users and (i) it's not feasible to run cluster for each (ii) zero startup time is important (iii) we lose the ability for dask to "play tetris" with our jobs to be efficient with compute usage. We'd be happy to run a single dask instance, my only concern is that potentially a single user can submit a large job that swamps everyone else, unless there is some mechanism to limit the total number of workers per user at a given time.

    Is this something that dask supports - or does anyone have any experience doing something like this?

    Martin Durant
    @martindurant
    ^ no, I don’t believe there is any mechanism to assign user ownership or otherwise throttle particular workloads within a single scheduler. I imagine this kind of thing could be implemented as an extension to gateway (which does understand users), if you can express the work as discrete jobs and expected resource requirements.
    David Kell
    @davidkell_gitlab
    Thanks @martindurant. Do you have any further thoughts on what we'd have to change? I'm imaging that somehow we'd have to interface with the scheduling policy (https://distributed.dask.org/en/latest/scheduling-policies.html) to ensure that any one job only has max N tasks scheduled as a given time. Does that make sense?
    Martin Durant
    @martindurant
    No, the scheduler wouldn’t know about any higher level “job” concept. The scheduler executes tasks as efficiently as it can to free resources and get the pending graphs done. That’s why any per-user or other policies would need to be done outside of the scheduler. Not an easy problem.
    David Kell
    @davidkell_gitlab
    OK, think I understand. You would need to write something upstream that throttles the tasks passed into the scheduler on a per-user basis. As a first approximation, we might just turn away anything that requires more than N tasks, and provide an error and suggest they need a dedicated cluster. Does that make more sense?
    Martin Durant
    @martindurant
    Something like that
    David Kell
    @davidkell_gitlab
    Thanks, appreciated.
    Benjamin Zaitlen
    @quasiben
    @davidkell_gitlab how long would you be willing to wait for a cluster come up ? <1 minute ? <30 seconds ?
    Matthew Rocklin
    @mrocklin
    @davidkell_gitlab my first instinct would be to put something in front of Dask that would handle your user management and gate things. That might be simpler than baking it into Dask itself.
    David Kell
    @davidkell_gitlab

    @quasiben It would need to be <5 seconds, and ideally <1 second. (Unfortunately, otherwise, our users will get bored.)

    We will typically have 1000s of concurrent users, each with anywhere from 10MB-10GB of data. (And potentially a tail of users with >10GB.) Sessions will typically be quite short, starting at seconds, typically 1-10 minutes but with a long tail again up to an hour. It's important to provide a fast user experience with the initial load, and also responding to the changes in data volume.

    My guess is that the overheads involved with launching pods and autoscaling the dask cluster (we are on kubernetes) would make the one cluster per user approach infeasible. For example, a user might only need to interact for 5 seconds of CPU with 100MB of memory, but with all the overheads involved throwing up a cluster we end up with a pod with 1 CPU, 1GB of memory running for 1 minute. (Whereas on the single cluster, the scheduler is making efficient use of cluster resources.) Interested to hear your thoughts.

    But on the other hand, with the single cluster there's always the possibility that some user has a query that needs a huge number of tasks and swamps the cluster resources. I'm not sure how to catch this without inspecting the generated task graph @mrocklin?

    Benjamin Zaitlen
    @quasiben
    Thanks for the detailed answer. One other question for context, if you are able, can you share what users are currently doing now ?
    Martin Durant
    @martindurant
    (NB: investigating the task graph will not tell you, by itself, how much resources it needs to be executed, unless you have a really good way to estimate the cost of any particular task; since it’s all python functions, you usually don’t know this)
    Itamar Turner-Trauring
    @itamarst
    just released new version of tracing logging library Eliot, with better support for Dask (dask.persist() in particular): https://eliot.readthedocs.io/en/stable/scientific-computing.html#dask
    David Kell
    @davidkell_gitlab
    @quasiben They are using various tools (from languages to packaged solutions) on their local machines. @martindurant Interesting point, will need to think about that.
    agralak-queueco
    @agralak-queueco

    Hey guys, I'm trying to understand some logic dask logic and thought this would be the best place to ask.

    Could you guys explain to me why a blockwise operator has to tokenize the function first before creating the compute graph? I'm asking this as I'm trying to apply a blockwise operation of a method that is a part of a large object, which is difficult to pickle. (and tokenize tries to pickle functions)

    I can fix this issue by overriding __getstate__ in an object to raise an exception but I'm really not sure if this is gonna cause any issues or if this gonna cause any issues.

    Thanks!

    Also please let me know if you think stack overflow is a better place to post such questions.
    Martin Durant
    @martindurant
    You can do even better by overriding the dask tokenize method (full name escapes me) if you have a reliable way to do it.
    Yes, SO is generally better, so you can pst code and others can search and learn from your problem
    karims
    @karims
    I'm running dask cluster and client connect from inside a docker container. I am able to connect, but only for some time. After maybe 200 around times, dask client doesn't connect anymore.