CLI>: add 192.168.0.16
Traceback (most recent call last):
File "main.py", line 22, in <module>
main()
File "main.py", line 10, in main
command = input("CLI>: ")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 2: invalid continuation byte
Error in sys.excepthook:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
from apport.fileutils import likely_packaged, get_recent_crashes
File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
from apport.report import Report
File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
import apport.fileutils
File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
from apport.packaging_impl import impl as packaging
File "/usr/lib/python3/dist-packages/apport/packaging_impl.py", line 23, in <module>
import apt
File "/usr/lib/python3/dist-packages/apt/__init__.py", line 23, in <module>
import apt_pkg
ModuleNotFoundError: No module named 'apt_pkg'
Original exception was:
Traceback (most recent call last):
File "main.py", line 22, in <module>
main()
File "main.py", line 10, in main
command = input("CLI>: ")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 2: invalid continuation byte
seven@testrsync:~/ICMP_Ping_Monitor_True_CLI$
seven@testrsync:~/ICMP_Ping_Monitor_True_CLI$ python3.6 -V
Python 3.6.7
obsd# python3.6 main.py
The program is not designed to work in your OS openbsd6
The program will be terminated in 5 seconds, Sorry...
Bye
obsd#
obsd# python3.6 -V
Python 3.6.6
obsd#
CLI>: add 192.168.0.16
192.168.0.16 was added to monitoring
CLI>: ping: permission denied (are you root?)
ping: permission denied (are you root?)
ping: permission denied (are you root?)
ping: permission denied (are you root?)
192.168.0.16 is not reachable.
admin@AS1002T:/volume1/home/admin/ICMP_Ping_Monitor_True_CLI $ python3.7 -V
Python 3.7.0
CLI>: add 192.168.0.16
192.168.0.16 was added to monitoring
CLI>: File "pingsubprocess.py", line 39
sys.stderr.write(f"{ip} session crushed.")
^
SyntaxError: invalid syntax
10 INPUT a
20 INPUT b
30 LET c=a+b
40 PRINT c
50 GOTO 10
добавил возможность задать интервал пинга отдельно для каждого узла.
В show показывает, только список адресов без интервала, если нужно, думаю можно сделать.
По представлению не по айпи следующее, поскольку программа строится вокруг консольной программы пинг, то на самом деле все равно, что добавлять адрес или доменное имя, если имя не удается разрешить днс, то пинг считается провальным.
Чтобы в сообщениях отображался хост можно пойти двумя путями, можно усложнять команду add будет что-то вроде add 1.1.1.1 10 myhostname - это сложный вариант
У меня на данный момент вообще есть желание приделать к программе базу sql lite
CLI>: add 10.10.40.10 180
10.10.40.10 was added to monitoring
CLI>: File "pingsubprocess.py", line 39
sys.stderr.write(f"{ip} session crushed.\n\n")
^
SyntaxError: invalid syntax
10.10.40.10 is removed from monitoring
CLI>:
elif sys.platform == 'linux':
ip_in_monitoring_dict[ip] = subprocess.Popen(["python3", "pingsubprocess.py", ip, interval],
stdout=subprocess.DEVNULL)
CLI>: add 10.10.40.10 180
10.10.40.10 was added to monitoring
CLI>: File "pingsubprocess.py", line 68
sys.stderr.write(f"{ip} is not reachable.\n\n")
^
SyntaxError: invalid syntax
10.10.40.10 is removed from monitoring
CLI>:
CLI>: exit
Traceback (most recent call last):
File "main.py", line 34, in <module>
main()
File "main.py", line 21, in main
command = input("CLI>: ")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 2: invalid continuation byte
import threading
import sys
import mail_activity
import ping_op
from time_lib import MyTime
def upload_error_notification_msg():
try:
with open(file="error_notification.txt", mode='r') as f:
temporary = f.read()
return temporary
except FileNotFoundError:
sys.stderr.write("The file error_notification.txt does not exist or corrupted. Create the file.\n\n")
sys.stderr.flush()
return None
def upload_recovery_notification_msg():
try:
with open(file="recovery_notification.txt", mode='r') as f:
temporary = f.read()
return temporary
except FileNotFoundError:
sys.stderr.write("The file recovery_notification.txt does not exist or corrupted. Create the file.\n\n")
sys.stderr.flush()
return None
def upload_smtp_settings():
settings = []
try:
with open(file="settings.py", mode="r") as f:
for i in f:
settings.append(i.strip("\\\n"))
except FileNotFoundError:
sys.stderr.write("Settings file does not exist or corrupted.\n\n")
sys.stderr.write("Delete the file if it exists and do setup command\n\n")
sys.stderr.write("{} session crushed.\n\n".format(ip))
sys.stderr.write("To restore session to the ip, add it again!\n")
sys.stderr.flush()
sys.exit()
return settings
def upload_recipients_list():
recipients = []
try:
with open(file="email_recipient_list.py", mode="r") as f:
for i in f:
recipients.append(i.strip("\\\n"))
except FileNotFoundError:
sys.stderr.write("email_recipient_list.py file does not exist or corrupted.\n\n")
sys.stderr.write("Delete the file if it exists and do recipients command\n\n")
sys.stderr.write("{} session crushed.\n\n".format(ip))
sys.stderr.write("To restore session to the ip, add it again!\n")
sys.stderr.flush()
sys.exit()
return recipients
def notificator(pingFailedLetterWasSent, negativePingsInRow, positivePingsInRow,
email_sender_box, email_recepient_list, error_mail_message, recovery_mail_message,
email_sender_password, smtp_settings, smtp_port, mode):
if negativePingsInRow == 4 and not pingFailedLetterWasSent:
pingFailedLetterWasSent = True
sys.stderr.write(f"{ip} is not reachable.\n\n")
sys.stderr.flush()
if mode == "short":
ping_op.write_ping_result_to_file_short_version(pingFailedLetterWasSent, ip)
negativeLetterThread = threading.Thread(target=mail_activity.send_negative_mail,
args=(f"{ip}", email_sender_box,
email_recepient_list,
error_mail_message,
email_sender_password,
smtp_settings,
smtp_port
),)
negativeLetterThread.start()
negativeLetterThread.join()
if positivePingsInRow == 10 and pingFailedLetterWasSent:
pingFailedLetterWasSent = False
sys.stderr.write(f"{ip} is reachable again now.\n\n")
sys.stderr.flush()
if mode == "short":
ping_op.write_ping_result_to_file_short_version(pingFailedLetterWasSent, ip)
positiveLetterThread = threading.Thread(target=mail_activity.send_positive_mail,
args=(f"{ip}", email_sender_box,
email_recepient_list,
recovery_mail_message,
email_sender_password,
smtp_settings,
smtp_port
),)
positiveLetterThread.start()
positiveLetterThread.join()
return pingFailedLetterWasSent
def main(ip, interval):
error_mail_message = upload_error_notification_msg()
recovery_mail_message = upload_recovery_notification_msg()
if error_mail_message is None or recovery_mail_message is None:
sys.stderr.write(f"{ip} session crushed.\n")
sys.stderr.flush()
sys.exit()
settings = upload_smtp_settings()
email_recepient_list = upload_recipients_list()
email_sender_box = settings[0]
email_sender_password = settings[1]
smtp_settings = settings[2]
smtp_port = settings[3]
log_mode = settings[4]
pingFailedLetterWasSent = False
positivePingsInRow = 0
negativePingsInRow = 0
if log_mode == "long":
positivePingsThisHourCounterC = 0
negativePingsThisHourCounterC = 0
lastAttemptTime = MyTime()
previousFilePath = ping_op.write_ping_result_to_file(ip=ip, pingresult=None)
while (True):
if not pingFailedLetterWasSent:
pingResult = ping_op.ping(ip, interval)
else:
pingResult = ping_op.ping(ip, 2)
if log_mode == "long":
CurrentFilePath = ping_op.write_ping_result_to_file(pingResult, ip)
currentTime = MyTime()
if currentTime.compare_dates(lastAttemptTime):
ping_op.write_ping_stats_to_file(ip, positivePingsThisHourCounterC,
negativePingsThisHourCounterC, previousFilePath)
previousFilePath = CurrentFilePath
lastAttemptTime = currentTime
positivePingsThisHourCounterC = 0
negativePingsThisHourCounterC = 0
positivePingsThisHourCounterC = positivePingsThisHourCounterC + pingResult[0]
negativePingsThisHourCounterC = negativePingsThisHourCounterC + pingResult[1]
if positivePingsInRow < positivePingsInRow + pingResult[0]:
positivePingsInRow = positivePingsInRow + pingResult[0]
negativePingsInRow = 0
if negativePingsInRow < negativePingsInRow + pingResult[1]:
positivePingsInRow = 0
negativePingsInRow = negativePingsInRow + pingResult[1]
pingFailedLetterWasSent = notificator(pingFailedLetterWasSent, negativePingsInRow, positivePingsInRow,
email_sender_box, email_recepient_list, error_mail_message,
recovery_mail_message, email_sender_password, smtp_settings,
smtp_port, log_mode)
if __name__ == '__main__':
ip = str(sys.argv[1])
interval = int(sys.argv[2])
main(ip, interval)
def exit_program(ip_in_monitoring_dict: dict) -> None:
if len(ip_in_monitoring_dict):
for active_popen in ip_in_monitoring_dict.values():
active_popen.kill()
print("The program will be terminated in 5 seconds.\n")
time.sleep(4)
print("Bye...\n")
time.sleep(1)
sys.exit()