Files
parser/main.py

412 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, Request, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import logging
import subprocess
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import json
from datetime import datetime as dt
import uvicorn
import time
from datetime import datetime
import settings_work as sw
import work_parser as wp
app = FastAPI(title="Parser API",
description="API для запуска парсинга в базу данных",
version="1.0")
# Инициализация планировщика
scheduler = AsyncIOScheduler()
# Настройка логгера
logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
@app.get("/logs")
def get_logs():
with open("app.log", "r") as file:
lines = file.readlines()[-10:] # последние 100 строк
return {"logs": lines}
# Инициализация таблицы статуса парсинга
wp.create_table()
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://45.129.78.228:8000"], # или список разрешенных адресов, например [""]
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
PROXIES_URL = "https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt"
def download_proxies(url):
response = requests.get(url)
if response.status_code == 200:
proxies = response.text.splitlines() # список прокси по строкам
return proxies
else:
return []
def fetch_with_proxy(url, proxy, verify, timeout):
proxies = {
'http': f'http://{proxy}', # или 'socks5://' если SOCKS5 и т.п.
'https': f'http://{proxy}',
}
try:
response = requests.get(url, proxies=proxies, timeout=timeout, verify=verify)
response.encoding = 'utf-8'
response.raise_for_status()
return response.text
except:
return None
# Общие функции нахождения ссылок
def extract_map_area_hrefs(url, verify=True, ist_number=1):
headers = {
"User-Agent": "Mozilla/5.0 (compatible; MyScraper/1.0; +https://example.com)"
}
resp = requests.get(url, headers=headers, verify=verify)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
hrefs = []
if ist_number == 1:
for map_tag in soup.find_all("li", attrs={"data-page": "1"}):
for a in map_tag.find_all("a", href=True):
href = a["href"]
abs_url = urljoin(url, href)
print(abs_url)
hrefs.append(abs_url)
else:
for map_tag in soup.find_all("map"):
for area in map_tag.find_all("area", href=True):
href = area["href"]
abs_url = urljoin(url, href)
hrefs.append(abs_url)
return hrefs
# функции парсера первого источника (газета)
def extract_text_from_url_one(url, timeout=10, verify=True):
proxies_list = download_proxies(PROXIES_URL)
response = ""
for proxy in proxies_list:
response = fetch_with_proxy(url, proxy=proxy, timeout=timeout, verify=verify)
if response:
break
else:
response = ""
soup = BeautifulSoup(response, "html.parser")
title_div = soup.find('div', class_='newsdetatit')
title_text = ''
if title_div:
h3_tag = title_div.find('h3')
if h3_tag:
title_text = h3_tag.get_text(strip=True)
content_div = soup.find('div', class_='newsdetatext')
content_text = ''
if content_div:
founder_content = content_div.find('founder-content')
if founder_content:
p_tags = founder_content.find_all('p')
content_text = '\n'.join(p.get_text(strip=True) for p in p_tags)
text = title_text + content_text
if len(text) > 4500:
text = text[:4500]
print(len(text))
return text
#Функции парсера второго источника (военного)
def extract_text_from_url(url, timeout=10, verify=True):
proxies_list = download_proxies(PROXIES_URL)
response = ""
for proxy in proxies_list:
response = fetch_with_proxy(url, proxy=proxy, timeout=timeout, verify=verify)
if response:
break
else:
response = ""
soup = BeautifulSoup(response, 'html.parser')
# Находим контейнер div.whitecon.article
container = soup.find("div", class_="whitecon article")
if not container:
return ""
# Получение заголовка <time> внутри контейнера
time_text = container.find('span')
if time_text:
time_t= time_text.get_text(strip=True)
# Получение всех <p> внутри контейнера, исключая те с class="before_ir"
paragraphs = container.find_all('p')
# Возвращаем текстовую сводку
content_text = []
for p in paragraphs:
if p.get('class') != ['before_ir']:
content_text.append(p.get_text(strip=True))
return "\n".join(content_text), time_t
# Общий запрос на GPT
def gpt_response_message(content, ist_number=1):
Promts = sw.read_settings().sources
if ist_number == 1:
contentGPT = Promts[0].prompt.replace('{content}', content)
else:
contentGPT = Promts[1].prompt.replace('{content}', content)
url = 'http://45.129.78.228:8484' #10.8.0.14:5500
params = {'text': contentGPT}
# Ограничение количества попыток
max_retries = 3
retries = 0
while retries < max_retries:
try:
response = requests.get(url, params=params, timeout=15)
return response.text
except Exception as ex:
print(f"Ошибка при запросе к GPT: {ex}")
logger.info(f"gpt_response_message: {ex}")
retries += 1
else:
restart_service('work_gpt.service')
print(f"\n\n\tПерезапуск GPT\n\n")
try:
response = requests.get(url, params=params, timeout=15)
return response.text
except Exception as ex:
print(f"Ошибка при запросе к GPT: {ex}")
logger.info(f"gpt_response_message: {ex}")
retries += 1
def restart_service(service_name):
try:
subprocess.run(['sudo', 'systemctl', 'restart', service_name], check=True)
time.sleep(30)
print(f"Сервис {service_name} успешно перезапущен")
except subprocess.CalledProcessError:
print(f"Не удалось перезапустить сервес {service_name}")
# Общие функции проверки ссылок
def check_url(url):
print(url)
response = requests.get('http://45.129.78.228:8002/check_url_exists', params={'url': url})
if response.status_code == 200:
result = response.json()
print(result["exists"])
return result["exists"]
else:
print(f"Ошибка: {response.status_code}")
return False
# функции даты первого источника (газета)
def create_folder(num):
if int(num) // 10 == 0:
num = f"0{num}"
else:
num = str(num)
return num
#Функции start первого источника (газета)
def start_pars_one_istochnik(data_init):
if data_init != ['']:
current_day = data_init[0]
current_month = data_init[1]
current_year = data_init[2]
else:
datetime_now = dt.now()
current_day = create_folder(datetime_now.day)
current_month = create_folder(datetime_now.month)
current_year = f"{datetime_now.year}"
task_id = wp.insert_task(status='queued', source_url=f'http://epaper.hljnews.cn/hljrb/pc/layout/{current_year}{current_month}/{current_day}/node_0X.html')
print("Создана задача с id:", task_id)
for page_number in range(1, 9):
start_url = f'http://epaper.hljnews.cn/hljrb/pc/layout/{current_year}{current_month}/{current_day}/node_0{page_number}.html'
wp.update_task(task_id, status='in_progress', source_url=start_url, started_at=datetime.utcnow())
print(f"Сбор href из: {start_url}")
try:
hrefs = extract_map_area_hrefs(start_url, ist_number=2)
except Exception as e:
print(f"Ошибка при извлечении ссылок: {e}")
logger.info(f"extract_map_area_hrefs: {e}")
continue
for i, link in enumerate(hrefs, 1):
if check_url(link) == False:
print(f"Страница {page_number} [{i}/{len(hrefs)}] parsing {link}")
text = extract_text_from_url_one(link)
if len(text) >= 100:
response_text = gpt_response_message(text, ist_number=2)
print(response_text)
clean_response = ''
try:
clean_response = response_text.strip().replace('```json', '').replace('```', '').strip()
data = json.loads(clean_response)
data['article_date'] = f"{current_day}/{current_month}/{current_year}"
data['url'] = link
data['parsed_at'] = str(dt.now())
data['original_text'] = text
data['status'] = False
data['viewed'] = False
data['other'] = start_url
if data['category']:
print(requests.post('http://45.129.78.228:8002/save_parsed_data', json=data))
except Exception as ex:
print(f"Ошибка при обработке ответа GPT: {ex}")
logger.info(f"gpt_response_message: {ex}")
continue
wp.update_task(task_id, status='completed', finished_at=datetime.utcnow())
#Функции start второго источника (военного)
def start_pars_two_istochnik():
task_id = wp.insert_task(status='queued', source_url=f'https://def.ltn.com.tw/')
istochnik = ['https://def.ltn.com.tw/breakingnewslist', 'https://def.ltn.com.tw/list/11', 'https://def.ltn.com.tw/list/19', 'https://def.ltn.com.tw/list/17','https://def.ltn.com.tw/list/16']
all_links = []
for url in istochnik:
try:
print(f"Сбор href из: {url}")
all_links += extract_map_area_hrefs(url)
except Exception as e:
print(f"Ошибка при извлечении ссылок: {e}")
logger.info(f"Ошибка при извлечении ссылок: {e}")
continue
for hrefs in all_links:
if check_url(hrefs) == False:
try:
text, time_text = extract_text_from_url(hrefs)
if len(text) >= 100:
response_text = gpt_response_message(text)
print(response_text)
clean_response = ''
try:
clean_response = response_text.strip().replace('json', '').replace('', '').strip()
data = json.loads(clean_response)
data['article_date'] = time_text
data['url'] = hrefs
data['parsed_at'] = str(dt.now())
data['original_text'] = text
data['status'] = False
data['viewed'] = False
data['other'] = url
# print[date]
if data['category']:
print(requests.post('http://45.129.78.228:8002/save_parsed_data', json=data))
except Exception as ex:
print(f"Ошибка при обработке ответа GPT: {ex}")
logger.info(f"Ошибка при обработке ответа GPT: {ex}")
continue
except:
continue
wp.update_task(task_id, status='completed', finished_at=datetime.utcnow())
# Функции для автоматического запуска
def scheduled_parser_1():
"""Планировщик для первого парсера"""
istochnik = "" # пустая строка = текущая дата
start_pars_one_istochnik(istochnik.split("."))
def scheduled_parser_2():
"""Планировщик для второго парсера"""
start_pars_two_istochnik()
@app.on_event("startup")
async def start_scheduler():
"""Запуск планировщика при старте приложения"""
scheduler.add_job(scheduled_parser_1, "cron", hour=10, minute=0)
scheduler.add_job(scheduled_parser_2, "cron", hour=11, minute=0)
scheduler.start()
@app.on_event("shutdown")
async def stop_scheduler():
"""Остановка планировщика при выключении"""
scheduler.shutdown()
class ParserOneRequest(BaseModel):
time: str
@app.post("/parser_1", summary="Запуск процесса парсинга первого источника")
async def process_data(data: ParserOneRequest, background_tasks: BackgroundTasks):
istochnik = data.time.split(".")
background_tasks.add_task(start_pars_one_istochnik, istochnik)
return {"message": "Процесс парсинга 1 источника запущен"}
@app.post("/parser_2" , summary="Запуск процеса парсинга второго источника")
async def process_data_gpt(background_tasks: BackgroundTasks):
background_tasks.add_task(start_pars_two_istochnik)
return {"message": "Процесс парсинга 2 источника запущен"}
# GET метод для получения
@app.get("/get_tasks_offset", summary="Метод получения задач парсинга")
def get_tasks_offset(limit: int = Query(10, gt=0), offset: int = Query(0, ge=0)):
return wp.get_tasks_offset(limit, offset)
# GET метод для получения настроек
@app.get("/settings", summary="Метод получения настроек парсера")
def get_settings():
return sw.read_settings()
# POST метод для установки настроек
@app.post("/settings", summary="Метод сохранения настроек парсера")
def set_settings(settings: sw.Source):
return sw.update_source(settings)
@app.delete("/delete_task/{task_id}", summary="Метод удаления задачи")
def delete_task(task_id: int):
return print(wp.delete_task(task_id))
# if __name__ == "__main__":
# uvicorn.run("main:app", port=8001, reload=True)