@martindurant: Following up a conversation I think we had recently-
I have a Dask graph where the main constraint is memory. The graph in question is in a simplistic sense a tree where one task's result is used by several other tasks. At the moment, it looks like Dask is computing one "leaf" result at a time, and so these intermediate"shared task" results are sitting there until the final leaf result is computed. Is there any way to resolve this without introducing arbitrary memory resource constraints?
delayedso I wonder if the tasks are being scheduled elsewhere
Does anyone experience using "dask_ml.model_selection.GridSearchCV(sklearn_function(),param_list,cv=5,n_jobs=16,scheduler="multiprocessing")" in parallel computing on a single cluster, but encounter all assigned 16 CPUs workers keep sleeping forever ?
import sklearn.neural_network as snn
import dask_ml.model_selection as dcv
regressor = dcv.GridSearchCV(snn.MLPRegressor(verbose=False),param_list,cv=5,n_jobs=njobs,scheduler="multiprocessing")
And the slurm file I generated to run the job of code above on Cluster is as below:
-A erw234 --partition=shared --nodes=1 --ntasks-per-node=16 --mem=80GB
But there is still no execution process generated and all the running
jobs' state keep "S", which looks like it is holding there forever. Do
you have any experience and suggestion on this? Thank you so much in
Hello everyone. what could be a reason for this messaging?
distributed.utils - ERROR - ('fetch', 'memory') Traceback (most recent call last): File "/usr/local/python/lib/python3.9/site-packages/distributed/utils.py", line 638, in log_errors yield File "/usr/local/python/lib/python3.9/site-packages/distributed/worker.py", line 2411, in gather_dep self.transition(ts, "memory", value=data[d]) File "/usr/local/python/lib/python3.9/site-packages/distributed/worker.py", line 1692, in transition func = self._transitions[start, finish] KeyError: ('fetch', 'memory') tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fccc940de80>>, <Task finished name='Task-2753' coro=<Worker.gather_dep() done, defined at /usr/local/python/lib/python3.9/site-packages/distributed/worker.py:2267> exception=KeyError(('fetch', 'memory'))>)
I am getting this message. Despite of this error. The application apears to be completing correctly. thanks =)
Hello @martindurant , the previous message does not appear anymore. However I still with a problem trying to save a somewhat large dataframe using the to_parquet method (using the pyarrow engine). The scheduler present this message repeatedly:
distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.87s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. Task exception was never retrieved future: <Task finished name='Task-59964' coro=<_listener_handler_coroutine() done, defined at /usr/local/ucx/ucx_4.6-126.96.36.199/lib/python3.9/site-packages/ucp/core.py:146> exception=UCXError('<stream_send>: Connection reset by remote peer')> Traceback (most recent call last): File "/usr/local/ucx/ucx_4.6-188.8.131.52/lib/python3.9/site-packages/ucp/core.py", line 163, in _listener_handler_coroutine peer_info = await exchange_peer_info( File "/usr/local/ucx/ucx_4.6-184.108.40.206/lib/python3.9/site-packages/ucp/core.py", line 59, in exchange_peer_info await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) ucp.exceptions.UCXError: <stream_send>: Connection reset by remote peer Task exception was never retrieved future: <Task finished name='Task-59972' coro=<_listener_handler_coroutine() done, defined at /usr/local/ucx/ucx_4.6-220.127.116.11/lib/python3.9/site-packages/ucp/core.py:146> exception=UCXError('<stream_send>: Connection reset by remote peer')> Traceback (most recent call last): File "/usr/local/ucx/ucx_4.6-18.104.22.168/lib/python3.9/site-packages/ucp/core.py", line 163, in _listener_handler_coroutine peer_info = await exchange_peer_info( File "/usr/local/ucx/ucx_4.6-22.214.171.124/lib/python3.9/site-packages/ucp/core.py", line 59, in exchange_peer_info await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)
The workers presents this kind of message:
distributed.core - INFO - Event loop was unresponsive in Worker for 33.98s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. distributed.utils - ERROR - Timed out trying to connect to ucx://10.20.30.22:12345 after 30 s Traceback (most recent call last): File "/usr/local/python/lib/python3.9/site-packages/distributed/comm/ucx.py", line 351, in connect ep = await ucx_create_endpoint(ip, port) File "/usr/local/ucx/ucx_4.6-126.96.36.199/lib/python3.9/site-packages/ucp/core.py", line 960, in create_endpoint return await _get_ctx().create_endpoint( File "/usr/local/ucx/ucx_4.6-188.8.131.52/lib/python3.9/site-packages/ucp/core.py", line 361, in create_endpoint peer_info = await exchange_peer_info( File "/usr/local/ucx/ucx_4.6-184.108.40.206/lib/python3.9/site-packages/ucp/core.py", line 62, in exchange_peer_info await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) asyncio.exceptions.CancelledError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/python/lib/python3.9/asyncio/tasks.py", line 492, in wait_for fut.result() asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/python/lib/python3.9/site-packages/distributed/comm/core.py", line 283, in connect comm = await asyncio.wait_for( File "/usr/local/python/lib/python3.9/asyncio/tasks.py", line 494, in wait_for raise exceptions.TimeoutError() from exc asyncio.exceptions.TimeoutError
The application hangs without any message. This execution was done using the ucx protocol. thanks
fsspec.read_blockis used to identify newline delimiters at the block offsets in dask's interface to
pandas.read_csv, and was wondering if you've any comments on my feature suggestion for validating the partition offsets for CSV files with unescaped newlines within rows dask/dask#8045 I hope this would be a welcome contribution to dask -- as without this dask can only handle a subset of the CSVs pandas can handle -- and I'd like to find the most reliable approach/fit in with the existing library design. If not I am happy to take ownership of this and submit it for review afterwards
dask_gateway.Gateway().new_cluster().scale(60)with scaling of the Kubernetes cluster itself (but maybe you can achieve that through simpler means).
I think you can set
- only being able to provide environments as Docker files (i.e. nothing like
EXTRA_PIP_PACKAGES-- is that accurate?) -- which raises the barrier to entry. And afaict from a cursory glance, only publicly available Docker images (as opposed to arbitrary Docker registries), which makes internal use tougher,
env_vars['EXTRA_PIP_PACKAGES']on the cluster provider, which will then install those packages
hi! I'm creating an nD dask array backed by a
map_blocks operation that reads planes from disk when computed:
my_array = da.map_blocks(some_reader, ....)
a) I want to close the underlying file handle except when necessary to perform the lazy read operations
b) I'm not the one doing the indexing/computing on
my_array, (it's passed to a user who is free to index however they want)
I'm wondering if there's a context so to speak that can wrap all of the "inner chunk" calls to compute() to open/close the underlying file just once when a user calls
my_array[some_idx].compute(). (I know that I can put the
with open(...): context inside of the
some_reader function that I pass to
map_blocks, but depending on how that array is chunked... that could cause hundreds of open/close events for a single call to
I suspect it's hard to a priori exactly when to open/close in this case, but just curious.
Hello! I'm writing some code that should process a larger than memory multidimensional array dataset using dask and partially a GPU. The basic idea of the code is:
Initiallly I did the sending to the GPU with
array.map_blocks(cupy.asarray) and the returning from the GPU with
array.map_blocks(cupy.asnumpy), but it seems the single machine scheduler does not take into consideration limitations in GPU memory, so even on a single thread it could plan all to
cupy.asarray operations sequentially crashing the program with an
OutOfMemory exception. I could resolve the problem in two ways: by including the send to and receive from GPU calls in the GPU processing function, or by using the
Local(CUDA)Cluster. I noticed that the "distributed" approach works much faster, presumably because it can more efficiently schedule GPU-CPU traffic. I also noticed that with the threads scheduler, the GPU memory remains high even after the program terminates (presumeably due to the way
cupy's memory pool works) but this does not happen with the distributed schedulers. I have a feeling though that since part of the work is I/O bound, part is CPU bound, the default
LocalCUDACluster with one worker is sub-optimal. My questions:
Hi! I'm having some weird problem and I don't know where to start the debugging. On Centos 7, I am using dask with a (local) distributed scheduler, let's say 3 workers and 4 threads per worker. From time to time, it seems the parallelization is blocked : the sum of cpu usage by each worker is 100%. Often, I'll kill the process, restart it and the problem is solved : individual worker cpu usage goes back to oscillating between 200 and 400%.
We have another machine that has the same specs and almost the same env and there the problem never occurs.
I'm lost. What should I look for? What to diagnostic? The problem seems so random...