""" API эндпоинты приложения """ import os import zipfile from datetime import datetime, timedelta from typing import List from fastapi import BackgroundTasks, FastAPI, Query, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from config import DOCUMENTS_DIR, APP_TITLE, APP_DESCRIPTION, APP_VERSION from utils import logger from api.schemas import ParserOneRequest, Parserall, Source, DownloadRange, DownloadCountsResponse from parsers import start_pars_one_istochnik, start_pars_two_istochnik, start_pars_all_istochnik import work_parser as wp def setup_routes(app: FastAPI) -> None: """ Настройка всех API маршрутов """ # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "https://allowlgroup.ru"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ==================== Парсеры ==================== @app.post("/parser_1", summary="Запуск процесса парсинга первого источника") async def process_parser_one_ist(data: ParserOneRequest, background_tasks: BackgroundTasks): istochnik = data.time.split("-") background_tasks.add_task(start_pars_one_istochnik, istochnik) return {"message": "Процесс парсинга 1 источника запущен"} @app.post("/parser_2", summary="Запуск процесса парсинга второго источника") async def process_parser_two_ist(background_tasks: BackgroundTasks): background_tasks.add_task(start_pars_two_istochnik) return {"message": "Процесс парсинга 2 источника запущен"} @app.post("/add_sources", summary="Добавление парсинга любого источника") async def add_sources_all_ist(sources: Parserall): result = wp.add_sources(str(sources.url), sources.promt) return {"status": "success", "message": "Источник добавлен", "data": result} @app.get("/all_sources", summary="Метод получения всех источников") async def get_all_sources(category: str = "all"): return wp.get_all_sources(category) @app.delete("/delete_sources", summary="Метод удаления источника") async def delete_sources(url: str): return print(wp.delete_sources(url)) @app.post("/parser_all", summary="Запуск процесса парсинга любого источника") async def process_parser_all_ist(url: Parserall, background_tasks: BackgroundTasks): background_tasks.add_task(start_pars_all_istochnik, str(url.url), url.promt) return {"message": "Процесс парсинга любого источника запущен"} @app.get("/get_tasks_offset", summary="Метод получения задач парсинга") async def get_tasks_offset(limit: int = Query(10, gt=0), offset: int = Query(0, ge=0)): return wp.get_tasks_offset(limit, offset) # ==================== Настройки ==================== @app.get("/settings", summary="Метод получения настроек парсера") async def get_settings(): return wp.get_all_promt() @app.get("/categories_promt", summary="Метод получения categories_promt") async def get_categories_promt(): return wp.get_all_categories_promt() @app.post("/settings", summary="Метод сохранения настроек парсера") async def set_settings(settings: Source): return wp.update_promt(settings.name, settings.promt) # ==================== Задачи ==================== @app.delete("/delete_task/{task_id}", summary="Метод удаления задачи") async def delete_task(task_id: int): return print(wp.delete_task(task_id)) # ==================== Файлы ==================== @app.get("/file_download", summary="Метод для скачивания файла") async def download_file(path: str, title: str): file_name = f"{title}.docx" file_path = os.path.join(DOCUMENTS_DIR, path, file_name) logger.warning(f"Файл: {file_path}") if not os.path.exists(file_path): logger.warning(f"Файл не найден: {file_path}") return {"error": "Файл не найден", "path": file_path} response = FileResponse( path=file_path, filename=file_name, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Content-Type" logger.warning(response) return response @app.post("/download_all", summary="Скачать все файлы за период") async def download_all(dates: DownloadRange, background_tasks: BackgroundTasks): date_start_str = dates.data_start date_finish_str = dates.data_finish field_name = getattr(dates, 'field_name', 'status') # Поле для фильтрации (по умолчанию 'status') try: start_date = datetime.strptime(date_start_str, "%Y-%m-%d") finish_date = datetime.strptime(date_finish_str, "%Y-%m-%d") + timedelta(days=1) except ValueError: return {"error": "Неверный формат даты. Используйте YYYY-MM-DD"} if start_date > finish_date: return {"error": "Дата начала не может быть позже даты окончания"} # 1. Получаем список заголовков из БД start_date_str = start_date.strftime("%Y-%m-%d") finish_date_str = finish_date.strftime("%Y-%m-%d") try: titles_from_db = wp.get_articles_by_filter(field_name, start_date_str, finish_date_str) except Exception as e: return {"error": f"Ошибка при получении данных из БД: {e}"} if not titles_from_db: return {"error": "Нет статей с выбранным фильтром за указанный период", "field_name": field_name} # 2. Собираем все файлы .docx за период all_files = [] current_date = start_date while current_date <= finish_date : date_path = current_date.strftime("%Y/%m/%d") full_dir_path = os.path.join(DOCUMENTS_DIR, date_path) if os.path.exists(full_dir_path): for file in os.listdir(full_dir_path): if file.endswith('.docx'): file_title = file[:-5] # убираем расширение .docx if file_title in titles_from_db: all_files.append(os.path.join(full_dir_path, file)) current_date += timedelta(days=1) if not all_files: return {"error": "Файлы не найдены за указанный период", "date_start": date_start_str, "date_finish": date_finish_str, "titles_found": len(titles_from_db)} archive_name = f"documents_{date_start_str}_{date_finish_str}.zip" archive_path = os.path.join(DOCUMENTS_DIR, archive_name) try: with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_path in all_files: zipf.write(file_path, os.path.basename(file_path)) except Exception as e: logger.error(f"Ошибка создания архива: {e}") return {"error": f"Ошибка создания архива: {e}"} def cleanup_archive(): try: if os.path.exists(archive_path): os.remove(archive_path) logger.info(f"Архив удалён: {archive_path}") except Exception as e: logger.warning(f"Не удалось удалить архив: {e}") def mark_as_downloaded(): try: wp.mark_articles_as_downloaded(titles_from_db) logger.info(f"Статьи помечены как скачанные: {len(titles_from_db)} записей") except Exception as e: logger.error(f"Ошибка при обновлении download: {e}") response = FileResponse( path=archive_path, filename=archive_name, media_type="application/zip" ) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" response.headers["Access-Control-Expose-Headers"] = "Content-Disposition" background_tasks.add_task(cleanup_archive) background_tasks.add_task(mark_as_downloaded) return response # ==================== Выгрузка (download) ==================== @app.get("/download_counts", summary="Получить количество статей для выгрузки", response_model=DownloadCountsResponse) async def get_download_counts(): """ Возвращает количество статей для каждого поля (tematik, svodka, donesenie, bilutene, status), где значение поля = TRUE и download = FALSE """ return wp.get_download_counts() # @app.post("/mark_downloaded", summary="Отметить статьи как скачанные") # async def mark_articles_as_downloaded(titles: List[str]): # """ # Обновляет поле download = TRUE для списка заголовков статей # """ # return wp.mark_articles_as_downloaded(titles) @app.get("/logs", summary="Показать логи") async def get_logs(): with open("app.log", "r") as file: lines = file.readlines()[-10:] return {"logs": lines}