Files
parser_bd/main.py
admin db21911652
Some checks failed
continuous-integration/drone/push Build is failing
+
2026-04-04 14:08:22 +10:00

323 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware # <-- добавлено
from pydantic import BaseModel
from typing import List, Optional
import psycopg2
import psycopg2.extras
import uvicorn
import json
app = FastAPI(title="Parser Data API",
description="API для записи данных парсинга в базу данных",
version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # или список разрешенных адресов, например ["http://localhost:8080"]
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Модель для данных, которые приходят в POST
class ParsedData(BaseModel):
url: str
parsed_at: str
title: str
original_text: str
article_date: str
status: Optional[bool] = False
viewed: Optional[bool] = False
other: str
category: str
translation_text: str
short_text: str
# Подключение к БД
def get_connection():
return psycopg2.connect(
dbname="parsed_url",
user="postgres",
password="qwertyqwerty123123",
# host="127.0.0.1"
host ="45.129.78.228"
)
@app.post("/save_parsed_data", summary="Сохранить данные парсинга")
def save_parsed_data(data: ParsedData):
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
cur.execute("""
INSERT INTO url (url, parsed_at, title, original_text, article_date, status, viewed, other, category, translation_text, short_text)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
parsed_at = EXCLUDED.parsed_at,
title = EXCLUDED.title,
original_text = EXCLUDED.original_text,
article_date = EXCLUDED.article_date,
status = EXCLUDED.status,
viewed = EXCLUDED.viewed,
other = EXCLUDED.other,
category = EXCLUDED.category,
translation_text = EXCLUDED.translation_text,
short_text = EXCLUDED.short_text;
""", (data.url, data.parsed_at, data.title, data.original_text, data.article_date, data.status, data.viewed, data.other, data.category, data.translation_text, data.short_text))
conn.commit()
return {"status": "success", "message": "Данные успешно сохранены"}
except Exception as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
finally:
if conn:
conn.close()
@app.post("/update_viewed_status", summary="Обновляет поле viewed")
def update_viewed_status(url: str, viewed: bool):
"""
Обновляет поле 'viewed' записи в БД по заданному URL.
"""
conn = None
try:
conn = get_connection()
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE url
SET viewed = %s
WHERE url = %s
""",
(viewed, url)
)
if cursor.rowcount == 0:
# Если запись не найдена
raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена")
conn.commit()
except Exception as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
finally:
if conn:
conn.close()
return {"status": "success", "message": "Статус просмотра успешно обновлен"}
@app.post("/update_status_status", summary="Обновляет поле status")
def update_status_status(url: str, status: bool):
"""
Обновляет поле 'status' записи в БД по заданному URL.
"""
conn = None
try:
conn = get_connection()
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE url
SET status = %s
WHERE url = %s
""",
(status, url)
)
if cursor.rowcount == 0:
# Если запись не найдена
raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена")
conn.commit()
except Exception as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
finally:
if conn:
conn.close()
return {"status": "success", "message": "Статус просмотра успешно обновлен"}
@app.get("/check_url_exists", summary="Проверяет url")
def check_url_exists(url: str):
"""
Проверяет, есть ли указанный URL в базе данных.
Возвращает true, если есть, иначе false.
"""
conn = None
try:
conn = get_connection()
with conn.cursor() as cursor:
cursor.execute(
"SELECT 1 FROM url WHERE url = %s LIMIT 1",
(url,)
)
result = cursor.fetchone()
return {"exists": bool(result)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}")
finally:
if conn:
conn.close()
@app.get("/records", summary="Получить записи из БД с пагинацией", response_model=List[ParsedData])
def get_records(offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)):
"""
Возвращает записи из таблицы url с учетом offset и limit.
"""
conn = None
try:
conn = get_connection()
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("""
SELECT url, parsed_at, title, original_text, article_date, status, viewed, other, category, translation_text, short_text
FROM url
ORDER BY parsed_at DESC NULLS LAST
OFFSET %s LIMIT %s
""", (offset, limit))
rows = cur.fetchall()
results = [dict(row) for row in rows]
return results
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")
finally:
if conn:
conn.close()
@app.get("/records_all/count", summary="Получить общее количество записей")
def get_records_count(item: str = "default"):
"""
Возвращает общее количество записей в таблице.
"""
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
if item == "viewed":
cur.execute("SELECT COUNT(*) FROM url WHERE viewed = true")
elif item == "status":
cur.execute("SELECT COUNT(*) FROM url WHERE status = true")
elif item == "time":
cur.execute("SELECT COUNT(*) FROM url")
else:
cur.execute("SELECT COUNT(*) FROM url")
result = cur.fetchone()
return {"count": result[0]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}")
finally:
if conn:
conn.close()
@app.get("/poisk/count", summary="Получить количество результатов поиска")
def get_poisk_count(query: str, item: str = "default"):
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
search_pattern = f"%{query}%"
base_query = """SELECT COUNT(*) FROM url WHERE (
title ILIKE %s OR original_text ILIKE %s OR translation_text ILIKE %s
OR short_text ILIKE %s OR url ILIKE %s OR category ILIKE %s OR other ILIKE %s
)"""
params = [search_pattern] * 7
if item == "viewed":
base_query += " AND viewed = true"
elif item == "status":
base_query += " AND status = true"
cur.execute(base_query, params)
result = cur.fetchone()
return {"count": result[0]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}")
finally:
if conn:
conn.close()
@app.get("/poisk", summary="Поиск с пагинацией")
def poisk(query: str, offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100), item: str = "default"):
conn = None
try:
conn = get_connection()
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
search_pattern = f"%{query}%"
base_query = """SELECT * FROM url WHERE (
title ILIKE %s OR original_text ILIKE %s OR translation_text ILIKE %s
OR short_text ILIKE %s OR url ILIKE %s OR category ILIKE %s OR other ILIKE %s
)"""
params = [search_pattern] * 7
if item == "viewed":
base_query += " AND viewed = true"
elif item == "status":
base_query += " AND status = true"
base_query += " ORDER BY article_date DESC OFFSET %s LIMIT %s"
params.extend([offset, limit])
cur.execute(base_query, params)
rows = cur.fetchall()
results = [dict(row) for row in rows]
return results
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при поиске: {e}")
finally:
if conn:
conn.close()
@app.get("/records_all", summary="Получить все записи из БД + сортирует + пагинация", response_model=List[ParsedData])
def get_records(item: str = "default", offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)):
"""
Возвращает записи из таблицы url с учетом.
"""
conn = None
try:
conn = get_connection()
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
print(item)
if item == "time":
cur.execute("""
SELECT * FROM url
ORDER BY viewed ASC, parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "viewed":
cur.execute("""
SELECT * FROM url
WHERE viewed = true
ORDER BY article_date DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "status":
cur.execute("""
SELECT * FROM url
WHERE status = true
ORDER BY parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
else:
cur.execute("""
SELECT * FROM url
ORDER BY viewed ASC, article_date DESC
OFFSET %s LIMIT %s
""", (offset, limit))
rows = cur.fetchall()
results = [dict(row) for row in rows]
# urls = [item['parsed_at'] for item in results]
return results
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")
finally:
if conn:
conn.close()
# Запуск сервера для теста
# if __name__ == "__main__":
# uvicorn.run("main:app", port=8002, reload=True)