Task Handlers
Task Handlers
Обзор
Обработчики задач (task_handlers) отвечают за обработку асинхронных сообщений, поступающих из RabbitMQ. Каждый обработчик специализируется на определенном типе задач.
Архитектура
graph TD
subgraph "Система"
A[RabbitMQ] --> B{Task Worker};
B --> C{Task Handlers};
end
subgraph "Обработчики"
C --> D[A/B Testing Handler];
C --> E[Backup Handler];
C --> F[Clean Work Status Handler];
end
D --> G[AB Optimizer];
E --> H[Backup Manager];
F --> I[StateStorage];- Task Worker: Получает сообщения из RabbitMQ и передает их соответствующему обработчику.
- Task Handlers: Содержат бизнес-логику для обработки конкретных типов задач.
Типы обработчиков
1. A/B Testing Handler
- Файл:
ab_testing.go - Назначение: Обрабатывает задачи для проведения A/B-тестирования.
- Логика:
- Декодирует параметры теста из
base64. - Запускает
ab_optimizerс полученными параметрами. - Обновляет статус выполнения задачи в
StateStorage(worker_progress). - Возвращает результат с лучшими параметрами.
- Декодирует параметры теста из
2. Backup Handler
- Файл:
backup_handler.go - Назначение: Обрабатывает задачи, связанные с резервным копированием.
- Логика:
- Определяет тип операции (
create_backup,restore_backupи т.д.). - Вызывает соответствующий метод в
backup.Manager. - Возвращает статус операции.
- Определяет тип операции (
3. Clean Work Status Handler
- Файл:
clean_work_status.go - Назначение: Выполняет периодическую очистку устаревших записей о статусе задач.
- Логика:
- Запускается по расписанию (например, раз в день).
- Удаляет из
worker_progressзаписи старше определенного срока (например, 7 дней).
Использование
Обработчики автоматически вызываются TaskWorker‘ом в зависимости от типа задачи, указанного в сообщении RabbitMQ.
Пример сообщения для A/B-теста
Внешний формат сообщения (RabbitMQ):
{
"type": "ab_test",
"model_type": "dqn",
"payload": "..." // base64-encoded параметры
}
Внутренний формат payload (после декодирования base64):
{
"params": {
"alpha": [0.1, 0.2, 0.3],
"beta": [0.5, 0.6]
},
"exchange": "binance",
"pair": "TONUSDT",
"data_period": "24h"
}
Обязательные поля в payload:
exchange(string) - название биржи (например, “binance”)pair(string) - торговая пара (например, “TONUSDT”)data_period(string) - период данных в формате Go duration (например, “24h”, “1h”, “30m”)- Минимальное значение:
min_data_periodиз конфига (по умолчанию “1h”) - Максимальное значение: 30 дней (720h)
- Формат: Go duration format (https://pkg.go.dev/time#ParseDuration)
- Минимальное значение:
Валидация:
- Если поле отсутствует, задача будет помечена как
failedс соответствующим сообщением об ошибке - Ошибки валидации логируются с контекстом (message_id, exchange, pair)
Пример сообщения для бэкапа
{
"type": "create_backup",
"payload": {
"scope": "database",
"database": "quotes"
// ...
}
}
Настройка
Обработчики не требуют отдельной конфигурации. Их зависимости (например, ab_optimizer, backup.Manager) настраиваются на уровне ServiceContainer.