добавил закрытие бд для пресечения утечки данных
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2026-04-11 12:14:01 +10:00
parent 8c790b018c
commit a813a3805e

View File

@@ -12,169 +12,229 @@ conn = psycopg2.connect(
options="-c statement_timeout=30000" # таймаут запроса 30 сек options="-c statement_timeout=30000" # таймаут запроса 30 сек
) )
conn.autocommit = True conn.autocommit = True
# работа с базой данных показывания задач work_parser
def close_connection():
"""Закрывает подключение к БД"""
global conn
if conn:
conn.close()
conn = None
def create_table(): def create_table():
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
CREATE TABLE IF NOT EXISTS work_parser ( cur.execute("""
id SERIAL PRIMARY KEY, CREATE TABLE IF NOT EXISTS work_parser (
status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')), id SERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')),
started_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ, started_at TIMESTAMPTZ,
source_url TEXT, finished_at TIMESTAMPTZ,
error_message TEXT, source_url TEXT,
attempts INTEGER DEFAULT 0, error_message TEXT,
priority INTEGER DEFAULT 0 attempts INTEGER DEFAULT 0,
); priority INTEGER DEFAULT 0
""") );
print("Таблица work_parser создана или уже существует") """)
print("Таблица work_parser создана или уже существует")
finally:
if conn:
conn.close()
def insert_task(status, source_url=None, source_id=None, priority=0): def insert_task(status, source_url=None, source_id=None, priority=0):
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
INSERT INTO work_parser (status, source_url, priority) cur.execute("""
VALUES (%s, %s, %s) INSERT INTO work_parser (status, source_url, priority)
RETURNING id; VALUES (%s, %s, %s)
""", (status, source_url, priority)) RETURNING id;
task_id = cur.fetchone()[0] """, (status, source_url, priority))
return task_id task_id = cur.fetchone()[0]
return task_id
finally:
if conn:
conn.close()
def get_tasks_offset(limit, offset): def get_tasks_offset(limit, offset):
with conn.cursor(cursor_factory=RealDictCursor) as cur: try:
cur.execute(""" with conn.cursor(cursor_factory=RealDictCursor) as cur:
SELECT * FROM work_parser cur.execute("""
ORDER BY created_at DESC SELECT * FROM work_parser
LIMIT %s OFFSET %s ORDER BY created_at DESC
""", (limit, offset)) LIMIT %s OFFSET %s
tasks = cur.fetchall() """, (limit, offset))
return tasks tasks = cur.fetchall()
return tasks
finally:
if conn:
conn.close()
def delete_task(task_id: int): def delete_task(task_id: int):
with conn.cursor(cursor_factory=RealDictCursor) as cur: try:
cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,)) with conn.cursor(cursor_factory=RealDictCursor) as cur:
deleted_task = cur.fetchone() cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,))
if deleted_task: deleted_task = cur.fetchone()
return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)} if deleted_task:
else: return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)}
return {"message": f"Задача с id {task_id} не найдена"} else:
return {"message": f"Задача с id {task_id} не найдена"}
finally:
if conn:
conn.close()
def update_task(task_id, **fields): def update_task(task_id, **fields):
# dynamic update query generator try:
allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority'] allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority']
set_parts = [] set_parts = []
values = [] values = []
for key, value in fields.items(): for key, value in fields.items():
if key in allowed_fields: if key in allowed_fields:
set_parts.append(f"{key} = %s") set_parts.append(f"{key} = %s")
values.append(value) values.append(value)
if not set_parts: if not set_parts:
return False return False
values.append(task_id) values.append(task_id)
set_sql = ", ".join(set_parts) set_sql = ", ".join(set_parts)
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values) cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values)
return True return True
finally:
if conn:
conn.close()
# Создание и работа с таблицей по созданию и редактированию промтов
def create_table_config_gpt(): def create_table_config_gpt():
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
CREATE TABLE IF NOT EXISTS config_gpt ( cur.execute("""
url TEXT PRIMARY KEY, CREATE TABLE IF NOT EXISTS config_gpt (
name VARCHAR(20), url TEXT PRIMARY KEY,
promt TEXT name VARCHAR(20),
); promt TEXT
""") );
print("Таблица config_gpt создана или уже существует") """)
print("Таблица config_gpt создана или уже существует")
finally:
if conn:
conn.close()
def update_promt(url: str, name: str, promt: str): def update_promt(url: str, name: str, promt: str):
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
INSERT INTO config_gpt (url, name, promt) cur.execute("""
VALUES (%s, %s, %s) INSERT INTO config_gpt (url, name, promt)
ON CONFLICT (url) DO UPDATE SET VALUES (%s, %s, %s)
name = EXCLUDED.name, ON CONFLICT (url) DO UPDATE SET
promt = EXCLUDED.promt name = EXCLUDED.name,
""", (url, name, promt)) promt = EXCLUDED.promt
conn.commit() """, (url, name, promt))
conn.commit()
finally:
if conn:
conn.close()
def get_promt(promt_name_url): def get_promt(promt_name_url):
with conn.cursor(cursor_factory=RealDictCursor) as cur: try:
cur.execute("SELECT promt FROM config_gpt WHERE url = %s", (promt_name_url,)) with conn.cursor(cursor_factory=RealDictCursor) as cur:
promt = cur.fetchone() cur.execute("SELECT promt FROM config_gpt WHERE url = %s", (promt_name_url,))
return promt['promt'] promt = cur.fetchone()
return promt['promt']
finally:
if conn:
conn.close()
def get_all_promt(): def get_all_promt():
with conn.cursor(cursor_factory=RealDictCursor) as cur: try:
cur.execute("SELECT * FROM config_gpt") with conn.cursor(cursor_factory=RealDictCursor) as cur:
rows = cur.fetchall() cur.execute("SELECT * FROM config_gpt")
rows = cur.fetchall()
sources = [{"url": row["url"], "name": row["name"], "promt": row["promt"]} for row in rows]
return {"sources": sources} sources = [{"url": row["url"], "name": row["name"], "promt": row["promt"]} for row in rows]
return {"sources": sources}
finally:
if conn:
conn.close()
# Возвращает список всех значений поля name из таблицы config_gpt
def get_all_categories_promt(): def get_all_categories_promt():
with conn.cursor(cursor_factory=RealDictCursor) as cur: try:
cur.execute("SELECT name FROM config_gpt") with conn.cursor(cursor_factory=RealDictCursor) as cur:
rows = cur.fetchall() cur.execute("SELECT name FROM config_gpt")
rows = cur.fetchall()
return [row["name"] for row in rows]
return [row["name"] for row in rows]
finally:
if conn:
conn.close()
# Создание, сохранение и работа с таблицей ошибочных ссылок (error_url)
def create_table_error_url(): def create_table_error_url():
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
CREATE TABLE IF NOT EXISTS error_url ( cur.execute("""
id SERIAL PRIMARY KEY, CREATE TABLE IF NOT EXISTS error_url (
source_url TEXT NOT NULL, id SERIAL PRIMARY KEY,
error_sources_url TEXT NOT NULL source_url TEXT NOT NULL,
); error_sources_url TEXT NOT NULL
""") );
print("Таблица error_url создана или уже существует") """)
print("Таблица error_url создана или уже существует")
finally:
if conn:
conn.close()
def add_error_url(source_url: str, error_sources_url: str): def add_error_url(source_url: str, error_sources_url: str):
"""Добавляет запись об ошибке URL""" try:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(""" cur.execute("""
INSERT INTO error_url (source_url, error_sources_url) INSERT INTO error_url (source_url, error_sources_url)
VALUES (%s, %s) VALUES (%s, %s)
RETURNING id; RETURNING id;
""", (source_url, error_sources_url)) """, (source_url, error_sources_url))
return cur.fetchone()[0] return cur.fetchone()[0]
finally:
if conn:
conn.close()
def check_error_url(error_sources_url: str) -> bool: def check_error_url(error_sources_url: str) -> bool:
"""Проверяет, есть ли запись в таблице error_url с таким URL""" try:
with conn.cursor(cursor_factory=RealDictCursor) as cur: with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(""" cur.execute("""
SELECT 1 FROM error_url SELECT 1 FROM error_url
WHERE error_sources_url = %s WHERE error_sources_url = %s
LIMIT 1; LIMIT 1;
""", (error_sources_url,)) """, (error_sources_url,))
row = cur.fetchone() row = cur.fetchone()
return row is None return row is None
finally:
if conn:
conn.close()
# Создание и работа с таблицей источников add_sources
def create_table_add_sourse(): def create_table_add_sourse():
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
CREATE TABLE IF NOT EXISTS sourse ( cur.execute("""
url TEXT PRIMARY KEY, CREATE TABLE IF NOT EXISTS sourse (
promt TEXT url TEXT PRIMARY KEY,
promt TEXT
); );
""") """)
print("Таблица sourse создана или уже существует") print("Таблица sourse создана или уже существует")
finally:
if conn:
conn.close()
def add_sources(url: str, promt: str): def add_sources(url: str, promt: str):
with conn.cursor() as cur: try:
cur.execute(""" with conn.cursor() as cur:
INSERT INTO config_gpt (url, promt) cur.execute("""
VALUES (%s, %s) INSERT INTO config_gpt (url, promt)
ON CONFLICT (url) DO UPDATE SET VALUES (%s, %s)
promt = EXCLUDED.promt ON CONFLICT (url) DO UPDATE SET
""", (url, promt)) promt = EXCLUDED.promt
conn.commit() """, (url, promt))
conn.commit()
finally:
if conn:
conn.close()
# Пример использования # Пример использования
# if __name__ == "__main__": # if __name__ == "__main__":