by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    suraj bhatt
    @surisurajbhatt_twitter
    Hi, im unable to execute following querry : import dask.dataframe as dd
    df = dd.read_parquet('gcs://anaconda-public-data/nyc-taxi/nyc.parquet/part.0.parquet')
    suraj bhatt
    @surisurajbhatt_twitter

    Hi, im unable to execute following querry : import dask.dataframe as dd
    df = dd.read_parquet('gcs://anaconda-public-data/nyc-taxi/nyc.parquet/part.0.parquet')

    Error : ArrowIOError: Unexpected end of stream: Page was smaller (5242780) than expected (6699768)

    Martin Durant
    @martindurant
    ^ please try with fsspec 0.4.2
    suraj bhatt
    @surisurajbhatt_twitter
    could you please give me demo syntax @martindurant ?
    Martin Durant
    @martindurant
    same syntax, but update your version of fsspec, available via pip or conda
    suraj bhatt
    @surisurajbhatt_twitter
    did that but error: KeyError: 'gcs' @matindurant
    Martin Durant
    @martindurant
    Then you should probably also update gcsfs
    suraj bhatt
    @surisurajbhatt_twitter
    which version
    Martin Durant
    @martindurant
    latest
    suraj bhatt
    @surisurajbhatt_twitter
    nothing working for this import dask.dataframe as dd
    df = dd.read_parquet('gcs://anaconda-public-data/nyc-taxi/nyc.parquet/part.0.parquet')

    nothing working for this import dask.dataframe as dd
    df = dd.read_parquet('gcs://anaconda-public-data/nyc-taxi/nyc.parquet/part.0.parquet')

    @martindurant

    Tom Augspurger
    @TomAugspurger
    @surisurajbhatt_twitter can you write a minimal example and post a github issue? http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports
    James Stidard
    @jamesstidard
    Hi, I was wondering if it's OK for a dask delayed function to use a process pool within it? Or will that cause havoc with the scheduler/resource monitoring of dask?
    Brett Naul
    @bnaul
    @jamesstidard if you're running with the multiprocess scheduler you'll definitely have issues, not sure about the threaded scheduler...you could try using the "tasks from tasks" pattern with the distributed scheduler https://distributed.dask.org/en/latest/task-launch.html
    random small q: I've noticed my k8s daskworkers sometimes take 30-60s to connect to the scheduler even after all of the pods are running, is there a config value I might have messed with that controls how often the workers retry to connect? I changed a lot of stuff trying to fix some timeouts so I'm guessing it's my fault
    James Stidard
    @jamesstidard
    @bnaul ah great thanks you!
    Eduardo Gonzalez
    @eddienko
    Hi, quick question, what does "orange" colour mean in the Dash Web UI bokeh interface. Cannot find it in the docs!
    dask_orange.png
    Eduardo Gonzalez
    @eddienko
    Oh, thanks, I missed that one!
    Benjamin Zaitlen
    @quasiben
    @eddienko perhaps that information could also be noted here: https://docs.dask.org/en/latest/diagnostics-distributed.html . Any interest in submitting a PR ?
    Understand if you are busy, but you are not the first person to miss that page
    Eduardo Gonzalez
    @eddienko
    I would have expected it to be in https://distributed.dask.org/en/latest/web.html
    Benjamin Zaitlen
    @quasiben
    or there
    Eduardo Gonzalez
    @eddienko
    cool. I may submit a PR ;-)
    Matthew Rocklin
    @mrocklin
    The first link is better. We should retire the doc in distributed.dask.org
    Adam Thornton
    @athornton
    This is probably just something I missed when it changed (we're trying to get back to current versions of everything)--KubeCluster workers used to kill themselves after 60 seconds of not being able to talk to their master.
    They don't anymore.
    Is that something I need to explicitly configure now?
    (if you don't have some kind of dead-man, your cluster or namespace quickly fills up with workers trying to talk to nonexistent kernels)
    Adam Thornton
    @athornton
    It looks like I'm still setting death-timeout in my dask-worker invocation:
    /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/bin/python /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/bin/dask-worker --nthreads 4 --memory-limit 8192M --death-timeout 60 --local-directory /tmp/dask-krughoff-e8dd95ac-72drc
    But I just get this in my logs over and over:
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise
        raise IOError(msg)
    OSError: Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07069128>: ConnectionRefusedError: [Errno 111] Connection refused
    Shouldn't that trigger the death timeout after six of those?
    Whole stack trace, if it helps, is:
    tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7feb77f9b278>>, <Task finished coro=<Worker.heartbeat() done, defined at /opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/worker.py:866> exception=OSError("Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07444240>: ConnectionRefusedError: [Errno 111] Connection refused")>)
    Traceback (most recent call last):
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect
        quiet_exceptions=EnvironmentError,
    tornado.util.TimeoutError: Timeout
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
        ret = callback()
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
        future.result()
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/worker.py", line 875, in heartbeat
        metrics=await self.get_metrics(),
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 747, in send_recv_from_rpc
        comm = await self.pool.connect(self.addr)
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/core.py", line 874, in connect
        connection_args=self.connection_args,
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect
        _raise(error)
      File "/opt/lsst/software/stack/python/miniconda3-4.5.12/envs/lsst-scipipe-1172c30/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise
        raise IOError(msg)
    OSError: Timed out trying to connect to 'tcp://10.46.128.14:36210' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7feb07444240>: ConnectionRefusedError: [Errno 111] Connection refused
    Matthew Rocklin
    @mrocklin
    @athornton I recommend raising an issue. You'll get far faster response times and much better support there.
    Justin Sybrandt
    @JSybrandt

    Hey I just posted a question on SO: https://stackoverflow.com/questions/57760355/how-should-i-load-a-memory-intensive-helper-object-per-worker-in-dask-distribute

    Summary: I have a big object that I need per-worker and don't want to reinit every function call. Can dask handle this?

    Yash Aggarwal
    @FR4NKESTI3N
    @jcrist There was a GSoC project which mentioned cythonisation of scheduler. Is that project still available ? (Not as a part of GSoC, of course) I wish to do this project now as a part of my coursework project. Thanks!
    Jim Crist-Harif
    @jcrist
    That work has not been done. The project was to experiment and see if meaningful speedups could have been attained by cythonizing some algorithms used by the scheduler. Note that any work done on this may or may not be merged, but it may be an interesting experiment. I personally have no time to mentor/respond to this issue, others may.
    Yash Aggarwal
    @FR4NKESTI3N
    @jcrist Got it. Would it be possible to obtain a list of todo's or projects that the organization would be more interested in?
    Jim Crist-Harif
    @jcrist
    We don't have any large projects we're actively working on. Depending on the size of project you need for your class, you may be interested in issues in our issue tracker: https://github.com/dask/dask/issues. Otherwise, adding dask support to a library in the python ecosystem would be an excellent project. Dask itself is fairly - we're mostly focused on performance and small changes at this point.
    The Python ecosystem however still has plenty of work that could be done to add parallelism support. Libraries that have added dask support include xarray, scikit-image, hyperspy, etc... Finding a Python library that is looking to add dask support would likely be a good project, and is something the dask dev team is actively interested in.
    Varun B Patil
    @varunbpatil

    Hello everyone. Just wanted to know whether I can perform Dask array computations inside a delayed function, something like this.

    def dask_delayed_example(X):
        mean = X.mean(axis=0)
        std = X.std(axis=0)
        return X - mean
    
    X = df.to_dask_array(lengths=True)
    res = dask_delayed_example(X)

    Is Dask delayed only meant for parallelizing computation or can it also be used to perform computation on larger-than-memory datasets in a distributed manner?

    James Stidard
    @jamesstidard
    Hi @varunbpatil, I'm far from being an expert on dask but I think I have a similar use case. And I asked about it a couple days ago in here. @bnaul pointed me at this recipe https://distributed.dask.org/en/latest/task-launch.html - might be what you're after.
    I also recall reading in the docs that nesting delays that return delayed is a antipattern of sorts. So I think you want that task scheduling other tasks approach
    Varun B Patil
    @varunbpatil
    Thanks for the link @jamesstidard . I'll check it out.
    Yash Aggarwal
    @FR4NKESTI3N
    @jcrist Thank you. I'll try and look for potential libraries for now
    Fran├žois Steinmetz
    @fsteinmetz

    Hi there ! I'm trying to use dask-ssh as follows:

    % dask-ssh --scheduler master1 node1
    
    ---------------------------------------------------------------
                     Dask.distributed v1.27.0
    
    Worker nodes:
      0: node1
    scheduler node: master1:8786
    ---------------------------------------------------------------
    [ scheduler master1:8786 ] : /home/applis/anaconda/envs/py3v19.04/bin/python -m distributed.cli.dask_scheduler --port 8786
    [ worker node1 ] : /home/applis/anaconda/envs/py3v19.04/bin/python -m None master1:8786 --nthreads 0 --host node1 --memory-limit auto

    but I cannot connect a client to the scheduler and logging does not provide more information. Client('master1:8786') fails with OSError: Timed out trying to connect... and I can not access the web ui at master1:8787. However, running dask-scheduler and dask-workerwork fine. Any suggestion ?

    Davis Bennett
    @d-v-b
    simple question: when using dask.array with distributed to do something like darray.mean(), do the workers get the full task graph of darray?
    Davis Bennett
    @d-v-b
    i'm asking because I noticed that using da.from_array(large_numpy_array) was blowing up workers, as (counterintuitively) the large_numpy_array is not actually partitioned along the expected chunks. Each worker appears to get a full copy of large_numpy_array regardless of the chunking.