class Client:
def __init__(self, host, port):
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.DEALER)
self.socket.connect(f"tcp://{host}:{port}")
self.identity = "123"
def recv_array(self, socket, flags=0, copy=False, track=False):
"""recv a numpy array"""
md = socket.recv_json(flags=flags)
msg = socket.recv(flags=flags, copy=copy, track=track)
buf = memoryview(msg)
A = np.frombuffer(buf, dtype=md["dtype"])
return A.reshape(md["shape"])
def run(self, request_data):
self.send(json.dumps(request_data))
result = self.recv_array(self.socket)
return result
class Worker(threading.Thread):
def __init__(self, zmq_context, encoder, _id):
threading.Thread.__init__(self)
self.zmq_context = zmq_context
self.worker_id = _id
self.encoder = encoder
def run(self):
socket = self.zmq_context.socket(zmq.DEALER)
socket.connect("inproc://backend")
while True:
client_id = socket.recv()
request = socket.recv()
result = self.compute(request)
flags = 0
md = dict(dtype=str(result.dtype), shape=result.shape,)
socket.send(client_id, zmq.SNDMORE)
socket.send_json(md, flags | zmq.SNDMORE)
socket.send(result, flags=0, copy=False)
class Server(object):
def __init__(self, embedding, model, port, num_workers=4):
self.zmq_context = zmq.Context()
self.port = port
self.num_workers = num_workers
self.encoder = Encoder()
def start(self):
socket_front = self.zmq_context.socket(zmq.ROUTER)
socket_front.bind(f"tcp://0.0.0.0:{self.port}")
# Backend socket to distribute work.
socket_back = self.zmq_context.socket(zmq.DEALER)
socket_back.bind("inproc://backend")
# Start workers.
for i in range(0, self.num_workers):
worker = Worker(self.zmq_context, self.encoder, i)
worker.start()
logger.info(f"[WORKER-{i}]: ready and listening!")
# Use built in queue device to distribute requests among workers.
# What queue device does internally is,
# 1. Read a client's socket ID and request.
# 2. Send socket ID and request to a worker.
# 3. Read a client's socket ID and result from a worker.
# 4. Route result back to the client using socket ID.
zmq.device(zmq.QUEUE, socket_front, socket_back)
try:
result = self.compute(request)
except Exception as e:
# or transform the error more deliberately
md = dict(
status="error",
error=str(e),
)
# empty result to ensure message consistency
result = b""
else:
# success
md = dict(
status="ok",
dtype=str(result.dtype),
shape=result.shape,
)
socket.send(client_id, flags | zmq.SNDMORE)
socket.send_json(md, flags | zmq.SNDMORE)
socket.send(result, flags, copy=False)
recv_msg
, you use identities
implying multiple identities for a single message. Is that correct?
[worker identity, client identity, msg_content]
(it would also have been sensible to add it to the msg body, but then it needs to be base64-encoded or something)
identities
means "go back the way you came, no matter how many identity prefixes were stacked up"
Downloading https://files.pythonhosted.org/packages/29/84/4b0e1f95a9305861507e7d60bd2368c8939766335d716b7cb05d0200a5ed/pyzmq-21.0.1.tar.gz (1.2MB)
100% |████████████████████████████████| 1.2MB 420kB/s
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-build-btceje2b/pyzmq/setup.py", line 687
warn(f"platform={get_platform()}, vcvars=")
^
SyntaxError: invalid syntax