Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 04 2021 12:02
    minrk synchronize #1577
  • Aug 04 2021 12:00
    minrk edited #1556
  • Aug 04 2021 11:08
    minrk opened #1577
  • Aug 03 2021 13:48
    minrk edited #1572
  • Aug 03 2021 13:47

    minrk on move-wepoll

    (compare)

  • Aug 03 2021 13:47

    minrk on main

    bundle wepoll the same way we d… try testing wheels for ipc supp… Merge pull request #1576 from z… (compare)

  • Aug 03 2021 13:47
    minrk closed #1576
  • Aug 03 2021 13:29

    minrk on main

    run pre-commit on pre-commit.ci… Merge pull request #1575 from m… (compare)

  • Aug 03 2021 13:29
    minrk closed #1575
  • Aug 03 2021 13:29
    minrk opened #1576
  • Aug 03 2021 13:28

    minrk on move-wepoll

    bundle wepoll the same way we d… try testing wheels for ipc supp… (compare)

  • Aug 03 2021 13:22
    minrk opened #1575
  • Aug 03 2021 12:49

    minrk on main

    Updating bundle.py to use v142 … Merge pull request #1574 from m… (compare)

  • Aug 03 2021 12:49
    minrk closed #1574
  • Aug 03 2021 12:49
    minrk commented #1574
  • Aug 03 2021 12:08
    AntiTenzor commented #1570
  • Aug 03 2021 12:02
    mattrjackson commented #1574
  • Aug 03 2021 11:58
    mattrjackson commented #1574
  • Aug 03 2021 11:35
    mattrjackson opened #1574
  • Aug 03 2021 07:54
    minrk commented #1563
Ashok Kumar Karasala
@ashokchowdary863
Hi, Facing crash with the XPUB,XSUB proxy when I add more subscriptions.
Is that expected behavior??
Do I have to unsubscribe to release memory?
Ashok Kumar Karasala
@ashokchowdary863
socket.close() doesn't seem to release memory.
Eoghan O'Connell
@PinkShnack

Hi all! I have what is (perhaps) a simple question. I am a complete newbie when it comes to sockets etc., so bear with me.
TLDR: How fast can a message be sent and received by the REP/REQ pattern?

Context:
I have a REQ socket that is asking for some data from a REP socket and vice versa. It does this in several send/recv steps. The client is currently simulating a real-time application. The most common messages being sent (after some initialisation) are some arrays of floating point numbers (of about length 50).
By using line-profiler on a test function, I can see that the socket.recv() method in the REP server is taking about 90-95% of the time of the entire process. This equates to ~ 4 ms. I have a "feeling" that this could be faster, seeing as it is a local operation. I have tried to speed this up with different approaches, but haven't come across anything that might help. These approaches:

  • Poller doesn't seem to have any effect (in fact, it seems to slow things down a little bit)
  • NOBLOCK doesn't have any effect.
  • NO_DELAY is set by zmq by default, is this true?

I am hoping for a transfer speed per message of about 0.5-1 ms. Is this possible with REP/REQ, or with another pattern? Or has it something to do with my PC aswell?

Also:
The REQ client function is using a different thread. I read somewhere that is isn't good, so please let me know.

MWE:
As I am a newb in this world of sockets, I can't think of a way to implement a MWE that is much simpler than the base code I have. Luckily, one can clone our repo and just run line-profiler (or any profiler) on the test function "tests/test_choose_minimum_function.py". This will send the above array of about 50 floats in each message.
For line-profiler, I tag the tests.test_choose_minimum_function.test_run_plugin_with_user_defined_features function and the shapelink.shapelink_plugin.ShapeLinkPlugin.handle_messages method with the @profile tag.
The repo: https://github.com/ZELLMECHANIK-DRESDEN/shapelink
I can also just copy the results of the line-profiler here.

Thanks for your time :)

François Tessier
@hephtaicie
Hi All! My pattern is pretty simple : CLIENT [DEALER] <----> [ROUTER] ORCHESTRATOR [ROUTER] <----> [DEALER] SERVER. All of this is implemented with {recv;send}multiparts.
I now would need to send a simple message from the orchestrator to the server. As the server waits on recv_multiparts I feel like I cannot send a simple message without an identity. Is that correct? Am I suppose to create a fake identity?
Min RK
@minrk
you can use router identities to send any number of messages. If your servers are already making themselves known with an initial advertisement request ("hi orchestrator, I'm here"), you can record and remember those router identities.
However you are tracking the identies now, you can continue to do the same, and send multiple 'replies' to a single request routing id.
François Tessier
@hephtaicie
Thanks! I'm still struggling a bit with identities. It's working now.
monkeyman192
@monkeyman192
Hi All, I have a question. So I have a setup working where I am using zmq with consul for service registration. I have a client which gets the correct uri from consul for zmq to communicate with, and it uses a REQ socket to talk to a ROUTER on the service. This all works fine, but to load balance we are unregistering the ROUTER from the poller so that any requests that come when all of the workers are busy will just time out after a pre-defined amount of time. I however want to have a separate ping/pong connection with the service from the client so that it can determine if the service is in fact still running jobs and to just wait if this is the case. I created a new REQ socket on the client so that even if the normal one which is connecting to the service to get a result is still waiting, we can ping at any time (and if no response then we know something is wrong...), however if I unregister the ROUTER then I obviously can't get any ping requests, but if I don't then I can't see any way of rejecting further connections that aren't pings and telling the client (just wait...)
It feels like this should be possible, I just can't seem to figure out how. Any help would be greatly appreciated (hopefully I am just overlooking something simple!)
Allie
@cite-reader
Heartbeats like you're describing usually want to be in-band for a variety of reasons. There's a socket option, ZMQ_HEARTBEAT_IVL, that provides some level of this automatically, and you can use the socket monitor API on the client to detect disconnects. Those heartbeats don't get passed up to the application, so if your goal is to detect things like an application-level deadlock you need to do a bit of protocol design.
It's... kind of intricate, but there's really no way to get around that.
monkeyman192
@monkeyman192

Thanks for the response! I did have a look over all the options for setsocketopt and that one did catch my eye, but I wasn't exactly sure how it would be used.
If I set the socket in the client like this:

sock = context.socket(zmq.REQ)
sock.setsockopt(zmq.ZMQ_HEARTBEAT_IVL, 1000)

Then how do I know that the 'pong' isn't heard (for example)
I had a quick google but couldn't seem to find an example of this used (other than just explanations of what it does c/o the docs)

Allie
@cite-reader
If the heartbeat doesn't receive a response in time the library disconnects the transport socket and starts its reconnect loop. You can't detect that specifically, but the socket monitor can detect disconnects in general.
Cassiel-girl
@Cassiel-girl
https://zguide.zeromq.org/docs/chapter3/
In the code example “rtreq: ROUTER-to-REQ in C”, why does the broker call the statement "s_sendmore(broker, identity)" to construct <identity,empty delimiter,data>? Doesn't the req socket only need empty delimiter?
Cassiel-girl
@Cassiel-girl
Why do we need zpoller when we already have zloop? What are the advantages, disadvantages and applicability of both?
Min RK
@minrk
Those are czmq APIs, not pyzmq, though pyzmq has equivalent features in e.g. asyncio integration vs zmq.Poller. But zloop gives you an event-driven API ("when there is a message on this socket, call this function"), while zpoller is for waiting for events ("block here until there's an event on a socket"). poll is a lower-level construct - you can implement zloop on top of zpoller, but since zloop exists, you don't have to.
Cassiel-girl
@Cassiel-girl
thanks
Endogen
@Endogen
hey guys, i'd like to get a sub/pub going with encryption. is that possible? i saw the examples in here: https://github.com/zeromq/pyzmq/tree/main/examples/security but none of them is for PUB SUB
Endogen
@Endogen

i think what i need is CurveZMQ and since i want to use it with pub/sub i'd need it in the transport layer i guess.

To secure a single hop between client and server, which is the CurveCP use case. For this use case we would embed CurveZMQ in the transport layer so that it can work for all patterns (publish-subscribe, pipeline, and so on).

how do i embed that lib in my python application?
Fabian Beitler
@swamper123
Hey ho together,
I am pretty new to zmq and I try to get into the topic. To make some comparison between zmq and other frameworks, I want to make a simple "ping-pong" thing for IPCs (just send a string "ping" and recv. "pong").
I figured out to built a REQ-REP version, but I am struggeling with a DEALER-ROUTER example.
Intuitive, I would replace REQ-REQ with DEALER-ROUTER and await send/recv_pyobj messages....but this doesn't seem to work. :/
Allie
@cite-reader
Are you handling the message envelopes in your DEALER-ROUTER code?
Fabian Beitler
@swamper123
Stupid question:
If I send a msg from DEALER to ROUTER, do I receive a list like [identifier, message]?
And if this is true, how should an answer from ROUTER back to DEALER look like (in an async context)?
Fabian Beitler
@swamper123
Ok, I am starting to understand how this whole stuff works. ^^
Indeed, since the ROUTER is a N-1 connection, it needs to know to whom it has to send what....
CPickens42
@CPickens42
Hello guys! I'm looking for a low-latency IPC mechanism right now. ZeroMQ on the top of my list. However, I was wondering... Is it possible to setup an inproc connection using the multiprocessing module in Python?
Since Python threading are not real threads, I was thinking about doing some kind of multiprocessing instead.
import threading
from multiprocessing import Process
import zmq

def step1(context=None):
    """Step 1"""
    context = context or zmq.Context.instance()
    # Signal downstream to step 2
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step2")

    sender.send(b"from step1")

def step2(context=None):
    """Step 2"""
    context = context or zmq.Context.instance()
    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step2")

    #thread = threading.Thread(target=step1)
    thread = Process(target=step1, args=(context,))
    thread.start()

    # Wait for signal
    msg = receiver.recv()
    print("%s" % msg)

    # Signal downstream to step 3
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step3")
    sender.send(b"from step2")

def main():
    """ server routine """
    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step3")

    #thread = threading.Thread(target=step2)
    thread = Process(target=step2, args=(context,))
    thread.start()

    # Wait for signal
    msg = receiver.recv()
    print("%s" % msg)

    print("Test successful!")

    receiver.close()
    context.term()

if __name__ == "__main__":
    main()

So far this example (changed with multiprocessing in mind) fails. I took, and changed it from here: https://github.com/companycy/cpp_by_example/blob/master/inproc_example.py

It gives this result:

Assertion failed: ok (src/mailbox.cpp:99)
Assertion failed: ok (src/mailbox.cpp:99)
[1]    3015255 abort (core dumped)  python test-pymq.py
Fabian Beitler
@swamper123
Hey ho,
I know that a Project exists which implemented an IPC socket communication in the python zeromq lib. I already tested it and worked pretty well. If I find the link, i will let you know :)
Fabian Beitler
@swamper123
Keep in mind that Python is not the fastest language, if you really need low latency.
CPickens42
@CPickens42
I would love too! Yes I know that Python is not the fastest language for that. But all our stack is in Python soooo :)
I think I just need to find a "fast enough solution"... And if I don't find it, I will try others alternatives in others languages. But it will surely add more complexity to the rest of our code
Fabian Beitler
@swamper123
yeah, zmq via unix socket is pretty fast :) Hope your os is unix compatibel?
CPickens42
@CPickens42
I'm on linux :p
Fabian Beitler
@swamper123
@CPickens42 Meh, can't find the project atm, haven't starred it. Just found this example in the normal repo, but that wasn't what I was looking for:
https://github.com/zeromq/pyzmq/blob/29aded1bf017385866dcbf7b92a954f272360060/examples/gevent/simple.py
Min RK
@minrk

inproc connection using the multiprocessing module in Python?

inproc is in-process, it can't be used across processes, which includes forks created by multiprocessing. Contexts cannot be passed across forks, either. You'll need to use the ipc or tcp transports for inter-process communication, and start a Context in each process.

CPickens42
@CPickens42
Okay, very clear ! Thank you for help guys
Cassiel-girl
@Cassiel-girl
Can zmq.Context() be called multiple times in the same process context?
3 replies
Cassiel-girl
@Cassiel-girl
How does zeromq enable reliable transmission from one sender to multiple receivers. pub/sub sockets seem to have packet loss issues, right?
Cassiel-girl
@Cassiel-girl

from zmq import Socket
mycontext = zmq.Context()
class MySocket(Socket):

#how to write this class?

sock = MySocket()
sock.xx()

how to implement MySocket to use like socket defined in zmq.

Min RK
@minrk
'relaible messages' are more of an application-level feature
libzmq handles low-level disruptions like packet loss, so you won't ever get incomplete messages
The guilde describes some patterns for reliability.
But this gist is usually something along the lines of
  1. store history (or state, depending on the nature of messages) on the sender, or somewhere else, and
  2. use counters in messages to enable receivers to see when they've missed something, and
  3. enable out-of-band bulk requests, so receivers can ask for what they've missed to catch up
Min RK
@minrk
exactly what and how much to make available for these 'catch-up' or 'start subscribing' requests will depend on the nature of your application. A generic version might be to keep the last N messages in an always-expiring buffer, and allow a request for retrieving some/all of those messages by counter.
The clone pattern in the guide is used for e.g. state reflection, where a 'resubscribe' is getting the whole state as a starting point.
Cassiel-girl
@Cassiel-girl
thank you