Files
parser/work_parser.py

89 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import psycopg2
from psycopg2.extras import RealDictCursor
# Подключение к БД (укажи свои параметры)
conn = psycopg2.connect(
dbname="parsed_url",
user="postgres",
password="qwertyqwerty123123",
# host="45.129.78.228",
host="127.0.0.1"
)
conn.autocommit = True
def create_table():
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS work_parser (
id SERIAL PRIMARY KEY,
status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
source_url TEXT,
error_message TEXT,
attempts INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0
);
""")
print("Таблица work_parser создана или уже существует")
def insert_task(status, source_url=None, source_id=None, priority=0):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO work_parser (status, source_url, priority)
VALUES (%s, %s, %s)
RETURNING id;
""", (status, source_url, priority))
task_id = cur.fetchone()[0]
return task_id
def get_task(task_id):
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM work_parser WHERE id = %s;", (task_id,))
task = cur.fetchone()
return task
def get_tasks_offset(limit, offset):
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT * FROM work_parser
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (limit, offset))
tasks = cur.fetchall()
return tasks
def update_task(task_id, **fields):
# dynamic update query generator
allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority']
set_parts = []
values = []
for key, value in fields.items():
if key in allowed_fields:
set_parts.append(f"{key} = %s")
values.append(value)
if not set_parts:
return False
values.append(task_id)
set_sql = ", ".join(set_parts)
with conn.cursor() as cur:
cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values)
return True
# Пример использования
# if __name__ == "__main__":
# create_table()
# task_id = insert_task(status='queued', source_url='http://example.com', priority=5)
# print("Создана задача с id:", task_id)
# task = get_task(task_id)
# print("Получена задача:", task)
# update_task(task_id, status='in_progress', started_at=datetime.utcnow(), attempts=1)
# task = get_task(task_id)
# print("Обновленная задача:", task)