Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Fred Massin
    @FMassin
    Hello,
    I would like to use template matching with time-series. The objective is to look for many relatively short 1d pattern in a relatively long time-series. Any suggestion on how to do this in Dask? I mean to have something like https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.corr with many other and split_every equal or lower than length of other...
    Thanks!
    Berk Gercek
    @berkgercek
    Heya, hate to add to the pile of questions but I'm currently going through dask-tutorial and I am on the weather example in Ch. 2: Arrays. The computation of mean in 500x500 chunks takes 14m31s of time (not wall time), with only 1m50s of that being User time and the rest is sys. Is this a normal result? The wall time for reference is ~37s, and I am using a 12-core processor
    Martin Durant
    @martindurant
    ^ many of these sounds like stack-overflow questions or issues (to tutorials in the latter case). We discourage anything more than general conversation here, because we would like solutions to be searchable, so others can benefit too
    Fred Massin
    @FMassin
    ok
    Matthew Rocklin
    @mrocklin
    +1 to what @martindurant said. @xavArtley @FMassin @berkgercek those are all great and valid questions. Most of the Dask maintainers don't watch this channel often. See https://docs.dask.org/en/latest/support.html for suggestions on where to ask for help
    Martin Durant
    @martindurant
    (it may still take some time for replies, but the audience is much broader)
    lszyba1
    @lszyba1
    Hello,
    I'm trying to load a csv file, but there are some inconsistent column types...
    So while initially the file looks like bunch of 0, then its an object...
    is there a way to:
    a) parse through all the csv file 10gb+ and create correct dtype. (I have over 100 columns)
    or
    b) go through the file and summersize the type counts... for example is there 99,999 rows of int and 1 row of object...and I can just clean that row?
    Dean Langsam
    @DeanLa
    is dask-yarn (specifically EMR) having known issues?
    Cluster just dies on every take or persist action.
    Martin Durant
    @martindurant
    ^ you should probably file an issue with more details. I believe dask-yarn has been successfully used on EMR.
    xavArtley
    @xavArtley
    Hello, is it anti-pattern to use dask.delayed to contruct task wich will be submit to the cluster latter?
    Davis Bennett
    @d-v-b
    quick question for any dask-jobqueue users / devs -- what happened to the cluster.start_workers() method, and what is the new way to get workers? (cluster.scale() doesn't use the right launch script)
    Davis Bennett
    @d-v-b
    ok looks like the start_workers method died with shift to SpecCluster
    Martin Durant
    @martindurant
    @xavArtley , if you mean having the delayed function return submitted work, then yes, this falls under tasks-which-operate-dask-collections, don’t do it. Since submit and/or delayed calls are almost instantaneous, there is no point to nesting them.
    Taleb Zeghmi
    @talebzeghmi
    Is anyone working on reading data from Hive Tables without knowledge of what the underlying file format is or where the file is located? (ie. Dask talks to Hive metastore just like Spark SQL)
    Davis Bennett
    @d-v-b
    Is this the intended behavior of the chunksize property of an array?
    In [3]: da.zeros((10,10), chunks=((1,9),(10,)))
    Out[3]: dask.array<zeros, shape=(10, 10), dtype=float64, chunksize=(9, 10), chunktype=numpy.ndarray>
    if not I can open an issue
    (to me it seems weird that chunksize ignores the uneven chunking of the array)
    Jim Crist-Harif
    @jcrist
    @talebzeghmi , at one point in time @mariusvniekerk wrote this up: https://github.com/mariusvniekerk/dask-hivemetastore. I've never used it, and it's not being actively developed. We personally have no immediate plans t work on it.
    Matthias De Vriendt
    @matthiasdv

    Hello. I have an environment where I'm upgrading:

    dask==1.1.0 to dask==1.2.0
    distributed==1.25.3 to distributed==1.27.0
     dask-kubernetes==0.8.0  to dask-kubernetes==0.9.2

    And I notice that my worker pods start, but then fail to connect to the scheduler. Further inspection points out that the worker pods have DASK_SCHEDULER_ADDRESS set to a localhost (i.e. tcp://127.0.0.1:44131) address while the working version points to a correct remote IP (scheduler and worker pods are on seperate clusters).

    I've spent some time digging through the codebase to find out why this is not correctly being set.

    Where should I begin looking?

    Matthew Fung
    @mattfung
    Hi, I've been playing around with running parallel tasks via dask distributed using remote workers. One problem I've encountered is that when I do client.submit on a function that depends on a module I wrote, the worker nodes would fail with ModuleNotFoundError. I'm wondering if dask.distributed requires all worker nodes to install dependencies beforehand, or am I doing something wrong?
    Martin Durant
    @martindurant
    Yes, you should ensure the same environment for workers and client; although you can upload modules to workers if you must
    Matthew Fung
    @mattfung
    @martindurant thx
    Zhao Jun
    @zhaojun060708
    企业咚咚20191030182118.png
    Hi, I create a task graph, which contains several small tasks. As you can see, the several tasks deserialization takes too much time shown in grey. I do not know which causes this. Is there any way to optimize this? Many thanks.
    Martin Durant
    @martindurant
    You would do well to post the code you are using along with this question to stack overflow. We can only guess at what might be happening
    Zhao Jun
    @zhaojun060708
    Thank you for your reply. I have a question. The workers have all the code, can the worker import the function locally, not deserialize task code.
    Martin Durant
    @martindurant
    Pickle normally stores the in-module version and imports when deserialising, but it depends how you defined things
    Tom Augspurger
    @TomAugspurger
    Thanks for scheduling a Tweet for the pytorch example Matt, I was just about to do that :)
    Matthew Rocklin
    @mrocklin
    :)
    Zoltan C. Toth
    @zoltanctoth

    Hi,

    I'm looking for a simple resource manager. I have a small set of computers, each with different characteristics (when it comes to memory size and GPU count). I'm running ML training algorithms in the form of standalone python scripts on these machines. One script runs on a single node, and there is no distributed computation needed. I know beforehand how much memory and how many GPUs are required for a particular python script.

    All I want is to have a resource manager that helps me automatically execute these python scripts on a node that has enough free mem/gpu capacity at the time of execution.

    I've seen the Dask distributed resource manager, but can't figure out if I can use it as a Resource Manager for executing python scripts, and nothing more. Can you give me some guidance here?

    I have quite a bit of experience with Apache Spark, but as of Spark 2, GPU-aware scheduling doesn't seem to be possible. I've checked Mesos+Chronos things get pretty complicated there compared to the simplicity of this use-case. K8S is a monster. So I'm checking if there is a straightforward way to get this done in Dask?

    Martin Durant
    @martindurant
    This would be a nice stack-overflow question, so others can benefit from the answer too. For now, I would refer you to Dasks “label-based” resource system: https://distributed.dask.org/en/latest/resources.html
    thorstenhater
    @thorstenhater

    Hi,
    I have an application I am not sure about whether to use Dask for or not. The problem is roughly
    this

    I have a set of points I want to index for use in spatial queries. The index must be distributed over several nodes to fit into main memory/performance, for the moment indexing into random subsets is fine. Queries will be executed by all Workers and merged into the final result.

    So I made a mock-up using dask and stumbled upon some issues

    from dask.delayed import delayed
    import numpy as np
    import dask.array as da
    
    @delayed
    def make(xs):
        """Create index"""
        return None
    
    @delayed
    def order(xs):
        """Sort points for search"""
        return np.sort(xs[0].compute()) # This feels bad
    
    @delayed
    def query(index, xs, ll, ur):
        """Bounding box query between lower left and upper right"""
        return xs[(ll[0] <= xs[:, 0]) & (xs[:, 0]< ur[0])]
    
    # Point count
    N = 10000
    
    # Random input: N random points in 3d
    array   = da.random.random((N, 3), chunks=(N//10, 3))
    # Split into Delayeds per chunk
    splits  = array.to_delayed()
    # Create two part index
    ordered = [order(split) for split in splits]
    indices = [make(split) for split in splits]
    # Execute a query
    queries = [query(index, points, (0, 0.5, 0.75), (1.0, 0.6, 0.85)) for index, points in zip(indices, ordered)]
    # Merge results
    results = [da.from_delayed(i, shape=(np.nan, 3), dtype=np.float64) for i in queries]
    final   = da.concatenate(results)
    final.compute()

    Note the marked line This feels bad. I feel should not call compute in a delayed function. Also,
    xs[0] is necessary as xs is an array of Delayed objects. This does not happen if the array and/or chunks are one-dimensional.

    Any suggestions or ideas?

    Thanks,
    Thorsten

    Scott Sievert
    @stsievert
    @thorstenhater calling compute from workers is bad practice. I suspect choosing the correct chunk size for “array” will make this easier. Then, you can call “array.map_blocks(np.sort)”.
    Said another way, why isn’t the chunk size the size of “xs[0].compute()”? Those data need to be gathered on one machine anyways, and it can’t be too expensive to compute
    thorstenhater
    @thorstenhater
    @stsievert The core issue is this
    Running with two-dimensional chunks (k, 3) the make function receives something like
    [Delayed(('random_sample-4e3ea92e29a66a4b4a705044ec3c18a7', 0, 0))]
    Running with one-dimensional chunks (k,) it is
    [0.55253896 0.4182976 0.62587806 0.25000455 0.85405076]
    In the second, 1d case, everything works as expected, but I do understand the difference in the 2d example.
    Scott Sievert
    @stsievert
    @thorstenhater this test passes:
    def _sort(x):
        return np.sort(x, axis=1)
    
    N = 1000
    arr   = da.random.random((N, 3), chunks=(N//10, 3))
    y = arr.map_blocks(_sort)
    z = y.compute()
    
    for zi in z:
        assert (np.diff(zi) > 0).all()  # assert ordered
    There's no need to call to_delayed to pass each chunk to a function; map_blocks does the same thing.
    thorstenhater
    @thorstenhater
    Yes, that is true, but I need to change from an array type to an opaque Index type, which does not work for map_blocks. I found a work-around by reshapeing to linear arrays and back. However, that is not ideal, either.
    Scott Sievert
    @stsievert
    @thorstenhater this question is better suited for SO.
    mcguip
    @mcguipat
    client.scatter(dict) raises a tornado.gen.Result. My understanding is this is a python 2 coroutine convention which is no longer required in python 3. What is the modern way of acquiring the associated futures from this call?
    Andrey Tatarinov
    @elephantum
    Hi, let's say I have a large computation graph that maps one parquet directory to another. And this graph fails in the middle for external reason. But some number of output parts are already computed and written to the disk. Is it possible to consider corresponding nodes as done and not recompute subgraphs that lead to them?
    Martin Durant
    @martindurant
    There is not a simple way to achieve that, no. If the mapping of files is one-to-one, you can probably write a little logic to pull tasks out of the graph, or to build the computation in the first place
    Andrey Tatarinov
    @elephantum
    Is it possible to inspect and modify graph once it's built? Where do I start?
    Martin Durant
    @martindurant
    Indeed, usually the .dask attribute of your dask object. Not many people do this, though, it’s not too simple.
    Andrey Tatarinov
    @elephantum

    Thanks, I'll have a look.

    Speaking of 1-to-1 mapping, usually that's the case, so I can build many small subgraphs instead of one big.

    Martin Durant
    @martindurant
    ^ probably the easier route
    fritz-clicktripz
    @fritz-clicktripz
    Is anyone available to briefly chat about the interplay between dask.dataframe, futures, and delayed's ? I have a simple dataframe operation on parquet files and a custom delayed save, I'm worried about making sure the correct dataframe partition maps to the correct filename that I got in the beginning when I overwrite the file (I can't use to_parquet for external reading system reasons)