From: François Fleuret Date: Tue, 29 Oct 2024 08:21:05 +0000 (+0100) Subject: Update. X-Git-Url: https://ant.fleuret.org/cgi-bin/gitweb/gitweb.cgi?a=commitdiff_plain;h=97f71dd09a28d1fd3a7d62a323169e48ea6a38f7;p=pytorch.git Update. --- diff --git a/distributed.py b/distributed.py index 6fe3f6b..91d33ee 100755 --- a/distributed.py +++ b/distributed.py @@ -10,7 +10,7 @@ parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) -parser.add_argument("--server_host", type=str, default=None) +parser.add_argument("--server", type=str, default=None) parser.add_argument("--port", type=int, default=12021) @@ -56,44 +56,50 @@ class SocketConnection: ###################################################################### -def create_server(port, reader, writer): +def start_server(port, core, reader): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("0.0.0.0", port)) s.listen(5) - self.nb_accepts = 0 + nb_accepts = 0 - def reading_loop(self, link, client_nb): + def reader_thread(reader, link, client_nb): try: while True: - reader(link.receive()) + reader(link.receive(), client_nb) except EOFError: - print(f"closing #{client_nb} on EOF") + print(f"closing reader #{client_nb} on EOFError") + + def core_thread(writer, client_nb): + try: + core(writer, client_nb) + except BrokenPipeError: + print(f"closing core #{client_nb} on BrokenPipeError") while True: client_socket, ip_and_port = s.accept() link = SocketConnection(client_socket) threading.Thread( - target=writer, - kwargs={"link": link.send, "client_nb": self.nb_accepts}, + target=core_thread, + kwargs={"writer": link.send, "client_nb": nb_accepts}, daemon=True, ).start() threading.Thread( - target=reading_loop, - kwargs={"link": link, "client_nb": self.nb_accepts}, + target=reader_thread, + kwargs={"reader": reader, "link": link, "client_nb": nb_accepts}, daemon=True, ).start() - self.nb_accepts += 1 + nb_accepts += 1 ###################################################################### -def create_client(server_hostname, port, reader): +def create_client(servername, port, reader): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_socket.connect((server_hostname, port)) + server_socket.connect((servername, port)) link = SocketConnection(server_socket) def reader_thread(reader): @@ -101,7 +107,7 @@ def create_client(server_hostname, port, reader): reader(link.receive()) def writer(x): - self.link.send(x) + link.send(x) threading.Thread( target=reader_thread, kwargs={"reader": reader}, daemon=True @@ -112,11 +118,30 @@ def create_client(server_hostname, port, reader): ###################################################################### -if args.server_host is None: +if args.server is None: print(f"Starting server port {args.port}") - CultureServer(args.port) + + def reader(x, nb): + print(f'Server received from client #{nb} "{x}"') + + def core(writer, client_nb): + writer(f"HELLO {client_nb}") + while True: + writer(f"PONG {time.localtime().tm_sec}") + time.sleep(3) + + start_server(port=args.port, core=core, reader=reader) + else: - print(f"Starting client connecting to {args.server_host}:{args.port}") - CultureClient(args.server_host, args.port) + print(f"Starting client connecting to {args.server}:{args.port}") + + def reader(x): + print(f'Client received "{x}"') + + writer = create_client(args.server, args.port, reader) + + while True: + writer(f"PING {time.localtime().tm_sec}") + time.sleep(3) ######################################################################