by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Davis Bennett
    @d-v-b
    I don't think dask workers on their own touch a large number of files... (I could be wrong here) it's most likely your code that is opening files
    Leo Singer
    @lpsinger
    I am running out of file descriptors on the "submit" node, not on the worker nodes
    I am using PBSCluster, FWIW
    So I don't think it's my code
    Davis Bennett
    @d-v-b
    an expert could chime in but I don't think the distributed scheduler opens a lot of files
    Leo Singer
    @lpsinger
    There is an entry in the FAQ about the too many open files errors associated with the scheduler: https://distributed.dask.org/en/latest/faq.html#too-many-open-file-descriptors
    Maybe it's not that crazy... I have 240 jobs and I see 506 file descriptors. About two per worker. That's not strange
    OK, I'll talk to my sysadmin
    Leo Singer
    @lpsinger
    I have this functools.partial callable that I am trying to map across 100k arguments using dask-jobqueue. It turns out that my script is using many tens of GB of memory and going into swap because when I call Client.map(func, ...), it is pickling my function 100k times, and the pickled function itself is about 100 kB.
    Any strategies to keep the memory usage in check?
    What I am trying to do here is port an existing script from multiprocessing.Pool.map() to distributed.Client.map. I am guessing that the reason that memory consumption was never an issue with multiprocessing.Pool.map() is that multiprocessing does not tend to pickle all of the iterations all at once, instead it consumes the iterables only about as quickly as the process pool is able to consume them.
    Davis Bennett
    @d-v-b
    can you wrap the arguments to the function, and the function itself, in dask.delayed?
    or generally wrap the arguments to the function in a dask datastructure like dask.array or dask.bag
    Leo Singer
    @lpsinger
    I'll try dask.delayed. How will that be different from client.map or client.submit?
    Davis Bennett
    @d-v-b
    the key thing is to transform your arguments (the stuff that's blowing up memory) into a lazy data structure that dask can optimize on
    Leo Singer
    @lpsinger
    Once I call compute() on the delayed, it's going to pickle my function, right?
    Immediately?
    Davis Bennett
    @d-v-b
    i think so
    Leo Singer
    @lpsinger
    So this is a scatter-gather application
    There's some setup to build the arguments, then I call the function 100,000 times in parallel, then I bring the results back together and do something else with them.
    I would have thought that this is what client.map() is for
    Davis Bennett
    @d-v-b
    it is for that, but if your arguments are so large that they become a burden to serialize, you have to manage that
    typically by packing those arguments into a chunked / partitioned data structure like a dask array or bag
    Leo Singer
    @lpsinger
    @d-v-b, interesting... it looks like client.map does some of this for you: dask/distributed@169a8d9
    Although the heuristic that it uses is that it converts kwargs that are larger than 100 kB, which is already too big
    Kyle Davis
    @kldavis52
    I'm looking to get involved in the dask ecosystem. Anyone need with testing or development?
    Kyle Davis
    @kldavis52
    assistance*
    Martin Durant
    @martindurant
    @kldavis52 If you want to help out, you are best off checking the github repo issues for something that it sounds like you can work on. Feel free to ask for help in the specific issue.
    Kyle Davis
    @kldavis52
    @martindurant ok. Thank you for the heads up.
    Jacob Barhak
    @Jacob-Barhak
    I wanted to thank the dask team for support. I just got a COVID-19 model published. that heavily relies on dask for HPC and on Holoviz for Visualization. You will find the publication here
    Barhak J , The Reference Model initial Use Case for COVID-19. Cureus. Online . Interactive Results are available directly using this link . You will find acknowledgments to dask and Holoviz teams in the paper - those technologies both helped. Dask supported the execution of the model for about 9 hours on 880 cores. Please feel free to circulate. Hopefully more people will see the potential of python tools you develop.
    Aleksey Bilogur
    @ResidentMario

    Does Dask support snappy compression for Parquet files? I wrote a bunch of Parquet files using pandas.DataFrame.to_parquet (which uses snappy by default), and read them back into Dask using dask.dataframe.read_parquet successfully (had to install the snappy and python-snappy conda-forge packages first), but trying to run a .do().a().thing().compute() on the data throws a KeyError: 'snappy'.

    Here's an SO question with more details: https://stackoverflow.com/questions/63157674/operations-on-a-dask-dataframe-fail-when-using-snappy-compression

    Aleksey Bilogur
    @ResidentMario
    Ah! In a classic instance of "writing down what the problem is gave me some intuition as to what might be going wrong", I've figured it out. You need to have snappy and python-snappy installed in the client environment as well. I'm accessing the cluster from a local Jupyter notebook on my machine via SSH port forwarding, and did not have these packages installed locally. Installing them locally (conda install -c conda-forge snappy python-snappy) resolved the issue. I guess snappy is getting used over the wire as well. :) I'll update the SO question with the resolution.
    Lukasz Szkutnik
    @lszku
    Hey, I'm looking for a way to efficiently dedup data in dask. More details in https://stackoverflow.com/questions/63172606/dask-drop-duplicates-performance
    Noah Kantrowitz
    @coderanger
    I have what I think is a ridiculously simple question but the docs don't seem to explain it: with dask.distributed, where does the code live? Like if I call client.submit(myfn), how do the dask-worker nodes find the code for myfn?
    Noah Kantrowitz
    @coderanger
    Oh, it seems like you pickle the function object on the client side and send that over? Isn't that incredibly inefficient?
    Scott Sievert
    @stsievert
    Pickle isn’t slow, it’s a protocol: https://blog.dask.org/2018/07/23/protocols-pickle. I think for functions it just sends the source over the wire, which is bytes. That’s fast to serialize and send over the wire. The docs say that adds 1ms of latency. https://distributed.dask.org/en/latest/efficiency.html
    Noah Kantrowitz
    @coderanger
    I mean the whole thing is a lot slower than having the relevant code already present on the worker side so it can load it directly :)
    That doesn't seem to be an option though?
    Scott Sievert
    @stsievert
    What does “a lot slower” mean? If your jobs run for 1ns any library will be slower. If your jobs run for 1 hour it doesn’t matter.
    client.upload_file moves code directly to workers
    Koustav Chanda
    @KoustavCode
    How do i free up my RAM after dask.compute() (like after my delayed objects has been computed)?
    ldacey
    @ldacey_gitlab
    is this a clash between fsspec and pyarrow.filesystem?
    TypeError: ls() got multiple values for keyword argument 'detail'
    ~/.local/lib/python3.7/site-packages/pyarrow/fs.py in get_file_info_selector(self, selector)
        159         infos = []
        160         selected_files = self.fs.find(
    --> 161             selector.base_dir, maxdepth=maxdepth, withdirs=True, detail=True
        162         )
        163         for path, info in selected_files.items():
    /opt/conda/lib/python3.7/site-packages/fsspec/spec.py in walk(self, path, maxdepth, **kwargs)
        324 
        325         try:
    --> 326             listing = self.ls(path, detail=True, **kwargs)
        327         except (FileNotFoundError, IOError):
        328             return [], [], []
    it happens when I try to us a pyarrow.dataset() on a partitioned parquet dataset using fsspec for azure blob
    ldacey
    @ldacey_gitlab
    actually, it happens when I use read_table() which worked before, so it is almost certainly something to do with the latest pyarrow update (1.0.0) and not dask - I can read the dataset with Dask still
    rmsrms1987
    @rmsrms1987
    Hi Everyone, I have a simple question. What is the main difference between using Dask Delayed and Dask Futures? Is there some advantages of using one over the other?
    Ray Bell
    @raybellwaves
    @ldacey_gitlab not sure if helps or not but adlfs version was bumped yesterday (I think). Not sure if that solves your problem
    ldacey
    @ldacey_gitlab
    nice, I see that fsspec was updated recently as well. one hiccup is that I am using Airflow which has a dependency on azure-storage 2.1 SDK and not the new 12.3 version which adlfs uses. I need to check if I am going to break everything if I upgrade
    gives me a path to follow at least