Task Handlers

Task Handlers

Обзор

Обработчики задач (task_handlers) отвечают за обработку асинхронных сообщений, поступающих из RabbitMQ. Каждый обработчик специализируется на определенном типе задач.

Архитектура

graph TD
    subgraph "Система"
        A[RabbitMQ] --> B{Task Worker};
        B --> C{Task Handlers};
    end

    subgraph "Обработчики"
        C --> D[A/B Testing Handler];
        C --> E[Backup Handler];
        C --> F[Clean Work Status Handler];
    end

    D --> G[AB Optimizer];
    E --> H[Backup Manager];
    F --> I[StateStorage];
  • Task Worker: Получает сообщения из RabbitMQ и передает их соответствующему обработчику.
  • Task Handlers: Содержат бизнес-логику для обработки конкретных типов задач.

Типы обработчиков

1. A/B Testing Handler

  • Файл: ab_testing.go
  • Назначение: Обрабатывает задачи для проведения A/B-тестирования.
  • Логика:
    1. Декодирует параметры теста из base64.
    2. Запускает ab_optimizer с полученными параметрами.
    3. Обновляет статус выполнения задачи в StateStorage (worker_progress).
    4. Возвращает результат с лучшими параметрами.

2. Backup Handler

  • Файл: backup_handler.go
  • Назначение: Обрабатывает задачи, связанные с резервным копированием.
  • Логика:
    1. Определяет тип операции (create_backup, restore_backup и т.д.).
    2. Вызывает соответствующий метод в backup.Manager.
    3. Возвращает статус операции.

3. Clean Work Status Handler

  • Файл: clean_work_status.go
  • Назначение: Выполняет периодическую очистку устаревших записей о статусе задач.
  • Логика:
    1. Запускается по расписанию (например, раз в день).
    2. Удаляет из worker_progress записи старше определенного срока (например, 7 дней).

Использование

Обработчики автоматически вызываются TaskWorker‘ом в зависимости от типа задачи, указанного в сообщении RabbitMQ.

Пример сообщения для A/B-теста

Внешний формат сообщения (RabbitMQ):

{
  "type": "ab_test",
  "model_type": "dqn",
  "payload": "..." // base64-encoded параметры
}

Внутренний формат payload (после декодирования base64):

{
  "params": {
    "alpha": [0.1, 0.2, 0.3],
    "beta": [0.5, 0.6]
  },
  "exchange": "binance",
  "pair": "TONUSDT",
  "data_period": "24h"
}

Обязательные поля в payload:

  • exchange (string) - название биржи (например, “binance”)
  • pair (string) - торговая пара (например, “TONUSDT”)
  • data_period (string) - период данных в формате Go duration (например, “24h”, “1h”, “30m”)
    • Минимальное значение: min_data_period из конфига (по умолчанию “1h”)
    • Максимальное значение: 30 дней (720h)
    • Формат: Go duration format (https://pkg.go.dev/time#ParseDuration)

Валидация:

  • Если поле отсутствует, задача будет помечена как failed с соответствующим сообщением об ошибке
  • Ошибки валидации логируются с контекстом (message_id, exchange, pair)

Пример сообщения для бэкапа

{
  "type": "create_backup",
  "payload": {
    "scope": "database",
    "database": "quotes"
    // ...
  }
}

Настройка

Обработчики не требуют отдельной конфигурации. Их зависимости (например, ab_optimizer, backup.Manager) настраиваются на уровне ServiceContainer.