Как организовать параллельную работу по сети с несколькими устройствами?

Есть примерно 100 устройств. По функционалу - пейджер(получили данные, отобразили). Все общаются по сети с сервером по одному IP и порту.
Сервер ловит коннект. Формирует для каждого индивидуальную строку. И отправляет на устройство. Спустя минуту формируется новая строка и так же отсылается на устройство.
В процессе возникло пара вопросов.
1. С помощью чего лучше будет организовать работу сервера. Перелопатил кучу статей по различным модулям и фреймворкам, но так и не разобрался что лучше использовать.
2. Для начала пробовал реализовать с помощью тредов, но возник вопрос с завершением потока.
Суть в том, что устройство периодически уходит в реконнект. В результате чего остается на сервере не закрытый сокет и ставший не нужным активный поток. Не могу собразить, как отследить дисконнект и закрыть сокет с потоком.
#!/usr/bin/env python
# -*- coding: utf-8 -*-



"""
An echo server that uses threads to handle multiple clients at a time.
Entering any line of input at the terminal will exit the server.
"""

import select, socket, sys, threading,  datetime, random, time
from tablo import Tablo

class Server:
    def __init__(self):
        self.host = ''
        self.port = 33335
        self.backlog = 5
        self.size = 1024
        self.server = None
        self.threads = []

    def open_socket(self):
        try:
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.bind((self.host,self.port))
            self.server.listen(self.backlog)
            print(u'Слушаем: %s' % self.port)
        except socket.error as e:
            if self.server:
                self.server.close()
            print ("Could not open socket: %s" % e.message)
            sys.exit(1)

    def run(self):
        self.open_socket()
        running = 1
        while running:
            #cli, addr = self.server.accept()
            inputready,_,_ = select.select([self.server],[],[])
            #inputready, _, _ = select.select([addr, ], [], [])

            for s in inputready:
                if s == self.server:
                    # handle the server socket
                    cli, addr = self.server.accept()
                    c = Client(cli, addr)
                    c.start()
                    self.threads.append(c)

                elif s == sys.stdin:
                    # handle standard input
                    sys.stdin.readline()
                    running = 0 
                    print('running', running)

        # close all threads

        self.server.close()
        print('Server Close')
        for c in self.threads:
            c.join()

class Client(threading.Thread):
    def __init__(self, client, address):
        threading.Thread.__init__(self)
        self.client = client
        self.address = address
        self.size = 1024
        print(u'Поймали', address)
        print(u'Активных потоков', threading.active_count())

    def run(self):
        running = 1
        while running:
            data = self.client.recv(self.size)
            if data:
                if data[:3] == b'REG':
                    IMEI = str(data[3:18], 'ascii');
                    print(IMEI)
                    if IMEI[:10] == 'Call Ready':
                        print(IMEI[:10])
                        break
                    #print(u'Получено: %s : %s' % (self.addr, data))
                    pack = Tablo.setDynamicString(1, 'KZN', 'Happy first Winter Day', random.randint(5,20))
                    #pack = Tablo.setTemperature('+93')
                    #pack = Tablo.setTime()
                    self.client.send(pack)
                    print(u'Отправлена строка %s %s' % (threading.currentThread().name, IMEI), datetime.datetime.now())
                        
                elif data[0] == 89:
                    print(u'Команда принята %s %s' % (threading.currentThread().name, IMEI), datetime.datetime.now())
                    time.sleep(60)
                    pack = Tablo.setDynamicString(1, 'KZN', 'Dooms Day is coming', random.randint(5,20))
                    self.client.send(pack)
                    #print(u'Отправлено %s %s: %s' % (threading.currentThread().name, IMEI, pack))
                    print(u'Отправлена строка %s %s' % (threading.currentThread().name, IMEI), datetime.datetime.now())
                elif data == b'SetTemp':
                    print(u'Установлена температура',datetime.datetime.now())
                elif data == b'TDSet':
                    print(u'Установлено время',datetime.datetime.now())
                else:
                    print(u'Неизвестный ответ', data)
            else:
                self.client.close()
                print('Socket Close')
                running = 0

if __name__ == "__main__":
    s = Server()
    s.run()
  • Вопрос задан
  • 374 просмотра
Решения вопроса 1
@nirvimel
  1. Подобные сервера лучше писать не на потоках threading (которые все равно упираются в GIL), а на "зеленых" потоках, например gevent, который создан для решения задач, подобной вашей, с максимальной производительностью.
  2. Проблему зависания потоков в вечном ожидании решает настройка keepalive: клиент обязуется передавать пустые пакеты раз в n секунд, если ему больше нечего передавать. А сервер делает socket.settimeout(n+10) и при вызове recv отлавливает исключение socket.timeout, которое сигнализирует, что клиент "отвалился".
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы