split_everyequal or lower than length of
chunksizeignores the uneven chunking of the array)
Hello. I have an environment where I'm upgrading:
dask==1.1.0 to dask==1.2.0 distributed==1.25.3 to distributed==1.27.0 dask-kubernetes==0.8.0 to dask-kubernetes==0.9.2
And I notice that my worker pods start, but then fail to connect to the scheduler. Further inspection points out that the worker pods have
DASK_SCHEDULER_ADDRESS set to a localhost (i.e.
tcp://127.0.0.1:44131) address while the working version points to a correct remote IP (scheduler and worker pods are on seperate clusters).
I've spent some time digging through the codebase to find out why this is not correctly being set.
Where should I begin looking?
client.submiton a function that depends on a module I wrote, the worker nodes would fail with
ModuleNotFoundError. I'm wondering if dask.distributed requires all worker nodes to install dependencies beforehand, or am I doing something wrong?
I'm looking for a simple resource manager. I have a small set of computers, each with different characteristics (when it comes to memory size and GPU count). I'm running ML training algorithms in the form of standalone python scripts on these machines. One script runs on a single node, and there is no distributed computation needed. I know beforehand how much memory and how many GPUs are required for a particular python script.
All I want is to have a resource manager that helps me automatically execute these python scripts on a node that has enough free mem/gpu capacity at the time of execution.
I've seen the Dask distributed resource manager, but can't figure out if I can use it as a Resource Manager for executing python scripts, and nothing more. Can you give me some guidance here?
I have quite a bit of experience with Apache Spark, but as of Spark 2, GPU-aware scheduling doesn't seem to be possible. I've checked Mesos+Chronos things get pretty complicated there compared to the simplicity of this use-case. K8S is a monster. So I'm checking if there is a straightforward way to get this done in Dask?
I have an application I am not sure about whether to use Dask for or not. The problem is roughly
I have a set of points I want to index for use in spatial queries. The index must be distributed over several nodes to fit into main memory/performance, for the moment indexing into random subsets is fine. Queries will be executed by all Workers and merged into the final result.
So I made a mock-up using dask and stumbled upon some issues
from dask.delayed import delayed import numpy as np import dask.array as da @delayed def make(xs): """Create index""" return None @delayed def order(xs): """Sort points for search""" return np.sort(xs.compute()) # This feels bad @delayed def query(index, xs, ll, ur): """Bounding box query between lower left and upper right""" return xs[(ll <= xs[:, 0]) & (xs[:, 0]< ur)] # Point count N = 10000 # Random input: N random points in 3d array = da.random.random((N, 3), chunks=(N//10, 3)) # Split into Delayeds per chunk splits = array.to_delayed() # Create two part index ordered = [order(split) for split in splits] indices = [make(split) for split in splits] # Execute a query queries = [query(index, points, (0, 0.5, 0.75), (1.0, 0.6, 0.85)) for index, points in zip(indices, ordered)] # Merge results results = [da.from_delayed(i, shape=(np.nan, 3), dtype=np.float64) for i in queries] final = da.concatenate(results) final.compute()
Note the marked line
This feels bad. I feel should not call compute in a delayed function. Also,
xs is necessary as
xs is an array of Delayed objects. This does not happen if the array and/or chunks are one-dimensional.
Any suggestions or ideas?
makefunction receives something like
[Delayed(('random_sample-4e3ea92e29a66a4b4a705044ec3c18a7', 0, 0))]
[0.55253896 0.4182976 0.62587806 0.25000455 0.85405076]
to_delayedto pass each chunk to a function;
map_blocksdoes the same thing.