Advanced ML Pipeline Patterns

Learn advanced patterns for building complex ML workflows with multi-service orchestration and dependency management.

Complex Pipeline Architectures

ModelKnife supports sophisticated ML pipeline patterns that go beyond simple linear workflows. These patterns enable you to build production-grade ML systems with proper dependency management and multi-service coordination.

Dependency Management

ModelKnife automatically handles complex dependencies between pipeline modules across different AWS services.

Complex dependency patterns in ML workflows
# Complex dependency management
modules:
  data_extraction:
    executor: "${executors.glue_etl}"
    entry_point: "extract_data.py"
    repository: "./src"
    job_parameters:
      input_path: "s3://raw-data-bucket/"
      output_path: "s3://processed-data-bucket/extracted/"
    depends_on: []

  feature_engineering:
    executor: "${executors.python_processor}"
    entry_point: "create_features.py"
    repository: "./src"
    job_parameters:
      input_path: "s3://processed-data-bucket/extracted/"
      output_path: "s3://processed-data-bucket/features/"
    depends_on: ["data_extraction"]

  model_training:
    executor: "${executors.python_processor}"
    entry_point: "train_model.py"
    repository: "./src"
    depends_on: ["feature_engineering"]

  batch_inference:
    executor: "${executors.sagemaker_batch}"
    entry_point: "batch_predict.py"
    repository: "./src"
    depends_on: ["model_training"]

Dependency Benefits

Multi-Service Orchestration

Coordinate complex workflows across Glue, SageMaker, Spark, and Bedrock.

Advanced pipeline with multiple AWS services
# Advanced pipeline with multiple AWS services
executors:
  glue_etl:
    type: glue_etl
    job_name: "data-preparation"
    worker_type: "G.2X"
    number_of_workers: 10

  spark_processor:
    class: sagemaker.spark.processing.PySparkProcessor
    instance_type: "ml.r5.2xlarge"
    instance_count: 3
    framework_version: "3.1"

  bedrock_batch:
    type: bedrock_batch
    model_id: "anthropic.claude-3-haiku-20240307-v1:0"
    batch_size: 1000

  sagemaker_training:
    class: sagemaker.tensorflow.TensorFlow
    instance_type: "ml.p3.2xlarge"
    instance_count: 1

modules:
  # Stage 1: Large-scale ETL with Glue
  raw_data_processing:
    executor: "${executors.glue_etl}"
    entry_point: "process_raw_data.py"
    job_parameters:
      --input_path: "s3://data-lake/raw/"
      --output_path: "s3://data-lake/processed/"

  # Stage 2: Feature engineering with Spark
  feature_engineering:
    executor: "${executors.spark_processor}"
    entry_point: "engineer_features.py"
    repository: "./src"
    depends_on: ["raw_data_processing"]

  # Stage 3: Text enrichment with Bedrock
  text_enrichment:
    executor: "${executors.bedrock_batch}"
    entry_point: "enrich_text_data.py"
    repository: "./src"
    job_parameters:
      prompt_template: "Extract key insights from: {text}"
    depends_on: ["feature_engineering"]

  # Stage 4: Model training with SageMaker
  model_training:
    executor: "${executors.sagemaker_training}"
    entry_point: "train_model.py"
    repository: "./src"
    hyperparameters:
      epochs: 100
      batch_size: 32
      learning_rate: 0.001
    depends_on: ["text_enrichment"]

Dynamic Dependencies & Parallel Execution

Build flexible pipelines with runtime-determined dependencies and parallel processing.

Dynamic pipeline with parallel feature engineering
# Dynamic pipeline with parallel feature engineering
modules:
  data_partitioner:
    executor: "${executors.python_processor}"
    entry_point: "partition_data.py"
    repository: "./src"
    job_parameters:
      num_partitions: 5
    outputs:
      partition_paths: "/opt/ml/processing/output/partitions.json"

  # Parallel feature engineering - one per partition
  feature_eng_1:
    executor: "${executors.python_processor}"
    entry_point: "engineer_features.py"
    repository: "./src"
    job_parameters:
      partition_id: 1
      input_path: "${modules.data_partitioner.outputs.partition_paths[0]}"
    depends_on: ["data_partitioner"]

  feature_eng_2:
    executor: "${executors.python_processor}"
    entry_point: "engineer_features.py"
    repository: "./src"
    job_parameters:
      partition_id: 2
      input_path: "${modules.data_partitioner.outputs.partition_paths[1]}"
    depends_on: ["data_partitioner"]

  # Continue for all partitions...

  # Merge results from parallel processing
  feature_merger:
    executor: "${executors.spark_processor}"
    entry_point: "merge_features.py"
    repository: "./src"
    depends_on: [
      "feature_eng_1",
      "feature_eng_2",
      # ... all feature engineering modules
    ]

Error Handling & Retry Logic

Build resilient pipelines with automatic retry, fallback strategies, and error recovery.

Robust pipeline with error handling
# Robust pipeline with error handling
modules:
  data_ingestion:
    executor: "${executors.python_processor}"
    entry_point: "ingest_data.py"
    repository: "./src"
    retry_config:
      max_attempts: 3
      retry_delay: 300  # 5 minutes
      retry_on: ["DataSourceUnavailable", "NetworkTimeout"]
    on_failure:
      action: "continue_with_fallback"
      fallback_module: "use_cached_data"

  use_cached_data:
    executor: "${executors.python_processor}"
    entry_point: "load_cached_data.py"
    repository: "./src"
    job_parameters:
      cache_age_limit: "24h"
    enabled: false  # Only runs as fallback

  model_training:
    executor: "${executors.sagemaker_training}"
    entry_point: "train_model.py"
    repository: "./src"
    timeout: 7200  # 2 hours
    checkpoint_config:
      enabled: true
      checkpoint_s3_uri: "s3://ml-checkpoints/training/"
    depends_on: ["data_ingestion"]
    on_failure:
      action: "resume_from_checkpoint"
      max_resume_attempts: 2

Best Practices for Error Handling

Advanced Deployment Strategies

Version Management

# Deploy new version of pipeline
mk p deploy --profile production

# Check version history
mk p version

# View deployment details
mk p status --verbose

# Rollback if needed (using service rollback for infrastructure)
mk s rollback --service api_endpoint

Safe Deployment Practices

# Validate configuration before deploy
mk s validate

# Deploy infrastructure first (stable)
mk s deploy

# Then deploy pipeline updates (frequent iteration)
mk p deploy

# Monitor pipeline execution
mk p runs --limit 5

A/B Testing Pipelines

A/B test different model architectures
# A/B test different model architectures
modules:
  traffic_splitter:
    executor: "${executors.python_processor}"
    entry_point: "split_traffic.py"
    job_parameters:
      split_ratio: "50:50"
      experiment_id: "model_arch_comparison_v1"

  model_a_training:
    executor: "${executors.sagemaker_training}"
    entry_point: "train_model_a.py"
    depends_on: ["traffic_splitter"]

  model_b_training:
    executor: "${executors.sagemaker_training}"
    entry_point: "train_model_b.py"
    depends_on: ["traffic_splitter"]

  performance_comparison:
    executor: "${executors.python_processor}"
    entry_point: "compare_models.py"
    depends_on: ["model_a_training", "model_b_training"]

Performance Optimization

Resource Optimization

Data Pipeline Optimization

Monitoring and Observability

# Check overall pipeline status
mk p status

# View pipeline execution history
mk p runs --limit 10

# Visualize pipeline dependencies
mk p visualize

Next Steps

Ready to implement these advanced patterns? Check out: