Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    RuiLoureiro
    @RuiLoureiro

    I use pytest to test my application. Since it relies on dask, I need to have a local cluster as a fixture.

    I tried to do it myself and close the cluster on the fixture cleanup. Even though it works, I get a warning with more than 100 lines because dask tries to communicate with the cluster after it has been closed.

    I use pytest to test my application. Since it relies on dask, I need to have a local cluster as a fixture.

    I tried to do it myself and close the cluster on the fixture cleanup. Even though it works, I get a warning with more than 100 lines because dask tries to communicate with the cluster after it has been closed.

    I've also tried to use the client fixture from distributed.utils_test, but that gives me the following error

    file /Users/ruiloureiro/.virtualenvs/tarsier/lib/python3.7/site-packages/distributed/utils_test.py, line 561
    @pytest.fixture
      def client(loop, cluster_fixture):
    E       fixture 'loop' not found
    >       available fixtures: cache, capfd, capfdbinary, caplog, capsys, capsysbinary, client, cov, doctest_namespace, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, tmp_path, tmp_path_factory, tmpdir, tmpdir_factory
    >       use 'pytest --fixtures [testpath]' for help on them.`
    Scott Sievert
    @stsievert
    Would it be possible to mock Dask’s response in the tests? https://github.com/pytest-dev/pytest-mock/
    Alessio Civitillo
    @acivitillo
    Hello, wondering if the retries argument inside dask.compute() retries only failed tasks or all tasks? https://distributed.dask.org/en/latest/api.html#distributed.Client.compute
    Martin Durant
    @martindurant
    ^ should be failed tasks. If the dependencies went away in the meantime, some may need to be recalculated.
    David Brochart
    @davidbrochart
    When I have a computation launched in the background with persist(), followed by a call to progress to have a progress bar, is there a way to wait for the computation to finish and still have the bar rendering? compute or wait will block including the bar rendering.
    David Brochart
    @davidbrochart
    Oh maybe by using client.persist and then checking status?
    David Brochart
    @davidbrochart
    But that needs event loop integration
    joeschmid
    @joeschmid
    Hi Dask community, thanks for a great project -- we're shifting a lot of our data science work onto Dask (+ Prefect, potentially) and we've had a good experience. Question on heterogeneous workers: we have need for different types of workers, e.g. high memory, GPU, etc. We use dask-kubernetes (on AWS EKS) which currently just supports a single k8s pod spec for all workers. I'm about to start on a potential PR in dask-kubernetes that would allow for different worker types, but only in non-adaptive mode. Before I dive in, I thought I'd ask if anyone knows of existing work in this area. I see this old issue: dask/distributed#2118 but haven't found anything else. Thanks in advance!
    RuiLoureiro
    @RuiLoureiro

    Hey all, my application requires me to launch tasks from within other tasks, like the following

    def a():
        # ... some computation ..
    
    def b():
        # ... some computation ..
    
    def c():
        client = get_client()
        a = client.submit(a)
        b = client.submit(b)
    
        [a,b] = client.gather([a,b])
    
        return a+b
    
    client = get_client()
    res = client.submit(c)

    However, I would like to have access to the intermediate results a and b, but only c shows up in client.futures.

    Is there a way to tell dask to keep the results for a and b?

    Thank you

    joeschmid
    @joeschmid
    @RuiLoureiro I think you want to use a worker_client in your task c like this:
    from dask.distributed import worker_client
    
    
    def c(n):
        with worker_client() as client:
            client = get_client()
            a = client.submit(a)
            b = client.submit(b)
            [a,b] = client.gather([a,b])
        return a + b
    RuiLoureiro
    @RuiLoureiro
    @joeschmid How should I access a and b? client.futures still only shows c.
    joeschmid
    @joeschmid
    @RuiLoureiro how about just returning a and b explicitly in c(), e.g.
    from dask.distributed import worker_client
    
    
    def c(n):
        with worker_client() as client:
            client = get_client()
            a = client.submit(a)
            b = client.submit(b)
            [a,b] = client.gather([a,b])
        return [a + b, a, b]
    RuiLoureiro
    @RuiLoureiro
    @joeschmid This example is a simplification of my application and making every task return its dependencies would be overly complicated, even more so because I have several levels of dependency (not only one level as like the example).
    joeschmid
    @joeschmid
    @RuiLoureiro got it, that makes sense. It might be overkill, but I suppose you could build up the entire tree of intermediate results. One open source project that I'm familiar with runs on Dask and does that: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/flow_runner.py The key line:
    task_states[task] = executor.submit(
                            self.run_task,
                            task=task,
                            state=task_state,
                           ... snipped ...
                        )
    We run large Flows with Prefect on Dask. When our Flow Run is finished, we get back a results object that we can then examine for the state and return value of the various tasks. Depending on what you're doing, it might be easier to just use Prefect to build your flow and run it on Dask. It makes it very easy to return data from tasks and build up multi-step flows that run on Dask. We've had a good experience with it.
    RuiLoureiro
    @RuiLoureiro
    @joeschmid Very interesting, will look into it. Even if we don't end up using the library, it's always interesting to see how other people build on top of dask. Thank you very much!
    garanews
    @garanews
    is it possible to have 2 different dask cluster running on same machine?
    joeschmid
    @joeschmid
    @RuiLoureiro You bet. Also, Prefect has a really helpful public Slack community available at prefect-community.slack.com . You can post questions there and get helpful, fast responses from their team.
    bgoodman44
    @bgoodman44
    Anyone here?
    konstantinmds
    @konstantinmds

    Hi guys,
    I have an issue of pulling large table( can't fit the memory) from the azure database or another server, that table i need to divide in multiple csv-s to generate.
    So ,i basicaly have no transformation except for dividing it to equal parts.
    I think that the Dask is the right tool i'm looking for?
    I tried many ways to make a simple connection to the sql server, but i just can't

    import dask.dataframe as dd
    import sqlachemy as sa
    engine =sa.create_engine('mssql+pyodbc://VM/Data?driver=SQL+Server+Native+Client+11.0')
    metadata = sa.MetaData()
    posts = sa.Table('posts', metadata, schema= 'dbo', autoload= True, autoload_with= engine)
    query = sa.select([posts])
    sql_reader = dd.read_sql_table('posts', uri =engine, npartitions=16, index_col='userId')

    Any help with this ?

    Martin Durant
    @martindurant
    uri should, as the name implies, be the URI and not a engine instance
    uri : string
        Full sqlalchemy URI for the database connection
    Bradley McElroy
    @limx0
    Hi dask team, is there an equivalent SystemMonitor for the dask-kubernetes workers as there is for the LocalCluster workers?
    Bradley McElroy
    @limx0
    I'm interested in gathering some worker resource stats for some dask graph runs
    Matthew Rocklin
    @mrocklin
    All dask workers run the system monitor internally
    Bradley McElroy
    @limx0
    Okay good to know, do the kubes workers record their usage like the local cluster? I'm actually trying to do a range query after some computation, is this also available on the kubes cluster?
    Matthew Rocklin
    @mrocklin
    (FYI, I don't answer questions here as a rule. if you're looking for help from me personally you'll have to use stack overflow or github)
    Bradley McElroy
    @limx0
    understood, thanks @mrocklin - dask/dask-kubernetes#180
    Matthew Rocklin
    @mrocklin
    I appreciate it!
    Adam Thornton
    @athornton
    I just opened this issue: dask/dask-kubernetes#181 but figured someone here might know. What changed in dask-kubernetes with respect to RBAC after 0.9.1 ? I get a 403 Forbidden: User \"system:serviceaccount:nublado-athornton:dask\" cannot get resource \"pods\" in API group \"\" in the namespace \"nublado-athornton\"" but I have what look like the right rules in my role:
    rules:
    - apiGroups:
      - ""
      resources:
      - pods
      verbs:
      - list
      - create
      - delete
    Eric Ma
    @ericmjl
    @TomAugspurger I hope you're doing well. I've got a question for you regarding dask-ml - is there a way to prevent parallel random forest fitting from using >100% CPU? I have set n_jobs=1 in the RandomForestRegressor() constructor, but still end up with some of my dask workers using 2000% CPU, which is looking really weird.
    (It also makes me a bad citizen on our compute cluster, haha.)
    FWIW, i
    I've been using dask as part of an automated ML system - one RF per slice/subset of data in our database - and there's like thousands of slices.
    It's super cool that Dask enables me to build this out.
    That said, I get a lot of garbage collection errors whenever I'm using parallel RF on this many slices of data - is it because of overwhelming the dask scheduler?
    Matthew Rocklin
    @mrocklin
    @ericmjl it might be better to raise this as an issue on github
    That way other people who run into the same issue will be able to do a web search and benefit from the answer
    Eric Ma
    @ericmjl
    OK! Thanks @mrocklin! :D
    Scott Sievert
    @stsievert
    I’m referencing dask-jobqueue in an academic paper. What’s the preferred method of referencing it?
    I’m currently planning on linking to the readthedocs homepage.
    joeschmid
    @joeschmid
    anakaine
    @anakaine
    Random question: When createding a new column in a dataframe by returning the results from an np.select operation dask complains that the data coming back is not supported, being of type numpy.ndarray. Doing the same in pandas (same code, just one dataframe is set up in dask, the other in pandas) works fine. Is this as expected?
    Scott Sievert
    @stsievert
    Thanks for that @joeschmid
    Simon
    @anakaine_twitter
    Is there an equivalent of np.select in dask? Checking the docs theres quite a few references to "see also: select", but no section for it.
    David Hoese
    @djhoese
    Not sure of a better place for this, but I'm looking for a small example of using fastparquet that shows the advantages of defining row-groups over the default "single" row group. I'm presenting some information on file formats but don't use parquet in my day to day (I really don't even use pandas that often). Anyone have a simple code example?
    Uwe L. Korn
    @xhochy
    @djhoese Here's a very simple example: https://gist.github.com/xhochy/0880fd0bed3a7eaca3c31bf07b6868d9 (although it uses pyarrow)
    Martin Durant
    @martindurant
    correct, whether or not to use row-groups is not specific to the engine; Dask will load in parallel with either and can make (limited) decisions to exclude some row groups based on metadata.
    David Hoese
    @djhoese

    @xhochy Ok thanks. Is there a way to do that query without knowing that row-group 1 is where you want to look?

    Does generating a pandas DataFrame (not dask) load the entire parquet file in to memory? Or does it do it lazily?

    Martin Durant
    @martindurant

    That last time I looked, pandas loaded everything. It would be reasonable to implement that iteratively, and fastparquet does have a specific method to do that.

    Is there a way to do that query without knowing that row-group 1 is where you want to look

    Parquet optionally stores columns max and min values for each row-group, so maybe