All checks were successful
continuous-integration/drone/push Build is passing
48 lines
1.4 KiB
Python
48 lines
1.4 KiB
Python
"""
|
||
Базовый класс парсера
|
||
"""
|
||
from abc import ABC, abstractmethod
|
||
from typing import List
|
||
import work_parser as wp
|
||
|
||
|
||
class BaseParser(ABC):
|
||
"""
|
||
Базовый класс для всех парсеров
|
||
"""
|
||
|
||
def __init__(self, source_name: str):
|
||
self.source_name = source_name
|
||
self.task_id = None
|
||
|
||
def start_task(self, source_url: str) -> int:
|
||
"""
|
||
Создаёт задачу парсинга и возвращает её ID
|
||
"""
|
||
self.task_id = wp.insert_task(status='queued', source_url=source_url)
|
||
print(f"Создана задача с id: {self.task_id}")
|
||
return self.task_id
|
||
|
||
def complete_task(self) -> None:
|
||
"""
|
||
Завершает задачу парсинга
|
||
"""
|
||
if self.task_id:
|
||
from datetime import datetime
|
||
wp.update_task(self.task_id, status='completed', finished_at=datetime.utcnow())
|
||
|
||
def fail_task(self) -> None:
|
||
"""
|
||
Отмечает задачу как неудачную
|
||
"""
|
||
if self.task_id:
|
||
from datetime import datetime
|
||
wp.update_task(self.task_id, status='failed', finished_at=datetime.utcnow())
|
||
|
||
@abstractmethod
|
||
def parse(self) -> None:
|
||
"""
|
||
Основной метод парсинга - должен быть реализован в наследниках
|
||
"""
|
||
pass
|