Files
parser/api/routes.py
Игорь Бандурист 707c523b53
All checks were successful
continuous-integration/drone/push Build is passing
abrc cj[hfytybz
2026-05-07 20:59:25 +10:00

197 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API эндпоинты приложения
"""
import os
import zipfile
from datetime import datetime, timedelta
from typing import List
from fastapi import BackgroundTasks, FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from config import DOCUMENTS_DIR, APP_TITLE, APP_DESCRIPTION, APP_VERSION
from utils import logger
from api.schemas import ParserOneRequest, Parserall, Source, DownloadRange
from parsers import start_pars_one_istochnik, start_pars_two_istochnik, start_pars_all_istochnik
import work_parser as wp
def setup_routes(app: FastAPI) -> None:
"""
Настройка всех API маршрутов
"""
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== Парсеры ====================
@app.post("/parser_1", summary="Запуск процесса парсинга первого источника")
async def process_parser_one_ist(data: ParserOneRequest, background_tasks: BackgroundTasks):
istochnik = data.time.split("-")
background_tasks.add_task(start_pars_one_istochnik, istochnik)
return {"message": "Процесс парсинга 1 источника запущен"}
@app.post("/parser_2", summary="Запуск процесса парсинга второго источника")
async def process_parser_two_ist(background_tasks: BackgroundTasks):
background_tasks.add_task(start_pars_two_istochnik)
return {"message": "Процесс парсинга 2 источника запущен"}
@app.post("/add_sources", summary="Добавление парсинга любого источника")
async def add_sources_all_ist(sources: Parserall):
result = wp.add_sources(str(sources.url), sources.promt)
return {"status": "success", "message": "Источник добавлен", "data": result}
@app.get("/all_sources", summary="Метод получения всех источников")
async def get_all_sources(category: str = "all"):
return wp.get_all_sources(category)
@app.delete("/delete_sources", summary="Метод удаления источника")
async def delete_sources(url: str):
return print(wp.delete_sources(url))
@app.post("/parser_all", summary="Запуск процесса парсинга любого источника")
async def process_parser_all_ist(url: Parserall, background_tasks: BackgroundTasks):
background_tasks.add_task(start_pars_all_istochnik, str(url.url), url.promt)
return {"message": "Процесс парсинга любого источника запущен"}
@app.get("/get_tasks_offset", summary="Метод получения задач парсинга")
async def get_tasks_offset(limit: int = Query(10, gt=0), offset: int = Query(0, ge=0)):
return wp.get_tasks_offset(limit, offset)
# ==================== Настройки ====================
@app.get("/settings", summary="Метод получения настроек парсера")
async def get_settings():
return wp.get_all_promt()
@app.get("/categories_promt", summary="Метод получения categories_promt")
async def get_categories_promt():
return wp.get_all_categories_promt()
@app.post("/settings", summary="Метод сохранения настроек парсера")
async def set_settings(settings: Source):
return wp.update_promt(settings.name, settings.promt)
# ==================== Задачи ====================
@app.delete("/delete_task/{task_id}", summary="Метод удаления задачи")
async def delete_task(task_id: int):
return print(wp.delete_task(task_id))
# ==================== Файлы ====================
@app.get("/file_download", summary="Метод для скачивания файла")
async def download_file(path: str, title: str):
file_name = f"{title}.docx"
file_path = os.path.join(DOCUMENTS_DIR, path, file_name)
logger.warning(f"Файл: {file_path}")
if not os.path.exists(file_path):
logger.warning(f"Файл не найден: {file_path}")
return {"error": "Файл не найден", "path": file_path}
response = FileResponse(
path=file_path,
filename=file_name,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type"
logger.warning(response)
return response
@app.post("/download_all", summary="Скачать все файлы за период")
async def download_all(dates: DownloadRange, background_tasks: BackgroundTasks):
date_start = dates.data_start
date_finish = dates.data_finish
field_name = getattr(dates, 'field_name', 'status') # Поле для фильтрации (по умолчанию 'status')
try:
start_date = datetime.strptime(date_start, "%Y-%m-%d")
finish_date = datetime.strptime(date_finish, "%Y-%m-%d")
except ValueError:
return {"error": "Неверный формат даты. Используйте YYYY-MM-DD"}
if start_date > finish_date:
return {"error": "Дата начала не может быть позже даты окончания"}
# 1. Получаем список заголовков из БД
try:
titles_from_db = wp.get_articles_by_filter(field_name, date_start, date_finish)
except Exception as e:
return {"error": f"Ошибка при получении данных из БД: {e}"}
if not titles_from_db:
return {"error": "Нет статей с выбранным фильтром за указанный период", "field_name": field_name}
# 2. Собираем все файлы .docx за период
all_files = []
current_date = start_date
while current_date <= finish_date + timedelta(days=1):
date_path = current_date.strftime("%Y/%m/%d")
full_dir_path = os.path.join(DOCUMENTS_DIR, date_path)
if os.path.exists(full_dir_path):
for file in os.listdir(full_dir_path):
if file.endswith('.docx'):
file_title = file[:-5] # убираем расширение .docx
if file_title in titles_from_db:
all_files.append(os.path.join(full_dir_path, file))
current_date += timedelta(days=1)
if not all_files:
return {"error": "Файлы не найдены за указанный период",
"date_start": date_start,
"date_finish": date_finish,
"titles_found": len(titles_from_db)}
archive_name = f"documents_{date_start}_{date_finish}.zip"
archive_path = os.path.join(DOCUMENTS_DIR, archive_name)
try:
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in all_files:
zipf.write(file_path, os.path.basename(file_path))
except Exception as e:
logger.error(f"Ошибка создания архива: {e}")
return {"error": f"Ошибка создания архива: {e}"}
def cleanup_archive():
try:
if os.path.exists(archive_path):
os.remove(archive_path)
logger.info(f"Архив удалён: {archive_path}")
except Exception as e:
logger.warning(f"Не удалось удалить архив: {e}")
response = FileResponse(
path=archive_path,
filename=archive_name,
media_type="application/zip"
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
response.headers["Access-Control-Expose-Headers"] = "Content-Disposition"
background_tasks.add_task(cleanup_archive)
return response
@app.get("/logs", summary="Показать логи")
async def get_logs():
with open("app.log", "r") as file:
lines = file.readlines()[-10:]
return {"logs": lines}