Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Martin Durant
    @martindurant

    I am confused that you are talking about arrays and then dataframes. pangeo-forge is mostly around arrays.

    I thought dask is able to store big data frames and allow parallel operations on them to be faster

    That’s certainly true (with caveats), but you must present your data in an understandable way. The docs give varous ways to produce arrays and dataframes from, for example, delayed functions.

    RGBYCP
    @RGBYCP_gitlab
    @martindurant I apologise. I try to explain it better. I raster scan over a sample with an X-ray beam. I measure at each point of this scan a 2D image. One hdf5 contains 3 or more of such images as numpy arrays. With another Python library, I need to manipulate these images. Another hdf5 contains the motor positions for each point.
    I would like to bring motor positions, manipulated data together in a dataframe. 1 data frame per scan.
    I hope this is more clear now? Please let me know.
    Martin Durant
    @martindurant
    Look in the dask-delayed documentation for building up generic task graphs for parallel execution, and how you can make these into dataframs and arrays. You may also be interested in the nascent fsspec-reference-maker project, which can give you an ensemble view over many binary pieces of many HDF5 files via zarr (which is easily parallelised with dask) - but don’t do this first, as you will need to understand faily well how the libraries work first.
    Kevin Sayers
    @KevinSayers
    I am trying to use dask, distributed, and dask-mpi to run an analysis across a couple nodes as the dataframe is larger than memory on a single node. I have a dask dataframe with a number of partitions, and then call map_partitions with a custom function, this function merges some timeseries data. I do client.persist() on the variable for the map_partitions step. The last step is then to run some calculations which should easily fit in memory, sums etc. The problem I keep running into is that I hit OOM on the first node. I have read the managing memory page on the distributed docs, but was hoping someone could point me to some other ideas?
    On small test cases everything works fine, and it seems like work is happening across the nodes. I am a bit at a loss as to why the one node OOM as other nodes are nowhere near maxing out their memory.
    Adam Ginsburg
    @keflavich
    I just asked this question on SO; x-posting here for visibility. https://stackoverflow.com/questions/69429950/dask-what-does-memory-limit-control
    Yunus Talip Erol
    @yerol89
    Hi Everyone. I am very beginner to DASK and did not even load a table from Database before. I think I have passed all the parameters correctly to read_sql_table() but most probably missing something important.
    df = dd.read_sql_table('Table', 'oracle://user:password.@host/sid', index_col='CST_ID', schema='A')
    When I pass table name as A.Table DASK throws me the error NoSuchTable.
    Then I find a way to pass them as two different parameters as 'Table' and schema=A and did not get an error.
    But this time I got KeyError: 'CST_ID'. By the way CST_ID is a numeric column.
    As a result, I could not load the table from the database and as a beginner looking for your help...
    Martin Durant
    @martindurant
    You should try to find the set of parameters that a) allows sqlalchemy to connect and b) allows pandas.read_sql to successfully pull something (perhaps just the first few rows). Then you’ll be in a better positioni to experiment.
    2 replies
    Yunus Talip Erol
    @yerol89
    @martindurant Thanks for your answers. I will let you know if I can figure out this problem.
    Yunus Talip Erol
    @yerol89
    Hello Everyone again. I managed to figure out the former problem. But now I have one more issue. My table which I loaded from DB has 3.5 Million rows. But Dask loads only 900K of this rows. Am I doing something wrong? How can I load the whole table from the DB?
    Martin Durant
    @martindurant
    You should probably add debugging to the function to see what start and end values of CST_ID are being generated for each partition. You can also supply the boundary value yourself, if you look at the docstring.
    George Ciobanu
    @georgeciobanu
    Hi! Just a quick helpful announcement for anyone in need of a ready-to-use Dask cluster: free Dask cluster available to use at tcp://3.216.44.221:8786
    Full FAQ at https://docs.google.com/document/d/19wX6wJXgkxZuLNgSv-5NpqbiqhP4fP11lG1AlT98DZY/edit?usp=sharing
    Henrik Feldt
    @haf

    Hello everyone; I'm trying to spawn a new dask cluster for Prefect to use, but can't make it work:

    from dask_kubernetes import KubeCluster
    cluster = KubeCluster(
        pod_template="worker-spec.yaml",
        name="dask-cluster",
        namespace="flows",
        deploy_mode="remote",
    )

    It's just timing out with

    Creating scheduler pod on cluster. This may take some time.
    ERROR:asyncio:Unclosed client session
    client_session: <aiohttp.client.ClientSession object at 0x119a78e20>
    Could you tell me a little bit about how to sort this out?
    Ale 😡
    @aleperalta82_twitter
    Hello everyone! I'm trying to setup a simple cluster with EC2 instances. The dask scheduler and workers are in a docker image. The question is about security what ports should be open so the dask scheduler and workers can communicate. I've only opened 8786 and 8787 in the security group. Which probably isn't enough because it stall when gathering results. That is the workers could connect to the scheduler.
    But I'm guessing that isn't enough. And I'm trying to find out what's missing
    Martin Durant
    @martindurant
    workers will open ports with high numbers for the scheduler to be able to contact them and to contact each other. You probably want to keep ports available within the cluster (but unreachable from the outside)
    Ale 😡
    @aleperalta82_twitter
    So, the worker will open ports where the scheduler connects? The worker will be listening?
    Is there a know port range I can open?
    Martin Durant
    @martindurant
    Yes, but I don’t know offhand what the range is. I think you can specify exactly what port when starting the worker (obviously, don’t run more than one worker process per container, in that case)
    The worker kwarg seems to be just port= in the consrtuctor or --worker-port on the command line
    Ale 😡
    @aleperalta82_twitter
    Great I'll try it out!
    I'll try it out!
    Ale 😡
    @aleperalta82_twitter
    --listen-address and --contact-address did the magic
    I'm using 8786 as the port for the workers too
    Since is the only one currently open
    Is the nanny port only used internally or the scheduler should know about it?
    Adesh
    @adeshsb
    Hello everyone..!! I trying to explain machine learning model results i.e. we can say that Global Explainable AI. For that I using SHAP values (https://shap.readthedocs.io/) which is supported to Pandas Dataframe . But Have anyone tried SHAP calculation on huge dataset which is in dask dataframe format?
    Martin Durant
    @martindurant

    Is the nanny port only used internally or the scheduler should know about it?

    You may well not want a nanny at all in a container-based cluster

    Ale 😡
    @aleperalta82_twitter
    Ah! Ok! Thanks!
    Ivan Kabadzhov
    @ikabadzhov

    Hello, I was trying to set a dask + slurm cluster and do some benchmarks. However, there are some unknowns that are setting me back from understanding my findings. I am attaching the report of my call - https://rawcdn.githack.com/ikabadzhov/DaskSlurmBenchmarks/24fb57d60c0cbf992d90278a0843af27d8933e3d/SET04/D03_1149_N8_f512_p1024_r3/dask-report_0.html. Here args.Nodes=8, args.cores=32, i.e. running on 32*8 cores total over a cluster.

    1. It is unclear to me from where the number of workers is coming in the report. It is now 160, and I am not setting anything to 160, indeed everything that I set manually is a power of 2. If it is a default value, how can I change it?

    2. How to guarantee that all workers start at the same time? I tried with client.wait_for_workers(args.Nodes), but I have only 8 nodes and 160 workers, which is certainly far from desired. I tried with async -> await and so far on my toy example it works as expected, but is that a guarantee?

    3. When going through the System tab of the report, I see that there is very low CPU utilization. I entered several node, where computations were done and htop was showing me that all CPU-s are full. Is the CPU report reliable?

    Thanks in advance for your time.

    zhonglism
    @zhonglism
    Hello, I have set up a cluster of 2 nodes and each has 2 GPU through CLI, and I implemented the distributed xgboost training on this cluster, however, only one GPU utilized on each node. Does anyone know how to utilize both GPU on each node, totally 4 GPU, to train the xgboost model?
    Christoffer Mollenhoff
    @cmollen
    Hey, I've just started dabbling with Dask, and have a distributed scheduler running on my Windows machine, with 10 local workers. This works beautifully. As soon as I add workers on a different machine, I start getting "distributed.client - WARNING - Couldn't gather 1 keys, rescheduling...". I have pip-sync:ed the environments on the different hosts, so they should match perfectly. I use client.submit and dask.distributed.as_completed. Any tips for what might be going on or how to debug? Thanks in advance.
    3 replies
    Martin Durant
    @martindurant
    To the people asking here, please be informed that not that many people watch this forum any more. The new discourse page ( https://dask.discourse.group/t/how-to-use-the-dask-discourse/21 ) was recently set up for these kinds of conversations. The main advantages of the move are: splitting conversations into topics; making everything searchable (so we don’t env up repeating ourselves)
    1 reply
    Sunil Varma
    @TheSunilVarma
    for col1 in columns_1:
    for col2 in columns_2:
    df.loc[df['any_column_in_df'] == col2, col1] = 0

    What I want : I want alternative Code/Way to get this done in dask ! working in pandas.
    Problem : Can't use assign ( = ) in dask while using df.loc because of inplace is not support ?
    Explanation : I want to assign 0/value where condition meet and return dataframe ! ( not series ! )
    I Tried using mask, map_partitions with df.replace (working fine for this simple 1 column value manipulation and returning dataframe as required)...

    def replace(x: pd.DataFrame) -> pd.DataFrame:
    return x.replace(
    {'any_column_to_replace_value': [np.nan]},
    {'any_column_to_replace_value': [0]}
    )
    df = df.map_partitions(replace)

    How to do for first code ? and return dataframe.

    Thanks in advance, Please help me Dask Experts i'm new to dask and exploring it..

    Martin Durant
    @martindurant
    This is a row-wise compute, so you can use apply or map_partitions
    def process(df):
        for col1 in columns_1:
            for col2 in columns_2:
                df.loc[df['any_column_in_df'] == col2, col1] = 0
        return df
    
    df2 = df.map_partitions(process)
    3 replies
    İbrahim Halil Koyuncu
    @menendes
    Hi everyone, I try to run daskhub via helm on local k8s cluster. On the jupyterhub I can create a cluster but when I try to connect the cluster with "cluster.get_client" I got an error like this. "OSError: Timed out trying to connect to gateway://10.100.3.84:32515/dhub.ac2ac55dd9534e018d808aa21fa74ce9 after 10 s". Related ip address belongs to proxy-public service external-ip address. Also when I check the k8s environment I can see the worker pod. In the helm charts dask-gateway version set up as 0.9.0.
    Martin Durant
    @martindurant
    Sounds like you should post an issue to the repo for picking the internal versus external IP.
    2 replies
    İbrahim Halil Koyuncu
    @menendes
    Hi everyone, when I try to create a cluster(I am using daskhub over the local k8s cluster) over the jupyterhub like ss-1 in the below; I got error message like "GatewayClusterError: Cluster 'dhub.0055e9e5e66b4907bb8e19995a326dda' failed to start, see logs for more information"
    image.png
    image.png
    I hope anyone encountered this error :)
    İbrahim Halil Koyuncu
    @menendes
    when I check the controller logs for this error gateway successfully create the cluster but can not recouncilling with this cluster and then gateway delete the related cluster. Anyone have an idea for that problem ?
    image.png
    Martin Durant
    @martindurant
    There is not much traffic any more on this channel. You are welcome to try on slack (where there is a gateway-specific channel), the dask discourse ( dask.discourse.group ), or a github issue (if you think you can isolate a specific bug).
    (note that the discourse is still very new!) Also, github has “discussion”, worth searching around there.
    İbrahim Halil Koyuncu
    @menendes
    Thank you Martin :) Do you have any link for your mentioned things to join ?
    Martin Durant
    @martindurant
    (but gateway has its own github repo, of course)
    Ian
    @ionox0
    Are there any solution for the error
    WARNING:distributed.utils_perf:full garbage collections took 49% CPU time recently (threshold: 10%)
    WARNING:distributed.worker_memory:Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 5.34 GiB -- Worker memory limit: 7.50 GiB
    WARNING:distributed.utils_perf:full garbage collections took 48% CPU time recently (threshold: 10%)
    WARNING:distributed.utils_perf:full garbage collections took 48% CPU time recently (threshold: 10%)
    WARNING:distributed.worker_memory:Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 5.34 GiB -- Worker memory limit: 7.50 GiB
    2 replies