Files
parser/work_parser.py

189 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import psycopg2
from psycopg2.extras import RealDictCursor
# Подключение к БД (укажи свои параметры)
conn = psycopg2.connect(
dbname="parsed_url",
user="postgres",
password="qwertyqwerty123123",
host="45.129.78.228",
# host="127.0.0.1"
connect_timeout=10,
options="-c statement_timeout=30000" # таймаут запроса 30 сек
)
conn.autocommit = True
# работа с базой данных показывания задач work_parser
def create_table():
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS work_parser (
id SERIAL PRIMARY KEY,
status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
source_url TEXT,
error_message TEXT,
attempts INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0
);
""")
print("Таблица work_parser создана или уже существует")
def insert_task(status, source_url=None, source_id=None, priority=0):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO work_parser (status, source_url, priority)
VALUES (%s, %s, %s)
RETURNING id;
""", (status, source_url, priority))
task_id = cur.fetchone()[0]
return task_id
def get_tasks_offset(limit, offset):
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT * FROM work_parser
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (limit, offset))
tasks = cur.fetchall()
return tasks
def delete_task(task_id: int):
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,))
deleted_task = cur.fetchone()
if deleted_task:
return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)}
else:
return {"message": f"Задача с id {task_id} не найдена"}
def update_task(task_id, **fields):
# dynamic update query generator
allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority']
set_parts = []
values = []
for key, value in fields.items():
if key in allowed_fields:
set_parts.append(f"{key} = %s")
values.append(value)
if not set_parts:
return False
values.append(task_id)
set_sql = ", ".join(set_parts)
with conn.cursor() as cur:
cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values)
return True
# Создание и работа с таблицей по созданию и редактированию промтов
def create_table_config_gpt():
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS config_gpt (
url TEXT PRIMARY KEY,
name VARCHAR(20),
promt TEXT
);
""")
print("Таблица config_gpt создана или уже существует")
def update_promt(url: str, name: str, promt: str):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO config_gpt (url, name, promt)
VALUES (%s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
name = EXCLUDED.name,
promt = EXCLUDED.promt
""", (url, name, promt))
conn.commit()
def get_promt(promt_name_url):
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT promt FROM config_gpt WHERE url = %s", (promt_name_url,))
promt = cur.fetchone()
return promt['promt']
def get_all_promt():
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM config_gpt")
rows = cur.fetchall()
sources = [{"url": row["url"], "name": row["name"], "promt": row["promt"]} for row in rows]
return {"sources": sources}
# Возвращает список всех значений поля name из таблицы config_gpt
def get_all_categories_promt():
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT name FROM config_gpt")
rows = cur.fetchall()
return [row["name"] for row in rows]
# Создание, сохранение и работа с таблицей ошибочных ссылок (error_url)
def create_table_error_url():
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS error_url (
id SERIAL PRIMARY KEY,
source_url TEXT NOT NULL,
error_sources_url TEXT NOT NULL
);
""")
print("Таблица error_url создана или уже существует")
def add_error_url(source_url: str, error_sources_url: str):
"""Добавляет запись об ошибке URL"""
with conn.cursor() as cur:
cur.execute("""
INSERT INTO error_url (source_url, error_sources_url)
VALUES (%s, %s)
RETURNING id;
""", (source_url, error_sources_url))
return cur.fetchone()[0]
def check_error_url(error_sources_url: str) -> bool:
"""Проверяет, есть ли запись в таблице error_url с таким URL"""
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT 1 FROM error_url
WHERE error_sources_url = %s
LIMIT 1;
""", (error_sources_url,))
row = cur.fetchone()
return row is None
# Создание и работа с таблицей источников add_sources
def create_table_add_sourse():
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS sourse (
url TEXT PRIMARY KEY,
promt TEXT
);
""")
print("Таблица sourse создана или уже существует")
def add_sources(url: str, promt: str):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO config_gpt (url, promt)
VALUES (%s, %s)
ON CONFLICT (url) DO UPDATE SET
promt = EXCLUDED.promt
""", (url, promt))
conn.commit()
# Пример использования
# if __name__ == "__main__":
# # create_table_config_gpt() # <-- раскомментировать эту строку
# update_promt({
# "url": "http://korei",
# "name": "Корея",
# "promt": "Задача: Перевод на русский язык и тематическая фильтрация новостных статей из китайской прессы. \n Необходимо переводить текст статьи и определять, относится ли она к КНР по указанным темам: \n 1. Перевод\n Переведи предоставленный китайский текст на русский язык, сохранив оригинальный смысл, стиль и структуру.\n Текст:\n {content}\n -------------------------------------\n 2. Отбирай исключительно новости, прямо относящиеся к Китаю, его безопасности, соседним странам и территориям, влияющим на интересы Китая.\n Если не относится к Китаю — считаем, что статья НЕ подходит, и отдаем пустой JSON:\n {\"text\": \"\", \"pereskas\": \"\", \"title\": \"\", \"topics\": []}\n Если привязка есть — переходи к шагу 3. \n -------------------------------------\n 3. Тематическая классификация\n Определи, относится ли статья к одной или нескольким темам из списка:\n 1) Военные новости — конфликты, учения, мобилизация, закупки вооружений. \n 2) Пограничная деятельность — охрана границы, пограничные учения, строительство или модернизация пограничной инфраструктуры, техника для пограничников. \n 3) Пункты пропуска на границе с РФ — изменения режима работы, строительство, реконструкция, оборудование, логистика. \n 4) Пограничные реки — состояние рек, экология, инфраструктурные проекты, мониторинг. \n 5) Чрезвычайные ситуации — природные и техногенные происшествия, особенно затрагивающие пограничные реки и прилегающие земли. \n 6) Санитарно-эпидемиологическая обстановка — эпидемии, эпизоотии, эпифитотии, угрозы и меры предотвращения. \n 7) Индустриальные проекты (арктическое/антарктическое направление). \n 8) Индустриальные проекты в приграничных районах — заводы, производства, технопарки, новые технологии. \n 9) Инфраструктурные проекты в приграничных районах — дороги, мосты, транспорт, логистика. \n 10) Культура малочисленных народностей (нанайцы, монголы, уйгуры, нанайцы и хэчжэ) — политика, традиции, бытовая жизнь нанайцев, монголов, уйгуров, и хэчжэ (малочисленных народов).\n\n Отметь только те темы, которым статья действительно соответствует.\n\n -------------------------------------\n 4. Формат ответа \n Вернуть строго JSON без пояснений и дополнительных слов:\n {\n \"translation_text\": \"<перевод текста статьи на русский язык (дословный, точный и без сокращений ) >\",\n \"short_text\": \"<пересказ переведённого текста>\",\n \"title\": \"<краткая суть новости (12 предложения)>\",\n \"category\": \"<названий категорий, которым соответствует статья>\"\n }\n Если статья не относится ни к одной теме или не привязана к нужным регионам — вернуть:\n {\"translation_text\": \"\", \"short_text\": \"\", \"title\": \"\", \"category\": \"\"}"
# })
# # print(get_promt("japan"))
# # create_table_error_url()
# create_table_add_sourse()