minrk on master
Correct docstring for ProcessMo… Merge pull request #1516 from m… (compare)
hey guys
super stressed out student here. my masters cloud coursework is to build a scalable, fault tolerant application to solve an 'embarrassingly parallel task'. I spent the last week gingerly digging myself into a hole of distributed systems and am experiencing severe conceptual overload. I reach out to you, kind people.
for the task settled on Mandelbrot sets to make cool pictures.
the architecture i arrived at (lifted from zeromq textbook) is as follows:
as you can see there are a number of faults:
I am not married to neither the task I picked nor the architecture at this point, It was really fun learning this stuff. If anybody has any advice on any of the topics pleaseeeee help me. Cheers!
@cite-reader we had some teaching in aws, but not a lot beyond starting an ec2 and running minikube on it; the course was mostly conceptual; so yeah throwing you with one eye closed kind of
the brief is very unspecific, i thought to use zeromq for messaging; to be frank I wasn't sure aws offered that capability
I'd probably deploy workers to a spot fleet. With platform messaging systems disfavored Lambda doesn't have the complexity advantage of being invoked for you and with a source as noisy as Twitter they'll likely spend no time idle, probably negating the cost advantage. Spot instances can be deeply discounted compared to equivalent on-demand pricing at the cost of being preemptable, but you're already building fault tolerance right? A spot fleet is like an autoscaling group, but can have spot instances in it.
Choosing an instance type can be tricky. Generally you want the newest class of instances that fits the workload because as a rule they provide equal or better performance than similarly-sized instances of previous generations at lower prices. The absolute newest generation (the t4g
, m6g
, etc. instance classes) make that a little trickier because they're arm64 machines and pyzmq only provides wheels for x86 and amd64, but that likely isn't insurmountable. If you don't want to bother look at the t3
or m5
or etc. classes. (Maybe the a
variants, which are running AMD chips rather than Intel, and are therefore somewhat cheaper. Not the a1
series, which is the first-generation arm instances; you want i.e. t3a
for AMD chips.) The common wisdom is that t
instances are mostly good for making you sad, but eeehhh... they're cheap and they come smaller than m
series do, so if the workload isn't super CPU-intensive I've found they're totally fine. If the workload does use lots of CPU, measured in terms of "the instance consumes burst credits," the m
series is a better default option. You want your thing to scale out, so don't worry too much about choosing the right size of instance: going up one size is like taking two of the previous size and welding them together, and priced exactly accordingly, so choosing an instance type larger than your workers need in order to function is of little benefit.
If my understanding of the Twitter API is correct there's no performance benefit to having multiple ventilators, since each client associated to the same principal will see exactly the same stream. You could implement a hot standby in various ways but I might not bother; deploying the ventilator into its own autoscaling group means AWS will bring it back automatically if it becomes unhealthy, so the hot standby can be a stretch goal for you if there's time remaining. This I wouldn't use a spot fleet for; preemption is fine for workers, which have small working sets and their tasks can be easily retried, but the ventilator going down and needing recovery should be extraordinary.
class Client:
def __init__(self, host, port):
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.DEALER)
self.socket.connect(f"tcp://{host}:{port}")
self.identity = "123"
def recv_array(self, socket, flags=0, copy=False, track=False):
"""recv a numpy array"""
md = socket.recv_json(flags=flags)
msg = socket.recv(flags=flags, copy=copy, track=track)
buf = memoryview(msg)
A = np.frombuffer(buf, dtype=md["dtype"])
return A.reshape(md["shape"])
def run(self, request_data):
self.send(json.dumps(request_data))
result = self.recv_array(self.socket)
return result
class Worker(threading.Thread):
def __init__(self, zmq_context, encoder, _id):
threading.Thread.__init__(self)
self.zmq_context = zmq_context
self.worker_id = _id
self.encoder = encoder
def run(self):
socket = self.zmq_context.socket(zmq.DEALER)
socket.connect("inproc://backend")
while True:
client_id = socket.recv()
request = socket.recv()
result = self.compute(request)
flags = 0
md = dict(dtype=str(result.dtype), shape=result.shape,)
socket.send(client_id, zmq.SNDMORE)
socket.send_json(md, flags | zmq.SNDMORE)
socket.send(result, flags=0, copy=False)
class Server(object):
def __init__(self, embedding, model, port, num_workers=4):
self.zmq_context = zmq.Context()
self.port = port
self.num_workers = num_workers
self.encoder = Encoder()
def start(self):
socket_front = self.zmq_context.socket(zmq.ROUTER)
socket_front.bind(f"tcp://0.0.0.0:{self.port}")
# Backend socket to distribute work.
socket_back = self.zmq_context.socket(zmq.DEALER)
socket_back.bind("inproc://backend")
# Start workers.
for i in range(0, self.num_workers):
worker = Worker(self.zmq_context, self.encoder, i)
worker.start()
logger.info(f"[WORKER-{i}]: ready and listening!")
# Use built in queue device to distribute requests among workers.
# What queue device does internally is,
# 1. Read a client's socket ID and request.
# 2. Send socket ID and request to a worker.
# 3. Read a client's socket ID and result from a worker.
# 4. Route result back to the client using socket ID.
zmq.device(zmq.QUEUE, socket_front, socket_back)
try:
result = self.compute(request)
except Exception as e:
# or transform the error more deliberately
md = dict(
status="error",
error=str(e),
)
# empty result to ensure message consistency
result = b""
else:
# success
md = dict(
status="ok",
dtype=str(result.dtype),
shape=result.shape,
)
socket.send(client_id, flags | zmq.SNDMORE)
socket.send_json(md, flags | zmq.SNDMORE)
socket.send(result, flags, copy=False)