import psycopg2 from psycopg2.extras import RealDictCursor from pydantic import BaseModel from typing import List, Optional from fastapi import HTTPException # Параметры подключения к БД DB_CONFIG = { "dbname": "parsed_url", "user": "postgres", "password": "qwertyqwerty123123", "host": "45.129.78.228", "connect_timeout": 10, "options": "-c statement_timeout=30000" } # Модель для данных, которые приходят в POST class ParsedData(BaseModel): url: str parsed_at: str title: str original_text: str article_date: str status: Optional[bool] = False viewed: Optional[bool] = False tematik: Optional[bool] = False svodka: Optional[bool] = False donesenie: Optional[bool] = False bilutene: Optional[bool] = False other: str category: str translation_text: str short_text: str # Функции для работы с БД (без эндпоинтов) def save_parsed_data_to_db(data: ParsedData): conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(""" INSERT INTO url (url, parsed_at, title, original_text, article_date, status, viewed, tematik, svodka, donesenie, bilutene, other, category, translation_text, short_text) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url) DO UPDATE SET parsed_at = EXCLUDED.parsed_at, title = EXCLUDED.title, original_text = EXCLUDED.original_text, article_date = EXCLUDED.article_date, status = EXCLUDED.status, viewed = EXCLUDED.viewed, tematik = EXCLUDED.tematik, svodka = EXCLUDED.svodka, donesenie = EXCLUDED.donesenie, bilutene = EXCLUDED.bilutene, other = EXCLUDED.other, category = EXCLUDED.category, translation_text = EXCLUDED.translation_text, short_text = EXCLUDED.short_text; """, (data.url, data.parsed_at, data.title, data.original_text, data.article_date, data.status, data.viewed, data.tematik, data.svodka, data.donesenie, data.bilutene, data.other, data.category, data.translation_text, data.short_text)) conn.commit() return {"status": "success", "message": "Данные успешно сохранены"} except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() def get_articles_by_filter(field_name: str, start_date: str, finish_date: str): """ Возвращает список заголовков статей по полю и диапазону дат Args: field_name: имя поля (tematik, svodka, donesenie, bilutene, status) start_date: дата начала в формате YYYY-MM-DD finish_date: дата окончания в формате YYYY-MM-DD Returns: List[str]: список заголовков (title) """ conn = get_connection() try: # Проверка валидности поля allowed_fields = ['tematik', 'svodka', 'donesenie', 'bilutene', 'status'] if field_name not in allowed_fields: raise ValueError(f"Недопустимое поле: {field_name}. Разрешено: {allowed_fields}") with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(f""" SELECT title FROM url WHERE {field_name} = TRUE AND article_date BETWEEN %s AND %s ORDER BY article_date DESC; """, (start_date, finish_date)) rows = cur.fetchall() return [row['title'] for row in rows] except Exception as e: print(f"Ошибка в get_articles_by_filter: {e}") raise # Глобальное подключение к БД conn = None def get_connection(): """Получает подключение к БД, создавая новое при необходимости""" global conn try: # Проверяем, активно ли подключение if conn is None or conn.closed: conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = True # Дополнительная проверка на валидность elif conn.info.transaction_status == 2: # TRANSACTION_IN_TRANS # Подключение активно, но в транзакции — закроем и создадим новое try: conn.close() except: pass conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = True return conn except Exception as e: print(f"Ошибка при получении подключения: {e}") # Сбрасываем подключение и пробуем заново conn = None conn = psycopg2.connect(**DB_CONFIG) conn.autocommit = True return conn def close_connection(): """Закрывает глобальное подключение к БД""" global conn if conn and not conn.closed: conn.close() conn = None # Проверяет, есть ли указанный URL в базе данных. def check_url_exists(url: str): conn = get_connection() try: with conn.cursor() as cursor: cursor.execute( "SELECT 1 FROM url WHERE url = %s LIMIT 1", (url,) ) result = cursor.fetchone() return {"exists": bool(result)} except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}") # работа с базой данных показывания задач work_parser def create_table(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS work_parser ( id SERIAL PRIMARY KEY, status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, source_url TEXT, error_message TEXT, attempts INTEGER DEFAULT 0, priority INTEGER DEFAULT 0 ); """) print("Таблица work_parser создана или уже существует") except Exception as e: print(f"Ошибка при создании таблицы work_parser: {e}") def insert_task(status, source_url=None, source_id=None, priority=0): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO work_parser (status, source_url, priority) VALUES (%s, %s, %s) RETURNING id; """, (status, source_url, priority)) task_id = cur.fetchone()[0] return task_id except Exception as e: print(f"Ошибка при создании задачи: {e}") raise def get_tasks_offset(limit, offset): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT * FROM work_parser ORDER BY created_at DESC LIMIT %s OFFSET %s """, (limit, offset)) tasks = cur.fetchall() return tasks except Exception as e: print(f"Ошибка при получении задач: {e}") raise def delete_task(task_id: int): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,)) deleted_task = cur.fetchone() if deleted_task: return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)} else: return {"message": f"Задача с id {task_id} не найдена"} except Exception as e: print(f"Ошибка при удалении задачи: {e}") raise def update_task(task_id, **fields): conn = get_connection() try: allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority'] set_parts = [] values = [] for key, value in fields.items(): if key in allowed_fields: set_parts.append(f"{key} = %s") values.append(value) if not set_parts: return False values.append(task_id) set_sql = ", ".join(set_parts) with conn.cursor() as cur: cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values) return True except Exception as e: print(f"Ошибка при обновлении задачи: {e}") raise # Создание и работа с таблицей по созданию и редактированию промтов def create_table_config_gpt(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS config_gpt ( name VARCHAR(20) PRIMARY KEY, promt TEXT ); """) print("Таблица config_gpt создана или уже существует") except Exception as e: print(f"Ошибка при создании таблицы config_gpt: {e}") def update_promt(name: str, promt: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO config_gpt (name, promt) VALUES ( %s, %s) ON CONFLICT (name) DO UPDATE SET promt = EXCLUDED.promt """, (name, promt)) conn.commit() except Exception as e: print(f"Ошибка при обновлении промта: {e}") raise def get_promt(promt_name_url): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT promt FROM config_gpt WHERE name = %s", (promt_name_url,)) promt = cur.fetchone() if promt: return promt['promt'] return None except Exception as e: print(f"Ошибка при получении промта: {e}") raise def get_all_promt(): # Возвращает список всех значений поля name из таблицы config_gpt conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT * FROM config_gpt") rows = cur.fetchall() sources = [{"name": row["name"], "promt": row["promt"]} for row in rows] return {"sources": sources} except Exception as e: print(f"Ошибка при получении всех промтов: {e}") raise def get_all_categories_promt(): conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT name FROM config_gpt") rows = cur.fetchall() return [row["name"] for row in rows] except Exception as e: print(f"Ошибка при получении категорий: {e}") raise # Создание, сохранение и работа с таблицей ошибочных ссылок (error_url) def create_table_error_url(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS error_url ( id SERIAL PRIMARY KEY, source_url TEXT NOT NULL, error_sources_url TEXT NOT NULL ); """) print("Таблица error_url создана или уже существует") except Exception as e: print(f"Ошибка при создании таблицы error_url: {e}") def add_error_url(source_url: str, error_sources_url: str): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO error_url (source_url, error_sources_url) VALUES (%s, %s) RETURNING id; """, (source_url, error_sources_url)) return cur.fetchone()[0] except Exception as e: print(f"Ошибка при добавлении error_url: {e}") raise def check_error_url(error_sources_url: str) -> bool: conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT 1 FROM error_url WHERE error_sources_url = %s LIMIT 1; """, (error_sources_url,)) row = cur.fetchone() return row is None except Exception as e: print(f"Ошибка при проверке error_url: {e}") return True # Создание и работа с таблицей источников sources def create_table_add_sourse(): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS sourse ( url TEXT PRIMARY KEY, promt TEXT, status BOOLEAN DEFAULT FALSE ); """) print("Таблица sourse создана или уже существует") except Exception as e: print(f"Ошибка при создании таблицы sourse: {e}") def add_sources(url: str, promt: str, status: bool = False): conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO sourse (url, promt, status) VALUES (%s, %s, %s) ON CONFLICT (url) DO UPDATE SET promt = EXCLUDED.promt, status = EXCLUDED.status """, (url, promt, status)) conn.commit() except Exception as e: print(f"Ошибка при добавлении источника: {e}") raise def get_all_sources(category: str): """Возвращает все записи из таблицы sourse. Сначала показываются записи со status=false""" conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: if category == "all": cur.execute(""" SELECT * FROM sourse ORDER BY status ASC, url ASC """) else: cur.execute(""" SELECT * FROM sourse WHERE promt = %s ORDER BY status ASC, url ASC """, (category,)) rows = cur.fetchall() sources = [{"url": row["url"], "promt": row["promt"], "status": row["status"]} for row in rows] return {"sources": sources} except Exception as e: print(f"Ошибка при получении источников: {e}") return {"error": str(e), "sources": []} def get_true_sources(): """Возвращает все записи из таблицы sourse. Сначала показываются записи со status=true""" conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT * FROM sourse WHERE status = true """) rows = cur.fetchall() sources = {} for row in rows: sources.update({row["url"]: row["promt"]}) return sources except Exception as e: print(f"Ошибка при получении источников: {e}") return {"error": str(e), "sources": []} def update_source_status(url: str, status: bool = True): """Обновляет статус источника по URL""" conn = get_connection() try: with conn.cursor() as cur: cur.execute(""" UPDATE sourse SET status = %s WHERE url = %s """, (status, url)) updated = cur.rowcount conn.commit() return {"message": f"Статус обновлён для {url}", "updated_rows": updated} except Exception as e: print(f"Ошибка при обновлении статуса: {e}") return {"error": str(e), "updated_rows": 0} def delete_sources(url: str): """Удаляет источник по URL из таблицы sourse""" conn = get_connection() try: with conn.cursor() as cur: cur.execute("DELETE FROM sourse WHERE url=%s RETURNING *", (url,)) deleted_task = cur.fetchone() conn.commit() if deleted_task: return {"message": f"Источник {url} удалён", "deleted": True} else: return {"message": f"Источник с url {url} не найден", "deleted": False} except Exception as e: print(f"Ошибка при удалении источника: {e}") return {"error": str(e), "deleted": False} # Пример использования # if __name__ == "__main__": # # create_table_config_gpt() # <-- раскомментировать эту строку # update_promt({ # "url": "http://korei", # "name": "Корея", # "promt": "Задача: Перевод на русский язык и тематическая фильтрация новостных статей из китайской прессы. \n Необходимо переводить текст статьи и определять, относится ли она к КНР по указанным темам: \n 1. Перевод\n Переведи предоставленный китайский текст на русский язык, сохранив оригинальный смысл, стиль и структуру.\n Текст:\n {content}\n -------------------------------------\n 2. Отбирай исключительно новости, прямо относящиеся к Китаю, его безопасности, соседним странам и территориям, влияющим на интересы Китая.\n Если не относится к Китаю — считаем, что статья НЕ подходит, и отдаем пустой JSON:\n {\"text\": \"\", \"pereskas\": \"\", \"title\": \"\", \"topics\": []}\n Если привязка есть — переходи к шагу 3. \n -------------------------------------\n 3. Тематическая классификация\n Определи, относится ли статья к одной или нескольким темам из списка:\n 1) Военные новости — конфликты, учения, мобилизация, закупки вооружений. \n 2) Пограничная деятельность — охрана границы, пограничные учения, строительство или модернизация пограничной инфраструктуры, техника для пограничников. \n 3) Пункты пропуска на границе с РФ — изменения режима работы, строительство, реконструкция, оборудование, логистика. \n 4) Пограничные реки — состояние рек, экология, инфраструктурные проекты, мониторинг. \n 5) Чрезвычайные ситуации — природные и техногенные происшествия, особенно затрагивающие пограничные реки и прилегающие земли. \n 6) Санитарно-эпидемиологическая обстановка — эпидемии, эпизоотии, эпифитотии, угрозы и меры предотвращения. \n 7) Индустриальные проекты (арктическое/антарктическое направление). \n 8) Индустриальные проекты в приграничных районах — заводы, производства, технопарки, новые технологии. \n 9) Инфраструктурные проекты в приграничных районах — дороги, мосты, транспорт, логистика. \n 10) Культура малочисленных народностей (нанайцы, монголы, уйгуры, нанайцы и хэчжэ) — политика, традиции, бытовая жизнь нанайцев, монголов, уйгуров, и хэчжэ (малочисленных народов).\n\n Отметь только те темы, которым статья действительно соответствует.\n\n -------------------------------------\n 4. Формат ответа \n Вернуть строго JSON без пояснений и дополнительных слов:\n {\n \"translation_text\": \"<перевод текста статьи на русский язык (дословный, точный и без сокращений ) >\",\n \"short_text\": \"<пересказ переведённого текста>\",\n \"title\": \"<краткая суть новости (1–2 предложения)>\",\n \"category\": \"<названий категорий, которым соответствует статья>\"\n }\n Если статья не относится ни к одной теме или не привязана к нужным регионам — вернуть:\n {\"translation_text\": \"\", \"short_text\": \"\", \"title\": \"\", \"category\": \"\"}" # }) # # print(get_promt("japan")) # # create_table_error_url() # create_table_add_sourse() # delete_sources("https://www.taipeitimes.com/")