From: François Fleuret Date: Mon, 28 Oct 2024 12:16:09 +0000 (+0100) Subject: Update. X-Git-Url: https://ant.fleuret.org/cgi-bin/gitweb/gitweb.cgi?a=commitdiff_plain;h=5ca289a1d537177b1d1b85159891c81a0de91cbc;p=pytorch.git Update. --- diff --git a/distributed.py b/distributed.py new file mode 100755 index 0000000..adaa36f --- /dev/null +++ b/distributed.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python + +import time, socket, threading, struct, pickle + +import math, sys, argparse + +###################################################################### + +parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, +) + +parser.add_argument("--server_host", type=str, default=None) + +parser.add_argument("--port", type=int, default=12021) + +args = parser.parse_args() + +###################################################################### + + +class SocketConnection: + def __init__(self, established_socket, read_len=16384): + self.read_len = read_len + self.socket = established_socket + self.socket.setblocking(1) + self.buffer = b"" + self.SEND_LOCK = threading.Lock() + self.RECEIVE_LOCK = threading.Lock() + self.failed = False + + def send(self, x): + with self.SEND_LOCK: + data = pickle.dumps(x) + self.socket.send(struct.pack("!i", len(data))) + self.socket.sendall(data) + + def raw_read(self, l): + while len(self.buffer) < l: + d = self.socket.recv(self.read_len) + if d: + self.buffer += d + else: + raise EOFError() + + x = self.buffer[:l] + self.buffer = self.buffer[l:] + return x + + def receive(self): + with self.RECEIVE_LOCK: + l = struct.unpack("!i", self.raw_read(4))[0] + return pickle.loads(self.raw_read(l)) + + +###################################################################### + + +class CultureServer: + def __init__(self, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("0.0.0.0", port)) + s.listen(5) + self.nb_accepts = 0 + + while True: + client_socket, ip_and_port = s.accept() + link = SocketConnection(client_socket) + + threading.Thread( + target=self.client_loop, + kwargs={ + "link": link, + "nb": self.nb_accepts, + }, + daemon=True, + ).start() + + self.nb_accepts += 1 + + def client_loop(self, link, nb): + link.send(f"HELLO #{nb}") + try: + while True: + r = link.receive() + print(f'from #{nb} receive "{r}"') + link.send(f"ACK {r}") + if r == "BYE": + break + except EOFError: + print(f"closing #{nb} on EOF") + + +###################################################################### + + +class CultureClient: + def __init__(self, server_hostname, port): + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.connect((server_hostname, port)) + self.link = SocketConnection(server_socket) + + threading.Thread(target=self.receive, daemon=True).start() + + self.send() + # threading.Thread(target=self.send, daemon=True).start() + + def receive(self): + try: + while True: + x = self.link.receive() + print(f'CultureClient receive "{x}"') + except EOFError: + print(f"closing connection on EOF") + + def send(self): + try: + self.link.send(f"HELLO") + x = 0 + while True: + time.sleep(5) + print(f'CultureClient send "{x}"') + self.link.send(x) + x += 1 + except BrokenPipeError: + print(f"closing connection on broken pipe") + + +###################################################################### + +if args.server_host is None: + print(f"Starting server port {args.port}") + CultureServer(args.port) +else: + print(f"Starting client connecting to {args.server_host}:{args.port}") + CultureClient(args.server_host, args.port) + +######################################################################