I am writing this message on behalf of Apache Airflow community and it's about Apache Airflow integration with Dask.
We are working on Apache Airflow 2.0 (still at least 3-4 months to release it) but we are doing various cleanup tasks. One of the things we noticed is that we have a DaskExecutor https://github.com/apache/airflow/blob/master/airflow/executors/dask_executor.py . It uses Dask to run the tasks. The executor is very little used. It's used so little that we even have all the Dask Executor tests failing for months - without anyone noticing. We understand that Dask is important for a number of people, but also in order to keep it in Airflow we need someone who actually uses Dask (and Dask Executor in Airflow) to keep it in a healthy state. We are querying at our devlist and checking if someone is willing to maintain it but we also think it might be a good idea to ask here. We are happy as committers to review and accept all the related code but we would love someone more familiar and using dask to maintain it :).
Note that the executor is rather simple and the tests https://github.com/apache/airflow/blob/master/tests/executors/test_dask_executor.py are just a few. It's literally 200 lines of code in total. We could fix it ourselves, but maybe having someone who actually uses dask being just a bit more active in our community is a good idea :)
def paralell_prophet(df_g): return df_g g = df.groupby(['pdv_id', 'producto_id']) client = Client('scheduler:8786') futures =  for i in g: x = client.submit(paralell_prophet, i) futures.append(x) for future, result in as_completed(futures, with_results=True): result_data = result_data.append(result) del g del df del futures
Hi, starting out with dask on kubernetes in azure.
In my scenario I want to run a Azure Function App which gets triggered by a storage queue. Do I need to deploy dask inside the function on the same cluster or should I set up a seperate dask kubernetes cluster? Anyone has experience with this? The standard azure doc doesnt go to much into depth
Its a longer running process which as of now runs fine in azure functions (python) on a single machine.
For larger workload I am looking to distribute numpy/numba calculations
Azure functions appears to work well with kubernetes and KEDA (Scaling up based on number of events)
My main concern is how to handle the dynamic scaling of the dask cluster. Couldnt find any information so far how to link it to KEDA
@quasiben Asked me to re-post here from RAPIDS-GoAI -
We have a use case where we need to run dask in a multi-tenant environment. We're aware of dask-gateway, but unfortunately, we need to serve 1000+ concurrent users and (i) it's not feasible to run cluster for each (ii) zero startup time is important (iii) we lose the ability for dask to "play tetris" with our jobs to be efficient with compute usage. We'd be happy to run a single dask instance, my only concern is that potentially a single user can submit a large job that swamps everyone else, unless there is some mechanism to limit the total number of workers per user at a given time.
Is this something that dask supports - or does anyone have any experience doing something like this?
@quasiben It would need to be <5 seconds, and ideally <1 second. (Unfortunately, otherwise, our users will get bored.)
We will typically have 1000s of concurrent users, each with anywhere from 10MB-10GB of data. (And potentially a tail of users with >10GB.) Sessions will typically be quite short, starting at seconds, typically 1-10 minutes but with a long tail again up to an hour. It's important to provide a fast user experience with the initial load, and also responding to the changes in data volume.
My guess is that the overheads involved with launching pods and autoscaling the dask cluster (we are on kubernetes) would make the one cluster per user approach infeasible. For example, a user might only need to interact for 5 seconds of CPU with 100MB of memory, but with all the overheads involved throwing up a cluster we end up with a pod with 1 CPU, 1GB of memory running for 1 minute. (Whereas on the single cluster, the scheduler is making efficient use of cluster resources.) Interested to hear your thoughts.
But on the other hand, with the single cluster there's always the possibility that some user has a query that needs a huge number of tasks and swamps the cluster resources. I'm not sure how to catch this without inspecting the generated task graph @mrocklin?
dask.persist()in particular): https://eliot.readthedocs.io/en/stable/scientific-computing.html#dask
Hey guys, I'm trying to understand some logic dask logic and thought this would be the best place to ask.
Could you guys explain to me why a blockwise operator has to tokenize the function first before creating the compute graph? I'm asking this as I'm trying to apply a blockwise operation of a method that is a part of a large object, which is difficult to pickle. (and tokenize tries to pickle functions)
I can fix this issue by overriding
__getstate__ in an object to raise an exception but I'm really not sure if this is gonna cause any issues or if this gonna cause any issues.