@SergeyBondarenko

Python CSV Upgrade?

Как реализовать метод upgrade (замена елемента по id)?

from tabulate import tabulate
from random import randint


def FileManager(db_path):
    """
    :param db_path: string
    :return:
    """
    def parse_data(text=''):
        rows = text.split('\n')
        result = []
        for row in rows:
            result.append(row.split(';'))
        return result

    def write(data):
        try:
            with open(db_path,'a') as f:
                f.write(data)
                f.write("\n")
        except FileNotFoundError:
            print(f"File is not found {db_path}")

        return f"Write: {str(data)}"

    def read():
        try:
            file = open(db_path)
            data = file.read()
            file.close()
            return parse_data(data)
        except Exception as e:
            print(e)

    def update(id):
        # get id: integer or string
        # retrieve from file all data
        # use parsed_data to serialize
        # iterate for parsed list and compare two id's
        # [email, id, first_name, last_name] -> row[2] == value1 && row[3] == value2
        # serialize from list of lists to simple string
        # mode='w'
        return "Update"

    def delete():
        return "Delete"

    return {
        'write': write,
        'read': read,
        'update': update,
        'delete': delete
    }
# ORM - own system
def UserDB(fm = FileManager("email.csv")):
    """
    :param fm: {
        'write': function,
        'read': function,
        'update': function,
        'delete': function
    }
    :return:
    """
    required_fields = ['login_email', 'first_name', 'last_name']
    def add(*data, **kwargs):
        # if all fields are exist (validation process)
        # open file stream for writing
        #
        try:
            # validation step start
            counter = 0
            for x in kwargs:
                if kwargs[x] == "":
                    raise Exception(f"{x} is required! Please enter an value!")
                if x in required_fields:
                    counter += 1
            if counter != 3:
                raise Exception(f"You must include all required fields: {','.join(required_fields)}\n")
            # validation step end
            user_entity = ''
            kwargs['identifier'] = randint(1_000, 1_0000)
            payload = {
                0: kwargs['login_email'],
                1: kwargs['identifier'],
                2: kwargs['first_name'],
                3: kwargs['last_name']
            }
            for field in payload:
                user_entity += str(payload[field]) + ";"
            fm['write'](user_entity)
            return "You successfully added new user to database..."
        except Exception as e:
            print(e)

    def update(*args,**kwargs):
        return "Upgrade"



    def get_all(*data, **kwargs):
        return fm['read']()

    def get_by_id(id):
        rows = fm['read']()
        result = []
        for row in rows[1:]:
            if int(row[1]) == int(id):
                result.append(row)
                break
        result.insert(0, rows[0])
        return result

    def remove_by_id(id):
        rows = fm['read']()[1:-1]

        def filter_cb(row):
            return int(row[1]) != int(id)

        updated_list = list(filter(filter_cb, rows))

        result = ''
        for row in updated_list:
            result += ";".join(row)

        print('res', result.strip())


    return {
        'add': add,
        'update': update,
        'get_all': get_all,
        'remove_by_id': remove_by_id,
        'get_by_id': get_by_id
    }


def render_table(data):
    print(tabulate(data[1:], headers=data[0]))


db = UserDB()
# Get user by id or get all users from database
# render_table(db['get_all']())
#
# render_table(db['get_by_id'](2070))
#
# render_table(db['get_by_id'](9346))

# Add user
# db['add'](login_email = "foo@gmail.com", first_name="John", last_name="Bradshaw")
# render_table(db['get_all']())

# db['remove_by_id'](4081)
print(db['update'](2070))
  • Вопрос задан
  • 51 просмотр
Пригласить эксперта
Ответы на вопрос 2
justhabrauser
@justhabrauser
IT specialist
1. Сначала надо определиться - update или Upgrade
2. Если новое значение _гарантировано_ влезет в ту же длину строки, то нужна низкоуровневая работа с файлом (все эти seek() и т.д.). Если не гарантировано, то только запись в новый файл.
(ну или "раздвигание" текущего файла, но это уже к области извращений относится)
Чуда нет.
Ответ написан
Комментировать
@o5a
Не нужно выдумывать "обновление" содержимого файла, так не работает. Для замены значения в CSV полностью его считываете во вложенный список/словарь, находите нужную строку (по id или еще чему нужно), заменяете в значение в этом списке. И уже после этого полностью перезаписываете файл этим новым списком. Т.е. условно
data = get_all()
затем аналогично get_by_id() ищем нужную запись в этом массиве и заменяем ее на новую, получаем обновленный вложенный список data
обновленный массив записываем назад в файл: write(data)

Для работы с CSV вместо ручного разбиения по символам есть родная библиотека csv, там есть уже более специализированные методы (с выбором различных параметров: символ разделителя, экранирование кавычек и т.п.)
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы