Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Aug 19 21:11

    Zac-HD on master

    Clarify docs on checkpoints in … Merge pull request #2390 from Z… (compare)

  • Aug 19 21:11
    Zac-HD closed #2388
  • Aug 19 21:11
    Zac-HD closed #2390
  • Aug 19 19:02
    dependabot[bot] edited #2331
  • Aug 19 19:00
    dependabot[bot] synchronize #2331
  • Aug 19 19:00
    dependabot[bot] edited #2331
  • Aug 19 19:00

    dependabot[bot] on pip

    Bump ipython from 7.31.1 to 7.3… (compare)

  • Aug 19 19:00
    dependabot[bot] edited #2331
  • Aug 19 18:59
    dependabot[bot] edited #2331
  • Aug 19 18:59

    dependabot[bot] on pip

    (compare)

  • Aug 19 18:59

    pquentin on master

    Bump matplotlib-inline from 0.1… Merge pull request #2402 from p… (compare)

  • Aug 19 18:59
    pquentin closed #2402
  • Aug 19 10:18

    dependabot[bot] on pip

    (compare)

  • Aug 19 10:18
    dependabot[bot] closed #2400
  • Aug 19 10:18
    dependabot[bot] labeled #2402
  • Aug 19 10:18

    dependabot[bot] on pip

    Bump matplotlib-inline from 0.1… (compare)

  • Aug 19 10:18
    dependabot[bot] opened #2402
  • Aug 17 10:29
    dependabot[bot] labeled #2401
  • Aug 17 10:29
    dependabot[bot] opened #2401
  • Aug 17 10:29

    dependabot[bot] on pip

    Bump types-cryptography from 3.… (compare)

Lura
@lura:veriny.tf
[m]
O_NONBLOCK sockets always close instantly
graingert
@graingert:matrix.org
[m]
My understanding was that calling sock.close() does a whole bunch of stuff that I don't want to wait for if I don't want to
Lura
@lura:veriny.tf
[m]
yeah and you don't wait because it's done in the background
graingert
@graingert:matrix.org
[m]
but with io_uring you can choose to wait if you want to?
Or are you saying if you submit a close and call the thing that gets 1 response or not without a syscall then you'll always get a close completion back?
Lura
@lura:veriny.tf
[m]
I am not very familiar with io_uring
presumably in a world with io_uring you would not be using raw socket.socket
graingert
@graingert:matrix.org
[m]
Oh no obviously I'd only use a trio.socket.SocketType
Lura
@lura:veriny.tf
[m]
(that is the same thing as a raw socket.socket)
graingert
@graingert:matrix.org
[m]
First, and most obviously, everything is made β€œTrio-style”: blocking methods become async methods, and the following attributes are not supported:
my understanding was that non-blocking sockets release the GIL and make a syscall when closed and so trio should have a deque of stuff to close that was pushed to here https://github.com/python-trio/trio/blob/master/trio/_socket.py#L444
And then they all get closed in the next wait for stuff call
graingert
@graingert:matrix.org
[m]
And if that was the case trio should provide an async function to wait for that to happen, especially if it were possible to consume close completions from the completion queue without issuing a syscall or releasing the GIL
Nathaniel J. Smith
@njsmith
I don't think socket close ever blocks, and there's nothing to wait for
after you close the socket the kernel might still keep the TCP state machine running for a while, but the socket object itself is still dead and gone
I guess you can use SO_LINGER to make close block until some further TCP state machine transitions have happened, but there isn't really any good reason to ever do that
graingert
@graingert:matrix.org
[m]
Right but it's not about that, it's about not making a syscall at all
Lura
@lura:veriny.tf
[m]
The answer is that there is no io_uring-compatible SocketType as sockets and io_uring are different APIs
so none of this really matters
would close on the hypothetical io_uring-based stream block and wait? it's your api to design here
Nathaniel J. Smith
@njsmith
naw, io_uring still uses regular socket fds, and you can use it to replace epoll, or even submit send/recv calls directly and let it notify you when they're done
but I think it's OK if closing a socket requires a syscall
graingert
@graingert:matrix.org
[m]
I think there should be two closes so users could skip the syscall if they choose to
Lura
@lura:veriny.tf
[m]
If you're going all-in with io_uring you might well want to use the support for anonymous files where there's no userside FD ever created
trio-bot
@trio-bot
πŸ€–β“ New python-trio question on stackoverflow: Is there a solution for asynchronous access to sqlite3 with Trio? – stackoverflow.com
richardsheridan
@richardsheridan:matrix.org
[m]
I'm pretty sure nothing exists yet but there is definitely an opportunity to make something trio native feeling using https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.set_progress_handler
I would have done it already if I had an actual use case
Lura
@lura:veriny.tf
[m]
sqlite is thread safe and releases the gil so it's easiest to just run it in a thread pool
richardsheridan
@richardsheridan:matrix.org
[m]
Ya but you can use the progress handler to poll for cancellation
graingert
@graingert:matrix.org
[m]
it's got ktls and io_uring etc
looks like it crashes if you try to close too many fds at once
it fills up the submission queue and can't close things
graingert
@graingert:matrix.org
[m]
I couldn't find any docs on what to do if you try to enqueue something and it doesn't fit - is there an io_uring chat channel?
I assume on a dynamic language you should maintain an unbound submission queue alongside the bound io_uring queue and then next time a dynamic language callback is called you make a new ring with double the size ?
Nathaniel J. Smith
@njsmith
you submit what you have and then start enqueueing the new stuff
originally this didn't work b/c there was a limit on how many operations you could have in flight at once, but I complained about it to Jens and he added a feature flag to support unbounded operations in flight :-)
graingert
@graingert:matrix.org
[m]
I thought it was bound by the queue depth?
kloop doesn't use submit - it uses SQPOLL
Nathaniel J. Smith
@njsmith
the docs might still only discuss the old semantics on in-flight-operation-limits. last I checked io_uring mostly followed a policy of documentation by "read the source/mailing list"
and using SQPOLL from python seems ... optimistic ... but I guess even then you can still block waiting for the queue to drain. you're only blocking waiting for the kernel to execute some CPU instructions, so it's not really "blocking"
graingert
@graingert:matrix.org
[m]
Ah elements in the submission queue are popped as soon as the kernel thread wakes up - not when they are handled?
What does the the kernel do if it has more completions than fit on the queue?
graingert
@graingert:matrix.org
[m]
https://github.com/torvalds/linux/blob/master/fs/io_uring.c#L2428 it goes on an overflow list and if kmalloc fails it just goes away
richardsheridan
@richardsheridan:matrix.org
[m]
just made PRs for two one-line changes to trio._threads.py, should be easy to review!
Quentin Pradet
@pquentin
βœ”οΈ
richardsheridan
@richardsheridan:matrix.org
[m]
Thanks! πŸ‘
AmericanY
@AmericanY

Hello,

Actually am connected to S3 Bucket while am trying to combine 500 CSV files into Single File. each file is around 1 GB while am trying to speed up the Process.

Below is my code, Let me know your suggestions please:

import trio
import boto3
import pandas as pd
from functools import partial


AWS_ID = 'Hidden'
AWS_SECRET = 'Hidden'
Bucket_Name = 'Hidden'

limiter = trio.CapacityLimiter(10)


async def read_object(bucket, object_csv, sender):
    async with limiter, sender:
        print(f'Reading {object_csv}')

        test = bucket.Object(object_csv)
        test = test.get()['Body']
        df = await trio.to_thread.run_sync(pd.read_csv, test)
        print(df)


async def connect_bucket():
    s3 = boto3.resource(
        service_name='s3',
        aws_access_key_id=AWS_ID,
        aws_secret_access_key=AWS_SECRET,
    )
    bucket = s3.Bucket(Bucket_Name)
    allfiles = [i.key for i in bucket.objects.all()]
    print(f'Collected: {len(allfiles)}')
    return bucket, allfiles


async def main():
    async with trio.open_nursery() as nurse:
        bucket, allfiles = await connect_bucket()
        sender, receiver = trio.open_memory_channel(0)
        nurse.start_soon(rec, receiver)
        async with sender:
            for f in allfiles:
                nurse.start_soon(read_object, bucket, f, sender.clone())


async def rec(receiver):
    async with receiver:
        async for val in receiver:
            pass


if __name__ == "__main__":
    try:
        trio.run(main)
    except KeyboardInterrupt:
        exit('Job Cancelled!')
AmericanY
@AmericanY
:S