Я пишу несложую p2p сеть, но высвечивается ошибка "OSError: [Errno 98] Address already in use" на этапе бинда в node3 (видимо из-за того, что после первого запуска создается доп поток, который прослушивает)
Я понимаю, что это скорее всего из-за того, что какой-то поток не завершился, но ни
htop, ни
lsof -i :5003 не показывают ничего о том, что запущен какой-то процесс. Приходится ждать минуты 2, потом можно запустить и все нормально будет.
И непонятно почему так, ибо из-за смены флага все 4 создающихся потока завершаются
import threading
import socket
import time
class InboundConnection(threading.Thread):
def __init__(self, conn, ip, port):
super(InboundConnection, self).__init__()
self.ip = ip
self.port = port
self.conn = conn
self.conn.settimeout(1.0)
self.STOP_FLAG = threading.Event()
print(self)
def run(self):
buff = b''
while not self.STOP_FLAG.is_set():
print(9999)
try: # print(12)
chunk = self.conn.recv(1024)
print(chunk)
buff += chunk
except socket.timeout:
print(12)
continue
except socket.error as e:
raise e
print("ALMOST KILLED")
self.conn.settimeout(None)
self.conn.close()
def stop(self):
print("STOPPED")
self.STOP_FLAG.set()
def __repr__(self):
return f'''
node info
node_ip: {self.ip}
node_port: {self.port}
----------------------------'''
class OutboundConnection():
def __init__(self, sock, ip, port):
self.ip = ip
self.port = port
self.sock = sock
def send(self, data):
self.sock.send(data)
def __repr__(self):
return f'''
node info
node_ip: {self.ip}
node_port: {self.port}
----------------------------'''
class Node(threading.Thread):
def __init__(self, ip, port):
super(Node, self).__init__()
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__init_server()
self.inbound_connections = []
self.outbound_connections = []
self.MAX_CONNECTIONS = 8
self.STOP_FLAG = threading.Event()
def __init_server(self):
self.sock.bind((self.ip, self.port))
self.sock.settimeout(1.0)
self.sock.listen(1)
def connectWithNode(self, ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
self.outbound_connections.append(OutboundConnection(sock, ip, port))
def sendMsgToAllNodes(self, data):
for conn in self.outbound_connections:
conn.send(data)
def stop(self):
self.STOP_FLAG.set()
def run(self):
while not self.STOP_FLAG.is_set():
try:
connection, client_address = self.sock.accept()
# print(connection)
conn_ip = client_address[0] # backward compatibilty
conn_port = client_address[1] # backward compatibilty
print((client_address[0], client_address[1]))
print(self.port)
conn = InboundConnection(connection, conn_ip, conn_port)
conn.start()
self.inbound_connections.append(conn)
# Basic information exchange (not secure) of the id's of the nodes!
except socket.timeout:
# print(12)
continue
except Exception as e:
raise e
for node in self.inbound_connections:
node.stop()
self.sock.settimeout(None)
self.sock.close()
def __repr__(self):
return f'''
HOST INFO
host_ip: {self.ip}
host_port: {self.port}
inbound_connections: {self.inbound_connections}
outbound_connections: {self.outbound_connections}'''
# inbound_connections: {', '.join(self.inbound_connections)}
# outbound_connections: {', '.join(self.outbound_connections)}'''
node1 = Node("127.0.0.1", 5001)
node1.start()
node2 = Node("127.0.0.1", 5002)
node2.start()
node3 = Node("127.0.0.1", 5003)
node3.start()
node2.connectWithNode("127.0.0.1", 5003)
# node2.connectWithNode("127.0.0.1", 5003)
# node1.connectWithNode("127.0.0.1", 5003)
node2.sendMsgToAllNodes(b'asd')
node1.stop()
node2.stop()
node3.stop()