Files
parser/main.py

448 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Стандартные библиотеки (stdlib)
import json
import logging
import os
import subprocess
import time
from datetime import datetime as dt
from datetime import datetime
import random
# Сторонние библиотеки (third-party)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from bs4 import BeautifulSoup
from contextlib import asynccontextmanager
from docx import Document
from fastapi import BackgroundTasks, FastAPI, Query, Request, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
from urllib.parse import urljoin
import uvicorn
import requests
# Локальные импорты
import settings_work as sw
import work_parser as wp
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Управление жизненным циклом приложения"""
# Startup
scheduler.add_job(scheduled_parser_1, "cron", hour=10, minute=0)
scheduler.add_job(scheduled_parser_2, "cron", hour=11, minute=0)
scheduler.start()
yield
# Shutdown
scheduler.shutdown()
app = FastAPI(title="Parser API",
description="API для запуска парсинга в базу данных",
version="1.0",
lifespan=lifespan)
# Инициализация планировщика
scheduler = AsyncIOScheduler()
# Настройка логгера
logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
@app.get("/logs")
def get_logs():
with open("app.log", "r") as file:
lines = file.readlines()[-10:] # последние 10 строк
return {"logs": lines}
# Инициализация таблицы статуса парсинга
wp.create_table()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # или список разрешенных адресов, например ["https://allowlgroup.ru","http://localhost:5173", "http://45.129.78.228:8000"]
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
PROXIES_URL = "https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt"
def download_proxies(url):
response = requests.get(url)
if response.status_code == 200:
proxies = response.text.splitlines() # список прокси по строкам
return proxies
else:
return []
def fetch_with_proxy(url, proxy, verify, timeout):
proxies = {
'http': f'http://{proxy}',
'https': f'http://{proxy}',
}
try:
response = requests.get(url, proxies=proxies, timeout=timeout, verify=verify)
response.encoding = 'utf-8'
if response.status_code == 200:
# Проверяем содержимое - если это ошибка от прокси
if '"message":"Request failed' in response.text or '403' in response.text[:500]:
print(f"Proxy {proxy} - Site returned 403 (inside response)")
return None
print(f"Proxy {proxy} - SUCCESS")
return response.text
elif response.status_code == 403:
print(f"Proxy {proxy} - 403 Forbidden")
return None # Прокси работает, но сайт блокирует
else:
print(f"Proxy {proxy} - Status {response.status_code}")
return None
except:
return None
# Перемешивает список прокси для случайного начала
def get_shuffled_proxies(proxies_list):
shuffled = proxies_list.copy()
random.shuffle(shuffled)
return shuffled
# Общие функции нахождения ссылок
def extract_map_area_hrefs(url, verify=True, ist_number=1):
headers = {
"User-Agent": "Mozilla/5.0 (compatible; MyScraper/1.0; +https://example.com)"
}
resp = requests.get(url, headers=headers, verify=verify)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
hrefs = []
if ist_number == 1:
for map_tag in soup.find_all("li", attrs={"data-page": "1"}):
for a in map_tag.find_all("a", href=True):
href = a["href"]
abs_url = urljoin(url, href)
print(abs_url)
hrefs.append(abs_url)
else:
for map_tag in soup.find_all("map"):
for area in map_tag.find_all("area", href=True):
href = area["href"]
abs_url = urljoin(url, href)
hrefs.append(abs_url)
return hrefs
# функции парсера первого источника (газета)
def extract_text_from_url_one(url, timeout=10, verify=True):
proxies_list = download_proxies(PROXIES_URL)
proxies_list = get_shuffled_proxies(proxies_list)
response = ""
for proxy in proxies_list:
response = fetch_with_proxy(url, proxy=proxy, timeout=timeout, verify=verify)
if response:
break
else:
response = ""
soup = BeautifulSoup(response, "html.parser")
title_div = soup.find('div', class_='newsdetatit')
title_text = ''
if title_div:
h3_tag = title_div.find('h3')
if h3_tag:
title_text = h3_tag.get_text(strip=True)
content_div = soup.find('div', class_='newsdetatext')
content_text = ''
if content_div:
founder_content = content_div.find('founder-content')
if founder_content:
p_tags = founder_content.find_all('p')
content_text = '\n'.join(p.get_text(strip=True) for p in p_tags)
text = title_text + content_text
if len(text) > 4500:
text = text[:4500]
print(len(text))
return text
#Функции парсера второго источника (военного)
def extract_text_from_url(url, timeout=10, verify=True):
proxies_list = download_proxies(PROXIES_URL)
proxies_list = get_shuffled_proxies(proxies_list)
response = ""
for proxy in proxies_list:
response = fetch_with_proxy(url, proxy=proxy, timeout=timeout, verify=verify)
if response:
break
else:
response = ""
soup = BeautifulSoup(response, 'html.parser')
# Находим контейнер div.whitecon.article
container = soup.find("div", class_="whitecon article")
if not container:
return "", ""
# Получение заголовка <time> внутри контейнера
time_text = container.find('span')
if time_text:
time_t= time_text.get_text(strip=True)
# Получение всех <p> внутри контейнера, исключая те с class="before_ir"
paragraphs = container.find_all('p')
# Возвращаем текстовую сводку
content_text = []
for p in paragraphs:
if p.get('class') != ['before_ir'] :
content_text.append(p.get_text(strip=True))
return "\n".join(content_text), time_t
# Общий запрос на GPT
def gpt_response_message(content, ist_number=1):
Promts = sw.read_settings().sources
if ist_number == 1:
contentGPT = Promts[0].prompt.replace('{content}', content)
else:
contentGPT = Promts[1].prompt.replace('{content}', content)
url = 'http://45.129.78.228:8484' #10.8.0.14:5500
params = {'text': contentGPT}
# Ограничение количества попыток
max_retries = 3
retries = 0
while retries < max_retries:
try:
response = requests.get(url, params=params, timeout=15)
return response.text
except Exception as ex:
print(f"Ошибка при запросе к GPT: {ex}")
logger.info(f"gpt_response_message: {ex}")
retries += 1
else:
restart_service('work_gpt.service')
print(f"\n\n\tПерезапуск GPT\n\n")
try:
response = requests.get(url, params=params, timeout=15)
return response.text
except Exception as ex:
print(f"Ошибка при запросе к GPT: {ex}")
logger.info(f"gpt_response_message: {ex}")
retries += 1
# перезапуск сервиса GPT при неудачных попытках запроса
def restart_service(service_name):
try:
subprocess.run(['sudo', 'systemctl', 'restart', service_name], check=True)
time.sleep(30)
print(f"Сервис {service_name} успешно перезапущен")
except subprocess.CalledProcessError:
print(f"Не удалось перезапустить сервес {service_name}")
# Общие функции проверки ссылок
def check_url(url):
print(url)
response = requests.get('http://45.129.78.228:8002/check_url_exists', params={'url': url})
if response.status_code == 200:
result = response.json()
print(result["exists"])
return result["exists"]
else:
print(f"Ошибка: {response.status_code}")
return False
# функции даты первого источника (газета)
def create_folder(num):
if int(num) // 10 == 0:
num = f"0{num}"
else:
num = str(num)
return num
# Функция формирования документа
def update_bd_and_create_document(response_text, article_date, url, parsed_at, original_text, other):
clean_response = ''
if not response_text:
print(f"Пустой ответ от GPT для URL: {url}")
logger.info(f"Пустой ответ от GPT для URL: {url}")
return
try:
clean_response = response_text.strip().replace('```json', '').replace('```', '').strip()
data = json.loads(clean_response)
if data['category']:
data['article_date'] = article_date
data['url'] = url
data['parsed_at'] = parsed_at
data['original_text'] = original_text
data['status'] = False
data['viewed'] = False
data['other'] = other
print(requests.post('http://45.129.78.228:8002/save_parsed_data', json=data))
path_day = article_date.split()[0]
if not os.path.exists(path_day):
os.makedirs(path_day)
print(f"Создана папка: {path_day}")
doc = Document()
doc.add_heading('Ссылка на статью', level=1)
doc.add_paragraph(other)
doc.add_heading('Дата и время', level=1)
doc.add_paragraph(article_date)
doc.add_heading('Обноруженные тематики текста', level=1)
doc.add_paragraph(data["category"])
doc.add_heading('Заголовок', level=1)
doc.add_paragraph(data["title"])
doc.add_heading('Краткий пересказ', level=1)
doc.add_paragraph(data["short_text"])
doc.add_heading('Переведенный текст статьи в газете', level=1)
doc.add_paragraph(data["translation_text"])
doc.add_heading('Оригинальный текст', level=1)
doc.add_paragraph(original_text)
doc_name = f"{data['title']}.docx"
doc_path = os.path.join(path_day, doc_name)
doc.save(doc_path)
print(f"Сохранен документ: {doc_path}")
except Exception as ex:
print(f"Ошибка при обработке ответа GPT: {ex}")
logger.info(f"Ошибка при обработке ответа GPT: {ex}")
#Функции start первого источника (газета)
def start_pars_one_istochnik(data_init):
if data_init != ['']:
current_day = data_init[0]
current_month = data_init[1]
current_year = data_init[2]
else:
datetime_now = dt.now()
current_day = create_folder(datetime_now.day)
current_month = create_folder(datetime_now.month)
current_year = f"{datetime_now.year}"
task_id = wp.insert_task(status='queued', source_url=f'http://epaper.hljnews.cn/hljrb/pc/layout/{current_year}{current_month}/{current_day}/node_0X.html')
print("Создана задача с id:", task_id)
for page_number in range(1, 9):
url = f'http://epaper.hljnews.cn/hljrb/pc/layout/{current_year}{current_month}/{current_day}/node_0{page_number}.html'
wp.update_task(task_id, status='in_progress', source_url=url, started_at=datetime.utcnow())
print(f"Сбор href из: {url}")
try:
hrefs = extract_map_area_hrefs(url, ist_number=2)
except Exception as e:
print(f"Ошибка при извлечении ссылок: {e}")
logger.info(f"extract_map_area_hrefs: {e}")
continue
for i, link in enumerate(hrefs, 1):
if check_url(link) == False:
print(f"Страница {page_number} [{i}/{len(hrefs)}] parsing {link}")
text = extract_text_from_url_one(link)
if len(text) >= 100:
response_text = gpt_response_message(text, ist_number=2)
print(response_text)
if response_text:
update_bd_and_create_document(response_text=response_text, article_date=f"{current_year}/{current_month}/{current_day}", url=link, parsed_at=str(dt.now()), original_text=text, other=url)
wp.update_task(task_id, status='completed', finished_at=datetime.utcnow())
#Функции start второго источника (военного)
def start_pars_two_istochnik():
task_id = wp.insert_task(status='queued', source_url=f'https://def.ltn.com.tw/')
istochnik = ['https://def.ltn.com.tw/breakingnewslist', 'https://def.ltn.com.tw/list/11', 'https://def.ltn.com.tw/list/19', 'https://def.ltn.com.tw/list/17','https://def.ltn.com.tw/list/16']
all_links = []
for url in istochnik:
try:
print(f"Сбор href из: {url}")
all_links += extract_map_area_hrefs(url)
except Exception as e:
print(f"Ошибка при извлечении ссылок: {e}")
logger.info(f"Ошибка при извлечении ссылок: {e}")
continue
for hrefs in all_links:
if check_url(hrefs) == False:
try:
text, time_text = extract_text_from_url(hrefs)
if len(text) >= 100:
response_text = gpt_response_message(text)
print(response_text)
if response_text:
update_bd_and_create_document(response_text=response_text, article_date=time_text, url=hrefs, parsed_at=str(dt.now()), original_text=text, other=url)
except:
continue
wp.update_task(task_id, status='completed', finished_at=datetime.utcnow())
# Функции для автоматического запуска
def scheduled_parser_1():
"""Планировщик для первого парсера"""
istochnik = "" # пустая строка = текущая дата
start_pars_one_istochnik(istochnik.split("."))
def scheduled_parser_2():
"""Планировщик для второго парсера"""
start_pars_two_istochnik()
class ParserOneRequest(BaseModel):
time: str
@app.post("/parser_1", summary="Запуск процесса парсинга первого источника")
async def process_data(data: ParserOneRequest, background_tasks: BackgroundTasks):
istochnik = data.time.split(".")
background_tasks.add_task(start_pars_one_istochnik, istochnik)
return {"message": "Процесс парсинга 1 источника запущен"}
@app.post("/parser_2" , summary="Запуск процеса парсинга второго источника")
async def process_data_gpt(background_tasks: BackgroundTasks):
background_tasks.add_task(start_pars_two_istochnik)
return {"message": "Процесс парсинга 2 источника запущен"}
# GET метод для получения
@app.get("/get_tasks_offset", summary="Метод получения задач парсинга")
def get_tasks_offset(limit: int = Query(10, gt=0), offset: int = Query(0, ge=0)):
return wp.get_tasks_offset(limit, offset)
# GET метод для получения настроек
@app.get("/settings", summary="Метод получения настроек парсера")
def get_settings():
return sw.read_settings()
# POST метод для установки настроек
@app.post("/settings", summary="Метод сохранения настроек парсера")
def set_settings(settings: sw.Source):
return sw.update_source(settings)
@app.delete("/delete_task/{task_id}", summary="Метод удаления задачи")
def delete_task(task_id: int):
return print(wp.delete_task(task_id))
@app.get("/file_download", summary="Метод для скачивания файла")
async def download_file(path: str, title: str):
path = f"./{path}/{title}.docx" #os.path.abspath(path)
return FileResponse(path=path, filename=f'{title}.docx', media_type='multipart/form-data')
# if __name__ == "__main__":
# uvicorn.run("main:app", port=8001, reload=True)