From: François Fleuret Date: Tue, 29 Oct 2024 08:34:41 +0000 (+0100) Subject: Update. X-Git-Url: https://ant.fleuret.org/cgi-bin/gitweb/gitweb.cgi?a=commitdiff_plain;ds=sidebyside;p=pytorch.git Update. --- diff --git a/distributed.py b/distributed.py index 91d33ee..648756e 100755 --- a/distributed.py +++ b/distributed.py @@ -2,7 +2,7 @@ import time, socket, threading, struct, pickle -import math, sys, argparse +import argparse ###################################################################### @@ -62,32 +62,36 @@ def start_server(port, core, reader): s.listen(5) nb_accepts = 0 - def reader_thread(reader, link, client_nb): + def threadable_reader(reader, receiver, client_id): try: while True: - reader(link.receive(), client_nb) + reader(link.receive(), client_id) except EOFError: - print(f"closing reader #{client_nb} on EOFError") + print(f"** closing reader #{client_id} on EOFError **") - def core_thread(writer, client_nb): + def threadable_core(sender, client_id): try: - core(writer, client_nb) + core(sender, client_id) except BrokenPipeError: - print(f"closing core #{client_nb} on BrokenPipeError") + print(f"** closing core #{client_id} on BrokenPipeError **") while True: client_socket, ip_and_port = s.accept() link = SocketConnection(client_socket) threading.Thread( - target=core_thread, - kwargs={"writer": link.send, "client_nb": nb_accepts}, + target=threadable_core, + kwargs={"sender": link.send, "client_id": nb_accepts}, daemon=True, ).start() threading.Thread( - target=reader_thread, - kwargs={"reader": reader, "link": link, "client_nb": nb_accepts}, + target=threadable_reader, + kwargs={ + "reader": reader, + "receiver": link.receive, + "client_id": nb_accepts, + }, daemon=True, ).start() @@ -102,7 +106,7 @@ def create_client(servername, port, reader): server_socket.connect((servername, port)) link = SocketConnection(server_socket) - def reader_thread(reader): + def threadable_reader(reader): while True: reader(link.receive()) @@ -110,7 +114,7 @@ def create_client(servername, port, reader): link.send(x) threading.Thread( - target=reader_thread, kwargs={"reader": reader}, daemon=True + target=threadable_reader, kwargs={"reader": reader}, daemon=True ).start() return writer @@ -121,11 +125,11 @@ def create_client(servername, port, reader): if args.server is None: print(f"Starting server port {args.port}") - def reader(x, nb): - print(f'Server received from client #{nb} "{x}"') + def reader(obj, client_id): + print(f'Server received from client #{client_id} "{obj}"') - def core(writer, client_nb): - writer(f"HELLO {client_nb}") + def core(writer, client_id): + writer(f"HELLO {client_id}") while True: writer(f"PONG {time.localtime().tm_sec}") time.sleep(3) @@ -135,8 +139,8 @@ if args.server is None: else: print(f"Starting client connecting to {args.server}:{args.port}") - def reader(x): - print(f'Client received "{x}"') + def reader(obj): + print(f'Client received from server "{obj}"') writer = create_client(args.server, args.port, reader)