from pydantic import BaseModel from typing import List, Optional import psycopg2 import psycopg2.extras from fastapi import HTTPException # Модель для данных, которые приходят в POST class ParsedData(BaseModel): url: str parsed_at: str title: str original_text: str article_date: str status: Optional[bool] = False viewed: Optional[bool] = False other: str category: str translation_text: str short_text: str # Подключение к БД def get_connection(): return psycopg2.connect( dbname="parsed_url", user="postgres", password="qwertyqwerty123123", host="45.129.78.228" ) # Функции для работы с БД (без эндпоинтов) def save_parsed_data_to_db(data: ParsedData): conn = None try: conn = get_connection() with conn.cursor() as cur: cur.execute(""" INSERT INTO url (url, parsed_at, title, original_text, article_date, status, viewed, other, category, translation_text, short_text) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (url) DO UPDATE SET parsed_at = EXCLUDED.parsed_at, title = EXCLUDED.title, original_text = EXCLUDED.original_text, article_date = EXCLUDED.article_date, status = EXCLUDED.status, viewed = EXCLUDED.viewed, other = EXCLUDED.other, category = EXCLUDED.category, translation_text = EXCLUDED.translation_text, short_text = EXCLUDED.short_text; """, (data.url, data.parsed_at, data.title, data.original_text, data.article_date, data.status, data.viewed, data.other, data.category, data.translation_text, data.short_text)) conn.commit() return {"status": "success", "message": "Данные успешно сохранены"} except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() def update_viewed_status_in_db(url: str, viewed: bool): conn = None try: conn = get_connection() with conn.cursor() as cursor: cursor.execute( """ UPDATE url SET viewed = %s WHERE url = %s """, (viewed, url) ) if cursor.rowcount == 0: return {"found": False} conn.commit() return {"found": True} except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() def update_status_status_in_db(url: str, status: bool): conn = None try: conn = get_connection() with conn.cursor() as cursor: cursor.execute( """ UPDATE url SET status = %s WHERE url = %s """, (status, url) ) if cursor.rowcount == 0: return {"found": False} conn.commit() return {"found": True} except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() def check_url_exists_in_db(url: str): conn = None try: conn = get_connection() with conn.cursor() as cursor: cursor.execute( "SELECT 1 FROM url WHERE url = %s LIMIT 1", (url,) ) result = cursor.fetchone() return {"exists": bool(result)} except Exception as e: raise e finally: if conn: conn.close() def get_records_from_db(offset: int = 0, limit: int = 10): conn = None try: conn = get_connection() with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute(""" SELECT url, parsed_at, title, original_text, article_date, status, viewed, other, category, translation_text, short_text FROM url ORDER BY parsed_at DESC NULLS LAST OFFSET %s LIMIT %s """, (offset, limit)) rows = cur.fetchall() results = [dict(row) for row in rows] return results except Exception as e: raise e finally: if conn: conn.close() def get_records_count_from_db(item: str = "default"): conn = None try: conn = get_connection() with conn.cursor() as cur: if item == "viewed": cur.execute("SELECT COUNT(*) FROM url WHERE viewed = true") elif item == "status": cur.execute("SELECT COUNT(*) FROM url WHERE status = true") elif item == "time": cur.execute("SELECT COUNT(*) FROM url") else: cur.execute("SELECT COUNT(*) FROM url") result = cur.fetchone() return {"count": result[0]} except Exception as e: raise e finally: if conn: conn.close() def get_poisk_count_from_db(query: str, item: str = "default"): conn = None try: conn = get_connection() with conn.cursor() as cur: search_pattern = f"%{query}%" base_query = """SELECT COUNT(*) FROM url WHERE ( title ILIKE %s OR original_text ILIKE %s OR translation_text ILIKE %s OR short_text ILIKE %s OR url ILIKE %s OR category ILIKE %s OR other ILIKE %s )""" params = [search_pattern] * 7 if item == "viewed": base_query += " AND viewed = true" elif item == "status": base_query += " AND status = true" cur.execute(base_query, params) result = cur.fetchone() return {"count": result[0]} except Exception as e: raise e finally: if conn: conn.close() def poisk_from_db(query: str, offset: int = 0, limit: int = 10, item: str = "default"): conn = None try: conn = get_connection() with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: search_pattern = f"%{query}%" base_query = """SELECT * FROM url WHERE ( title ILIKE %s OR original_text ILIKE %s OR translation_text ILIKE %s OR short_text ILIKE %s OR url ILIKE %s OR category ILIKE %s OR other ILIKE %s )""" params = [search_pattern] * 7 if item == "viewed": base_query += " AND viewed = true" elif item == "status": base_query += " AND status = true" base_query += " ORDER BY article_date DESC OFFSET %s LIMIT %s" params.extend([offset, limit]) cur.execute(base_query, params) rows = cur.fetchall() results = [dict(row) for row in rows] return results except Exception as e: raise e finally: if conn: conn.close() def get_records_all_from_db(item: str = "default", offset: int = 0, limit: int = 10): conn = None try: conn = get_connection() with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: if item == "time": cur.execute(""" SELECT * FROM url ORDER BY viewed ASC, parsed_at DESC OFFSET %s LIMIT %s """, (offset, limit)) elif item == "viewed": cur.execute(""" SELECT * FROM url WHERE viewed = true ORDER BY article_date DESC OFFSET %s LIMIT %s """, (offset, limit)) elif item == "status": cur.execute(""" SELECT * FROM url WHERE status = true ORDER BY parsed_at DESC OFFSET %s LIMIT %s """, (offset, limit)) elif item == "Япония" or item == "Корея": cur.execute(""" SELECT * FROM url WHERE other = %s ORDER BY viewed ASC, article_date DESC OFFSET %s LIMIT %s """, (item, offset, limit)) elif item == "Китай": cur.execute(""" SELECT * FROM url WHERE other IN (%s, %s, %s) ORDER BY viewed ASC, article_date DESC OFFSET %s LIMIT %s """, ("source1", "source2", "Китай", offset, limit)) else: cur.execute(""" SELECT * FROM url ORDER BY viewed ASC, article_date DESC OFFSET %s LIMIT %s """, (offset, limit)) rows = cur.fetchall() results = [dict(row) for row in rows] return results except Exception as e: raise e finally: if conn: conn.close()