StateStorage - Управление состояниями
StateStorage - Управление состояниями
Обзор
StateStorage — это отдельная база данных PostgreSQL для хранения истории состояний системы. Она обеспечивает:
- Отдельную БД PostgreSQL для истории (независимо от БД котировок)
- Хранение истории состояний Meta Learning, Transfer Learning и checkpoint’ов Genetic Algorithm
- Хранение истории прогресса воркеров (worker_progress)
- Поддержку схем PostgreSQL для изоляции данных
- Автоматические миграции через golang-migrate
- Защиту от потери данных при перезапусках подов
- Глобальное хранилище знаний, доступное всем воркерам
Важно: StateStorage использует отдельную БД PostgreSQL, которая не связана с БД для хранения котировок (PostgreSQL или ClickHouse).
Архитектура
graph TB
subgraph "StateStorage"
SS[StateStorage]
MIG[Migrator]
REPO[MetaLearningStateRepository]
WPR[WorkerProgressRepository]
end
subgraph "База данных истории (StateStorage)"
DB[(PostgreSQL<br/>ml_states)]
SCHEMA[Схема: public/ml_states]
TABLES[Таблицы:<br/>worker_progress<br/>meta_learning_state]
end
subgraph "База данных котировок"
PG[(PostgreSQL<br/>quotes)]
CH[(ClickHouse<br/>quotes)]
end
subgraph "Компоненты системы"
GA[Genetic Algorithm]
ML[Meta Learning]
TL[Transfer Learning]
WP[Worker Progress]
end
SS --> MIG
SS --> DB
MIG --> SCHEMA
SCHEMA --> TABLES
REPO --> SS
WPR --> SS
GA --> REPO
ML --> REPO
TL --> REPO
WP --> WPRКонфигурация
Базовая конфигурация
Важно: StateStorage использует отдельную БД PostgreSQL, отличную от БД для котировок.
# БД для котировок (PostgreSQL или ClickHouse)
storage:
type: "postgres" # или "clickhouse"
postgres:
db_name: "quotes" # БД для котировок
clickhouse:
db_name: "quotes" # БД для котировок
# БД для истории состояний (отдельный PostgreSQL)
state_storage:
db_user: "states" # Имя пользователя БД истории
db_password: "password" # Пароль пользователя БД истории
db_host: "localhost:5432" # Адрес и порт БД истории
db_name: "ml_states" # Имя базы данных истории (отдельная БД!)
schema: "public" # Схема БД (по умолчанию "public")
migrations_path: "migrations" # Путь к миграциям
auto_migrate: true # Автоматическое применение миграций при старте
Переменные окружения
| Переменная | Описание | По умолчанию |
|---|---|---|
STATE_STORAGE_DB_USER | Пользователь БД состояний | states |
STATE_STORAGE_DB_PASSWORD | Пароль БД состояний | password |
STATE_STORAGE_DB_HOST | Хост БД состояний | localhost:5432 |
STATE_STORAGE_DB_NAME | Имя БД состояний | ml_states |
STATE_STORAGE_SCHEMA | Схема БД состояний | public |
STATE_STORAGE_MIGRATIONS_PATH | Путь к миграциям | migrations |
STATE_STORAGE_AUTO_MIGRATE | Автоматическое применение миграций | true |
Использование схем
StateStorage поддерживает использование различных схем PostgreSQL для изоляции данных:
state_storage:
schema: "ml_states" # Использование отдельной схемы
Преимущества использования схем:
- Изоляция данных — состояния отделены от других данных
- Управление правами — можно настроить права доступа на уровне схемы
- Организация — логическое разделение данных в одной БД
Миграции
StateStorage использует golang-migrate для управления схемой БД.
Формат миграций
Миграции находятся в папке migrations/ и следуют формату:
000001_<name>.up.sql— применение миграции000001_<name>.down.sql— откат миграции
Автоматическое применение
При auto_migrate: true миграции применяются автоматически при старте приложения:
stateStorage, err := storage.NewStateStorage(ctx, log, &cfg.StateStorage)
// Миграции применяются автоматически
Ручное управление миграциями
Если auto_migrate: false, можно управлять миграциями вручную:
# Применить все миграции
migrate -path migrations -database "postgres://user:pass@host/db?sslmode=disable" up
# Откатить последнюю миграцию
migrate -path migrations -database "postgres://user:pass@host/db?sslmode=disable" down 1
# Проверить версию
migrate -path migrations -database "postgres://user:pass@host/db?sslmode=disable" version
Таблицы
worker_progress
Таблица для отслеживания статуса обработки заданий и промежуточных результатов.
Основные поля:
message_id(UUID, PRIMARY KEY) — уникальный идентификатор сообщенияstatus(VARCHAR) — статус: pending, processing, completed, failedworker_id(VARCHAR) — идентификатор воркераprogress_percent(DECIMAL) — процент выполненияcurrent_generation(INT) — текущее поколение GAintermediate_results(JSONB) — промежуточные результаты
Индексы:
idx_worker_progress_status— по статусу и времени обновленияidx_worker_progress_worker— по воркеру и статусуidx_worker_progress_updated— по времени обновления
meta_learning_state
Таблица для хранения состояний Meta Learning, Transfer Learning и GA checkpoints.
Основные поля:
id(UUID, PRIMARY KEY) — уникальный идентификаторtest_id(VARCHAR) — ID конкретного теста (NULL для глобальных знаний)message_id(UUID) — связь с worker_progressstate_type(VARCHAR) — тип: meta_learning, transfer_learning, genetic_algorithmexchange,pair,model_type— контекст задачиknowledge_key(VARCHAR) — составной ключ для глобальных знанийis_global(BOOLEAN) — флаг глобального состоянияstate_data(JSONB) — полное состояниеgeneration(INT) — текущее поколение GAcheckpoint_type(VARCHAR) — тип checkpoint: full, incremental, finalexpires_at(TIMESTAMP) — время истечения (для автоочистки)version(INT) — версия для оптимистичных блокировок
Индексы:
idx_meta_learning_knowledge_key— для поиска глобальных знанийidx_meta_learning_test_id— для поиска checkpoint’ов по test_ididx_meta_learning_message_id— для поиска по message_ididx_meta_learning_exchange_pair_model— по комбинации exchange+pair+model_typeidx_meta_learning_expires— для очистки истекших состоянийidx_meta_learning_global_unique— уникальный индекс для глобальных знаний
Репозитории
MetaLearningStateRepository
Репозиторий для работы с состояниями Meta Learning, Transfer Learning и GA checkpoints.
Основные методы:
SaveGlobalKnowledge()— сохранение глобальных знанийLoadGlobalKnowledge()— загрузка глобальных знанийUpdateGlobalKnowledge()— обновление глобальных знанийSaveState()— сохранение состояния (универсальный метод)LoadState()— загрузка состояния по test_idSaveCheckpoint()— сохранение checkpoint GALoadLatestCheckpoint()— загрузка последнего checkpointFindSimilarTasks()— поиск похожих задач для Transfer LearningCleanupExpiredStates()— очистка истекших состояний
WorkerProgressRepository
Репозиторий для работы с прогрессом воркеров.
Основные методы:
GetProgress()— получение текущего статусаCreateProgress()— создание записи о прогрессеUpdateStatus()— обновление статусаUpdateProgress()— обновление прогресса и метрикMarkCompleted()— пометка как завершенноеMarkFailed()— пометка как завершенное с ошибкойGetStuckTasks()— получение зависших задачGetActiveTasks()— получение активных задач воркера
Использование в коде
Инициализация
// В ServiceContainer
stateStorage, err := storage.NewStateStorage(ctx, log, &cfg.StateStorage)
if err != nil {
log.Warn("StateStorage не инициализирован", slog.String("error", err.Error()))
}
Создание репозиториев
// MetaLearningStateRepository
stateRepo := storage.NewMetaLearningStateRepository(stateStorage, log)
// WorkerProgressRepository
workerProgressRepo := storage.NewWorkerProgressRepository(stateStorage, log)
Сохранение checkpoint GA
stateManager := ab_optimizer.NewStateManager(stateRepo, log)
err := stateManager.SaveGACheckpoint(
ctx,
testID,
messageID,
generation,
population,
ab_optimizer.CheckpointTypeIncremental,
exchange,
pair,
modelType,
)
Восстановление checkpoint
checkpoint, err := stateManager.RestoreGACheckpoint(ctx, testID)
if err == nil && checkpoint != nil {
// Восстановить популяцию и продолжить с generation
population = checkpoint.StateData.GAPopulation
currentGeneration = *checkpoint.Generation
}
Очистка истекших состояний
StateStorage предоставляет встроенный сервис очистки:
// Запуск фоновой задачи очистки
go stateStorage.StartCleanupService(ctx, 1*time.Hour)
Или вручную:
deleted, err := stateRepo.CleanupExpiredStates(ctx)
Масштабирование
StateStorage поддерживает работу нескольких воркеров с одной БД:
- Глобальные знания (Meta Learning, Transfer Learning) доступны всем воркерам
- Checkpoint’ы привязаны к конкретному
test_idиmessage_id - Оптимистичные блокировки через поле
versionпредотвращают конфликты
Разделение БД
БД для котировок
- PostgreSQL или ClickHouse — хранилище котировок
- Используется для хранения исторических данных котировок
- Настраивается в секции
storageконфигурации
БД для истории (StateStorage)
- PostgreSQL — отдельная БД для истории состояний
- Используется для хранения:
- Истории состояний Meta Learning, Transfer Learning
- Checkpoint’ов Genetic Algorithm
- Прогресса воркеров (worker_progress)
- Настраивается в секции
state_storageконфигурации
Рекомендация: Используйте разные БД для котировок и истории, чтобы разделить нагрузку и обеспечить изоляцию данных.
Рекомендации
- Production: Используйте отдельную БД PostgreSQL для истории состояний
- Разделение: Не используйте одну БД для котировок и истории
- Схемы: Используйте отдельную схему для дополнительной изоляции данных
- Миграции: Включите
auto_migrate: trueдля автоматического применения - Очистка: Настройте периодическую очистку истекших состояний
- Мониторинг: Отслеживайте размер таблиц и производительность запросов
Troubleshooting
Миграции не применяются
Проверьте:
- Путь к миграциям в конфигурации
- Права доступа к БД
- Логи приложения при старте
Ошибки подключения
Проверьте:
- Параметры подключения в конфигурации
- Доступность БД
- Правильность схемы (если используется)
Конфликты при конкурентном доступе
StateStorage использует оптимистичные блокировки через поле version. При конфликтах операция повторяется автоматически.