import psycopg2 from psycopg2.extras import RealDictCursor # Параметры подключения к БД DB_CONFIG = { "dbname": "parsed_url", "user": "postgres", "password": "qwertyqwerty123123", "host": "45.129.78.228", "connect_timeout": 10, "options": "-c statement_timeout=30000" } # Глобальное подключение к БД conn = None def get_connection(): """Получает подключение к БД, создавая новое при необходимости""" global conn if conn is None or conn.closed: conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = True return conn # Проверяет, есть ли указанный URL в базе данных. def check_url_exists(url: str): conn = get_connection() try: conn = get_connection() with conn.cursor() as cursor: cursor.execute( "SELECT 1 FROM url WHERE url = %s LIMIT 1", (url,) ) result = cursor.fetchone() return {"exists": bool(result)} except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}") finally: pass # работа с базой данных показывания задач work_parser def create_table(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS work_parser ( id SERIAL PRIMARY KEY, status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, source_url TEXT, error_message TEXT, attempts INTEGER DEFAULT 0, priority INTEGER DEFAULT 0 ); """) print("Таблица work_parser создана или уже существует") finally: pass def insert_task(status, source_url=None, source_id=None, priority=0): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO work_parser (status, source_url, priority) VALUES (%s, %s, %s) RETURNING id; """, (status, source_url, priority)) task_id = cur.fetchone()[0] return task_id finally: pass def get_tasks_offset(limit, offset): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT * FROM work_parser ORDER BY created_at DESC LIMIT %s OFFSET %s """, (limit, offset)) tasks = cur.fetchall() return tasks finally: pass def delete_task(task_id: int): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,)) deleted_task = cur.fetchone() if deleted_task: return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)} else: return {"message": f"Задача с id {task_id} не найдена"} finally: pass def update_task(task_id, **fields): conn = get_connection() try: allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority'] set_parts = [] values = [] for key, value in fields.items(): if key in allowed_fields: set_parts.append(f"{key} = %s") values.append(value) if not set_parts: return False values.append(task_id) set_sql = ", ".join(set_parts) with conn.cursor() as cur: cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values) return True finally: pass # Создание и работа с таблицей по созданию и редактированию промтов def create_table_config_gpt(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS config_gpt ( name VARCHAR(20) PRIMARY KEY, promt TEXT ); """) print("Таблица config_gpt создана или уже существует") finally: pass def update_promt(name: str, promt: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO config_gpt (name, promt) VALUES ( %s, %s) ON CONFLICT (name) DO UPDATE SET promt = EXCLUDED.promt """, (name, promt)) conn.commit() finally: pass def get_promt(promt_name_url): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT promt FROM config_gpt WHERE name = %s", (promt_name_url,)) promt = cur.fetchone() return promt['promt'] finally: pass def get_all_promt(): # Возвращает список всех значений поля name из таблицы config_gpt conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT * FROM config_gpt") rows = cur.fetchall() sources = [{"name": row["name"], "promt": row["promt"]} for row in rows] return {"sources": sources} finally: pass def get_all_categories_promt(): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT name FROM config_gpt") rows = cur.fetchall() return [row["name"] for row in rows] finally: pass # Создание, сохранение и работа с таблицей ошибочных ссылок (error_url) def create_table_error_url(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS error_url ( id SERIAL PRIMARY KEY, source_url TEXT NOT NULL, error_sources_url TEXT NOT NULL ); """) print("Таблица error_url создана или уже существует") finally: pass def add_error_url(source_url: str, error_sources_url: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO error_url (source_url, error_sources_url) VALUES (%s, %s) RETURNING id; """, (source_url, error_sources_url)) return cur.fetchone()[0] finally: pass def check_error_url(error_sources_url: str) -> bool: conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT 1 FROM error_url WHERE error_sources_url = %s LIMIT 1; """, (error_sources_url,)) row = cur.fetchone() return row is None finally: pass # Создание и работа с таблицей источников add_sources def create_table_add_sourse(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS sourse ( url TEXT PRIMARY KEY, promt TEXT ); """) print("Таблица sourse создана или уже существует") finally: pass def add_sources(url: str, promt: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO sourse (url, promt) VALUES (%s, %s) ON CONFLICT (url) DO UPDATE SET promt = EXCLUDED.promt """, (url, promt)) conn.commit() finally: pass # Пример использования # if __name__ == "__main__": # # create_table_config_gpt() # <-- раскомментировать эту строку # update_promt({ # "url": "http://korei", # "name": "Корея", # "promt": "Задача: Перевод на русский язык и тематическая фильтрация новостных статей из китайской прессы. \n Необходимо переводить текст статьи и определять, относится ли она к КНР по указанным темам: \n 1. Перевод\n Переведи предоставленный китайский текст на русский язык, сохранив оригинальный смысл, стиль и структуру.\n Текст:\n {content}\n -------------------------------------\n 2. Отбирай исключительно новости, прямо относящиеся к Китаю, его безопасности, соседним странам и территориям, влияющим на интересы Китая.\n Если не относится к Китаю — считаем, что статья НЕ подходит, и отдаем пустой JSON:\n {\"text\": \"\", \"pereskas\": \"\", \"title\": \"\", \"topics\": []}\n Если привязка есть — переходи к шагу 3. \n -------------------------------------\n 3. Тематическая классификация\n Определи, относится ли статья к одной или нескольким темам из списка:\n 1) Военные новости — конфликты, учения, мобилизация, закупки вооружений. \n 2) Пограничная деятельность — охрана границы, пограничные учения, строительство или модернизация пограничной инфраструктуры, техника для пограничников. \n 3) Пункты пропуска на границе с РФ — изменения режима работы, строительство, реконструкция, оборудование, логистика. \n 4) Пограничные реки — состояние рек, экология, инфраструктурные проекты, мониторинг. \n 5) Чрезвычайные ситуации — природные и техногенные происшествия, особенно затрагивающие пограничные реки и прилегающие земли. \n 6) Санитарно-эпидемиологическая обстановка — эпидемии, эпизоотии, эпифитотии, угрозы и меры предотвращения. \n 7) Индустриальные проекты (арктическое/антарктическое направление). \n 8) Индустриальные проекты в приграничных районах — заводы, производства, технопарки, новые технологии. \n 9) Инфраструктурные проекты в приграничных районах — дороги, мосты, транспорт, логистика. \n 10) Культура малочисленных народностей (нанайцы, монголы, уйгуры, нанайцы и хэчжэ) — политика, традиции, бытовая жизнь нанайцев, монголов, уйгуров, и хэчжэ (малочисленных народов).\n\n Отметь только те темы, которым статья действительно соответствует.\n\n -------------------------------------\n 4. Формат ответа \n Вернуть строго JSON без пояснений и дополнительных слов:\n {\n \"translation_text\": \"<перевод текста статьи на русский язык (дословный, точный и без сокращений ) >\",\n \"short_text\": \"<пересказ переведённого текста>\",\n \"title\": \"<краткая суть новости (1–2 предложения)>\",\n \"category\": \"<названий категорий, которым соответствует статья>\"\n }\n Если статья не относится ни к одной теме или не привязана к нужным регионам — вернуть:\n {\"translation_text\": \"\", \"short_text\": \"\", \"title\": \"\", \"category\": \"\"}" # }) # # print(get_promt("japan")) # # create_table_error_url() # create_table_add_sourse()