91 lines
3.2 KiB
Python
91 lines
3.2 KiB
Python
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
|
||
# Подключение к БД (укажи свои параметры)
|
||
conn = psycopg2.connect(
|
||
dbname="parsed_url",
|
||
user="postgres",
|
||
password="qwertyqwerty123123",
|
||
host="45.129.78.228",
|
||
# host="127.0.0.1"
|
||
)
|
||
conn.autocommit = True
|
||
|
||
def create_table():
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS work_parser (
|
||
id SERIAL PRIMARY KEY,
|
||
status VARCHAR(20) NOT NULL CHECK (status IN ('queued', 'in_progress', 'completed', 'failed')),
|
||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
started_at TIMESTAMPTZ,
|
||
finished_at TIMESTAMPTZ,
|
||
source_url TEXT,
|
||
error_message TEXT,
|
||
attempts INTEGER DEFAULT 0,
|
||
priority INTEGER DEFAULT 0
|
||
);
|
||
""")
|
||
print("Таблица work_parser создана или уже существует")
|
||
|
||
def insert_task(status, source_url=None, source_id=None, priority=0):
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
INSERT INTO work_parser (status, source_url, priority)
|
||
VALUES (%s, %s, %s)
|
||
RETURNING id;
|
||
""", (status, source_url, priority))
|
||
task_id = cur.fetchone()[0]
|
||
return task_id
|
||
|
||
|
||
def get_tasks_offset(limit, offset):
|
||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("""
|
||
SELECT * FROM work_parser
|
||
ORDER BY created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""", (limit, offset))
|
||
tasks = cur.fetchall()
|
||
return tasks
|
||
|
||
def delete_task(task_id: int):
|
||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("DELETE FROM work_parser WHERE id = %s RETURNING *;", (task_id,))
|
||
deleted_task = cur.fetchone()
|
||
if deleted_task:
|
||
return {"message": f"Задача {task_id} удалена", "deleted_task": dict(deleted_task)}
|
||
else:
|
||
return {"message": f"Задача с id {task_id} не найдена"}
|
||
|
||
def update_task(task_id, **fields):
|
||
# dynamic update query generator
|
||
allowed_fields = ['status', 'started_at', 'finished_at', 'source_url', 'error_message', 'attempts', 'priority']
|
||
set_parts = []
|
||
values = []
|
||
for key, value in fields.items():
|
||
if key in allowed_fields:
|
||
set_parts.append(f"{key} = %s")
|
||
values.append(value)
|
||
if not set_parts:
|
||
return False
|
||
values.append(task_id)
|
||
set_sql = ", ".join(set_parts)
|
||
with conn.cursor() as cur:
|
||
cur.execute(f"UPDATE work_parser SET {set_sql} WHERE id = %s;", values)
|
||
return True
|
||
|
||
# Пример использования
|
||
# if __name__ == "__main__":
|
||
# create_table()
|
||
|
||
# task_id = insert_task(status='queued', source_url='http://example.com', priority=5)
|
||
# print("Создана задача с id:", task_id)
|
||
|
||
# task = get_task(task_id)
|
||
# print("Получена задача:", task)
|
||
|
||
# update_task(task_id, status='in_progress', started_at=datetime.utcnow(), attempts=1)
|
||
# task = get_task(task_id)
|
||
# print("Обновленная задача:", task)
|