""" API эндпоинты приложения """ import os import zipfile from datetime import datetime, timedelta from typing import List from fastapi import BackgroundTasks, FastAPI, Query, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from config import DOCUMENTS_DIR, APP_TITLE, APP_DESCRIPTION, APP_VERSION from utils import logger from api.schemas import ParserOneRequest, Parserall, Source, DownloadRange from parsers import start_pars_one_istochnik, start_pars_two_istochnik, start_pars_all_istochnik import work_parser as wp import parser_bd as pbd def setup_routes(app: FastAPI) -> None: """ Настройка всех API маршрутов """ # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ==================== Парсеры ==================== @app.post("/parser_1", summary="Запуск процесса парсинга первого источника") async def process_parser_one_ist(data: ParserOneRequest, background_tasks: BackgroundTasks): istochnik = data.time.split("-") background_tasks.add_task(start_pars_one_istochnik, istochnik) return {"message": "Процесс парсинга 1 источника запущен"} @app.post("/parser_2", summary="Запуск процесса парсинга второго источника") async def process_parser_two_ist(background_tasks: BackgroundTasks): background_tasks.add_task(start_pars_two_istochnik) return {"message": "Процесс парсинга 2 источника запущен"} @app.post("/add_sources", summary="Добавление парсинга любого источника") async def add_sources_all_ist(sources: Parserall): result = wp.add_sources(str(sources.url), sources.promt) return {"status": "success", "message": "Источник добавлен", "data": result} @app.get("/all_sources", summary="Метод получения всех источников") async def get_all_sources(): return wp.get_all_sources() @app.delete("/delete_sources", summary="Метод удаления источника") async def delete_sources(url: str): return print(wp.delete_sources(url)) @app.post("/parser_all", summary="Запуск процесса парсинга любого источника") async def process_parser_all_ist(url: Parserall, background_tasks: BackgroundTasks): background_tasks.add_task(start_pars_all_istochnik, str(url.url), url.promt) return {"message": "Процесс парсинга любого источника запущен"} @app.get("/get_tasks_offset", summary="Метод получения задач парсинга") async def get_tasks_offset(limit: int = Query(10, gt=0), offset: int = Query(0, ge=0)): return wp.get_tasks_offset(limit, offset) # ==================== Настройки ==================== @app.get("/settings", summary="Метод получения настроек парсера") async def get_settings(): return wp.get_all_promt() @app.get("/categories_promt", summary="Метод получения categories_promt") async def get_categories_promt(): return wp.get_all_categories_promt() @app.post("/settings", summary="Метод сохранения настроек парсера") async def set_settings(settings: Source): return wp.update_promt(settings.name, settings.promt) # ==================== Задачи ==================== @app.delete("/delete_task/{task_id}", summary="Метод удаления задачи") async def delete_task(task_id: int): return print(wp.delete_task(task_id)) # ==================== Файлы ==================== @app.get("/file_download", summary="Метод для скачивания файла") async def download_file(path: str, title: str): file_name = f"{title}.docx" file_path = os.path.join(DOCUMENTS_DIR, path, file_name) logger.warning(f"Файл: {file_path}") if not os.path.exists(file_path): logger.warning(f"Файл не найден: {file_path}") return {"error": "Файл не найден", "path": file_path} response = FileResponse( path=file_path, filename=file_name, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Content-Type" logger.warning(response) return response @app.post("/download_all", summary="Скачать все файлы за период") async def download_all(dates: DownloadRange, background_tasks: BackgroundTasks): date_start = dates.data_start date_finish = dates.data_finish try: start_date = datetime.strptime(date_start, "%Y-%m-%d") finish_date = datetime.strptime(date_finish, "%Y-%m-%d") except ValueError: return {"error": "Неверный формат даты. Используйте YYYY-MM-DD"} if start_date > finish_date: return {"error": "Дата начала не может быть позже даты окончания"} all_files = [] current_date = start_date while current_date <= finish_date: date_path = current_date.strftime("%Y/%m/%d") full_dir_path = os.path.join(DOCUMENTS_DIR, date_path) if os.path.exists(full_dir_path): for file in os.listdir(full_dir_path): if file.endswith('.docx'): all_files.append(os.path.join(full_dir_path, file)) current_date += timedelta(days=1) if not all_files: return {"error": "Файлы не найдены за указанный период", "date_start": date_start, "date_finish": date_finish} archive_name = f"documents_{date_start}_{date_finish}.zip" archive_path = os.path.join(DOCUMENTS_DIR, archive_name) try: with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_path in all_files: zipf.write(file_path, os.path.basename(file_path)) except Exception as e: logger.error(f"Ошибка создания архива: {e}") return {"error": f"Ошибка создания архива: {e}"} def cleanup_archive(): try: if os.path.exists(archive_path): os.remove(archive_path) logger.info(f"Архив удалён: {archive_path}") except Exception as e: logger.warning(f"Не удалось удалить архив: {e}") response = FileResponse( path=archive_path, filename=archive_name, media_type="application/zip" ) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" response.headers["Access-Control-Expose-Headers"] = "Content-Disposition" background_tasks.add_task(cleanup_archive) return response @app.get("/logs", summary="Показать логи") async def get_logs(): with open("app.log", "r") as file: lines = file.readlines()[-10:] return {"logs": lines} # ==================== Эндпоинты из parser_bd.py ==================== @app.post("/save_parsed_data", summary="Сохранить данные парсинга") def save_parsed_data(data: pbd.ParsedData): try: pbd.save_parsed_data_to_db(data) return {"status": "success", "message": "Данные успешно сохранены"} except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}") @app.post("/update_viewed_status", summary="Обновляет поле viewed") def update_viewed_status(url: str, viewed: bool): try: result = pbd.update_viewed_status_in_db(url, viewed) if not result.get("found"): raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена") except Exception as e: if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}") return {"status": "success", "message": "Статус просмотра успешно обновлен"} @app.post("/update_status_status", summary="Обновляет поле status") def update_status_status(url: str, status: bool): try: result = pbd.update_status_status_in_db(url, status) if not result.get("found"): raise HTTPException(status_code=404, detail="Запись с указанным URL не найдена") except Exception as e: if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail=f"Ошибка при сохранении данных: {e}") return {"status": "success", "message": "Статус просмотра успешно обновлен"} @app.get("/check_url_exists", summary="Проверяет url") def check_url_exists(url: str): try: return pbd.check_url_exists_in_db(url) except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при проверке: {e}") @app.get("/records", summary="Получить записи из БД с пагинацией", response_model=List[pbd.ParsedData]) def get_records(offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)): try: return pbd.get_records_from_db(offset, limit) except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}") @app.get("/records_all/count", summary="Получить общее количество записей") def get_records_count(item: str = Query("default")): try: return pbd.get_records_count_from_db(item) except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}") @app.get("/poisk/count", summary="Получить количество результатов поиска") def get_poisk_count(query: str, item: str = Query("default")): try: return pbd.get_poisk_count_from_db(query, item) except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при получении количества: {e}") @app.get("/poisk", summary="Поиск с пагинацией") def poisk(query: str, offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100), item: str = Query("default")): try: return pbd.poisk_from_db(query, offset, limit, item) except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при поиске: {e}") @app.get("/records_all", summary="Получить все записи из БД + сортирует + пагинация", response_model=List[pbd.ParsedData]) def get_records_all(item: str = Query("default"), offset: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100)): try: return pbd.get_records_all_from_db(item, offset, limit) except Exception as e: raise HTTPException(status_code=500, detail=f"Ошибка при получении записей из БД: {e}")