Pipelines Reference

Complete reference for ML pipeline executors, modules, and orchestration

Module Configuration

Individual processing units that make up your ML pipelines

Key Concept

Modules are the individual processing units that perform specific ML tasks. Each module uses an executor (compute environment) and contains your ML processing code.

Basic Module Structure
modules:
  module_name:
    executor: ${executors.executor_name}
    repository: "../modules"
    entry_point: "script.py"
    job_parameters:
      input_path: "s3://bucket/input/"
      output_path: "s3://bucket/output/"
      custom_param: "value"
    depends_on: ["other_module"]
    description: "Description of what this module does"

Module Parameters

  • executor - Reference to executor configuration (required)
  • repository - Path to module code repository
  • entry_point - Main script or class to execute
  • job_parameters - Custom parameters passed to your code
  • depends_on - List of modules that must complete first
  • description - Human-readable description

Module Dependencies and Execution Order

Pipeline with Module Dependencies
modules:
  data_ingestion:
    executor: ${executors.glue_etl}
    entry_point: "ingest_data.py"
    job_parameters:
      source_database: "raw_data"
      output_path: "s3://bucket/ingested/"
    # No dependencies - runs first
    
  data_cleaning:
    executor: ${executors.python_processor}
    entry_point: "clean_data.py"
    job_parameters:
      input_path: "s3://bucket/ingested/"
      output_path: "s3://bucket/cleaned/"
    depends_on: ["data_ingestion"]
    
  feature_engineering:
    executor: ${executors.python_processor}
    entry_point: "create_features.py"
    job_parameters:
      input_path: "s3://bucket/cleaned/"
      output_path: "s3://bucket/features/"
    depends_on: ["data_cleaning"]
    
  model_training:
    executor: ${executors.training_job}
    entry_point: "train_model.py"
    job_parameters:
      training_data: "s3://bucket/features/"
      model_output: "s3://bucket/models/"
      epochs: 50
    depends_on: ["feature_engineering"]

Executor Configuration Overrides

Override executor parameters at the module level for fine-tuned resource allocation

Key Concept

You can override specific executor parameters at the module level while still using the base executor configuration. This is useful when you need different compute resources or settings for specific modules.

Module-level executor overrides
executors:
  python_processor:
    type: sagemaker_processor
    class: sagemaker.sklearn.processing.SKLearnProcessor
    instance_type: "ml.m5.large"  # Base configuration
    instance_count: 1
    framework_version: "1.0-1"

modules:
  # Uses base executor configuration
  data_cleaning:
    executor: ${executors.python_processor}
    entry_point: "clean_data.py"
    
  # Override instance type for memory-intensive task
  feature_engineering:
    executor: ${executors.python_processor}
    entry_point: "create_features.py"
    instance_type: "ml.m5.2xlarge"  # Override: bigger instance
    instance_count: 2               # Override: more instances
    
  # Override for GPU processing
  model_training:
    executor: ${executors.python_processor}
    entry_point: "train_model.py"
    instance_type: "ml.p3.2xlarge"  # Override: GPU instance
    max_runtime_in_seconds: 7200    # Override: longer timeout

Override Benefits

  • Cost Optimization: Use smaller instances for lightweight tasks, larger for heavy computation
  • Performance Tuning: Adjust timeouts, memory, and CPU for specific workloads
  • Resource Management: Scale compute resources per module without duplicating executor definitions
  • Maintainability: Keep base configurations simple while customizing where needed

Executor Types

Compute environments that run your modules

EMR Spark Jar

Serverless Spark jobs using EMR Serverless for cost-effective big data processing

Best For

Large-scale Scala/Java Spark applications, complex ETL with custom JARs, cost-effective big data processing with automatic scaling, and batch processing workloads.

EMR Serverless Benefits

  • Cost Optimization: Pay only for compute used, automatic start/stop
  • Serverless: No cluster management, automatic scaling
  • Performance: Optimized Spark runtime with faster startup
  • Integration: Native AWS service integration
Configuration Example
executors:
  emr_spark_processor:
    type: emr_spark

    # Optional: All parameters below show defaults - can be customized
    application_id: "the application id of the default applcation"  # Custom application ID
    executor_num: 4                      # Number of executor instances
    executor_cores: 2
    executor_memory: "4g"
    executor_disk: "20g"                 # Disk space per executor
    driver_cores: 2
    driver_memory: "4g"
    driver_disk: "20g"                   # Disk space for driver
    timeout: 1440                        # 24 hours timeout (in minutes)

    # Optional: All parameters below can be customized
    dependent_jars:                      # Executor-level dependencies
      - "org.apache.spark:spark-sql_2.12:3.3.0"
    config:
      - spark.sql.adaptive.enabled=true
      - spark.sql.adaptive.coalescePartitions.enabled=true
      - spark.serializer=org.apache.spark.serializer.KryoSerializer

Configuration Parameters

  • type - Use emr_spark for Scala/Java Spark applications
  • role_arn - IAM execution role ARN (optional, uses default EMR execution role)
  • application_id - Custom EMR Serverless application ID (optional, uses shared application by default)
  • executor_num - Number of executor instances (optional, default: 2)
  • executor_cores - Number of CPU cores per executor (optional, default: 2)
  • executor_memory - Memory per executor (optional, default: 4g)
  • executor_disk - Disk space per executor (optional, default: 20g)
  • driver_cores - Number of CPU cores for driver (optional, default: 2)
  • driver_memory - Memory for driver (optional, default: 4g)
  • driver_disk - Disk space for driver (optional, default: 20g)
  • timeout - Job timeout in minutes (optional, default: 720 = 12 hours)
  • dependent_jars - List of JAR files or Maven packages for executor-level dependencies
  • config - List of Spark configuration parameters

Module Usage

Note: All executor-level parameters can be overridden at the module level for fine-grained control per job.

Module Configuration
modules:
  data_transformation:
    executor: ${executors.emr_spark_processor}
    repository: "../spark-app"
    entry_point: "com.company.DataProcessor"
    build_command: "mvn clean package -DskipTests"
    # Override executor-level settings for this specific job
    executor_num: 8                      # Override executor instances
    executor_memory: "8g"                # Override memory for large job
    timeout: 1440                        # Override timeout to 24 hours
    job_parameters:
      input_path: "s3://data-bucket/input/"
      output_path: "s3://data-bucket/output/"
      config_param: "production"
    submit_jar: "target/data-processor-1.0.jar"
    dependent_jars:                      # Override/extend executor JARs
      # Local JAR files (uploaded to S3)
      - "lib/custom-library.jar"
      # Maven packages (downloaded automatically)
      - "org.apache.spark:spark-sql_2.12:3.3.0"
      - "com.databricks:spark-xml_2.12:0.14.0"
      # S3 JAR files
      - "s3://shared-jars/delta-core_2.12-2.3.0.jar"

Module Configuration Parameters

  • executor - Reference to the configured EMR Spark executor
  • repository - Local path to directory containing Spark application project
  • entry_point - Main class name to execute (must have main method)
  • build_command - Shell command to build the project (e.g., Maven, SBT)
  • Executor Overrides - Any executor parameter can be overridden at module level:
    • executor_num, executor_cores, executor_memory, executor_disk
    • driver_cores, driver_memory, driver_disk
    • timeout, application_id, role_arn
    • dependent_jars, config
  • job_parameters - Custom parameters passed to your Spark application as arguments:
    • input_path - S3 path for input data to process
    • output_path - S3 path where processed data will be written
    • Custom parameters passed as --key value arguments
  • submit_jar - Path to the main JAR file (relative to repository, auto-detected if not specified)
  • dependent_jars - List of JAR files or Maven packages (overrides executor-level):
    • JAR files: Local paths or S3 URIs (added to spark.jars)
    • Maven packages: Format groupId:artifactId:version (added to spark.jars.packages)

EMR Application Management

Shared Application (Default):

  • Uses team-shared EMR Serverless application created during mk setup init
  • Cost-efficient for typical workloads
  • Automatic start/stop management
  • No configuration required

Custom Application:

  • Specify application_id for dedicated EMR application
  • Useful for resource isolation, different environments, or custom configurations
  • Application must exist before job submission
  • Full control over application lifecycle and settings

EMR PySpark

Serverless PySpark jobs using EMR Serverless for Python-based big data processing

Best For

Large-scale Python data processing, ETL pipelines with Python libraries, machine learning feature engineering, and cost-effective distributed computing for Python workloads.

Configuration Example
executors:
  emr_pyspark_processor:
    type: emr_pyspark

    # Optional: All parameters below show defaults - can be customized
    application_id: "00def456uvw789123"  # Custom application ID
    executor_num: 3                      # Number of executor instances
    executor_cores: 2
    executor_memory: "4g"
    executor_disk: "20g"                 # Disk space per executor
    driver_cores: 2
    driver_memory: "4g"
    driver_disk: "20g"                   # Disk space for driver
    timeout: 720                         # 12 hours timeout (in minutes)

    # Optional: All parameters below can be customized
    dependent_jars:                      # Executor-level JAR dependencies
      - "io.delta:delta-core_2.12:2.3.0"
    extra_py_files:                      # Executor-level Python files
      - "src/utils:utils"               # add utils package to the root 
      - "common/helpers.py"
    additional_python_modules:           # Auto-built Python packages
      - delta-spark==2.4.0
      - s3fs==2025.5.1
      - numpy==1.24.4
    config:
      - spark.sql.adaptive.enabled=true
      - spark.sql.adaptive.coalescePartitions.enabled=true
      - spark.python.worker.memory=2g

Configuration Parameters

  • type - Use emr_pyspark for Python Spark applications
  • role_arn - IAM execution role ARN (optional, uses default EMR execution role)
  • application_id - Custom EMR Serverless application ID (optional, uses shared application by default)
  • executor_num - Number of executor instances (optional, default: 2)
  • executor_cores - Number of CPU cores per executor (optional, default: 2)
  • executor_memory - Memory per executor (optional, default: 4g)
  • executor_disk - Disk space per executor (optional, default: 20g)
  • driver_cores - Number of CPU cores for driver (optional, default: 2)
  • driver_memory - Memory for driver (optional, default: 4g)
  • driver_disk - Disk space for driver (optional, default: 20g)
  • timeout - Job timeout in minutes (optional, default: 720 = 12 hours)
  • dependent_jars - List of JAR files or Maven packages for executor-level dependencies
  • extra_py_files - List of additional Python files/directories for executor-level distribution
  • additional_python_modules - List of Python packages to install and build automatically
  • config - List of Spark configuration parameters

Module Usage

Note: All executor-level parameters can be overridden at the module level for fine-grained control per job.

Module Configuration
modules:
  feature_engineering:
    executor: ${executors.emr_pyspark_processor}
    repository: "../pyspark_jobs"
    entry_point: "feature_engineering.py"
    # Override executor-level settings for this specific job
    executor_num: 6                      # Override executor instances
    executor_memory: "8g"                # Override memory for large dataset
    driver_memory: "4g"                  # Override driver memory
    timeout: 360                         # Override timeout to 6 hours
    job_parameters:
      input_path: "s3://data-bucket/raw/"
      output_path: "s3://data-bucket/features/"
      feature_columns: "age,income,location"
      target_column: "conversion"
    submit_py_files:
      - "utils/data_helpers.py"
      - "transformers/"
    dependent_jars:                      # Override/extend executor JARs
      # Maven packages (downloaded automatically)
      - "io.delta:delta-core_2.12:2.3.0"
      - "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0"
      # S3 JAR files
      - "s3://jars-bucket/custom-transformers.jar"
    extra_py_files:                      # Override/extend executor Python files
      - "module_specific_utils/"
    additional_python_modules:           # Add module-specific packages
      - scikit-learn==1.3.0
      - matplotlib==3.7.1

Module Configuration Parameters

  • executor - Reference to the configured EMR PySpark executor
  • repository - Local path to directory containing PySpark job scripts
  • entry_point - Main PySpark script to execute (becomes spark-submit application)
  • job_parameters - Custom parameters passed to your PySpark script as arguments:
    • input_path - S3 path for input data to process
    • output_path - S3 path where processed data will be written
    • Custom parameters passed as --key value arguments
  • submit_py_files - List of additional Python files/directories to include in PySpark job (zipped and distributed to executors)
  • dependent_jars - List of JAR files or Maven packages:
    • JAR files: Local paths or S3 URIs (e.g., Delta Lake, Iceberg JARs)
    • Maven packages: Format groupId:artifactId:version for automatic download

Cost Optimization

EMR PySpark uses the same shared EMR Serverless application as EMR Spark, providing:

  • Zero idle costs: Application stops after 15 minutes of inactivity
  • Fast startup: Applications start automatically when jobs are submitted
  • Shared resources: Team members share the same EMR application
  • Automatic scaling: Executors scale based on workload

Glue Job

ETL jobs using AWS Glue for large-scale data processing with Delta Lake support

Best For

Large-scale ETL operations, data transformation, Spark-based processing with automatic scaling, Delta Lake operations, and serverless execution with built-in data lake capabilities.

Delta Lake Integration

All Glue jobs come pre-configured with Delta Lake support, including:

  • Automatic Delta Core: io.delta:delta-core_2.12:2.3.0 JAR included by default
  • Delta Catalog: Spark SQL Delta catalog configured automatically
  • S3 LogStore: Optimized Delta log store for S3 storage
  • Iceberg Support: AWS Glue catalog integration for Iceberg tables
Configuration Example
executors:
  etl_processor:
    type: glue_job

    # Optional: All parameters below show defaults - can be customized
    role_arn: "mlknife-glue-job"
    runtime: "python3.9"
    glue_version: "5.0"
    executor_type: "G.1X"
    executor_num: 2
    timeout: 2880  # 48 hours in minutes

    # Optional: All parameters below can be customized
    dependent_jars:                      # Executor-level JAR dependencies
      - "io.delta:delta-core_2.12:2.3.0"
    extra_py_files:                      # Executor-level Python files
      - "src/utils:utils"               # add utils package to the root 
      - "common/helpers.py"
    additional_python_modules:           # Auto-built Python packages
      - delta-spark==2.4.0
      - s3fs==2025.5.1
      - numpy==1.24.4
    config:
      - spark.sql.adaptive.enabled=true
      - spark.sql.adaptive.coalescePartitions.enabled=true
      - spark.python.worker.memory=2g

Configuration Parameters

  • role_arn - IAM role ARN for Glue job execution (defaults to mlknife-glue-job)
  • runtime - Python runtime version (default: "python3.9")
  • glue_version - AWS Glue version (default: "5.0")
  • executor_type - Worker node type: G.1X, G.2X, G.4X, G.8X (default: "G.1X")
  • executor_num - Number of worker nodes (default: 2)
  • timeout - Job timeout in minutes (default: 2880 = 48 hours)
  • dependent_jars - Additional JAR files or Maven coordinates (default: includes Delta Lake)
  • extra_py_files - Additional Python files to include
  • additional_python_modules - Python packages to install
  • config - Custom Spark configuration overrides

Default Spark Configuration

Every Glue job includes optimized Spark settings:

  • Hive Integration: AWS Glue Data Catalog as Hive metastore
  • Delta Lake: Delta catalog and S3 log store configured
  • Iceberg Support: Glue catalog integration for Iceberg tables
  • Arrow Optimization: PyArrow enabled with 500 records per batch
  • Performance: 300 shuffle partitions, dynamic partition overwrite
  • Memory: 2GB Python worker memory allocation
  • Monitoring: Metrics and Spark UI enabled by default

Module Usage

Module Configuration
modules:
  etl_transformation:
    executor: "${executors.etl_processor}"
    repository: "../glue_jobs"
    entry_point: "transform_data.py"
    job_parameters:
      input_path: "s3://data-bucket/raw/"
      output_path: "s3://data-bucket/processed/"

Module Configuration Parameters

  • executor - Reference to the configured Glue job executor
  • repository - Local path to directory containing Glue job scripts
  • entry_point - Main Python script to execute as Glue job
  • Executor Overrides - Any executor parameter can be overridden at module level:
    • runtime, glue_version, worker_type, number_of_workers
    • timeout, role_arn
    • dependent_jars, extra_py_files, additional_python_modules
    • config - Custom Spark configurations
  • job_parameters - Custom parameters passed to your Glue job script as job arguments:
    • input_path - S3 path for source data to process
    • output_path - S3 path where processed data will be written
    • Custom parameters passed as --key value arguments
  • dependent_jars - List of JAR files or Maven packages (overrides executor-level):
    • Maven coordinates: "org.apache.spark:spark-avro_2.12:3.3.0"
    • S3 URIs: "s3://bucket/path/to/custom.jar"
    • Note: Delta Lake JAR is included automatically

SageMaker PySpark Processor

Distributed data processing using PySpark on SageMaker

Best For

Large-scale feature engineering with Python, distributed data transformations, complex aggregations and joins, and ML data preprocessing pipelines.

Configuration Example
executors:
  spark_processor:
    type: sagemaker_processor
    class: sagemaker.spark.processing.PySparkProcessor
    role: "${pipeline.role}"
    instance_type: "ml.m5.xlarge"
    instance_count: 2
    max_runtime_in_seconds: 3600
    spark_config:
      spark.executor.memory: "4g"
      spark.executor.cores: "2"
      spark.sql.adaptive.enabled: "true"

Configuration Parameters

  • class - Use sagemaker.spark.processing.PySparkProcessor
  • instance_type - EC2 instance type for Spark cluster
  • instance_count - Number of instances in the cluster
  • spark_config - Spark configuration parameters
  • max_runtime_in_seconds - Maximum job runtime

Module Usage

Module Configuration
modules:
  feature_engineering:
    executor: "${executors.spark_processor}"
    repository: "../spark_jobs"
    entry_point: "feature_engineering.py"
    job_parameters:
      input_path: "s3://data-bucket/raw/"
      output_path: "s3://data-bucket/processed/"
    submit_py_files:
      - "utils/spark_helpers.py"
      - "transformers/"
    additional_python_modules:
      - "delta-spark==2.4.0"

Module Configuration Parameters

  • executor - Reference to the configured SageMaker PySpark processor executor
  • repository - Local path to directory containing Spark job scripts
  • entry_point - Main PySpark script to execute (becomes spark-submit application)
  • job_parameters - Custom parameters passed to your Spark script as arguments:
    • input_path - S3 path for input data to process
    • output_path - S3 path where processed data will be written
  • submit_py_files - List of additional Python files/directories to include in Spark job (zipped and distributed to executors)
  • additional_python_modules - List of Python packages to download as wheels and include in Spark job

SageMaker Spark JAR Processor

Run Java/Scala Spark applications on SageMaker

Best For

High-performance Scala/Java Spark applications, complex business logic in JVM languages, integration with existing Java/Scala codebases, and performance-critical data processing.

Configuration Example
executors:
  spark_jar_processor:
    type: sagemaker_processor
    class: sagemaker.spark.processing.SparkJarProcessor
    role: "${pipeline.role}"
    instance_type: "ml.m5.2xlarge"
    instance_count: 3
    max_runtime_in_seconds: 7200
    spark_config:
      spark.executor.memory: "8g"
      spark.executor.cores: "4"
      spark.serializer: "org.apache.spark.serializer.KryoSerializer"

Configuration Parameters

  • class - Use sagemaker.spark.processing.SparkJarProcessor
  • instance_type - EC2 instance type for Spark cluster
  • instance_count - Number of instances in the cluster
  • spark_config - Spark configuration parameters
  • max_runtime_in_seconds - Maximum job runtime

Module Usage

Module Configuration
modules:
  data_transformation:
    executor: "${executors.spark_jar_processor}"
    repository: "../spark-app"
    entry_point: "com.company.DataProcessor"
    build_command: "mvn clean package -DskipTests"
    job_parameters:
      input_path: "s3://data-bucket/input/"
      output_path: "s3://data-bucket/output/"
    submit_jar: "target/data-processor-1.0.jar"
    dependent_jars:
      - "lib/custom-library.jar"

Module Configuration Parameters

  • executor - Reference to the configured SageMaker Spark JAR processor executor
  • repository - Local path to directory containing Spark application project
  • entry_point - Main class name to execute (must have main method)
  • build_command - Shell command to build the project (e.g., Maven, SBT)
  • job_parameters - Custom parameters passed to your Spark application as arguments:
    • input_path - S3 path for input data to process
    • output_path - S3 path where processed data will be written
  • submit_jar - Path to the main JAR file (relative to repository, auto-detected if not specified)
  • dependent_jars - List of additional JAR files to include in Spark classpath

Build Process

The Spark JAR processor automatically builds your project using the specified build_command before execution. Ensure your project structure follows Maven or SBT conventions.

SageMaker Processor

Managed compute for Python/R-based ML processing jobs

Best For

Data preprocessing, feature engineering, model evaluation, and custom ML processing tasks using Python or R.

Configuration Example
executors:
  data_processor:
    type: sagemaker_processor
    class: sagemaker.sklearn.processing.SKLearnProcessor
    role: "${pipeline.role}"
    instance_type: "ml.m5.xlarge"
    instance_count: 1
    framework_version: "1.0-1"
    py_version: "py3"

Configuration Parameters

  • class - SageMaker processor class to use
  • instance_type - EC2 instance type for processing
  • instance_count - Number of instances to use
  • framework_version - Framework version
  • py_version - Python version

Module Usage

Module Configuration
modules:
  data_preprocessing:
    executor: "${executors.data_processor}"
    repository: "../processing_jobs"
    entry_point: "preprocess_data.py"
    input_names: "raw_data"
    output_names: "processed_data"
    job_parameters:
      raw_data: "s3://data-bucket/raw/"
      processed_data: "s3://data-bucket/processed/"

Module Configuration Parameters

  • executor - Reference to the configured SageMaker processor executor
  • repository - Local path to directory containing processing scripts
  • entry_point - Main Python script to execute for processing
  • input_names - Comma-separated list of input channel names. Each name must match a key in job_parameters. Creates SageMaker ProcessingInput that mounts S3 data to /opt/ml/processing/input/{name}/ and passes --{name} /opt/ml/processing/input/{name}/ as arguments to your script
  • output_names - Comma-separated list of output channel names. Each name must match a key in job_parameters. Creates SageMaker ProcessingOutput that uploads data from /opt/ml/processing/output/{name}/ to S3 and passes --{name} /opt/ml/processing/output/{name}/ as arguments to your script
  • job_parameters - Custom parameters passed to your processing script:
    • raw_data - S3 path for input data (matches input_names, mounted to container)
    • processed_data - S3 path for output data (matches output_names, uploaded from container)
    • feature_columns - Custom parameter passed as --feature_columns
    • target_column - Custom parameter passed as --target_column

SageMaker Training

Model training jobs using SageMaker Training with built-in or custom algorithms

Best For

Training machine learning models with distributed computing, hyperparameter tuning, and automatic model artifacts management.

Configuration Example
executors:
  model_trainer:
    type: sagemaker_training
    role: "${pipeline.role}"
    instance_type: "ml.m5.2xlarge"
    instance_count: 1
    image_uri: "382416733822.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest"
    hyperparameters:
      max_depth: 5
      eta: 0.2
      objective: "multi:softmax"

Configuration Parameters

  • instance_type - EC2 instance type for training
  • instance_count - Number of instances to use
  • image_uri - Docker image for training algorithm
  • hyperparameters - Algorithm hyperparameters

Module Usage

Module Configuration
modules:
  model_training:
    executor: "${executors.model_trainer}"
    repository: "../training_jobs"
    entry_point: "train_model.py"
    input_names: "training_data,validation_data"
    output_names: "model_artifacts"
    job_parameters:
      training_data: "s3://data-bucket/train/"
      validation_data: "s3://data-bucket/validation/"
      model_artifacts: "s3://model-bucket/artifacts/"

Module Configuration Parameters

  • executor - Reference to the configured SageMaker training executor
  • repository - Local path to directory containing training scripts
  • entry_point - Main Python script to execute for training
  • input_names - Comma-separated list of input channel names. Each name must match a key in job_parameters. Creates SageMaker training input channels that mount S3 data to /opt/ml/input/data/{name}/ and passes --{name} /opt/ml/input/data/{name}/ as arguments to your script
  • output_names - Comma-separated list of output channel names. Each name must match a key in job_parameters. Creates SageMaker training output channels that upload artifacts from /opt/ml/model/{name}/ to S3 and passes --{name} /opt/ml/model/{name}/ as arguments to your script
  • job_parameters - Custom parameters passed to your training script:
    • training_data - S3 path for training dataset (matches input_names, mounted to container)
    • validation_data - S3 path for validation dataset (matches input_names, mounted to container)
    • model_artifacts - S3 path where trained model will be saved (matches output_names, uploaded from container)
    • epochs - Custom parameter passed as --epochs
    • batch_size - Custom parameter passed as --batch_size

Bedrock Batch Inference (Managed)

Queues jobs to a management service (Lambda + DynamoDB) that calls Bedrock on your behalf

Best For

Large-scale batch inference with foundation models, processing thousands of prompts efficiently using AWS Bedrock's batch inference capabilities.

Why This Executor?

This is not the native Bedrock batch API wrapper. It integrates with a queue-based management service (Lambdas + DynamoDB) to provide throttling, retries, scheduling, and cost/concurrency control for large batch runs. Use it when you need operational safeguards around Bedrock at scale.

How It Differs From Native

  • Queue + Control: Jobs are queued; concurrency is capped to avoid quota errors.
  • Resilience: Managed retries/status checks via Step Functions and Lambdas.
  • Separation: Decouples submission from execution; safer for spikes.
  • Observability: Status stored in DynamoDB; periodic polling (wait_seconds).
Configuration Example
executors:
  bedrock_batch_infer:
    # Alias: you can also use type: foundation_batch_infer  (future provider support)
    type: bedrock_batch_infer
    service_name: bedrock  # optional; defaults to 'bedrock'
    # Lambda ARNs for the batch manager service (configure per env)
    queue_lambda_arn: arn:aws:lambda:REGION:ACCOUNT:function:bedrock-batch-inference-create-batch-queue
    process_lambda_arn: arn:aws:lambda:REGION:ACCOUNT:function:bedrock-batch-inference-process-batch-job
    status_lambda_arn: arn:aws:lambda:REGION:ACCOUNT:function:bedrock_batch_inference_status_checker
    # Optional: polling wait between status checks (seconds)
    wait_seconds: 300

How Bedrock Batch Works

  • Enqueue: A Lambda creates/queues batch jobs in a DynamoDB-backed queue
  • Process: A Lambda starts Bedrock batch jobs and updates job state
  • Status: A Lambda checks progress; Step Functions polls every wait_seconds
  • Results: Outputs are written to the configured S3 directory

Required Parameters (in modules)

  • task_name - Unique identifier for the batch job
  • prompt_input_dir - S3 path containing input prompts (JSONL format)
  • result_output_dir - S3 path for batch inference results
  • model_id - Bedrock foundation model identifier (e.g., Claude, Titan)

Module Usage

Module Configuration
modules:
  llm_batch_processing:
    executor: "${executors.bedrock_batch_infer}"
    job_parameters:
      task_name: "content-analysis-batch"
      prompt_input_dir: "s3://data-bucket/prompts/"
      result_output_dir: "s3://data-bucket/results/"
      model_id: "anthropic.claude-3-sonnet-20240229-v1:0"
    description: "Process large batches of prompts using Bedrock"
    depends_on: []

Module Configuration Parameters

  • executor - Reference to the configured Bedrock batch inference executor
  • job_parameters - Required parameters for Bedrock batch inference:
    • task_name - Unique identifier for the batch inference job
    • prompt_input_dir - S3 path containing input prompts in JSONL format
    • result_output_dir - S3 path where batch inference results will be written
    • model_id - Amazon Bedrock foundation model identifier (e.g., Claude, Titan models)
  • description - Human-readable description of the batch processing task
  • depends_on - List of modules that must complete before this batch job runs

Service Dependencies

This executor requires the following AWS resources (configured via the executor):

  • queue_lambda_arn: ARN of the queue-creation Lambda
  • process_lambda_arn: ARN of the batch-processing Lambda
  • status_lambda_arn: ARN of the status-checker Lambda
  • IAM roles for Bedrock and S3 access