Files
parser/work_parser.py
Игорь Бандурист e894e7f9f5
All checks were successful
continuous-integration/drone/push Build is passing
исправил косяки
2026-05-01 20:22:40 +10:00

357 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import psycopg2
from psycopg2.extras import RealDictCursor
from pydantic import BaseModel
from typing import List, Optional
from fastapi import HTTPException
# Параметры подключения к БД
DB_CONFIG = {
"dbname": "parsed_url",
"user": "postgres",
"password": "qwertyqwerty123123",
"host": "45.129.78.228",
"connect_timeout": 10,
"options": "-c statement_timeout=30000"
}
# Модель для данных, которые приходят в POST
class ParsedData(BaseModel):
url: str
parsed_at: str
title: str
original_text: str
article_date: str
status: Optional[bool] = False
viewed: Optional[bool] = False
tematik: Optional[bool] = False
svodka: Optional[bool] = False
donesenie: Optional[bool] = False
bilutene: Optional[bool] = False
other: str
category: str
translation_text: str
short_text: str
# Функции для работы с БД (без эндпоинтов)
def save_parsed_data_to_db(data: ParsedData):
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
cur.execute("""
INSERT INTO url (url, parsed_at, title, original_text, article_date, status, viewed, tematik, svodka, donesenie, bilutene, other, category, translation_text, short_text)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
parsed_at = EXCLUDED.parsed_at,
title = EXCLUDED.title,
original_text = EXCLUDED.original_text,
article_date = EXCLUDED.article_date,
status = EXCLUDED.status,
viewed = EXCLUDED.viewed,
tematik = EXCLUDED.tematik,
svodka = EXCLUDED.svodka,
donesenie = EXCLUDED.donesenie,
bilutene = EXCLUDED.bilutene,
other = EXCLUDED.other,
category = EXCLUDED.category,
translation_text = EXCLUDED.translation_text,
short_text = EXCLUDED.short_text;
""", (data.url, data.parsed_at, data.title, data.original_text, data.article_date, data.status, data.viewed, data.tematik, data.svodka, data.donesenie, data.bilutene, data.other, data.category, data.translation_text, data.short_text))
conn.commit()
return {"status": "success", "message": "Данные успешно сохранены"}
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
conn.close()
# Глобальное подключение к БД
conn = None
def get_connection():
"""Получает подключение к БД, создавая новое при необходимости"""
global conn
if conn is None or conn.closed:
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = True
return conn
# Проверяет, есть ли указанный URL в базе данных.
def check_url_exists(url: str):
conn = get_connection()
try:
conn = get_connection()
with conn.cursor() as cursor:
cursor.execute(
"SELECT 1 FROM url WHERE url = %s LIMIT 1",
(url,)
)
result = cursor.fetchone()
return {"exists": bool(result)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}")
finally:
pass
# работа с базой данных показывания задач work_parser
def create_table():
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS work_parser (
id SERIAL PRIMARY KEY,
status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
source_url TEXT,
error_message TEXT,
attempts INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0
);
""")
print("Таблица work_parser создана или уже существует")
finally:
pass
def insert_task(status, source_url=None, source_id=None, priority=0):
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO work_parser (status, source_url, priority)
VALUES (%s, %s, %s)
RETURNING id;
""", (status, source_url, priority))
task_id = cur.fetchone()[0]
return task_id
finally:
pass
def get_tasks_offset(limit, offset):
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT * FROM work_parser
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (limit, offset))
tasks = cur.fetchall()
return tasks
finally:
pass
def delete_task(task_id: int):
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,))
deleted_task = cur.fetchone()
if deleted_task:
return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)}
else:
return {"message": f"Задача с id {task_id} не найдена"}
finally:
pass
def update_task(task_id, **fields):
conn = get_connection()
try:
allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority']
set_parts = []
values = []
for key, value in fields.items():
if key in allowed_fields:
set_parts.append(f"{key} = %s")
values.append(value)
if not set_parts:
return False
values.append(task_id)
set_sql = ", ".join(set_parts)
with conn.cursor() as cur:
cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values)
return True
finally:
pass
# Создание и работа с таблицей по созданию и редактированию промтов
def create_table_config_gpt():
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS config_gpt (
name VARCHAR(20) PRIMARY KEY,
promt TEXT
);
""")
print("Таблица config_gpt создана или уже существует")
finally:
pass
def update_promt(name: str, promt: str):
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO config_gpt (name, promt)
VALUES ( %s, %s)
ON CONFLICT (name) DO UPDATE SET
promt = EXCLUDED.promt
""", (name, promt))
conn.commit()
finally:
pass
def get_promt(promt_name_url):
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT promt FROM config_gpt WHERE name = %s", (promt_name_url,))
promt = cur.fetchone()
return promt['promt']
finally:
pass
def get_all_promt():
# Возвращает список всех значений поля name из таблицы config_gpt
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM config_gpt")
rows = cur.fetchall()
sources = [{"name": row["name"], "promt": row["promt"]} for row in rows]
return {"sources": sources}
finally:
pass
def get_all_categories_promt():
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT name FROM config_gpt")
rows = cur.fetchall()
return [row["name"] for row in rows]
finally:
pass
# Создание, сохранение и работа с таблицей ошибочных ссылок (error_url)
def create_table_error_url():
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS error_url (
id SERIAL PRIMARY KEY,
source_url TEXT NOT NULL,
error_sources_url TEXT NOT NULL
);
""")
print("Таблица error_url создана или уже существует")
finally:
pass
def add_error_url(source_url: str, error_sources_url: str):
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO error_url (source_url, error_sources_url)
VALUES (%s, %s)
RETURNING id;
""", (source_url, error_sources_url))
return cur.fetchone()[0]
finally:
pass
def check_error_url(error_sources_url: str) -> bool:
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT 1 FROM error_url
WHERE error_sources_url = %s
LIMIT 1;
""", (error_sources_url,))
row = cur.fetchone()
return row is None
finally:
pass
# Создание и работа с таблицей источников sources
def create_table_add_sourse():
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS sourse (
url TEXT PRIMARY KEY,
promt TEXT
);
""")
print("Таблица sourse создана или уже существует")
finally:
pass
def add_sources(url: str, promt: str):
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO sourse (url, promt)
VALUES (%s, %s)
ON CONFLICT (url) DO UPDATE SET
promt = EXCLUDED.promt
""", (url, promt))
conn.commit()
finally:
pass
def get_all_sources():
"""Возвращает все записи из таблицы sourse"""
conn = get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM sourse")
rows = cur.fetchall()
sources = [{"url": row["url"], "promt": row["promt"]} for row in rows]
return {"sources": sources}
finally:
pass
def delete_sources(url: str):
"""Удаляет источник по URL из таблицы sourse"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM sourse WHERE url=%s RETURNING *", (url,))
deleted_task = cur.fetchone()
conn.commit()
if deleted_task:
return {"message": f"Источник {url} удалён", "deleted": True}
else:
return {"message": f"Источник с url {url} не найден", "deleted": False}
except Exception as e:
print(f"Ошибка при удалении источника: {e}")
return {"error": str(e), "deleted": False}
finally:
pass
# Пример использования
# if __name__ == "__main__":
# # create_table_config_gpt() # <-- раскомментировать эту строку
# update_promt({
# "url": "http://korei",
# "name": "Корея",
# "promt": "Задача: Перевод на русский язык и тематическая фильтрация новостных статей из китайской прессы. \n Необходимо переводить текст статьи и определять, относится ли она к КНР по указанным темам: \n 1. Перевод\n Переведи предоставленный китайский текст на русский язык, сохранив оригинальный смысл, стиль и структуру.\n Текст:\n {content}\n -------------------------------------\n 2. Отбирай исключительно новости, прямо относящиеся к Китаю, его безопасности, соседним странам и территориям, влияющим на интересы Китая.\n Если не относится к Китаю — считаем, что статья НЕ подходит, и отдаем пустой JSON:\n {\"text\": \"\", \"pereskas\": \"\", \"title\": \"\", \"topics\": []}\n Если привязка есть — переходи к шагу 3. \n -------------------------------------\n 3. Тематическая классификация\n Определи, относится ли статья к одной или нескольким темам из списка:\n 1) Военные новости — конфликты, учения, мобилизация, закупки вооружений. \n 2) Пограничная деятельность — охрана границы, пограничные учения, строительство или модернизация пограничной инфраструктуры, техника для пограничников. \n 3) Пункты пропуска на границе с РФ — изменения режима работы, строительство, реконструкция, оборудование, логистика. \n 4) Пограничные реки — состояние рек, экология, инфраструктурные проекты, мониторинг. \n 5) Чрезвычайные ситуации — природные и техногенные происшествия, особенно затрагивающие пограничные реки и прилегающие земли. \n 6) Санитарно-эпидемиологическая обстановка — эпидемии, эпизоотии, эпифитотии, угрозы и меры предотвращения. \n 7) Индустриальные проекты (арктическое/антарктическое направление). \n 8) Индустриальные проекты в приграничных районах — заводы, производства, технопарки, новые технологии. \n 9) Инфраструктурные проекты в приграничных районах — дороги, мосты, транспорт, логистика. \n 10) Культура малочисленных народностей (нанайцы, монголы, уйгуры, нанайцы и хэчжэ) — политика, традиции, бытовая жизнь нанайцев, монголов, уйгуров, и хэчжэ (малочисленных народов).\n\n Отметь только те темы, которым статья действительно соответствует.\n\n -------------------------------------\n 4. Формат ответа \n Вернуть строго JSON без пояснений и дополнительных слов:\n {\n \"translation_text\": \"<перевод текста статьи на русский язык (дословный, точный и без сокращений ) >\",\n \"short_text\": \"<пересказ переведённого текста>\",\n \"title\": \"<краткая суть новости (12 предложения)>\",\n \"category\": \"<названий категорий, которым соответствует статья>\"\n }\n Если статья не относится ни к одной теме или не привязана к нужным регионам — вернуть:\n {\"translation_text\": \"\", \"short_text\": \"\", \"title\": \"\", \"category\": \"\"}"
# })
# # print(get_promt("japan"))
# # create_table_error_url()
# create_table_add_sourse()
# delete_sources("https://www.taipeitimes.com/")