Advanced ML Pipeline Patterns
Learn advanced patterns for building complex ML workflows with multi-service orchestration and dependency management.
Complex Pipeline Architectures
ModelKnife supports sophisticated ML pipeline patterns that go beyond simple linear workflows. These patterns enable you to build production-grade ML systems with proper dependency management and multi-service coordination.
Dependency Management
ModelKnife automatically handles complex dependencies between pipeline modules across different AWS services.
Complex dependency patterns in ML workflows
# Complex dependency management
modules:
data_extraction:
executor: "${executors.glue_etl}"
entry_point: "extract_data.py"
repository: "./src"
job_parameters:
input_path: "s3://raw-data-bucket/"
output_path: "s3://processed-data-bucket/extracted/"
depends_on: []
feature_engineering:
executor: "${executors.python_processor}"
entry_point: "create_features.py"
repository: "./src"
job_parameters:
input_path: "s3://processed-data-bucket/extracted/"
output_path: "s3://processed-data-bucket/features/"
depends_on: ["data_extraction"]
model_training:
executor: "${executors.python_processor}"
entry_point: "train_model.py"
repository: "./src"
depends_on: ["feature_engineering"]
batch_inference:
executor: "${executors.sagemaker_batch}"
entry_point: "batch_predict.py"
repository: "./src"
depends_on: ["model_training"]
Dependency Benefits
- Automatic execution ordering across different AWS services
- Built-in error handling and retry logic
- Step Functions orchestration generation
- Parallel execution where dependencies allow
Multi-Service Orchestration
Coordinate complex workflows across Glue, SageMaker, Spark, and Bedrock.
Advanced pipeline with multiple AWS services
# Advanced pipeline with multiple AWS services
executors:
glue_etl:
type: glue_etl
job_name: "data-preparation"
worker_type: "G.2X"
number_of_workers: 10
spark_processor:
class: sagemaker.spark.processing.PySparkProcessor
instance_type: "ml.r5.2xlarge"
instance_count: 3
framework_version: "3.1"
bedrock_batch:
type: bedrock_batch
model_id: "anthropic.claude-3-haiku-20240307-v1:0"
batch_size: 1000
sagemaker_training:
class: sagemaker.tensorflow.TensorFlow
instance_type: "ml.p3.2xlarge"
instance_count: 1
modules:
# Stage 1: Large-scale ETL with Glue
raw_data_processing:
executor: "${executors.glue_etl}"
entry_point: "process_raw_data.py"
job_parameters:
--input_path: "s3://data-lake/raw/"
--output_path: "s3://data-lake/processed/"
# Stage 2: Feature engineering with Spark
feature_engineering:
executor: "${executors.spark_processor}"
entry_point: "engineer_features.py"
repository: "./src"
depends_on: ["raw_data_processing"]
# Stage 3: Text enrichment with Bedrock
text_enrichment:
executor: "${executors.bedrock_batch}"
entry_point: "enrich_text_data.py"
repository: "./src"
job_parameters:
prompt_template: "Extract key insights from: {text}"
depends_on: ["feature_engineering"]
# Stage 4: Model training with SageMaker
model_training:
executor: "${executors.sagemaker_training}"
entry_point: "train_model.py"
repository: "./src"
hyperparameters:
epochs: 100
batch_size: 32
learning_rate: 0.001
depends_on: ["text_enrichment"]
Dynamic Dependencies & Parallel Execution
Build flexible pipelines with runtime-determined dependencies and parallel processing.
Dynamic pipeline with parallel feature engineering
# Dynamic pipeline with parallel feature engineering
modules:
data_partitioner:
executor: "${executors.python_processor}"
entry_point: "partition_data.py"
repository: "./src"
job_parameters:
num_partitions: 5
outputs:
partition_paths: "/opt/ml/processing/output/partitions.json"
# Parallel feature engineering - one per partition
feature_eng_1:
executor: "${executors.python_processor}"
entry_point: "engineer_features.py"
repository: "./src"
job_parameters:
partition_id: 1
input_path: "${modules.data_partitioner.outputs.partition_paths[0]}"
depends_on: ["data_partitioner"]
feature_eng_2:
executor: "${executors.python_processor}"
entry_point: "engineer_features.py"
repository: "./src"
job_parameters:
partition_id: 2
input_path: "${modules.data_partitioner.outputs.partition_paths[1]}"
depends_on: ["data_partitioner"]
# Continue for all partitions...
# Merge results from parallel processing
feature_merger:
executor: "${executors.spark_processor}"
entry_point: "merge_features.py"
repository: "./src"
depends_on: [
"feature_eng_1",
"feature_eng_2",
# ... all feature engineering modules
]
Error Handling & Retry Logic
Build resilient pipelines with automatic retry, fallback strategies, and error recovery.
Robust pipeline with error handling
# Robust pipeline with error handling
modules:
data_ingestion:
executor: "${executors.python_processor}"
entry_point: "ingest_data.py"
repository: "./src"
retry_config:
max_attempts: 3
retry_delay: 300 # 5 minutes
retry_on: ["DataSourceUnavailable", "NetworkTimeout"]
on_failure:
action: "continue_with_fallback"
fallback_module: "use_cached_data"
use_cached_data:
executor: "${executors.python_processor}"
entry_point: "load_cached_data.py"
repository: "./src"
job_parameters:
cache_age_limit: "24h"
enabled: false # Only runs as fallback
model_training:
executor: "${executors.sagemaker_training}"
entry_point: "train_model.py"
repository: "./src"
timeout: 7200 # 2 hours
checkpoint_config:
enabled: true
checkpoint_s3_uri: "s3://ml-checkpoints/training/"
depends_on: ["data_ingestion"]
on_failure:
action: "resume_from_checkpoint"
max_resume_attempts: 2
Best Practices for Error Handling
- Fail Fast: Set appropriate timeouts to catch issues early
- Graceful Degradation: Use fallback data sources when primary fails
- Checkpointing: Save intermediate results for long-running processes
- Notification: Set up alerts for critical pipeline failures
- Circuit Breaker: Stop processing if error rate exceeds threshold
Advanced Deployment Strategies
Version Management
# Deploy new version of pipeline
mk p deploy --profile production
# Check version history
mk p version
# View deployment details
mk p status --verbose
# Rollback if needed (using service rollback for infrastructure)
mk s rollback --service api_endpoint
Safe Deployment Practices
# Validate configuration before deploy
mk s validate
# Deploy infrastructure first (stable)
mk s deploy
# Then deploy pipeline updates (frequent iteration)
mk p deploy
# Monitor pipeline execution
mk p runs --limit 5
A/B Testing Pipelines
A/B test different model architectures
# A/B test different model architectures
modules:
traffic_splitter:
executor: "${executors.python_processor}"
entry_point: "split_traffic.py"
job_parameters:
split_ratio: "50:50"
experiment_id: "model_arch_comparison_v1"
model_a_training:
executor: "${executors.sagemaker_training}"
entry_point: "train_model_a.py"
depends_on: ["traffic_splitter"]
model_b_training:
executor: "${executors.sagemaker_training}"
entry_point: "train_model_b.py"
depends_on: ["traffic_splitter"]
performance_comparison:
executor: "${executors.python_processor}"
entry_point: "compare_models.py"
depends_on: ["model_a_training", "model_b_training"]
Performance Optimization
Resource Optimization
- Right-sizing: Use appropriate instance types for each workload
- Spot Instances: Leverage spot instances for fault-tolerant workloads
- Auto-scaling: Configure dynamic scaling based on queue depth
- Resource Pooling: Share compute resources across similar modules
Data Pipeline Optimization
- Partitioning: Partition large datasets for parallel processing
- Caching: Cache frequently accessed intermediate results
- Compression: Use appropriate compression for data transfer
- Incremental Processing: Process only changed data when possible
Monitoring and Observability
# Check overall pipeline status
mk p status
# View pipeline execution history
mk p runs --limit 10
# Visualize pipeline dependencies
mk p visualize
Next Steps
Ready to implement these advanced patterns? Check out: