bluca on master
Problem: test_pubsub broken eve… (compare)
bluca on master
Problem: norm fails to compile … (compare)
"Socket operation on non-socket" ought to be totally impossible given Java's type safety. That's telling me your program has some kind of undefined behavior, probably a data race: uncoordinated access to a shared variable. Without seeing the code I can't guess where it is. I'm not primarily a Java developer so I can't suggest tools for automatically detecting them.
Alternatively, this means the jzmq wrapper is totally hosed. I doubt it but who knows.
org.zeromq.ZMQException: Invalid argument at org.zeromq.ZMQ$Poller.run_poll(Native Method) ~[zmq-3.1.0.jar:?] at org.zeromq.ZMQ$Poller.poll(ZMQ.java:2136) ~[zmq-3.1.0.jar:?] at com.d.b(Unknown Source) ~[cx-dev.4.0.6.jar:?] at com.s.b.run(Unknown Source) ~[cx-dev.4.0.6.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_265] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_265] at java.lang.Thread.run(Thread.java:7
`class socketPool
{
std::string endpoint;
public:
socketPool(const std::string &ep) : endpoint(ep) {}
zmq::socket_t & operator()()
{
thread_local static zmq::socket_t socket(
globalContext(),
ZMQ_REQ);
thread_local static bool connected;
if (!connected) {
connected = true;
socket.connect(endpoint_);
}
return socket;
}
};
// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");`
I found the code above from https://stackoverflow.com/questions/41941702/one-zeromq-socket-per-thread-or-per-call
I can't understand what is "globalContext()".
can you tell me? thanks a lot!
IPv6=ON
in the .travis.yml
has no effect. On the one hand, it seems that there is actually nothing that uses this flag. But what's worse is that apparently since end of 2018, there is no support for IPv6 on any amd64 machine on travis-ci anymore, according to https://docs.travis-ci.com/user/reference/overview/#virtualisation-environment-vs-operating-system. But there are now arm64/ppc64le/s390x machines, which support IPv6. We should probably run tests there as well, including at least one coverage job, so we can actually see how IPv6 code is covered. I will try to make that work.
ZMQ_ROUTER_NOTIFY
socket option (set on the router side) can be set to ZMQ_NOTIFY_CONNECT
to deliver "a zero-length message" when the ZMTP connection completes, with the usual routing frame prepended. This shows up in the stream of messages received from the socket, not on any associated monitor sockets.
The response to browser only sent if I CTRL+C the running ZeroMQ Application but I don't know why.
// Send Response
socket.send(frames[0], zmq::send_flags::sndmore);
socket.send(zmq::str_buffer("HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\nHello, World!"), zmq::send_flags::none);
// Close Connection with Browser
socket.send(frames[0], zmq::send_flags::sndmore);
socket.send(zmq::buffer(std::string_view("\0")), zmq::send_flags::none);
can you tell me if Bitcoin still uses zeromq?
Seems like it https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md
I've just noticed that libzmq sub sockets performs prefix match filtering when I use TCP, reading spec 29 the section describing the SUB socket does allow it
MAY, depending on the transport, filter messages according to subscriptions, using a prefix match algorithm.
My question is how can I know which transports libzmq will try to prefix match for? I can't find it in the documentation for setsockopt
Hello @sappo , @cite-reader ,
could you please help me out from this problem :java.lang.IndexOutOfBoundsException: Index: 2, Size: 2
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at zmq.socket.pubsub.Dist.distribute(Dist.java:162)
at zmq.socket.pubsub.Dist.sendToMatching(Dist.java:138)
at zmq.socket.pubsub.XPub.xsend(XPub.java:253)
at zmq.SocketBase.send(SocketBase.java:718)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:3205)
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:3131)
I am using PUB/SUB pattern in multithreading.
When i try to publish event, i am getting this error.
using jeromq-0.5.2
import org.json.JSONObject;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import java.math.BigDecimal;
import java.util.Arrays;
public class RatePublisher {
private static final ZContext context = new ZContext();
private static final ZMQ.Socket publisher = context.createSocket(SocketType.PUB);
private static String zmqAddress = "tcp://localhost:5563";
static {
publisher.bind(zmqAddress);
}
public static void publish(String name, String ident, BigDecimal rate, BigDecimal changePercentage) throws InterruptedException {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("name", name);
jsonObject.put("ident", ident);
jsonObject.put("rate", rate);
jsonObject.put("changed_percentage", changePercentage);
publisher.sendMore("rate");
publisher.send(jsonObject.toString());
} catch (ZMQException e) {
e.printStackTrace();
throw new InterruptedException();
}
}
public static void close() throws InterruptedException {
try {
context.destroy();
} catch (Exception e) {
throw new InterruptedException();
}
}
}
publish
method like this :RatePublisher.publish("test", pair, rate, changePercentage);
import org.json.JSONObject;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import java.math.BigDecimal;
import java.util.Arrays;
public class RatePublisher {
private static final ZContext context = new ZContext();
private static final ZMQ.Socket publisher = context.createSocket(SocketType.PUB);
private static String zmqAddress = "tcp://localhost:5563";
static {
publisher.bind(zmqAddress);
}
public static void publish(String name, String ident, BigDecimal rate, BigDecimal changePercentage) throws InterruptedException {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("name", name);
jsonObject.put("ident", ident);
jsonObject.put("rate", rate);
jsonObject.put("changed_percentage", changePercentage);
publisher.sendMore("rate");
publisher.send(jsonObject.toString());
} catch (ZMQException e) {
e.printStackTrace();
throw new InterruptedException();
}
}
public static void close() throws InterruptedException {
try {
context.destroy();
} catch (Exception e) {
throw new InterruptedException();
}
}
}
public class A
{
public static void main(String[] args)
{
int i = 100;
while(i>0) {
RatePublisher.publish("test", pair, rate, changePercentage);
i--;
}
}
}
public class B
{
public static void main(String[] args)
{
int i = 100;
while(i>0) {
RatePublisher.publish("test", pair, rate, changePercentage);
i--;
}
}
}
public class C
{
public static void main(String[] args)
{
int i = 100;
while(i>0) {
RatePublisher.publish("test", pair, rate, changePercentage);
i--;
}
}
}
public class D
{
public static void main(String[] args)
{
int i = 100;
while(i>0) {
RatePublisher.publish("test", pair, rate, changePercentage);
i--;
}
}
}