Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 07:06
    zchunhai opened #1553
  • Jun 18 16:46
    minrk commented #1547
  • Jun 18 16:41
    minrk opened #1552
  • Jun 18 16:41

    minrk on can-connect-aain

    use SIGALRM for test timeout s… (compare)

  • Jun 18 16:23
    gramhagen commented #1547
  • Jun 18 16:19
    minrk commented #1547
  • Jun 18 14:33
    gramhagen commented #1547
  • Jun 18 14:16
    kloczek commented #1550
  • Jun 18 14:14
    kloczek commented #1550
  • Jun 18 13:58
    minrk commented #1547
  • Jun 18 13:56
    minrk edited #1551
  • Jun 18 13:56
    minrk opened #1551
  • Jun 18 13:48
    minrk commented #1547
  • Jun 18 13:19
    gramhagen commented #1547
  • Jun 18 13:13
    gramhagen commented #1547
  • Jun 18 12:22
    minrk commented #1547
  • Jun 18 11:41

    minrk on main

    remove optional JSON imports fr… Merge pull request #1549 from m… (compare)

  • Jun 18 11:41
    minrk closed #1549
  • Jun 18 11:41
    minrk commented #1550
  • Jun 18 11:24
    fengwang commented #1510
Min RK
@minrk
ah, ok. You're using ZMQ_STREAM?
Johan Mabille
@JohanMabille
yes, for communicating with debugpy
sorry I realize my first message was unclear
Min RK
@minrk
ok, so this is ~expected behavior for sending a message to an unavailable routing id
Johan Mabille
@JohanMabille
yes, and thanks for suggesting to check these flags, I would not have found the bug otherwise
I just reset the routing id after connecting again, works like a charm!
thanks again!
Min RK
@minrk
:tada:
sivasai.sriramdas
@itsmesrds_twitter
Hello everyone, I'm exploring zmq for one my use case. I find zmq broker titanic request-response patter will suits for my requirement. but I see there is a bottle neck at broker, like how will handle multiple parallel request ? does any one have any idea on this ? its more common in request-resp broker patters.
please help me in understanding.
sivasai.sriramdas
@itsmesrds_twitter
does async with req/resp pattern with dealer/router gives the data ordering ?
Allie
@cite-reader
The example broker for Titanic in the guide keeps a one-to-one relationship between files on disk and saved messages and fully processes one incoming message before moving on to the next; that's simple to implement, but it's a relatively low performance implementation of the concept. That might be OK for your message volumes, you'd have to measure, but I wouldn't want to trust this exact code in production for other reasons: the IO does no error checking and neglects to call fsync or similar, so an accepted message can easily be lost, or in the event of a crash the files it leaves behind may be in some indeterminate, garbage state that may cause it to send arbitrary nonsense on to peers. Even worse, the example uses the zmsg_save function from czmq to write the message to the file, which explicitly is not stable across library versions, preventing easy upgrades. Definitely do the work to define a stable message serialization format, and pull in an actual database to do the actual saving. (A simple on disk key-value store is likely fine, you just want it to treat your valuable data with the care it deserves.) At that point there's a variety of ways to trade off throughput for latency, but you do have to measure your implementation on your hardware.
Allie
@cite-reader
Your other question is easier to answer: the messages will definitely not be reordered, and going asynchronous doesn't change that. Provided, of course, that you're not trying to access a single socket from multiple coroutines concurrently. That isn't supported, don't do that.
Somesh Koli
@someshkoli1_gitlab
Hey there, I'm using zmq with tornado ioloops for communication btw two hosts/proc in a DAG. The sender and recv both are a pair of socket, sender - publisher/pusher + REP and recv - subscriber/puller + REQ. All these sockets are are used with ZMQ streams and are handled via callbacks. Now the problem is, I need to somehow collect data from N parents(one from each) and then pass it to the next node.
This looks somehting like this
image.png
What is the best possible way to achieve this ?
Min RK
@minrk
The pipeline pattern in the guide may be a good fit. I'd recommend against using PAIR, REQ, or REP in any production setup. Instead, I'd always replace REQ and PAIR with DEALER and REP with ROUTER.
In your diagram, if the 'collector' needs to initiate collection, ROUTER-DEALER makes sense. If messages just flow from parents to collectors, then that's PUSH-PULL.
Somesh Koli
@someshkoli1_gitlab
it perform some task after collecting one data packets from each parents and then again send forward its not just a tunnel for incomming data
sivasai.sriramdas
@itsmesrds_twitter
image.png
@cite-reader yeah, true. Im replacing files storage with postgres to store the data
above is the one I'm trying to achieve with broker titanic pattern. client will not wait for enter message to get processed. client will just send the message to broker gives the ack as reply and it stores all the data in memory buffer and pushes to titanic worker which persist the data to postgres.
titanic client again send the messag to actual with reference id written to postgres.
worker will read the req id from postgres and write to kafka topic.
sivasai.sriramdas
@itsmesrds_twitter
Here kafka topic is running on different machine. we are try to collect high frequency data from multiple clients asap and eventually tries to process on bigger machine after collecting.
any other suggestions or patterns on this will be more helpful. also data freq is at 10milliseconds.
sivasai.sriramdas
@itsmesrds_twitter
is there any built-in core ZeroMQ patterns, provides acknowledgement by default or with any configurations ? Thanks
Allie
@cite-reader
No, ZMQ treats acknowledgements as an application-level concern and requires you to build it into your own protocol.
Allie
@cite-reader
Is that 10ms period global, or is that per client?
sivasai.sriramdas
@itsmesrds_twitter
@cite-reader its at per client level.
Allie
@cite-reader

Right, okay. That's an eyebrow-raising sample rate.

I'm going to suggest inverting your current flow between Postgres and Kafka. When using both, it's really unusual to hit Postgres first: its APIs are poorly suited to streaming workloads, and Kafka is highly tuned for persisting a bunch of messages quickly in ways Postgres's design priorities make difficult. You can then use a Kafka consumer application to put the events into a more structured database in larger chunks; hopefully large enough to make up for the work that needs to be done to write indices and whatever else.

Also, I don't know how many clients you have or what these events look like, but under modest assumptions at 100 Hz even a single client will need large amounts of storage relatively quickly. Depending on the shape of your events and how long you want to keep them, it may be beneficial to evaluate column stores sooner rather than later for better compression.

sivasai.sriramdas
@itsmesrds_twitter
true, kafka will be right one for this scenario. But because of resource constraint, we planned to use zeromq to put the events in postgres. like we will be using max of 2vcpus for broker and titanic and minimal ram to store 1 to 10 min of data as inmemory buffer and may be postgres storage for one day..
as soon as we move the events from postgres to kafka, we delete the events in postgres. all this is a temporary store.
Thanks for your suggestion. @cite-reader
Tushar Kolhe
@hyperparameters
is there any way to know on PUSH/PULL socket (the one that I have bind) when a new socket gets connected to it?
Allie
@cite-reader
The socket monitor API can give you something like that, with some caveats.
Somesh Koli
@someshkoli1_gitlab
Does the connect method connect with out verifying if the host is valid or not ?
Allie
@cite-reader
Basically yes. If ZMQ can't establish the underlying transport connection to a peer, it continuously retries without reporting an error because maybe it will show up later.
Somesh Koli
@someshkoli1_gitlab
And is there any way we can set timeout to that ? usig sockopts ? like in our usecase we need to know wether a the host is up or not atleast after certain period of time so that we can perform further actions
Allie
@cite-reader
Not exactly. If you're using the request-response pattern you might want to time out on seeing a message from the peer rather than specifically trying to time out the connection phase.
Ashok Kumar Karasala
@ashokchowdary863
Hi, Facing crash with the XPUB,XSUB proxy when I add more subscriptions.
Is that expected behavior??
Do I have to unsubscribe to release memory?
Ashok Kumar Karasala
@ashokchowdary863
socket.close() doesn't seem to release memory.
Eoghan O'Connell
@PinkShnack

Hi all! I have what is (perhaps) a simple question. I am a complete newbie when it comes to sockets etc., so bear with me.
TLDR: How fast can a message be sent and received by the REP/REQ pattern?

Context:
I have a REQ socket that is asking for some data from a REP socket and vice versa. It does this in several send/recv steps. The client is currently simulating a real-time application. The most common messages being sent (after some initialisation) are some arrays of floating point numbers (of about length 50).
By using line-profiler on a test function, I can see that the socket.recv() method in the REP server is taking about 90-95% of the time of the entire process. This equates to ~ 4 ms. I have a "feeling" that this could be faster, seeing as it is a local operation. I have tried to speed this up with different approaches, but haven't come across anything that might help. These approaches:

  • Poller doesn't seem to have any effect (in fact, it seems to slow things down a little bit)
  • NOBLOCK doesn't have any effect.
  • NO_DELAY is set by zmq by default, is this true?

I am hoping for a transfer speed per message of about 0.5-1 ms. Is this possible with REP/REQ, or with another pattern? Or has it something to do with my PC aswell?

Also:
The REQ client function is using a different thread. I read somewhere that is isn't good, so please let me know.

MWE:
As I am a newb in this world of sockets, I can't think of a way to implement a MWE that is much simpler than the base code I have. Luckily, one can clone our repo and just run line-profiler (or any profiler) on the test function "tests/test_choose_minimum_function.py". This will send the above array of about 50 floats in each message.
For line-profiler, I tag the tests.test_choose_minimum_function.test_run_plugin_with_user_defined_features function and the shapelink.shapelink_plugin.ShapeLinkPlugin.handle_messages method with the @profile tag.
The repo: https://github.com/ZELLMECHANIK-DRESDEN/shapelink
I can also just copy the results of the line-profiler here.

Thanks for your time :)

François Tessier
@hephtaicie
Hi All! My pattern is pretty simple : CLIENT [DEALER] <----> [ROUTER] ORCHESTRATOR [ROUTER] <----> [DEALER] SERVER. All of this is implemented with {recv;send}multiparts.
I now would need to send a simple message from the orchestrator to the server. As the server waits on recv_multiparts I feel like I cannot send a simple message without an identity. Is that correct? Am I suppose to create a fake identity?
Min RK
@minrk
you can use router identities to send any number of messages. If your servers are already making themselves known with an initial advertisement request ("hi orchestrator, I'm here"), you can record and remember those router identities.
However you are tracking the identies now, you can continue to do the same, and send multiple 'replies' to a single request routing id.
François Tessier
@hephtaicie
Thanks! I'm still struggling a bit with identities. It's working now.