Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Dec 07 2018 13:58
    @tobz banned @SoniEx2
  • Aug 01 2018 23:32

    carllerche on gh-pages

    Documentation for tokio-rs/toki… (compare)

  • Aug 01 2018 23:29

    carllerche on master

    Deprecate tokio-proto. (#209) (compare)

  • Aug 01 2018 23:29
    carllerche closed #209
  • Aug 01 2018 19:10
    tobz commented #207
  • Aug 01 2018 19:08
    tobz synchronize #209
  • Aug 01 2018 19:00
    tobz commented #209
  • Aug 01 2018 19:00
    tobz opened #209
  • Jul 26 2018 14:13
    upsuper opened #208
  • Jul 20 2018 03:16
    driftluo edited #207
  • Jul 20 2018 03:15
    driftluo opened #207
  • Jun 15 2018 13:04
    ignatenkobrain opened #206
  • Jun 12 2018 01:41
    rphmeier commented #205
  • Jun 12 2018 01:41
    rphmeier edited #205
  • Jun 12 2018 01:40
    rphmeier commented #205
  • Jun 12 2018 01:40
    rphmeier closed #205
  • Jun 12 2018 01:40
    rphmeier commented #205
  • Jun 12 2018 01:40
    rphmeier commented #205
  • Jun 12 2018 01:36
    rphmeier opened #205
  • Jun 01 2018 13:55
    flosse commented #202
ugurcan377
@ugurcan377

Hello, I've been trying to use a reciever of a channel inside of select and getting this error

no method named `poll` found for struct `std::pin::Pin<&mut futures::channel::mpsc::UnboundedReceiver<()>>` in the current scope
method not found in `std::pin::Pin<&mut futures::channel::mpsc::UnboundedReceiver<()>>`

I am probably doing something wrong here but couldn't find out what.

let mut lock_rx = data.add_handle_to_queue(&req.key, &new_handle);
    let fu = async move {
        loop {
            let lock_expiry = time::delay_for(std::time::Duration::from_millis(
                data.get_lock_remaining_time(&req.key)as u64));
            tokio::select! {
                _ = lock_expiry => {println!("Lock expired")},
                _ = lock_rx => {},
            }

Thank you for any assistance.

3 replies
Chris Allen
@bitemyapp
Hey y'all, I've been googling around to suss out best practice for my use-case but I've not found anything exactly fitting in the answers and docs so far. I want to make a background worker task that loops indefinitely to run scheduled (and occasionally, perhaps ad-hoc work as well) jobs. Should I use a blocking tokio task or a regular thread spawn? I have some idea of how the tokio runtime works but I'm not sure if there's some strong reason I shouldn't use a blocking task spawn I'm missing.
Chris Allen
@bitemyapp
Actually I'm pretty sure I want the timing facilities in core or tokio-timer for waking up at a scheduled point in time, so I'll go ahead with the blocking spawn.
inzanez
@inzanez
How would one obtain the Context to call poll_read on a Bufstream? I'm trying to read from a Bufstream<TcpStream> in a non-blocking way,...
Leonhard Weber
@lweberk

Hi everyone,

is there any way to spawn an async task making it thread-sticky or cheat my way around having a non-Send object in the task?

I'm currently plain out blocking on the task as a work around, which is unfortunately anything but ideal since the task is heavy on IO. More specifically a long running sub-process.

matrixbot
@matrixbot
tanriol Leonhard Weber (Gitter): In Tokio 0.2 that could be done by using LocalSet, IIRC. Not sure about 0.3, however.
tanriol inzanez (Gitter): Why are you trying to do that manually?
Leonhard Weber
@lweberk
Thank you! LocalSet in a blocking task/thread did beautifully;
TokioRuntimeHandle::current().spawn_blocking(move || {
    let handle = TokioRuntimeHandle::current();
    let local  = TokioLocalSet::new();

    handle.block_on(
        local.run_until(
            async { my_async_task_fn }
        )
    );
});
inzanez
@inzanez
tanriol: Well,...assuming I didn't want to split the Stream, how would I be reading non-blocking? I know it's a sync concept, but in my case I need to send a request, retrieve the answer (until there's nothing in the buffer anymore) and continue. Currently, I use time::timeout::timeout to set a timeout on read_buf. Otherwise, the last read_buf will block forever.
matrixbot
@matrixbot
tanriol inzanez (Gitter): Which read_buf are you talking about? AsyncReadExt::read_buf?
inzanez
@inzanez
tanriol: indeed, the very same. Using it on BufStream<TcpStream>
matrixbot
@matrixbot
tanriol I'd probably just use timeout as you do. Why not?
inzanez
@inzanez
tanriol: Yeah, it works alright, I just wondered if there was a more,...commonly accept way of doing it :-)
Andrew Cann
@canndrew
Hi people. I'm upgrading some ancient tokio-0.1 code of mine to tokio-0.3, but I notice PollEvented isn't exposed anymore...
It's still there in the code, it's just private now. Any chance it can be exposed? (maybe behind a mio feature flag or something?)
I'm using it to do non-blocking stdio since tokio's stdio types are actually blocking. In my code this wasn't a problem since I could just use RawFds for stdio along with mio's EventedFd (now SourceFd) then wrap that up in a PollEvented.
matrixbot
@matrixbot
tanriol Andrew Cann (Gitter): This channel is deprecated and really unpopular, you may want to ask in the tokio discord instead.
Andrew Cann
@canndrew
ah, thanks. I'll ask there instead then...
Michael Zedeler
@mzedeler_twitter

I am having a hard time getting a struct passed into an async handler for hyper (which uses tokio). It seems that Arc should do the trick, but honestly, I am so new to rust that I don't quite get why passing a reference in doesn't work.

The basic setup is that I have an async handler that on every call should take a reference to the struct (called model here) and use it to process the calls. The model struct contains a leveldb instance which is said to be thread safe, so it can be passed around read only.

The handler setup looks like this:

pub async fn serve(config: config::Config, model: Model) {
    pretty_env_logger::init();
    info!("initialized");

    let model_arc = Arc::new(model);

    let make_svc = make_service_fn(|_conn| async {
        let model_clone = model_arc.clone();
        // service_fn converts our function into a `Service`
        Ok::<_, Infallible>(service_fn(move |request| {
            let context = Context {
                model: model_clone,
                request
            };
            main_handler(context)
        }))
    });

    let server = Server::bind(&config.server_socket).serve(make_svc);

    // Run this server for... forever!
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

And I get complaints from the borrow checker:

error[E0507]: cannot move out of `model_clone`, a captured variable in an `FnMut` closure
  --> src/server.rs:38:24
   |
34 |         let model_clone = model_arc.clone();
   |             ----------- captured outer variable
...
38 |                 model: model_clone,
   |                        ^^^^^^^^^^^ move occurs because `model_clone` has type `std::sync::Arc<model::Model>`, which does not implement the `Copy` trait

error[E0597]: `model_arc` does not live long enough
  --> src/server.rs:34:27
   |
33 |       let make_svc = make_service_fn(|_conn| async {
   |  ____________________________________-------_-
   | |                                    |
   | |                                    value captured here
34 | |         let model_clone = model_arc.clone();
   | |                           ^^^^^^^^^ borrowed value does not live long enough
35 | |         // service_fn converts our function into a `Service`
36 | |         Ok::<_, Infallible>(service_fn(move |request| {
...  |
42 | |         }))
43 | |     });
   | |_____- returning this value requires that `model_arc` is borrowed for `'static`
...
51 |   }
   |   - `model_arc` dropped here while still borrowed

error: aborting due to 2 previous errors; 1 warning emitted

I've tried a bunch of things and I'm getting desperate. Any help will be appreciated.

Michael Zedeler
@mzedeler_twitter
I just noticed the refernce to discord and will try there.
matrixbot
@matrixbot
tanriol Michael Zedeler (Gitter): If you clone something for a closure, you need to clone outside the closure, not inside.
matrixbot
@matrixbot
tanriol octave99 (Gitter): You may want to ask on Discord, this room is almost abandoned and I don't know what to answer :-(
tanriol
@tanriol:matrix.org
[m]
(test)
zz
@zzhengzhuo

I'd like to create and maintain an infinite delayqueue and want to reinsert the item when the task inplemented by the item runed failedly.
this is my code:

#[async_trait(?Send)]
pub trait Task{
    async fn run(&self) -> Result<(),ServiceError>;
}

pub struct InfiniteDelayQueue {
    queue:DelayQueue<Box::<dyn Task + 'static>>
}

impl InfiniteDelayQueue  {
    pub fn insert<T: 'static + Task >(&mut self,task:Box<T>,expire:u64) -> Result<(),ServiceError>{
        self.queue.insert(task, Duration::from_secs(expire));
        Ok(())
    }
    fn new() -> Self{
        InfiniteDelayQueue{
            queue:DelayQueue::new()
        }
    }

    pub async fn run(&mut self){
        futures::stream::poll_fn(|cx|{
            match self.queue.poll_expired(cx){
                Poll::Pending => {
                    Poll::Pending
                },
                Poll::Ready(Some(expired)) =>{
                    let expired = expired.unwrap();
                    Poll::Ready(Some(expired.into_inner()))
                },
                Poll::Ready(None) =>{
                    Poll::Pending
                }
            }
        }).for_each(|task| {
            async move {
                match task.run().await{
                    Err(e) => {
                        self.insert(task, 10);
                    },
                    Ok(_) =>{}
                };
            }
        }).await; 

    }
}

But build error:

error[E0277]: the size for values of type `dyn delay_queue::Task` cannot be known at compilation time
  --> src\delay_queue.rs:52:37
   |
52 |                         self.insert(task, 10);
   |                                     ^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `std::marker::Sized` is not implemented for `dyn delay_queue::Task`

error: aborting due to previous error; 13 warnings emitted

I have tried for some hours but failed for kinds of errros like the ownship.Do you have any advice?Thanks for any help!

Adam Chalmers
@adam_chal_twitter

Hello! I have a question about the philosophy of tokio. I have an immutable database pool struct and I want to share it across many tasks. Which is the more idiomatic tokio way?

A. Put the Pool into an Arc and give each task an Arc<Pool>
B. Make a long-running database pool task and let other tasks send messages to it via channels

I'm agnostic about either approach so I'm just curious which is more conventional Tokio.

3 replies
madmaxio
@madmaxio
IMHO best idea is to pass messages via sync module from tokio.
Florentin DUBOIS
@FlorentinDUBOIS
Hello folks, I am looking for the structure DelayQueue in tokio@1.0.1 (previously https://docs.rs/tokio/0.2.24/tokio/time/delay_queue/struct.DelayQueue.html), it seems to be replace by a new implementation or discard. Could someone point me to the new mean to do ?
1 reply
Simon Paitrault
@Freyskeyd

Hello !

I'm trying to implement an rfc which use TCP to exchange some packet and then if some packet arrives accept TLS.

I can't fin da way to do this, I listen for TCP connection, I split the stream into a read/write to interact and read packet, but when I want to upgrade the stream to TLS I don't have access to the stream. I dont know how to do it :/

madmaxio
@madmaxio
Try ask on discord https://discord.gg/tokio
habnabit
@habnabit:matrix.org
[m]
discord? bleh
Vitaly Shukela
@vi

"This trait is sealed and is intended to be opaque. The details of the trait will change. Stabilization is pending enhancements to the Rust language." (from the documentation of tokio::net::ToSocketAddrs)

Shall it link to some Rust issue or RFC where such enhancements are requested?

Vitaly Shukela
@vi
What is difference between tokio::select! and futures::select!?
Should I in general expect things from futures crate to work for Tokio (except of obviously incompatible things like AsyncRead)?
madmaxio
@madmaxio
Better to ask on discord. But i guess better to use tokio one because it is related to tokio internal logic.
Vitaly Shukela
@vi
It is hard to access discord without disclosing a non-throwaway email address. There is no Github login.
jasl8r
@jasl8r
Is there a proper way to integrate with unit testing (asserts) that will be properly caught when testing code that has been tokio::spawn'd? panicks just log that they happened...
Diggory Blake
@Diggsey
is there a way to measure load under tokio? eg. if the executor threads are spending 90% of their time idle, then the load would be 10%
I would then use that to decide whether the application should try to pick up more work, or (if the load is too high) to focus on the existing workload
Abdelmonem Mostafa
@abdelmonem17

hello everyone I am using project called toshi from crate.io and it uses tokio which works well under windows 10 but when I compiled and run it under VM 'Ubuntu 20.04.2 LTS' always after few seconds the program stop responses and gives me a panic like the following

thread 'tokio-runtime-worker' panicked at 'assertion failed: now >= lock.elapsed', /home/my-user/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.2.0/src/time/driver/mod.rs:260:9

this error also exist in the old version that uses old version of tokio and the output of the panic is:

thread 'tokio-runtime-worker' panicked at 'elapsed=71666; when=71665', /home/my-user/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.25/src/time/wheel/mod.rs:223:9
note: run with RUST_BACKTRACE=1 environment variable to display a backtrace

Simon Paitrault
@Freyskeyd

Hello,

I'm trying to create a struct that can hold a both a tokio::io::WriteHalf<tokio_rustls::server::TlsStream<tokio::net::TcpStream>> and a tokio::io::WriteHalf<tokio::net::TcpStream> but I can't find any wait to do it...

Do you have any advice around this kind of issue?
madmaxio
@madmaxio
Better to ask this on discord
Kelly Thomas Kline
@kellytk
@habnabit:matrix.org @vi as you can see, the Tokio team requires that chat be through a proprietary and predatory service, Discord. This is obviously harmful to the ecosystem. Would you or any others like to make a more accessible chat for Tokio users? You're welcome to PM me
For example, I'd like to discuss async fn run<F>(self, handler: F) -> Result<(), Error> -> async fn run<F, Fut>(self, handler: F) -> Result<(), Error> from https://tokio.rs/blog/2021-05-14-inventing-the-service-trait# however I currently have no way to
Vitaly Shukela
@vi
@kellytk Maybe Github Discussions would also be fine for that?
Kelly Thomas Kline
@kellytk
It's different than real-time and yet another proprietary service ran by M$ and no doubt harvesting user data with reckless abandon
Kelly Thomas Kline
@kellytk
What I'd prefer is a Mattermost or Zulip instance. I'd offer to sponsor the VPS fee and perform the minimal sysadmin work, FWIW
Vitaly Shukela
@vi
Which minimal Windows version does Tokio support?
madmaxio
@madmaxio
Better ask on discord, but it is standard rust dependencies