Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 24 19:28
    dcgloe commented #1510
  • Feb 23 11:55
    zhanggyarcher commented #1505
  • Feb 22 10:05
    minrk commented #1505
  • Feb 22 09:10
    minrk commented #1510
  • Feb 19 20:43
    dcgloe opened #1510
  • Feb 19 08:40
    minrk closed #1025
  • Feb 19 08:39
    minrk closed #1077
  • Feb 19 08:36
    minrk closed #1098
  • Feb 19 08:35
    minrk commented #1098
  • Feb 19 08:35
    minrk closed #1100
  • Feb 19 08:32
    minrk closed #887
  • Feb 19 08:28
    minrk closed #161
  • Feb 19 08:28
    minrk commented #161
  • Feb 19 08:27
    minrk closed #1231
  • Feb 18 15:26
    zhanggyarcher commented #1505
  • Feb 18 11:42
    minrk closed #1250
  • Feb 18 11:42
    minrk commented #1250
  • Feb 18 11:41
    minrk closed #1283
  • Feb 18 11:39
    minrk commented #1324
  • Feb 18 11:38
    minrk closed #1324
Yassine Benyahia
@Wronskia
my client looks like this
class Client:
    def __init__(self, host, port):
        self.zmq_context = zmq.Context()
        self.socket = self.zmq_context.socket(zmq.DEALER)
        self.socket.connect(f"tcp://{host}:{port}")
        self.identity = "123"

    def recv_array(self, socket, flags=0, copy=False, track=False):
        """recv a numpy array"""
        md = socket.recv_json(flags=flags)
        msg = socket.recv(flags=flags, copy=copy, track=track)
        buf = memoryview(msg)
        A = np.frombuffer(buf, dtype=md["dtype"])
        return A.reshape(md["shape"])

    def run(self, request_data):
        self.send(json.dumps(request_data))
        result = self.recv_array(self.socket)
        return result
and the worker
class Worker(threading.Thread):
    def __init__(self, zmq_context, encoder, _id):
        threading.Thread.__init__(self)
        self.zmq_context = zmq_context
        self.worker_id = _id
        self.encoder = encoder

    def run(self):
        socket = self.zmq_context.socket(zmq.DEALER)
        socket.connect("inproc://backend")

        while True:
            client_id = socket.recv()
            request = socket.recv()
            result = self.compute(request)
            flags = 0
            md = dict(dtype=str(result.dtype), shape=result.shape,)
            socket.send(client_id, zmq.SNDMORE)
            socket.send_json(md, flags | zmq.SNDMORE)
            socket.send(result, flags=0, copy=False)
and the server
class Server(object):
    def __init__(self, embedding, model, port, num_workers=4):
        self.zmq_context = zmq.Context()
        self.port = port
        self.num_workers = num_workers
        self.encoder = Encoder()

    def start(self):
        socket_front = self.zmq_context.socket(zmq.ROUTER)
        socket_front.bind(f"tcp://0.0.0.0:{self.port}")

        # Backend socket to distribute work.
        socket_back = self.zmq_context.socket(zmq.DEALER)
        socket_back.bind("inproc://backend")

        # Start workers.
        for i in range(0, self.num_workers):
            worker = Worker(self.zmq_context, self.encoder, i)
            worker.start()
            logger.info(f"[WORKER-{i}]: ready and listening!")

        # Use built in queue device to distribute requests among workers.
        # What queue device does internally is,
        #   1. Read a client's socket ID and request.
        #   2. Send socket ID and request to a worker.
        #   3. Read a client's socket ID and result from a worker.
        #   4. Route result back to the client using socket ID.
        zmq.device(zmq.QUEUE, socket_front, socket_back)
Min RK
@minrk
Add to your message protocol a status field (could be in your existing md dict) that defines the message structure for success or failure. For example:
try:
    result = self.compute(request)
except Exception as e:
    # or transform the error more deliberately
    md = dict(
        status="error",
        error=str(e),
    )
    # empty result to ensure message consistency
    result = b""
else:
    # success
    md = dict(
        status="ok",
        dtype=str(result.dtype),
        shape=result.shape,
    )

socket.send(client_id, flags | zmq.SNDMORE)
socket.send_json(md, flags | zmq.SNDMORE)
socket.send(result, flags, copy=False)
So you always get a reply, even on errors. Then it's up to the client to handle that error reply appropriately.
Yassine Benyahia
@Wronskia
Great, Thanks a lot @minrk
Min RK
@minrk
you may also want to deal with timeouts to handle errors at a higher level that prevent replies (e.g. worker crashes)
Yassine Benyahia
@Wronskia
Make sense, Thanks! I guess i would have to this in the client, using setsockopt
Min RK
@minrk
yes, or use a poller
Yassine Benyahia
@Wronskia
would something like this work
def run(self, request_data):
        self.socket.setsockopt(zmq.RCVTIMEO, self.timeout)
        self.send(json.dumps(request_data))
        try:
            result = self.recv_array(self.socket)
        except zmq.error.Again as e:
            # log e
            raise TimeoutError("timeout")

        return result
Min RK
@minrk
I expect so, yes
François Tessier
@hephtaicie
Hello everyone! I'm trying to build a simple scheduler using zmq (Python). On one side, there are multiple clients that submit requests to the scheduler (a single app). On the other side, there are servers that register to the scheduler and offer a list of available resources. When a client submit a request, the scheduler will determine the resources that will be allocated to this client. Then, this scheduler will reach the corresponding server to let it know that this set of resources is now allocated by a client. The server returns info about the resources to the scheduler which forwards it to the calling client.
I can't figure out how to identify who is who. For instance, I took a look at the "extended request-reply" example using ROUTER and DEALER but I can't see which client sends which request and which worker replies to it.
François Tessier
@hephtaicie
Going though identities. That could answer my question.
Min RK
@minrk
ROUTER messages are all prefixed with an identity, which lets you identify the sender and route replies. You'll probably want ROUTER sockets for both directions in the scheduler, and DEALER sockets on the workers and clients. Workers can send an "I'm here" message when they connect, which the scheduler can then store along with the identity that came with the message.
François Tessier
@hephtaicie
Hmm... Interesting. Why not |CLIENTS|(REQ) <-> (ROUTER)|SCHEDULER|(ROUTER) <-> (REQ)|SERVER|?
Min RK
@minrk
why DEALER instead of REQ, you mean?
François Tessier
@hephtaicie
Yes
Min RK
@minrk
DEALER and REQ are really the same, but REQ adds unnecessary enforcement of send/recv/send/recv sequencing. The only thing it accomplishes is making it impossible to have messages without reply, retrying sends, etc.
REQ/REP are ~never right for production
François Tessier
@hephtaicie
I see! Thanks for your help. That's very helpful. I'm gonna try with DEALER <-> ROUTER + ROUTER <-> DEALER and see how it goes. The REQ/REP sequencing was making my code really heavy...
Min RK
@minrk
François Tessier
@hephtaicie
@minrk Thanks a lot!
Just a couple of questions to be sure I understand everything correctly:
1) In recv_msg, you use identities implying multiple identities for a single message. Is that correct?
François Tessier
@hephtaicie
2) What's the difference between client_socket.send_multipart([client_identity, encode_msg(msg)]) (L98) and client_socket.send_multipart(identities + [encode_msg(msg)]) (L122)?
Min RK
@minrk
1-many, yes
In particular, I kept the client identity in the worker message, meaning that worker replies are [worker identity, client identity, msg_content] (it would also have been sensible to add it to the msg body, but then it needs to be base64-encoded or something)
sending a reply with identities means "go back the way you came, no matter how many identity prefixes were stacked up"
whereas forwarding reply from worker to client requires manipulation, specifically removing the worker identity from the front
François Tessier
@hephtaicie
I see. So it also answers 2): you just merge two lists.
Min RK
@minrk
Yes, the difference is one is sending a message back the way it came, and the other is relaying it on a different socket
for any given socket, only the first identity is used for routing, the rest is opaque 'content' to be used at the application level
François Tessier
@hephtaicie
So what is returned by recv_multipart is something like [[identity_1, identity_2, ... , identity_n], empty, message], right?
Min RK
@minrk
recv_multipart returns a list of bytes, e.g. [identity1, msg] or just [msg] depending on which socket is receiving
in that example, there are 1-, 2-, and 3- part messages
(client recvs one-part, scheduler recvs two-part from client, three-part from worker, worker recvs two-part from scheduler)
François Tessier
@hephtaicie
Oh ok. And the last part is always msg I suppose
Min RK
@minrk
yes
François Tessier
@hephtaicie
@minrk Thanks again! I worked actively on my project those days and your example and our discussion here have been very helpful.
Fogang Fokoa
@fokoa
Hello
I'am installing jupyter notebook and my installation stop when pyzmq-21.0.1 want to install.
I get this error :
Downloading https://files.pythonhosted.org/packages/29/84/4b0e1f95a9305861507e7d60bd2368c8939766335d716b7cb05d0200a5ed/pyzmq-21.0.1.tar.gz (1.2MB) 100% |████████████████████████████████| 1.2MB 420kB/s Complete output from command python setup.py egg_info: Traceback (most recent call last): File "<string>", line 1, in <module> File "/tmp/pip-build-btceje2b/pyzmq/setup.py", line 687 warn(f"platform={get_platform()}, vcvars=") ^ SyntaxError: invalid syntax
And when I take a version <21.0.1, pip automatically try to install 21.0.1.
I need help. Thank you!
Min RK
@minrk
@fokoa what platform, Python version, and pip version?
That syntax error suggests you are on an unsupported version of Python, which in turn suggests you are using a pip that's too old to respect the python version requirement metadata
Fogang Fokoa
@fokoa
@minrk I use ubuntu 16.04, Python3(3.5.2) so pip3 (version 8.1.1).
Min RK
@minrk
pyzmq 21 requires Python 3.6. You need pip >=9.0 to implement Python version requirements. It's always a good idea to start with: python3 -m pip install --upgrade pip to make sure you have pip up-to-date.