Files
parser_bd/main.py
Игорь Бандурист 6682abff92
All checks were successful
continuous-integration/drone/push Build is passing
CORS
2026-05-31 16:11:24 +10:00

388 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware # <-- добавлено
from pydantic import BaseModel
from typing import List, Optional
import psycopg2
import psycopg2.extras
import uvicorn
import json
app = FastAPI(title="Parser Data API",
description="API для записи данных парсинга в базу данных",
version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "https://allowlgroup.ru"], # или список разрешенных адресов, например ["http://localhost:8080"]
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Модель для данных, которые приходят в POST
class ParsedData(BaseModel):
url: str
parsed_at: str
title: str
original_text: str
article_date: str
status: Optional[bool] = False
viewed: Optional[bool] = False
tematik: Optional[bool] = False
svodka: Optional[bool] = False
donesenie: Optional[bool] = False
bilutene: Optional[bool] = False
other: str
category: str
translation_text: str
short_text: str
# Подключение к БД
def get_connection():
return psycopg2.connect(
dbname="parsed_url",
user="postgres",
password="qwertyqwerty123123",
# host="127.0.0.1"
host ="45.129.78.228"
)
@app.post("/save_parsed_data", summary="Сохранить данные парсинга")
def save_parsed_data(data: ParsedData):
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
cur.execute("""
INSERT INTO url (url, parsed_at, title, original_text, article_date, status, viewed, tematik, svodka, donesenie, bilutene, other, category, translation_text, short_text)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
parsed_at = EXCLUDED.parsed_at,
title = EXCLUDED.title,
original_text = EXCLUDED.original_text,
article_date = EXCLUDED.article_date,
status = EXCLUDED.status,
viewed = EXCLUDED.viewed,
tematik = EXCLUDED.tematik,
svodka = EXCLUDED.svodka,
donesenie = EXCLUDED.donesenie,
bilutene = EXCLUDED.bilutene,
other = EXCLUDED.other,
category = EXCLUDED.category,
translation_text = EXCLUDED.translation_text,
short_text = EXCLUDED.short_text;
""", (data.url, data.parsed_at, data.title, data.original_text, data.article_date, data.status, data.viewed, data.tematik, data.svodka, data.donesenie, data.bilutene, data.other, data.category, data.translation_text, data.short_text))
conn.commit()
return {"status": "success", "message": "Данные успешно сохранены"}
except Exception as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
finally:
if conn:
conn.close()
def _update_field_by_url(field_name: str, url: str, value: bool) -> dict:
"""
Внутренняя функция для обновления любого поля в таблице url по URL.
"""
conn = None
try:
conn = get_connection()
with conn.cursor() as cursor:
cursor.execute(
f"UPDATE url SET {field_name} = %s WHERE url = %s",
(value, url)
)
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена")
conn.commit()
return {"status": "success", "message": f"Поле {field_name} успешно обновлено"}
except HTTPException:
raise
except Exception as e:
if conn:
conn.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
finally:
if conn:
conn.close()
@app.post("/update_viewed_status", summary="Обновляет поле viewed")
def update_viewed_status(url: str, viewed: bool):
"""
Обновляет поле 'viewed' записи в БД по заданному URL.
"""
return _update_field_by_url("viewed", url, viewed)
@app.post("/update_status_status", summary="Обновляет поле status")
def update_status_status(url: str, status: bool):
"""
Обновляет поле 'status' записи в БД по заданному URL.
"""
return _update_field_by_url("status", url, status)
@app.post("/update_tematik_status", summary="Обновляет поле tematik")
def update_tematik_status(url: str, tematik: bool):
"""
Обновляет поле 'tematik' записи в БД по заданному URL.
"""
return _update_field_by_url("tematik", url, tematik)
@app.post("/update_svodka_status", summary="Обновляет поле svodka")
def update_svodka_status(url: str, svodka: bool):
"""
Обновляет поле 'svodka' записи в БД по заданному URL.
"""
return _update_field_by_url("svodka", url, svodka)
@app.post("/update_donesenie_status", summary="Обновляет поле donesenie")
def update_donesenie_status(url: str, donesenie: bool):
"""
Обновляет поле 'donesenie' записи в БД по заданному URL.
"""
return _update_field_by_url("donesenie", url, donesenie)
@app.post("/update_bilutene_status", summary="Обновляет поле bilutene")
def update_bilutene_status(url: str, bilutene: bool):
"""
Обновляет поле 'bilutene' записи в БД по заданному URL.
"""
return _update_field_by_url("bilutene", url, bilutene)
@app.get("/records", summary="Получить записи из БД с пагинацией", response_model=List[ParsedData])
def get_records(offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)):
"""
Возвращает записи из таблицы url с учетом offset и limit.
"""
conn = None
try:
conn = get_connection()
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("""
SELECT url, parsed_at, title, original_text, article_date, status, viewed, tematik, svodka, donesenie, bilutene, other, category, translation_text, short_text
FROM url
ORDER BY parsed_at DESC NULLS LAST
OFFSET %s LIMIT %s
""", (offset, limit))
rows = cur.fetchall()
results = [dict(row) for row in rows]
return results
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")
finally:
if conn:
conn.close()
@app.get("/records_all/count", summary="Получить общее количество записей")
def get_records_count(item: str = "default"):
"""
Возвращает общее количество записей в таблице.
"""
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
if item == "viewed":
cur.execute("SELECT COUNT(*) FROM url WHERE viewed = true")
elif item == "status":
cur.execute("SELECT COUNT(*) FROM url WHERE status = true")
elif item == "tematik":
cur.execute("SELECT COUNT(*) FROM url WHERE tematik = true")
elif item == "svodka":
cur.execute("SELECT COUNT(*) FROM url WHERE svodka = true")
elif item == "donesenie":
cur.execute("SELECT COUNT(*) FROM url WHERE donesenie = true")
elif item == "bilutene":
cur.execute("SELECT COUNT(*) FROM url WHERE bilutene = true")
elif item == "status":
cur.execute("SELECT COUNT(*) FROM url WHERE status = true")
elif item == "time":
cur.execute("SELECT COUNT(*) FROM url")
else:
cur.execute("SELECT COUNT(*) FROM url")
result = cur.fetchone()
return {"count": result[0]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}")
finally:
if conn:
conn.close()
@app.get("/poisk/count", summary="Получить количество результатов поиска")
def get_poisk_count(query: str, item: str = "default"):
conn = None
try:
conn = get_connection()
with conn.cursor() as cur:
search_pattern = f"%{query}%"
base_query = """SELECT COUNT(*) FROM url WHERE (
title ILIKE %s OR original_text ILIKE %s OR translation_text ILIKE %s
OR short_text ILIKE %s OR url ILIKE %s OR category ILIKE %s OR other ILIKE %s
)"""
params = [search_pattern] * 7
if item == "viewed":
base_query += " AND viewed = true"
elif item == "status":
base_query += " AND status = true"
elif item == "tematik":
base_query += " AND tematik = true"
elif item == "svodka":
base_query += " AND svodka = true"
elif item == "donesenie":
base_query += " AND donesenie = true"
elif item == "bilutene":
base_query += " AND bilutene = true"
cur.execute(base_query, params)
result = cur.fetchone()
return {"count": result[0]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}")
finally:
if conn:
conn.close()
@app.get("/poisk", summary="Поиск с пагинацией")
def poisk(query: str, offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100), item: str = "default"):
conn = None
try:
conn = get_connection()
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
search_pattern = f"%{query}%"
base_query = """SELECT * FROM url WHERE (
title ILIKE %s OR original_text ILIKE %s OR translation_text ILIKE %s
OR short_text ILIKE %s OR url ILIKE %s OR category ILIKE %s OR other ILIKE %s
)"""
params = [search_pattern] * 7
if item == "viewed":
base_query += " AND viewed = true"
elif item == "status":
base_query += " AND status = true"
elif item == "tematik":
base_query += " AND tematik = true"
elif item == "svodka":
base_query += " AND svodka = true"
elif item == "donesenie":
base_query += " AND donesenie = true"
elif item == "bilutene":
base_query += " AND bilutene = true"
base_query += " ORDER BY article_date DESC OFFSET %s LIMIT %s"
params.extend([offset, limit])
cur.execute(base_query, params)
rows = cur.fetchall()
results = [dict(row) for row in rows]
return results
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при поиске: {e}")
finally:
if conn:
conn.close()
@app.get("/records_all", summary="Получить все записи из БД + сортирует + пагинация", response_model=List[ParsedData])
def get_records(item: str = "default", offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)):
"""
Возвращает записи из таблицы url с учетом.
"""
conn = None
try:
conn = get_connection()
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
print(item)
if item == "time":
cur.execute("""
SELECT * FROM url
ORDER BY viewed ASC, parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "viewed":
cur.execute("""
SELECT * FROM url
WHERE viewed = true
ORDER BY article_date DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "status":
cur.execute("""
SELECT * FROM url
WHERE status = true
ORDER BY parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "tematik":
cur.execute("""
SELECT * FROM url
WHERE tematik = true
ORDER BY parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "svodka":
cur.execute("""
SELECT * FROM url
WHERE svodka = true
ORDER BY parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "donesenie":
cur.execute("""
SELECT * FROM url
WHERE donesenie = true
ORDER BY parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "bilutene":
cur.execute("""
SELECT * FROM url
WHERE bilutene = true
ORDER BY parsed_at DESC
OFFSET %s LIMIT %s
""", (offset, limit))
elif item == "Япония" or item == "Корея":
cur.execute("""
SELECT * FROM url
WHERE other = %s
ORDER BY viewed ASC, article_date DESC
OFFSET %s LIMIT %s
""", (item, offset, limit))
elif item == "Китай":
cur.execute("""
SELECT * FROM url
WHERE other IN (%s, %s, %s)
ORDER BY viewed ASC, article_date DESC
OFFSET %s LIMIT %s
""", ("source1", "source2", "Китай", offset, limit))
else:
cur.execute("""
SELECT * FROM url
ORDER BY viewed ASC, article_date DESC
OFFSET %s LIMIT %s
""", (offset, limit))
rows = cur.fetchall()
results = [dict(row) for row in rows]
# urls = [item['parsed_at'] for item in results]
return results
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")
finally:
if conn:
conn.close()
# Запуск сервера для теста
# if __name__ == "__main__":
# uvicorn.run("main:app", port=8002, reload=True)