All checks were successful
continuous-integration/drone/push Build is passing
76 lines
2.6 KiB
Python
76 lines
2.6 KiB
Python
"""
|
||
Менеджер прокси - управление загрузкой и использованием прокси
|
||
"""
|
||
import random
|
||
import requests
|
||
from config import PROXIES_URL
|
||
|
||
|
||
def download_proxies(url: str = PROXIES_URL) -> list[str]:
|
||
"""
|
||
Загружает список прокси из удаленного источника
|
||
"""
|
||
response = requests.get(url)
|
||
if response.status_code == 200:
|
||
proxies = response.text.splitlines()
|
||
return proxies
|
||
else:
|
||
return []
|
||
|
||
|
||
def get_shuffled_proxies(proxies_list: list[str]) -> list[str]:
|
||
"""
|
||
Перемешивает список прокси для случайного начала
|
||
"""
|
||
shuffled = proxies_list.copy()
|
||
random.shuffle(shuffled)
|
||
return shuffled
|
||
|
||
|
||
def fetch_with_proxy(url: str, proxy: str, verify: bool = True, timeout: int = 10) -> str | None:
|
||
"""
|
||
Выполняет запрос к URL через прокси
|
||
Возвращает текст ответа или None при ошибке
|
||
"""
|
||
proxies = {
|
||
'http': f'http://{proxy}',
|
||
'https': f'http://{proxy}',
|
||
}
|
||
try:
|
||
response = requests.get(url, proxies=proxies, timeout=timeout, verify=verify)
|
||
response.encoding = 'utf-8'
|
||
if response.status_code == 200:
|
||
# Проверяем содержимое - если это ошибка от прокси
|
||
if '"message":"Request failed' in response.text or '403' in response.text[:500]:
|
||
print(f"Proxy {proxy} - Site returned 403 (inside response)")
|
||
return None
|
||
print(f"Proxy {proxy} - SUCCESS")
|
||
return response.text
|
||
elif response.status_code == 403:
|
||
print(f"Proxy {proxy} - 403 Forbidden")
|
||
return None # Прокси работает, но сайт блокирует
|
||
else:
|
||
print(f"Proxy {proxy} - Status {response.status_code}")
|
||
return None
|
||
except:
|
||
return None
|
||
|
||
|
||
def fetch_with_proxy_retry(url: str, timeout: int = 10, verify: bool = True) -> str:
|
||
"""
|
||
Выполняет запрос с перебором прокси до успешного
|
||
Возвращает пустую строку если все прокси не сработали
|
||
"""
|
||
proxies_list = download_proxies(PROXIES_URL)
|
||
proxies_list = get_shuffled_proxies(proxies_list)
|
||
|
||
response = ""
|
||
for proxy in proxies_list:
|
||
response = fetch_with_proxy(url, proxy=proxy, timeout=timeout, verify=verify)
|
||
if response:
|
||
break
|
||
else:
|
||
response = ""
|
||
|
||
return response
|