import pymysql
from pymysql.constants import CLIENT
import csv
import re
def create_connection_to_server_and_create_db(host, user, passwd, sql_script_path):
try:
connection = pymysql.connect(
host = host,
user = user,
passwd = passwd,
client_flag = CLIENT.MULTI_STATEMENTS
)
try:
with connection.cursor() as cursor:
with open(sql_script_path) as file:
script = file.read()
cursor.execute(script)
connection.commit()
print('db was create')
except Error as e:
print(f"The error '{e}' occurred")
except Error as e:
print(f"The error '{e}' occurred")
def get_connection_to_db(host, user, passwd, db_name):
connection = None
try:
connection = pymysql.connect(
host = host,
user = user,
passwd = passwd,
database = db_name,
client_flag = CLIENT.MULTI_STATEMENTS
)
print('Connection to MySQl_db successful')
except Error as e:
print(f"The error '{e}' occurred")
return connection
def create_table(connection, sql_script_path):
try:
with connection.cursor() as cursor:
with open(sql_script_path) as file:
script = file.read()
cursor.execute(script)
connection.commit()
except Error as e:
print(f"The error '{e}' occurred")
def get_list_from_csv_file(csv_file_path):
data = []
with open(csv_file_path, encoding='utf-8') as file:
reader = csv.reader(file, delimiter=',')
for row in reader:
data.append(row)
result_data = data[1:]
return result_data
def fill_movies_table(connection, movies_data, sql_script_path):
for movie_id, title, genres in movies_data:
if re.search(r'\(\d{4}\)', title) is not None:
search_year = re.search(r'\(\d{4}\)', title)
result_year = search_year.group(0)[1:-1]
year = int(result_year)
else:
year = 0
title = title.replace("'", '`')
title = title.replace('"', '`')
with connection.cursor() as cursor:
with open(sql_script_path) as file:
script = file.read()
cursor.execute(script.format(int(movie_id), title[:-6], int(year), genres))
connection.commit()
def main():
create_connection_to_server_and_create_db('localhost', 'root', '12345678', './files/sql/CREATE_DATABASE_my_movies_db.sql')
connection = get_connection_to_db('localhost', 'root', '12345678', 'my_movies_db')
create_table(connection, './files/sql/CREATE_TABLE_movies.sql')
create_table(connection, './files/sql/CREATE_TABLE_rating.sql')
movies = get_list_from_csv_file('./files/csv/movies.csv')
rating = get_list_from_csv_file('./files/csv/ratings.csv')
fill_movies_table(connection, movies, './files/sql/FILL_MOVIES_TABLE.sql')
main()
import mysql.connector
from mysql.connector import Error
def create_connection(host_name, user_name, user_password, db_name):
connection = None
try:
connection = mysql.connector.connect(
host=host_name,
user=user_name,
passwd=user_password,
database=db_name
)
print("Connection to MySQL DB successful")
except Error as e:
print(f"The error '{e}' occurred")
return connection
connection = create_connection("localhost", "root", "12345678", "movies")
def create_database(connection, query):
cursor = connection.cursor()
try:
cursor.execute(query)
print("Database created successfully")
except Error as e:
print(f"The error '{e}' occurred")
create_database_query = "CREATE DATABASE movies"
create_database(connection, create_database_query)
def execute_query(connection, query):
cursor = connection.cursor()
try:
cursor.execute(query)
connection.commit()
print("Query executed successfully")
except Error as e:
print(f"The error '{e}' occurred")
create_movies_table = """
CREATE TABLE IF NOT EXISTS movies (
id INT AUTO_INCREMENT,
title TEXT NOT NULL,
genres TEXT NOT NULL,
PRIMARY KEY (id)
) ENGINE = InnoDB
"""
create_rating_table = """
CREATE TABLE IF NOT EXISTS rating (
id INT AUTO_INCREMENT,
movie_id INT NOT NULL,
rating INT NOT NULL,
timestamp INT NOT NULL
FOREIGN KEY (movie_id) REFERENCES movies (id),
PRIMARY KEY (id)
) ENGINE = InnoDB
"""
execute_query(connection, create_movies_table)
execute_query(connection, create_rating_table)
import argparse
import csv
from collections import defaultdict
import re
parser = argparse.ArgumentParser()
parser.add_argument('-N',
type=int,
help='the number of the highest rated films for each genre'
)
parser.add_argument('-genres',
type=str,
help='filter by genre'
)
parser.add_argument('-year_from',
type=int,
help='filter by year (FROM YEAR)',
default=1800
)
parser.add_argument('-year_to',
type=int,
help='filter by year (TO YEAR)',
default=2025
)
parser.add_argument('-regexp',
type=str,
help='filter on the movie name'
)
args = parser.parse_args()
# read movies.csv
def read_movies_file():
data_m = []
with open('files/movies.csv', encoding='utf-8') as file:
reader = csv.reader(file, delimiter=',')
for row in reader:
data_m.append(row)
data_movies = data_m[1:]
return data_movies
data_movies = read_movies_file()
# read ratings.csv
def read_rating_file():
data_r = []
with open('files/ratings.csv', encoding='utf-8') as file:
reader = csv.reader(file, delimiter=',')
for row in reader:
data_r.append(row)
data_rating = data_r[1:]
return data_rating
data_rating = read_rating_file()
# get averages, is a dict in which: key=str(movieID), values=middle rating.
# and sort sort list by rating (reverse=True)
def get_averages():
total = defaultdict(float)
count = defaultdict(int)
for line in data_rating:
total[line[1]] += float(line[2])
count[line[1]] += 1
# middle rating
averages = { id: total[id]/count[id] for id in count }
# sort list by reverse=True
data_movies.sort(key=lambda item: averages.get(item[0], 0), reverse=True)
return averages
averages = get_averages()
# make a list from geners
def make_list_from_geners_in_data_movies():
for i in range(len(data_movies)):
geners_list = data_movies[i][2].split('|')
data_movies[i][2] = geners_list
return data_movies
data_movies = make_list_from_geners_in_data_movies()
"""
Короче, сейчас есть таблица data_movies, которая отсортирована по рейтингу (Reverse=True)
Далее, я планирую написать функции сортировки для каждого аргумента командной строки
И ппосле, просто применять эти функции для сортировки data_movies и выводить в консоль результат.
"""
def filter_by_year(year_from, year_to):
result = []
pattern = r'\(\d{4}\)'
for i in range(len(data_movies)):
string = data_movies[i][1]
if re.search(pattern, string) is not None:
year = re.search(pattern, string)
a = year.group(0)[1:-1]
int_year = int(a)
if year_from <= int_year <= year_to:
result.append(data_movies[i])
return result
#filter_by_year(1999, 2001)
def filter_by_regexp(name, data):
result = []
pattern = name
for i in range(len(data)):
string = data[i][1]
if re.search(pattern, string) is not None:
result.append(data[i])
return result
def filter_by_genres(geners, data):
result = []
if '|' in geners:
geners_list = geners.split('|')
elif '&' in geners:
geners_list = geners.split('&')
else:
geners_list = geners.split()
for i in range(len(geners_list)):
gener = geners_list[i]
for i in range(len(data)):
if gener in data[i][2]:
if data[i] not in result:
result.append(data[i])
return result
def print_result_to_console(data, N, geners):
path
def main_condition():
result = []
result = filter_by_year(args.year_from, args.year_to)
if args.regexp is not None:
result = filter_by_regexp(args.regexp, result)
if args.genres is not None:
result = filter_by_genres(args.genres, result)
# !!! сейчас у нас есть отфильтрованная таблица result, но нужно вывести в консоль правильные значения
if __name__ == "__main__":
read_movies_file()
read_rating_file()
get_averages()
main_condition()
print(args)