Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • May 09 14:44
    Ricardicus edited #585
  • May 09 14:43
    Ricardicus opened #585
  • May 09 10:59
    ignus2 opened #584
  • Mar 24 14:15
    wirew0rm closed #580
  • Mar 21 11:06
    victimsnino opened #583
  • Mar 21 11:04
    victimsnino synchronize #563
  • Mar 17 07:09
    wuyuanyi135 commented #579
  • Mar 16 09:27
    RalphSteinhagen opened #582
  • Mar 15 22:15
    kirkshoop commented #579
  • Mar 15 07:35
    wuyuanyi135 commented #579
  • Mar 15 07:17
    wuyuanyi135 commented #579
  • Mar 14 21:14
    RalphSteinhagen opened #581
  • Mar 14 18:18
    wirew0rm opened #580
  • Mar 14 17:37
    wuyuanyi135 commented #579
  • Mar 14 17:24
    wuyuanyi135 opened #579
  • Mar 14 16:25
    kirkshoop commented #572
  • Mar 14 16:17
    RalphSteinhagen commented #572
  • Mar 14 07:33
    guhwanbae commented #572
  • Mar 13 22:48

    kirkshoop on main

    Add missing CMake variables for… (compare)

  • Mar 13 22:48
    kirkshoop closed #578
gnaggnoyil
@gnaggnoyil
Hello! I saw this stackoverflow answer(https://stackoverflow.com/a/30294037/2348830) saying "The worker guarantees that each schedulable completes before the next schedulable is started".
However, it seems that implementation of struct new_thread::new_worker in rxcpp/schedulers/rx-newthread.hpp doesn't ensure that previous schedulable must be finished before the next schedulable call. Is there anything I misunderstoood?
IvaOi
@Carlos01689293_twitter

I'm trying to make a thing that every second makes a request to the api and gets an answer and returns it to me and adds it to the buffer

For queries I use cpr, for everything else I use rxcpp

I don't know how to write it, but here is my attempt (I hope you can help me)

#include <cpr/cpr.h>

#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"

namespace Rx {
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::operators;
    using namespace rxcpp::util;
}
using namespace Rx;

#include <iostream>

int main() {

    std::vector<std::string> allData;

    auto getData = [](std::string URL) {
        return observable<>::defer([=]() {


            return cpr::Get(cpr::Url{ URL }) |
                map([](cpr::Response r) {
                return r.text;
                    }) |
                repeat() |
                timer<>(std::chrono::seconds(1));
            }); // do I need subscribe here?
    };

    // auto newData = getData("http//example.com//api")
    // allData.push(newData)

    return 0;
}

any idea how to make this?

Juan M Gómez
@jmgomez

Hey guys, is this the correct to use subscriptions?
https://stackoverflow.com/questions/48724201/rxcpp-raii-observable-subscription

I read somewhere else that you dont need to unsubscribe from the observable but I think it isn't the case. Which makes me wonder why there is no destructor calling to unsubscribe on the subscription_base

Kirk Shoop
@kirkshoop
I answered on SO
Kirk Shoop
@kirkshoop
@gnaggnoyil 😂 yeah that does seem off. I should have said that an instance of worker guarantees.. When you make a worker, all copies of that worker satisfy the guarantee, making multiple workers will not necessarily create a sequential order across the two - that depends on the scheduler - each of the workers will maintain the guarantee regardless of the scheduler.
Juan M Gómez
@jmgomez
Great thank you!
Kirk Shoop
@kirkshoop
@Carlos01689293_twitter yes, you must subscribe to the observable returned from the lambda. Since you would like to store the results as they arrive, you will need to use map() on the result of the lambda and do the push_back in map(). The use of timer in the lambda will not cause the request to repeat once per second. One way to do that is to use interval() to get one item per second and then flat_map() that. Inside the flat_map return cpr::Get() | map(). This does not use repeat() or timer() and will make one request per second.
TC Wang
@tcw165
@kirkshoop Do you have any draft road map to iterate RxCpp with more features? I think people here including me are happy to help :)
Kirk Shoop
@kirkshoop
I don’t have a roadmap. I have learned a lot working on the standardization of the rx pattern, that could be applied to improve compile times and with some more invasive work rxcpp could reduce some complexity. I have not had time to work on rxcpp in a while. If you have things that you would like to do I would be happy to coordinate with you. It would be great to have someone actively working on rxcpp
TC Wang
@tcw165
image.png
Got a noobie question: What is this pattern that a function's return type is auto and -> whatever at the same time? What is the return type eventually? What’s the benefit of using this pattern?
Kirk Shoop
@kirkshoop
This pattern allows the return type to be expressed using argument names in a decltype() expression.
Stylistically, this pattern puts the name of the function front and center and pushes the, sometimes complex and long, result type after the args.
This pattern will fully specify the function return type as the type that follows ->. auto just becomes a placeholder that makes language rules work, it is purely syntactic. auto does not do any work to deduce the return type in this pattern.
1 reply
Aleksey Loginov
@victimsnino

@kirkshoop ,

I don’t have a roadmap. I have learned a lot working on the standardization of the rx pattern, that could be applied to improve compile times and with some more invasive work rxcpp could reduce some complexity. I have not had time to work on rxcpp in a while. If you have things that you would like to do I would be happy to coordinate with you. It would be great to have someone actively working on rxcpp

Honestly, I believe you need someone to continue work on rxcpp. At least, to merge pull requests, check issues list, prepare new releases for package managers and etc. As active user of rxcpp for my pet-projects and my main work I am really worry about "staleness" of awesome rxcpp repository =C

Kirk Shoop
@kirkshoop
Thank you for the feedback. I am putting some time into rxcpp again. I hope we can keep rxcpp vital and active.
Aleksey Loginov
@victimsnino
@kirkshoop, I'm going to prepare a fix to reduce the number of copies/moves inside rxcpp while value transferring over the stream. What do I need to pay attention to during this one? I understand, that observable should be ready to subscribe again at any time, so, we must keep inside some info, but probably something else?
And can I rely on your approve for this changes? I'm going to add "const ref" for "readonly" operators (for example, filter) or "universal ref"+forward where it is possible.
Kirk Shoop
@kirkshoop
That sounds like a good task. The key thing is to be sure that scopes don’t exit while a ref is in use and it should be fine.
Aleksey Loginov
@victimsnino
@kirkshoop , i've prepared fix for reducing amount of copies. Count of copies reduced significantly. Please, check it.
ReactiveX/RxCpp#562
Kirk Shoop
@kirkshoop
Thank you so much Aleksey!
@/all FYI the master branch has been renamed to main.
EF
@pminiszewski
Hello! Currently learning rxcpp and ran into issue on how to solve this usecase: How can I conditionally synchronize operator with a subject? The operator should wait until something gets send though the subject. See this gist for more context: https://gist.github.com/pminiszewski/cd6d388b356ade1ba097bfc14f71984a . Basically I want my GUI app to display login popup screen whenever user does something but hasn't logged in yet. After Logging in the app should complete the action requested in the first place.
Aleksey Loginov
@victimsnino
@pminiszewski, you try to check with_latest_from operator for this task. For example, main flow of actions/requests can get value from authentication flow via "with_latest_from". if no any values -> it will not be blocked (in terms of mutex), but would not send values next till values in authentication flow. For example,
rxcpp::subject<requests> main_flow;
rxcpp::subject<bool> is_logged_in{};
main_flow.with_latest_from(is_logged_in.get_observable()).subscribe([](std::tuple<requests, bool> pack){
// final subscriber
});

main_flow.get_subscriber().on_next(request{}); // no values in final subscriber
.....
is_logged_in.get_subscriber().on_next(true); // final subscriber obtains values
EF
@pminiszewski
Thank you @victimsnino, I think I understand the idea but how could I make this step conditional? I mean, I want to trigger login flow only once so user doesn't have to input login/password every time
Aleksey Loginov
@victimsnino

@pminiszewski with_latest_from by default uses the last available value. So, when you logged in once -> value is stored inside. Conditional should be not this step, but step with requesting of login/pass. Not sure about your original idea, but I would like it in something like this...

rxcpp::observable<request> requests; // some stream of requests

auto multicasted = requests.publish();
auto credentials = multicasted
   .take(1)
   .flat_map([](const auto&){
         return get_observable_which_requests_and_returns_credentials();
   });

multicasted
   .ref_count()
   .with_latest_from(credentials)
   .subscribe([](std::tuple<request, credentials>){});

What we do? we split original flow into 2. First one waits for any first request and starts process of requesting credentials. then we pass it to same flow but via with_latest_from. So, we requested credentials only once on any first request and cached value inside.

EF
@pminiszewski
Ok, looks like I have to rethink my approach to authentication. Definitely going to use with_latest_from. Thank you for your help @victimsnino !
Ryan Olson
@ryanolson
I was playing with rxcpp and tried to make an observable<std::unique_ptr<int>> and was able to subscribe to it using an empty subscriber, but all attempts to create an observer or subscriber with an on_next have failed due to move semantics not propagating to the observer/subscriber.
    auto observable = rxcpp::observable<>::create<std::unique_ptr<int>>([](rxcpp::subscriber<std::unique_ptr<int>> s) {
        s.on_next(std::make_unique<int>(1));
        s.on_next(std::make_unique<int>(2));
        s.on_completed();
    }).tap([](const type_t& int_ptr) { LOG(INFO) << *int_ptr; });

    observable.subscribe(); // works
    // observable.subscribe([](std::unique_ptr<int>) { });  // fails
    // observable.subscribe([](std::unique_ptr<int>&&) { });  // fails
Aleksey Loginov
@victimsnino
You can create observer with [](const std::unique_ptr<int>&){} lambda. It doesn't give you ability to "steal" ownership over object, but for me it is "feature, not bug". Let's imagine you broadcast unique_ptr via publish() to several subscribers. If first one get value by "move", then rest part of observers would obtain "empty ptr".
Kirk Shoop
@kirkshoop
Yes, rxcpp is dependent on copyable types like shared_ptr
FYI - my parents need care and I am working through that transition.
Erric765
@Erric765
Hi, I am a bit new to RxCpp. But I am trying to figure out whether RxCpp be used to queue events to be consumed within an event loop?
Eg: Various background threads enqueue events every so often to an Observable
The main thread runs an eventloop (like UI rendering loop) and consumes these events if available within the loop
The key here is that the shared observable need to support publishing and consuming events from multiple threads
Michal Fapso
@michalfapso
@kirkshoop, I'm praying for you and your parents. If there is anything more we could help you with remotely, please let us know.
wuyuanyi135
@wuyuanyi135
Hello! Thanks for creating this great project. I have been using the reactive pattern in various projects and now I decide to also adopt it to my embedded MCU platform (ESP32 using esp-idf framework). I plan to use this library to implement asynchronous functions for some deferred I/O operations, e.g., emit the temperature measurement a while after convert command is issued. I wonder - if anyone has researched this - regarding the runtime overhead of using rxcpp in an embedded system? Is there any guideline that we can follow to avoid the performance issues?
wuyuanyi135
@wuyuanyi135
Hello, what is the default scheduler (coordinator) supplied to the operators if unspecified? If an upstream operator has been assigned with a scheduler (coordinator), do the following operators without scheduler explicitly specified also use the supplied one or fall back to the default?
Kirk Shoop
@kirkshoop
I have not used rxcpp on embedded devices. It is dependent on shared_ptr allocations and this would be the first hurdle on an embedded device.

In almost all cases the default coordinator is current_thread. This is a trampoline scheduler that protects the stack of the current thread by queueing work until the current task unwinds the stack.

The following operators use the default.

wuyuanyi135
@wuyuanyi135
Thank you! I have tried to use rxcpp on esp32 with run loop as an alternative to compose async io operations. It has been smooth.
wuyuanyi135
@wuyuanyi135
I have a general rx question. I want to iterate through a vector in serial. i.e., the next element won't be emitted until the previous one has completed. I achieved the goal using the following code but it looks awkward somehow. Can someone suggest a more elegant way to compose this? Thanks
  std::vector<int> data{1, 2, 3, 4, 5};
  sources::iterate(data)
      .concat_map([](int v) {
        return sources::just(v)
            .tap([](int v) {
              std::cout << "Before delay: " << v << "\n";
            })
            .delay(std::chrono::seconds(1));
      })
      .subscribe([](int v) {
        std::cout << "Next " << v << "\n";
      });
wuyuanyi135
@wuyuanyi135
If I use identity_same_worker for all rx operations, does it mean there is no risk of racing condition and there is no need to use serialize/observe_on coordinator? Sorry, after reading the development manual, I am still a little confused about the use case of various coordinators. Can you please give an example when identity fails and I have to use serialize coordinator?
TC Wang
@tcw165
Does anyone know how the chart is generated?
image.png
Aleksey Loginov
@victimsnino
You need to set HAVE_DOT = YES option and have installed graphviz. Without this option graphs generated via default tools and looks much simpler. For example, without option:
image.png
And with option + graphviz:
image.png
TC Wang
@tcw165
👍
TC Wang
@tcw165

https://www.youtube.com/watch?v=FcQURwM806o

Referring to this presentation, where is the code that makes a different instance lifetime management than the compiler?

image.png
I’m referring to “lifetime” at 32:32.
TC Wang
@tcw165

Hmm, is this subscribe a blocking wait?

    // print result
    lines |
        subscribe<string>(println(cout));

In RxJava/Kotlin, there's a CompositeSubscription as a group container to manage multiple subscriptions. Is there an equivalent thing in RxCpp?

1 reply
TC Wang
@tcw165