import psycopg2 from psycopg2.extras import RealDictCursor # Параметры подключения к БД DB_CONFIG = { "dbname": "parsed_url", "user": "postgres", "password": "qwertyqwerty123123", "host": "45.129.78.228", "connect_timeout": 10, "options": "-c statement_timeout=30000" } # Модель для данных, которые приходят в POST class ParsedData(BaseModel): url: str parsed_at: str title: str original_text: str article_date: str status: Optional[bool] = False viewed: Optional[bool] = False tematik: Optional[bool] = False svodka: Optional[bool] = False donesenie: Optional[bool] = False bilutene: Optional[bool] = False other: str category: str translation_text: str short_text: str # Функции для работы с БД (без эндпоинтов) def save_parsed_data_to_db(data: ParsedData): conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(""" INSERT INTO url (url, parsed_at, title, original_text, article_date, status, viewed, tematik, svodka, donesenie, bilutene, other, category, translation_text, short_text) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url) DO UPDATE SET parsed_at = EXCLUDED.parsed_at, title = EXCLUDED.title, original_text = EXCLUDED.original_text, article_date = EXCLUDED.article_date, status = EXCLUDED.status, viewed = EXCLUDED.viewed, tematik = EXCLUDED.tematik, svodka = EXCLUDED.svodka, donesenie = EXCLUDED.donesenie, bilutene = EXCLUDED.bilutene, other = EXCLUDED.other, category = EXCLUDED.category, translation_text = EXCLUDED.translation_text, short_text = EXCLUDED.short_text; """, (data.url, data.parsed_at, data.title, data.original_text, data.article_date, data.status, data.viewed, data.tematik, data.svodka, data.donesenie, data.bilutene, data.other, data.category, data.translation_text, data.short_text)) conn.commit() return {"status": "success", "message": "Данные успешно сохранены"} except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() # Глобальное подключение к БД conn = None def get_connection(): """Получает подключение к БД, создавая новое при необходимости""" global conn if conn is None or conn.closed: conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = True return conn # Проверяет, есть ли указанный URL в базе данных. def check_url_exists(url: str): conn = get_connection() try: conn = get_connection() with conn.cursor() as cursor: cursor.execute( "SELECT 1 FROM url WHERE url = %s LIMIT 1", (url,) ) result = cursor.fetchone() return {"exists": bool(result)} except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}") finally: pass # работа с базой данных показывания задач work_parser def create_table(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS work_parser ( id SERIAL PRIMARY KEY, status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, source_url TEXT, error_message TEXT, attempts INTEGER DEFAULT 0, priority INTEGER DEFAULT 0 ); """) print("Таблица work_parser создана или уже существует") finally: pass def insert_task(status, source_url=None, source_id=None, priority=0): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO work_parser (status, source_url, priority) VALUES (%s, %s, %s) RETURNING id; """, (status, source_url, priority)) task_id = cur.fetchone()[0] return task_id finally: pass def get_tasks_offset(limit, offset): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT * FROM work_parser ORDER BY created_at DESC LIMIT %s OFFSET %s """, (limit, offset)) tasks = cur.fetchall() return tasks finally: pass def delete_task(task_id: int): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,)) deleted_task = cur.fetchone() if deleted_task: return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)} else: return {"message": f"Задача с id {task_id} не найдена"} finally: pass def update_task(task_id, **fields): conn = get_connection() try: allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority'] set_parts = [] values = [] for key, value in fields.items(): if key in allowed_fields: set_parts.append(f"{key} = %s") values.append(value) if not set_parts: return False values.append(task_id) set_sql = ", ".join(set_parts) with conn.cursor() as cur: cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values) return True finally: pass # Создание и работа с таблицей по созданию и редактированию промтов def create_table_config_gpt(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS config_gpt ( name VARCHAR(20) PRIMARY KEY, promt TEXT ); """) print("Таблица config_gpt создана или уже существует") finally: pass def update_promt(name: str, promt: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO config_gpt (name, promt) VALUES ( %s, %s) ON CONFLICT (name) DO UPDATE SET promt = EXCLUDED.promt """, (name, promt)) conn.commit() finally: pass def get_promt(promt_name_url): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT promt FROM config_gpt WHERE name = %s", (promt_name_url,)) promt = cur.fetchone() return promt['promt'] finally: pass def get_all_promt(): # Возвращает список всех значений поля name из таблицы config_gpt conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT * FROM config_gpt") rows = cur.fetchall() sources = [{"name": row["name"], "promt": row["promt"]} for row in rows] return {"sources": sources} finally: pass def get_all_categories_promt(): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT name FROM config_gpt") rows = cur.fetchall() return [row["name"] for row in rows] finally: pass # Создание, сохранение и работа с таблицей ошибочных ссылок (error_url) def create_table_error_url(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS error_url ( id SERIAL PRIMARY KEY, source_url TEXT NOT NULL, error_sources_url TEXT NOT NULL ); """) print("Таблица error_url создана или уже существует") finally: pass def add_error_url(source_url: str, error_sources_url: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO error_url (source_url, error_sources_url) VALUES (%s, %s) RETURNING id; """, (source_url, error_sources_url)) return cur.fetchone()[0] finally: pass def check_error_url(error_sources_url: str) -> bool: conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT 1 FROM error_url WHERE error_sources_url = %s LIMIT 1; """, (error_sources_url,)) row = cur.fetchone() return row is None finally: pass # Создание и работа с таблицей источников sources def create_table_add_sourse(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS sourse ( url TEXT PRIMARY KEY, promt TEXT ); """) print("Таблица sourse создана или уже существует") finally: pass def add_sources(url: str, promt: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO sourse (url, promt) VALUES (%s, %s) ON CONFLICT (url) DO UPDATE SET promt = EXCLUDED.promt """, (url, promt)) conn.commit() finally: pass def get_all_sources(): """Возвращает все записи из таблицы sourse""" conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT * FROM sourse") rows = cur.fetchall() sources = [{"url": row["url"], "promt": row["promt"]} for row in rows] return {"sources": sources} finally: pass def delete_sources(url: str): """Удаляет источник по URL из таблицы sourse""" conn = get_connection() try: with conn.cursor() as cur: cur.execute("DELETE FROM sourse WHERE url=%s RETURNING *", (url,)) deleted_task = cur.fetchone() conn.commit() if deleted_task: return {"message": f"Источник {url} удалён", "deleted": True} else: return {"message": f"Источник с url {url} не найден", "deleted": False} except Exception as e: print(f"Ошибка при удалении источника: {e}") return {"error": str(e), "deleted": False} finally: pass # Пример использования # if __name__ == "__main__": # # create_table_config_gpt() # <-- раскомментировать эту строку # update_promt({ # "url": "http://korei", # "name": "Корея", # "promt": "Задача: Перевод на русский язык и тематическая фильтрация новостных статей из китайской прессы. \n Необходимо переводить текст статьи и определять, относится ли она к КНР по указанным темам: \n 1. Перевод\n Переведи предоставленный китайский текст на русский язык, сохранив оригинальный смысл, стиль и структуру.\n Текст:\n {content}\n -------------------------------------\n 2. Отбирай исключительно новости, прямо относящиеся к Китаю, его безопасности, соседним странам и территориям, влияющим на интересы Китая.\n Если не относится к Китаю — считаем, что статья НЕ подходит, и отдаем пустой JSON:\n {\"text\": \"\", \"pereskas\": \"\", \"title\": \"\", \"topics\": []}\n Если привязка есть — переходи к шагу 3. \n -------------------------------------\n 3. Тематическая классификация\n Определи, относится ли статья к одной или нескольким темам из списка:\n 1) Военные новости — конфликты, учения, мобилизация, закупки вооружений. \n 2) Пограничная деятельность — охрана границы, пограничные учения, строительство или модернизация пограничной инфраструктуры, техника для пограничников. \n 3) Пункты пропуска на границе с РФ — изменения режима работы, строительство, реконструкция, оборудование, логистика. \n 4) Пограничные реки — состояние рек, экология, инфраструктурные проекты, мониторинг. \n 5) Чрезвычайные ситуации — природные и техногенные происшествия, особенно затрагивающие пограничные реки и прилегающие земли. \n 6) Санитарно-эпидемиологическая обстановка — эпидемии, эпизоотии, эпифитотии, угрозы и меры предотвращения. \n 7) Индустриальные проекты (арктическое/антарктическое направление). \n 8) Индустриальные проекты в приграничных районах — заводы, производства, технопарки, новые технологии. \n 9) Инфраструктурные проекты в приграничных районах — дороги, мосты, транспорт, логистика. \n 10) Культура малочисленных народностей (нанайцы, монголы, уйгуры, нанайцы и хэчжэ) — политика, традиции, бытовая жизнь нанайцев, монголов, уйгуров, и хэчжэ (малочисленных народов).\n\n Отметь только те темы, которым статья действительно соответствует.\n\n -------------------------------------\n 4. Формат ответа \n Вернуть строго JSON без пояснений и дополнительных слов:\n {\n \"translation_text\": \"<перевод текста статьи на русский язык (дословный, точный и без сокращений ) >\",\n \"short_text\": \"<пересказ переведённого текста>\",\n \"title\": \"<краткая суть новости (1–2 предложения)>\",\n \"category\": \"<названий категорий, которым соответствует статья>\"\n }\n Если статья не относится ни к одной теме или не привязана к нужным регионам — вернуть:\n {\"translation_text\": \"\", \"short_text\": \"\", \"title\": \"\", \"category\": \"\"}" # }) # # print(get_promt("japan")) # # create_table_error_url() # create_table_add_sourse() # delete_sources("https://www.taipeitimes.com/")