Задать вопрос
@5cp5

Как доработать программу для сканирования сети?

Я пытаюсь создать программу для сканирования сети на угрозы и уязвимости, но у меня не очень получается.

Вот на чём я остановился:

670a7ec7b88b2400523368.png

Проблемы:
1. Программа перестаёт работать после завершения сканирования.

После завершения сканирования программа перестаёт выходить на связь, не предоставляя никаких сообщений об ошибках или предупреждениях.

2. Функционал ограничен.

Функционал программы ограничен и не позволяет проводить более сложный анализ сети, например, сканирование на наличие чужих подключений к WI FI или мониторинг сетевой активности в реальном времени.

Код:

import socket
import ipaddress
import os
import concurrent.futures
import tkinter as tk
from tkinter import messagebox, scrolledtext, filedialog
import threading

class NetworkScannerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Сканер сети")
        self.local_ip = self.get_local_ip()
        self.ip_range = self.get_ip_range(self.local_ip)

        # Кнопка запуска
        self.start_button = tk.Button(root, text="Сканировать сеть", command=self.start_scan)
        self.start_button.grid(row=0, column=0, padx=10, pady=10, sticky='ew')

        # Кнопка отмены
        self.cancel_button = tk.Button(root, text="Отменить сканирование", command=self.cancel_scan, state=tk.DISABLED)
        self.cancel_button.grid(row=0, column=1, padx=10, pady=10, sticky='ew')

        # Полоса загрузки
        self.progress_var = tk.DoubleVar()
        self.progress_bar = tk.ttk.Progressbar(root, variable=self.progress_var, maximum=100)
        self.progress_bar.grid(row=1, column=0, columnspan=2, padx=10, pady=10, sticky='ew')

        # Текстовое поле
        self.output_text = scrolledtext.ScrolledText(root, width=60, height=20, wrap=tk.WORD)
        self.output_text.grid(row=2, column=0, columnspan=2, padx=10, pady=10, sticky='nsew')

        # Настройка строк
        root.grid_rowconfigure(2, weight=1)  # Увеличение веса строки с текстом для динамического масштабирования
        root.grid_columnconfigure(0, weight=1)  # Увеличение веса столбца

        self.devices = []
        self.open_ports_info = []
        self.is_scanning = False
        self.cancelled = False

    def get_local_ip(self):
        hostname = socket.gethostname()
        local_ip = socket.gethostbyname(hostname)
        print(f"Ваш локальный IP-адрес: {local_ip}")  # IP
        return local_ip

    def get_ip_range(self, local_ip):
        ip = ipaddress.ip_interface(local_ip + '/24')
        return str(ip.network)

    def ping_ip(self, ip):
        response = os.system(f"ping -n 1 -w 1000 {ip} > NUL")  
        print(f"Пинг {ip}: {'доступен' if response == 0 else 'недоступен'}")  
        return response == 0

    def scan_network(self):
        self.devices.clear()
        self.open_ports_info.clear()
        total_ips = sum(1 for _ in ipaddress.ip_network(self.ip_range).hosts())
        scanned_ips = 0

        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {executor.submit(self.ping_ip, str(ip)): ip for ip in ipaddress.ip_network(self.ip_range).hosts()}
            for future in concurrent.futures.as_completed(futures):
                if self.cancelled:  
                    self.update_output("Сканирование отменено.\n")
                    break
                ip = futures[future]
                if future.result():
                    self.devices.append(str(ip))
                    self.update_output(f"{ip} доступен.\n")  
                scanned_ips += 1
                progress_percentage = (scanned_ips / total_ips) * 100
                self.update_progress(progress_percentage)

        self.finalize_scan()  

    def check_open_ports(self, ip, ports):
        open_ports = []
        for port in ports:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.settimeout(1)
                if sock.connect_ex((ip, port)) == 0:
                    open_ports.append(port)
        return open_ports

    def evaluate_results(self):
        if not self.devices:
            return []

        for device in self.devices:
            open_ports = self.check_open_ports(device, range(1, 224))  
            if open_ports:
                self.open_ports_info.append((device, open_ports))
                self.update_output(f"{device} имеет открытые порты: {open_ports}\n")  
            print(f"{device}: открытые порты - {open_ports}") 

    def update_progress(self, percentage):
        self.progress_var.set(percentage)
        if percentage >= 100:
            self.finalize_scan()

    def finalize_scan(self):
        self.cancelled = False  
        self.evaluate_results()
        self.save_results()
        self.show_summary()
        self.is_scanning = False
        self.start_button.config(text="Сканировать заново", state=tk.NORMAL)
        self.cancel_button.config(state=tk.DISABLED)

    def save_results(self):
        file_path = filedialog.asksaveasfilename(defaultextension=".txt", filetypes=[("Text files", "*.txt")])
        if file_path:
            with open(file_path, 'w') as f:
                if not self.open_ports_info:
                    f.write("Все устройства в сети безопасны. Открытых портов не найдено.\n")
                else:
                    for device, ports in self.open_ports_info:
                        f.write(f"{device} имеет открытые порты: {ports}\n")
            self.update_output(f"\nРезультаты сохранены в {file_path}\n")

    def show_summary(self):
        if not self.open_ports_info:
            messagebox.showinfo("Результаты сканирования", "Все устройства в сети безопасны. Открытых портов не найдено.")
        else:
            summary = "Итоги сканирования:\n\n"
            for device, ports in self.open_ports_info:
                summary += f"{device} имеет открытые порты: {ports}\n"
            summary += "\nСоветы:\n1. Закройте ненужные порты для повышения безопасности.\n2. Убедитесь, что ваши устройства защищены брандмауэром."
            messagebox.showinfo("Результаты сканирования", summary)

    def update_output(self, message):
        self.output_text.insert(tk.END, message)
        self.output_text.see(tk.END)  # Прокрутка текста вниз

    def start_scan(self):
        if not self.is_scanning:  
            self.is_scanning = True
            self.cancelled = False  
            self.start_button.config(state=tk.DISABLED)
            self.cancel_button.config(state=tk.NORMAL)
            threading.Thread(target=self.run_scan, daemon=True).start()

    def cancel_scan(self):
        self.cancelled = True  
        self.start_button.config(text="Сканировать заново", state=tk.NORMAL)
        self.cancel_button.config(state=tk.DISABLED)

    def run_scan(self):
        self.scan_network()

if __name__ == "__main__":
    import tkinter.ttk as ttk
    root = tk.Tk()
    app = NetworkScannerApp(root)
    root.mainloop()


Если у вас есть какие-либо предложения, буду рад выслушать вас.
  • Вопрос задан
  • 103 просмотра
Подписаться 1 Средний Комментировать
Пригласить эксперта
Ответы на вопрос 1
Подебажила предоставленный код:
1. программа не зависает, а "не быстро" проверяет открытые порты
попробуйте подебажить/вывести лог в методе check_open_ports и станет ясно, что после обнаружения все доступных ip адресов она опрашивает все порты и это занимает время, много времени
2. + так же в коде увидела двойную "финализацию" `self.finalize_scan()`. Она вызывается в update_progress (после >= 100%) и после выхода из цикла for в методе scan_network, что приводит к двойному сканированию портов
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы
SpectrumData Екатеринбург
от 200 000 до 300 000 ₽
Akronix Санкт-Петербург
от 150 000 до 200 000 ₽
19 янв. 2025, в 02:12
70000 руб./за проект
19 янв. 2025, в 01:58
20000 руб./за проект
18 янв. 2025, в 23:27
50000 руб./за проект