Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Scott Sievert
    @stsievert
    Dean Langsam
    @DeanLa
    not what i need.
    i need to expand a list to columns (on axis 1), not explode and create new rows
    Sergio Meana
    @smeana
    Hello
    Is any build in function to do forward rolling on timeseries index?
    I have seen same question asked by @AlbaFS on Aug 04 without reply. Any solution at all? thks
    mcguip
    @mcguipat
    Hi all, I have a problem which requires data to be loaded in several blocks which are then subdivided and operated on. Scatter seems like the most sensible way of accomplishing this; however, if I want loading to be executed on the cluster as well, this requires scatter from within another task. Is there a better way to accomplish this?
    Dario Vianello
    @dvianello
    Hey! I'm struggling a bit with s3 & IAM roles while using dask. We have a set of workers in EC2 with instance profiles authorised to assume a role in a different account and read from a bucket. I've tried to do the assume role beforehand in boto3 and pass the session into dask, but the session object can't be pickled apparently (fails to pickle the lock). Is there a way to pull this off in Dask? Sending creds along to workers isn't the best idea ever and it would be cool if the system was able to do the assume role on the workers before trying to access s3...
    Martin Durant
    @martindurant
    Whatever you do to assume the role, you could execute the same thing on the cluster workers using client.run
    Dario Vianello
    @dvianello
    right!
    Dean Langsam
    @DeanLa
    Running dask-yarn, notebook crashed, I think Cluster instance (scheduler?) is dangling because I can't create a new one. How can I make sure?
    xavArtley
    @xavArtley

    Hello,
    I'm using

    dask.compute(*delayeds, scheduler='processes', num_workers=4)

    to run computations.
    I was wondering which function was used to serialized object between processes and if it was possible to change it
    Thanks

    Fred Massin
    @FMassin
    Hello,
    I would like to use template matching with time-series. The objective is to look for many relatively short 1d pattern in a relatively long time-series. Any suggestion on how to do this in Dask? I mean to have something like https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.corr with many other and split_every equal or lower than length of other...
    Thanks!
    Berk Gercek
    @berkgercek
    Heya, hate to add to the pile of questions but I'm currently going through dask-tutorial and I am on the weather example in Ch. 2: Arrays. The computation of mean in 500x500 chunks takes 14m31s of time (not wall time), with only 1m50s of that being User time and the rest is sys. Is this a normal result? The wall time for reference is ~37s, and I am using a 12-core processor
    Martin Durant
    @martindurant
    ^ many of these sounds like stack-overflow questions or issues (to tutorials in the latter case). We discourage anything more than general conversation here, because we would like solutions to be searchable, so others can benefit too
    Fred Massin
    @FMassin
    ok
    Matthew Rocklin
    @mrocklin
    +1 to what @martindurant said. @xavArtley @FMassin @berkgercek those are all great and valid questions. Most of the Dask maintainers don't watch this channel often. See https://docs.dask.org/en/latest/support.html for suggestions on where to ask for help
    Martin Durant
    @martindurant
    (it may still take some time for replies, but the audience is much broader)
    lszyba1
    @lszyba1
    Hello,
    I'm trying to load a csv file, but there are some inconsistent column types...
    So while initially the file looks like bunch of 0, then its an object...
    is there a way to:
    a) parse through all the csv file 10gb+ and create correct dtype. (I have over 100 columns)
    or
    b) go through the file and summersize the type counts... for example is there 99,999 rows of int and 1 row of object...and I can just clean that row?
    Dean Langsam
    @DeanLa
    is dask-yarn (specifically EMR) having known issues?
    Cluster just dies on every take or persist action.
    Martin Durant
    @martindurant
    ^ you should probably file an issue with more details. I believe dask-yarn has been successfully used on EMR.
    xavArtley
    @xavArtley
    Hello, is it anti-pattern to use dask.delayed to contruct task wich will be submit to the cluster latter?
    Davis Bennett
    @d-v-b
    quick question for any dask-jobqueue users / devs -- what happened to the cluster.start_workers() method, and what is the new way to get workers? (cluster.scale() doesn't use the right launch script)
    Davis Bennett
    @d-v-b
    ok looks like the start_workers method died with shift to SpecCluster
    Martin Durant
    @martindurant
    @xavArtley , if you mean having the delayed function return submitted work, then yes, this falls under tasks-which-operate-dask-collections, don’t do it. Since submit and/or delayed calls are almost instantaneous, there is no point to nesting them.
    Taleb Zeghmi
    @talebzeghmi
    Is anyone working on reading data from Hive Tables without knowledge of what the underlying file format is or where the file is located? (ie. Dask talks to Hive metastore just like Spark SQL)
    Davis Bennett
    @d-v-b
    Is this the intended behavior of the chunksize property of an array?
    In [3]: da.zeros((10,10), chunks=((1,9),(10,)))
    Out[3]: dask.array<zeros, shape=(10, 10), dtype=float64, chunksize=(9, 10), chunktype=numpy.ndarray>
    if not I can open an issue
    (to me it seems weird that chunksize ignores the uneven chunking of the array)
    Jim Crist-Harif
    @jcrist
    @talebzeghmi , at one point in time @mariusvniekerk wrote this up: https://github.com/mariusvniekerk/dask-hivemetastore. I've never used it, and it's not being actively developed. We personally have no immediate plans t work on it.
    Matthias De Vriendt
    @matthiasdv

    Hello. I have an environment where I'm upgrading:

    dask==1.1.0 to dask==1.2.0
    distributed==1.25.3 to distributed==1.27.0
     dask-kubernetes==0.8.0  to dask-kubernetes==0.9.2

    And I notice that my worker pods start, but then fail to connect to the scheduler. Further inspection points out that the worker pods have DASK_SCHEDULER_ADDRESS set to a localhost (i.e. tcp://127.0.0.1:44131) address while the working version points to a correct remote IP (scheduler and worker pods are on seperate clusters).

    I've spent some time digging through the codebase to find out why this is not correctly being set.

    Where should I begin looking?

    Matthew Fung
    @mattfung
    Hi, I've been playing around with running parallel tasks via dask distributed using remote workers. One problem I've encountered is that when I do client.submit on a function that depends on a module I wrote, the worker nodes would fail with ModuleNotFoundError. I'm wondering if dask.distributed requires all worker nodes to install dependencies beforehand, or am I doing something wrong?
    Martin Durant
    @martindurant
    Yes, you should ensure the same environment for workers and client; although you can upload modules to workers if you must
    Matthew Fung
    @mattfung
    @martindurant thx
    Zhao Jun
    @zhaojun060708
    企业咚咚20191030182118.png
    Hi, I create a task graph, which contains several small tasks. As you can see, the several tasks deserialization takes too much time shown in grey. I do not know which causes this. Is there any way to optimize this? Many thanks.
    Martin Durant
    @martindurant
    You would do well to post the code you are using along with this question to stack overflow. We can only guess at what might be happening
    Zhao Jun
    @zhaojun060708
    Thank you for your reply. I have a question. The workers have all the code, can the worker import the function locally, not deserialize task code.
    Martin Durant
    @martindurant
    Pickle normally stores the in-module version and imports when deserialising, but it depends how you defined things
    Tom Augspurger
    @TomAugspurger
    Thanks for scheduling a Tweet for the pytorch example Matt, I was just about to do that :)
    Matthew Rocklin
    @mrocklin
    :)
    Zoltan C. Toth
    @zoltanctoth

    Hi,

    I'm looking for a simple resource manager. I have a small set of computers, each with different characteristics (when it comes to memory size and GPU count). I'm running ML training algorithms in the form of standalone python scripts on these machines. One script runs on a single node, and there is no distributed computation needed. I know beforehand how much memory and how many GPUs are required for a particular python script.

    All I want is to have a resource manager that helps me automatically execute these python scripts on a node that has enough free mem/gpu capacity at the time of execution.

    I've seen the Dask distributed resource manager, but can't figure out if I can use it as a Resource Manager for executing python scripts, and nothing more. Can you give me some guidance here?

    I have quite a bit of experience with Apache Spark, but as of Spark 2, GPU-aware scheduling doesn't seem to be possible. I've checked Mesos+Chronos things get pretty complicated there compared to the simplicity of this use-case. K8S is a monster. So I'm checking if there is a straightforward way to get this done in Dask?

    Martin Durant
    @martindurant
    This would be a nice stack-overflow question, so others can benefit from the answer too. For now, I would refer you to Dasks “label-based” resource system: https://distributed.dask.org/en/latest/resources.html
    thorstenhater
    @thorstenhater

    Hi,
    I have an application I am not sure about whether to use Dask for or not. The problem is roughly
    this

    I have a set of points I want to index for use in spatial queries. The index must be distributed over several nodes to fit into main memory/performance, for the moment indexing into random subsets is fine. Queries will be executed by all Workers and merged into the final result.

    So I made a mock-up using dask and stumbled upon some issues

    from dask.delayed import delayed
    import numpy as np
    import dask.array as da
    
    @delayed
    def make(xs):
        """Create index"""
        return None
    
    @delayed
    def order(xs):
        """Sort points for search"""
        return np.sort(xs[0].compute()) # This feels bad
    
    @delayed
    def query(index, xs, ll, ur):
        """Bounding box query between lower left and upper right"""
        return xs[(ll[0] <= xs[:, 0]) & (xs[:, 0]< ur[0])]
    
    # Point count
    N = 10000
    
    # Random input: N random points in 3d
    array   = da.random.random((N, 3), chunks=(N//10, 3))
    # Split into Delayeds per chunk
    splits  = array.to_delayed()
    # Create two part index
    ordered = [order(split) for split in splits]
    indices = [make(split) for split in splits]
    # Execute a query
    queries = [query(index, points, (0, 0.5, 0.75), (1.0, 0.6, 0.85)) for index, points in zip(indices, ordered)]
    # Merge results
    results = [da.from_delayed(i, shape=(np.nan, 3), dtype=np.float64) for i in queries]
    final   = da.concatenate(results)
    final.compute()

    Note the marked line This feels bad. I feel should not call compute in a delayed function. Also,
    xs[0] is necessary as xs is an array of Delayed objects. This does not happen if the array and/or chunks are one-dimensional.

    Any suggestions or ideas?

    Thanks,
    Thorsten

    Scott Sievert
    @stsievert
    @thorstenhater calling compute from workers is bad practice. I suspect choosing the correct chunk size for “array” will make this easier. Then, you can call “array.map_blocks(np.sort)”.
    Said another way, why isn’t the chunk size the size of “xs[0].compute()”? Those data need to be gathered on one machine anyways, and it can’t be too expensive to compute
    thorstenhater
    @thorstenhater
    @stsievert The core issue is this
    Running with two-dimensional chunks (k, 3) the make function receives something like
    [Delayed(('random_sample-4e3ea92e29a66a4b4a705044ec3c18a7', 0, 0))]
    Running with one-dimensional chunks (k,) it is
    [0.55253896 0.4182976 0.62587806 0.25000455 0.85405076]
    In the second, 1d case, everything works as expected, but I do understand the difference in the 2d example.
    Scott Sievert
    @stsievert
    @thorstenhater this test passes:
    def _sort(x):
        return np.sort(x, axis=1)
    
    N = 1000
    arr   = da.random.random((N, 3), chunks=(N//10, 3))
    y = arr.map_blocks(_sort)
    z = y.compute()
    
    for zi in z:
        assert (np.diff(zi) > 0).all()  # assert ordered
    There's no need to call to_delayed to pass each chunk to a function; map_blocks does the same thing.