StateStorage - Управление состояниями

StateStorage - Управление состояниями

Обзор

StateStorage — это отдельная база данных PostgreSQL для хранения истории состояний системы. Она обеспечивает:

  • Отдельную БД PostgreSQL для истории (независимо от БД котировок)
  • Хранение истории состояний Meta Learning, Transfer Learning и checkpoint’ов Genetic Algorithm
  • Хранение истории прогресса воркеров (worker_progress)
  • Поддержку схем PostgreSQL для изоляции данных
  • Автоматические миграции через golang-migrate
  • Защиту от потери данных при перезапусках подов
  • Глобальное хранилище знаний, доступное всем воркерам

Важно: StateStorage использует отдельную БД PostgreSQL, которая не связана с БД для хранения котировок (PostgreSQL или ClickHouse).

Архитектура

graph TB
    subgraph "StateStorage"
        SS[StateStorage]
        MIG[Migrator]
        REPO[MetaLearningStateRepository]
        WPR[WorkerProgressRepository]
    end
    
    subgraph "База данных истории (StateStorage)"
        DB[(PostgreSQL<br/>ml_states)]
        SCHEMA[Схема: public/ml_states]
        TABLES[Таблицы:<br/>worker_progress<br/>meta_learning_state]
    end
    
    subgraph "База данных котировок"
        PG[(PostgreSQL<br/>quotes)]
        CH[(ClickHouse<br/>quotes)]
    end
    
    subgraph "Компоненты системы"
        GA[Genetic Algorithm]
        ML[Meta Learning]
        TL[Transfer Learning]
        WP[Worker Progress]
    end
    
    SS --> MIG
    SS --> DB
    MIG --> SCHEMA
    SCHEMA --> TABLES
    REPO --> SS
    WPR --> SS
    
    GA --> REPO
    ML --> REPO
    TL --> REPO
    WP --> WPR

Конфигурация

Базовая конфигурация

Важно: StateStorage использует отдельную БД PostgreSQL, отличную от БД для котировок.

# БД для котировок (PostgreSQL или ClickHouse)
storage:
  type: "postgres"  # или "clickhouse"
  postgres:
    db_name: "quotes"  # БД для котировок
  clickhouse:
    db_name: "quotes"  # БД для котировок

# БД для истории состояний (отдельный PostgreSQL)
state_storage:
  db_user: "states"                          # Имя пользователя БД истории
  db_password: "password"                    # Пароль пользователя БД истории
  db_host: "localhost:5432"                  # Адрес и порт БД истории
  db_name: "ml_states"                       # Имя базы данных истории (отдельная БД!)
  schema: "public"                           # Схема БД (по умолчанию "public")
  migrations_path: "migrations"              # Путь к миграциям
  auto_migrate: true                         # Автоматическое применение миграций при старте

Переменные окружения

ПеременнаяОписаниеПо умолчанию
STATE_STORAGE_DB_USERПользователь БД состоянийstates
STATE_STORAGE_DB_PASSWORDПароль БД состоянийpassword
STATE_STORAGE_DB_HOSTХост БД состоянийlocalhost:5432
STATE_STORAGE_DB_NAMEИмя БД состоянийml_states
STATE_STORAGE_SCHEMAСхема БД состоянийpublic
STATE_STORAGE_MIGRATIONS_PATHПуть к миграциямmigrations
STATE_STORAGE_AUTO_MIGRATEАвтоматическое применение миграцийtrue

Использование схем

StateStorage поддерживает использование различных схем PostgreSQL для изоляции данных:

state_storage:
  schema: "ml_states"  # Использование отдельной схемы

Преимущества использования схем:

  • Изоляция данных — состояния отделены от других данных
  • Управление правами — можно настроить права доступа на уровне схемы
  • Организация — логическое разделение данных в одной БД

Миграции

StateStorage использует golang-migrate для управления схемой БД.

Формат миграций

Миграции находятся в папке migrations/ и следуют формату:

  • 000001_<name>.up.sql — применение миграции
  • 000001_<name>.down.sql — откат миграции

Автоматическое применение

При auto_migrate: true миграции применяются автоматически при старте приложения:

stateStorage, err := storage.NewStateStorage(ctx, log, &cfg.StateStorage)
// Миграции применяются автоматически

Ручное управление миграциями

Если auto_migrate: false, можно управлять миграциями вручную:

# Применить все миграции
migrate -path migrations -database "postgres://user:pass@host/db?sslmode=disable" up

# Откатить последнюю миграцию
migrate -path migrations -database "postgres://user:pass@host/db?sslmode=disable" down 1

# Проверить версию
migrate -path migrations -database "postgres://user:pass@host/db?sslmode=disable" version

Таблицы

worker_progress

Таблица для отслеживания статуса обработки заданий и промежуточных результатов.

Основные поля:

  • message_id (UUID, PRIMARY KEY) — уникальный идентификатор сообщения
  • status (VARCHAR) — статус: pending, processing, completed, failed
  • worker_id (VARCHAR) — идентификатор воркера
  • progress_percent (DECIMAL) — процент выполнения
  • current_generation (INT) — текущее поколение GA
  • intermediate_results (JSONB) — промежуточные результаты

Индексы:

  • idx_worker_progress_status — по статусу и времени обновления
  • idx_worker_progress_worker — по воркеру и статусу
  • idx_worker_progress_updated — по времени обновления

meta_learning_state

Таблица для хранения состояний Meta Learning, Transfer Learning и GA checkpoints.

Основные поля:

  • id (UUID, PRIMARY KEY) — уникальный идентификатор
  • test_id (VARCHAR) — ID конкретного теста (NULL для глобальных знаний)
  • message_id (UUID) — связь с worker_progress
  • state_type (VARCHAR) — тип: meta_learning, transfer_learning, genetic_algorithm
  • exchange, pair, model_type — контекст задачи
  • knowledge_key (VARCHAR) — составной ключ для глобальных знаний
  • is_global (BOOLEAN) — флаг глобального состояния
  • state_data (JSONB) — полное состояние
  • generation (INT) — текущее поколение GA
  • checkpoint_type (VARCHAR) — тип checkpoint: full, incremental, final
  • expires_at (TIMESTAMP) — время истечения (для автоочистки)
  • version (INT) — версия для оптимистичных блокировок

Индексы:

  • idx_meta_learning_knowledge_key — для поиска глобальных знаний
  • idx_meta_learning_test_id — для поиска checkpoint’ов по test_id
  • idx_meta_learning_message_id — для поиска по message_id
  • idx_meta_learning_exchange_pair_model — по комбинации exchange+pair+model_type
  • idx_meta_learning_expires — для очистки истекших состояний
  • idx_meta_learning_global_unique — уникальный индекс для глобальных знаний

Репозитории

MetaLearningStateRepository

Репозиторий для работы с состояниями Meta Learning, Transfer Learning и GA checkpoints.

Основные методы:

  • SaveGlobalKnowledge() — сохранение глобальных знаний
  • LoadGlobalKnowledge() — загрузка глобальных знаний
  • UpdateGlobalKnowledge() — обновление глобальных знаний
  • SaveState() — сохранение состояния (универсальный метод)
  • LoadState() — загрузка состояния по test_id
  • SaveCheckpoint() — сохранение checkpoint GA
  • LoadLatestCheckpoint() — загрузка последнего checkpoint
  • FindSimilarTasks() — поиск похожих задач для Transfer Learning
  • CleanupExpiredStates() — очистка истекших состояний

WorkerProgressRepository

Репозиторий для работы с прогрессом воркеров.

Основные методы:

  • GetProgress() — получение текущего статуса
  • CreateProgress() — создание записи о прогрессе
  • UpdateStatus() — обновление статуса
  • UpdateProgress() — обновление прогресса и метрик
  • MarkCompleted() — пометка как завершенное
  • MarkFailed() — пометка как завершенное с ошибкой
  • GetStuckTasks() — получение зависших задач
  • GetActiveTasks() — получение активных задач воркера

Использование в коде

Инициализация

// В ServiceContainer
stateStorage, err := storage.NewStateStorage(ctx, log, &cfg.StateStorage)
if err != nil {
    log.Warn("StateStorage не инициализирован", slog.String("error", err.Error()))
}

Создание репозиториев

// MetaLearningStateRepository
stateRepo := storage.NewMetaLearningStateRepository(stateStorage, log)

// WorkerProgressRepository
workerProgressRepo := storage.NewWorkerProgressRepository(stateStorage, log)

Сохранение checkpoint GA

stateManager := ab_optimizer.NewStateManager(stateRepo, log)

err := stateManager.SaveGACheckpoint(
    ctx,
    testID,
    messageID,
    generation,
    population,
    ab_optimizer.CheckpointTypeIncremental,
    exchange,
    pair,
    modelType,
)

Восстановление checkpoint

checkpoint, err := stateManager.RestoreGACheckpoint(ctx, testID)
if err == nil && checkpoint != nil {
    // Восстановить популяцию и продолжить с generation
    population = checkpoint.StateData.GAPopulation
    currentGeneration = *checkpoint.Generation
}

Очистка истекших состояний

StateStorage предоставляет встроенный сервис очистки:

// Запуск фоновой задачи очистки
go stateStorage.StartCleanupService(ctx, 1*time.Hour)

Или вручную:

deleted, err := stateRepo.CleanupExpiredStates(ctx)

Масштабирование

StateStorage поддерживает работу нескольких воркеров с одной БД:

  • Глобальные знания (Meta Learning, Transfer Learning) доступны всем воркерам
  • Checkpoint’ы привязаны к конкретному test_id и message_id
  • Оптимистичные блокировки через поле version предотвращают конфликты

Разделение БД

БД для котировок

  • PostgreSQL или ClickHouse — хранилище котировок
  • Используется для хранения исторических данных котировок
  • Настраивается в секции storage конфигурации

БД для истории (StateStorage)

  • PostgreSQL — отдельная БД для истории состояний
  • Используется для хранения:
    • Истории состояний Meta Learning, Transfer Learning
    • Checkpoint’ов Genetic Algorithm
    • Прогресса воркеров (worker_progress)
  • Настраивается в секции state_storage конфигурации

Рекомендация: Используйте разные БД для котировок и истории, чтобы разделить нагрузку и обеспечить изоляцию данных.

Рекомендации

  1. Production: Используйте отдельную БД PostgreSQL для истории состояний
  2. Разделение: Не используйте одну БД для котировок и истории
  3. Схемы: Используйте отдельную схему для дополнительной изоляции данных
  4. Миграции: Включите auto_migrate: true для автоматического применения
  5. Очистка: Настройте периодическую очистку истекших состояний
  6. Мониторинг: Отслеживайте размер таблиц и производительность запросов

Troubleshooting

Миграции не применяются

Проверьте:

  • Путь к миграциям в конфигурации
  • Права доступа к БД
  • Логи приложения при старте

Ошибки подключения

Проверьте:

  • Параметры подключения в конфигурации
  • Доступность БД
  • Правильность схемы (если используется)

Конфликты при конкурентном доступе

StateStorage использует оптимистичные блокировки через поле version. При конфликтах операция повторяется автоматически.