Files
parser/api/routes.py
Игорь Бандурист 25f2c09064
All checks were successful
continuous-integration/drone/push Build is passing
сделал ревью системы
2026-04-28 22:13:47 +10:00

259 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API эндпоинты приложения
"""
import os
import zipfile
from datetime import datetime, timedelta
from typing import List
from fastapi import BackgroundTasks, FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from config import DOCUMENTS_DIR, APP_TITLE, APP_DESCRIPTION, APP_VERSION
from utils import logger
from api.schemas import ParserOneRequest, Parserall, Source, DownloadRange
from parsers import start_pars_one_istochnik, start_pars_two_istochnik, start_pars_all_istochnik
import work_parser as wp
import parser_bd as pbd
def setup_routes(app: FastAPI) -> None:
"""
Настройка всех API маршрутов
"""
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== Парсеры ====================
@app.post("/parser_1", summary="Запуск процесса парсинга первого источника")
async def process_parser_one_ist(data: ParserOneRequest, background_tasks: BackgroundTasks):
istochnik = data.time.split("-")
background_tasks.add_task(start_pars_one_istochnik, istochnik)
return {"message": "Процесс парсинга 1 источника запущен"}
@app.post("/parser_2", summary="Запуск процесса парсинга второго источника")
async def process_parser_two_ist(background_tasks: BackgroundTasks):
background_tasks.add_task(start_pars_two_istochnik)
return {"message": "Процесс парсинга 2 источника запущен"}
@app.post("/add_sources", summary="Добавление парсинга любого источника")
async def add_sources_all_ist(sources: Parserall):
result = wp.add_sources(str(sources.url), sources.promt)
return {"status": "success", "message": "Источник добавлен", "data": result}
@app.get("/all_sources", summary="Метод получения всех источников")
async def get_all_sources():
return wp.get_all_sources()
@app.delete("/delete_sources", summary="Метод удаления источника")
async def delete_sources(url: str):
return print(wp.delete_sources(url))
@app.post("/parser_all", summary="Запуск процесса парсинга любого источника")
async def process_parser_all_ist(url: Parserall, background_tasks: BackgroundTasks):
background_tasks.add_task(start_pars_all_istochnik, str(url.url), url.promt)
return {"message": "Процесс парсинга любого источника запущен"}
@app.get("/get_tasks_offset", summary="Метод получения задач парсинга")
async def get_tasks_offset(limit: int = Query(10, gt=0), offset: int = Query(0, ge=0)):
return wp.get_tasks_offset(limit, offset)
# ==================== Настройки ====================
@app.get("/settings", summary="Метод получения настроек парсера")
async def get_settings():
return wp.get_all_promt()
@app.get("/categories_promt", summary="Метод получения categories_promt")
async def get_categories_promt():
return wp.get_all_categories_promt()
@app.post("/settings", summary="Метод сохранения настроек парсера")
async def set_settings(settings: Source):
return wp.update_promt(settings.name, settings.promt)
# ==================== Задачи ====================
@app.delete("/delete_task/{task_id}", summary="Метод удаления задачи")
async def delete_task(task_id: int):
return print(wp.delete_task(task_id))
# ==================== Файлы ====================
@app.get("/file_download", summary="Метод для скачивания файла")
async def download_file(path: str, title: str):
file_name = f"{title}.docx"
file_path = os.path.join(DOCUMENTS_DIR, path, file_name)
logger.warning(f"Файл: {file_path}")
if not os.path.exists(file_path):
logger.warning(f"Файл не найден: {file_path}")
return {"error": "Файл не найден", "path": file_path}
response = FileResponse(
path=file_path,
filename=file_name,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type"
logger.warning(response)
return response
@app.post("/download_all", summary="Скачать все файлы за период")
async def download_all(dates: DownloadRange, background_tasks: BackgroundTasks):
date_start = dates.data_start
date_finish = dates.data_finish
try:
start_date = datetime.strptime(date_start, "%Y-%m-%d")
finish_date = datetime.strptime(date_finish, "%Y-%m-%d")
except ValueError:
return {"error": "Неверный формат даты. Используйте YYYY-MM-DD"}
if start_date > finish_date:
return {"error": "Дата начала не может быть позже даты окончания"}
all_files = []
current_date = start_date
while current_date <= finish_date:
date_path = current_date.strftime("%Y/%m/%d")
full_dir_path = os.path.join(DOCUMENTS_DIR, date_path)
if os.path.exists(full_dir_path):
for file in os.listdir(full_dir_path):
if file.endswith('.docx'):
all_files.append(os.path.join(full_dir_path, file))
current_date += timedelta(days=1)
if not all_files:
return {"error": "Файлы не найдены за указанный период", "date_start": date_start, "date_finish": date_finish}
archive_name = f"documents_{date_start}_{date_finish}.zip"
archive_path = os.path.join(DOCUMENTS_DIR, archive_name)
try:
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in all_files:
zipf.write(file_path, os.path.basename(file_path))
except Exception as e:
logger.error(f"Ошибка создания архива: {e}")
return {"error": f"Ошибка создания архива: {e}"}
def cleanup_archive():
try:
if os.path.exists(archive_path):
os.remove(archive_path)
logger.info(f"Архив удалён: {archive_path}")
except Exception as e:
logger.warning(f"Не удалось удалить архив: {e}")
response = FileResponse(
path=archive_path,
filename=archive_name,
media_type="application/zip"
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
response.headers["Access-Control-Expose-Headers"] = "Content-Disposition"
background_tasks.add_task(cleanup_archive)
return response
@app.get("/logs", summary="Показать логи")
async def get_logs():
with open("app.log", "r") as file:
lines = file.readlines()[-10:]
return {"logs": lines}
# ==================== Эндпоинты из parser_bd.py ====================
@app.post("/save_parsed_data", summary="Сохранить данные парсинга")
def save_parsed_data(data: pbd.ParsedData):
try:
pbd.save_parsed_data_to_db(data)
return {"status": "success", "message": "Данные успешно сохранены"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
@app.post("/update_viewed_status", summary="Обновляет поле viewed")
def update_viewed_status(url: str, viewed: bool):
try:
result = pbd.update_viewed_status_in_db(url, viewed)
if not result.get("found"):
raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена")
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
return {"status": "success", "message": "Статус просмотра успешно обновлен"}
@app.post("/update_status_status", summary="Обновляет поле status")
def update_status_status(url: str, status: bool):
try:
result = pbd.update_status_status_in_db(url, status)
if not result.get("found"):
raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена")
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}")
return {"status": "success", "message": "Статус просмотра успешно обновлен"}
@app.get("/check_url_exists", summary="Проверяет url")
def check_url_exists(url: str):
try:
return pbd.check_url_exists_in_db(url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}")
@app.get("/records", summary="Получить записи из БД с пагинацией", response_model=List[pbd.ParsedData])
def get_records(offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)):
try:
return pbd.get_records_from_db(offset, limit)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")
@app.get("/records_all/count", summary="Получить общее количество записей")
def get_records_count(item: str = Query("default")):
try:
return pbd.get_records_count_from_db(item)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}")
@app.get("/poisk/count", summary="Получить количество результатов поиска")
def get_poisk_count(query: str, item: str = Query("default")):
try:
return pbd.get_poisk_count_from_db(query, item)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}")
@app.get("/poisk", summary="Поиск с пагинацией")
def poisk(query: str, offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100), item: str = Query("default")):
try:
return pbd.poisk_from_db(query, offset, limit, item)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при поиске: {e}")
@app.get("/records_all", summary="Получить все записи из БД + сортирует + пагинация", response_model=List[pbd.ParsedData])
def get_records_all(item: str = Query("default"), offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)):
try:
return pbd.get_records_all_from_db(item, offset, limit)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")