bluca on master
Add conan badge Add relicense statement Merge pull request #4497 from M… (compare)
I'm trying to understand the PUB and SUB pattern in C++ and compiled the example codes in https://zeromq.org/socket-api/#publish-subscribe-pattern.
After I run both the publisher and the subscriber, the subscriber throws an assertion error:
$ ./sub
sub: sub.cpp:27: int main(): Assertion `*result == 2' failed.
Aborted (core dumped)
The function recv_multipart
returns 1 instead 2. What did I do wrong?
syncpub.cpp:17:26: warning: ‘void zmq::detail::socket_base::setsockopt(int, const void*, size_t)’ is deprecated: from 4.7.0, use `set` taking option from zmq::sockopt [-Wdeprecated-declarations]
17 | publisher.setsockopt (ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
| ~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Synchronized publisher in C++
//
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <iostream>
#include <chrono>
#include <thread>
// We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED 1
int main () {
zmq::context_t context(1);
// Socket to talk to clients
zmq::socket_t publisher (context, zmq::socket_type::pub);
publisher.bind("tcp://*:5561");
// Socket to receive signals
zmq::socket_t syncservice (context, zmq::socket_type::rep);
syncservice.bind("tcp://*:5562");
// Get synchronization from subscribers
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
zmq::message_t message;
// - wait for synchronization request
syncservice.recv(message, zmq::recv_flags::none);
// - send synchronization reply
zmq::message_t reply;
syncservice.send (reply, zmq::send_flags::none);
subscribers++;
}
// Give the subscribers a chance to connect, so they don't lose any messages
// std::this_thread::sleep_for(std::chrono::milliseconds(20));
std::cout << "Start broadcasting\n";
// Now broadcast exactly 1M updates followed by END
int update_nbr;
for (update_nbr = 0; update_nbr < 1000; update_nbr++) {
publisher.send (zmq::str_buffer("SYNC"), zmq::send_flags::sndmore);
publisher.send(zmq::str_buffer("Rhubarb"));
}
publisher.send (zmq::str_buffer(""), zmq::send_flags::sndmore);
publisher.send (zmq::str_buffer("END"));
// Give 0MQ time to flush output
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
//
// Synchronized subscriber in C++
//
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <iostream>
#include <chrono>
using namespace std::chrono;
int main (int argc, char *argv[])
{
zmq::context_t context(1);
// First, connect our subscriber socket
zmq::socket_t subscriber (context, zmq::socket_type::sub);
subscriber.connect("tcp://localhost:5561");
subscriber.set(zmq::sockopt::subscribe, "");
// Second, synchronize with publisher
zmq::socket_t syncclient (context, zmq::socket_type::req);
syncclient.connect("tcp://localhost:5562");
std::cout << "Send\n";
// - send a synchronization request
zmq::message_t request;
syncclient.send (request, zmq::send_flags::none);
// - wait for synchronization reply
// Get the reply.
zmq::message_t reply;
syncclient.recv (reply, zmq::recv_flags::none);
std::cout << "Got reply\n";
// Third, get our updates and report how many we got
int update_nbr = 0;
auto start = high_resolution_clock::now();
while (1) {
// Receive all parts of the message
std::vector<zmq::message_t> recv_msgs;
zmq::recv_result_t result =
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
assert(result && "recv failed");
assert(*result == 2);
if (recv_msgs[1].to_string().compare("END") == 0) {
break;
}
// std::cout << "Received " <<recv_msgs[0].to_string()<<" "<<update_nbr<<std::endl;
update_nbr++;
}
std::cout << "Received " << update_nbr << " updates" << std::endl;
auto stop = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(stop - start);
// To get the value of duration use the count()
// member function on the duration object
std::cout << duration.count() << std::endl;
return 0;
}
Does anyone have working versions of the clone examples in C? In particular I am trying to get model 5 working and the use of subtrees, I have updated them to run, but I don't believe they are working correctly.
In clonecli5.c there are the lines:
zsocket_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5557");
zsocket_set_subscribe (subscriber, SUBTREE);
I have updated these to the following:
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
zmq_connect(subscriber, "tcp://localhost:5557");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "SUBTREE", sizeof(SUBTREE));
The line: zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
means that all messages are received, and no filtering is done. This can be seen when I start multiple clients with different subtrees and all messages are received.
I assumed if I took out the subscribe to all topics line, and just left in: zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "SUBTREE", sizeof(SUBTREE));
then only the specific SUBTREE topic would be received. Instead, nothing is received.