Pipelines Reference
Complete reference for ML pipeline executors, modules, and orchestration
Pipeline Components
Advanced Configuration
Available Executor Types
Data Processing
ML Compute
AI & Inference
Module Configuration
Individual processing units that make up your ML pipelines
Key Concept
Modules are the individual processing units that perform specific ML tasks. Each module uses an executor (compute environment) and contains your ML processing code.
modules:
module_name:
executor: ${executors.executor_name}
repository: "../modules"
entry_point: "script.py"
job_parameters:
input_path: "s3://bucket/input/"
output_path: "s3://bucket/output/"
custom_param: "value"
depends_on: ["other_module"]
description: "Description of what this module does"
Module Parameters
executor
- Reference to executor configuration (required)repository
- Path to module code repositoryentry_point
- Main script or class to executejob_parameters
- Custom parameters passed to your codedepends_on
- List of modules that must complete firstdescription
- Human-readable description
Module Dependencies and Execution Order
modules:
data_ingestion:
executor: ${executors.glue_etl}
entry_point: "ingest_data.py"
job_parameters:
source_database: "raw_data"
output_path: "s3://bucket/ingested/"
# No dependencies - runs first
data_cleaning:
executor: ${executors.python_processor}
entry_point: "clean_data.py"
job_parameters:
input_path: "s3://bucket/ingested/"
output_path: "s3://bucket/cleaned/"
depends_on: ["data_ingestion"]
feature_engineering:
executor: ${executors.python_processor}
entry_point: "create_features.py"
job_parameters:
input_path: "s3://bucket/cleaned/"
output_path: "s3://bucket/features/"
depends_on: ["data_cleaning"]
model_training:
executor: ${executors.training_job}
entry_point: "train_model.py"
job_parameters:
training_data: "s3://bucket/features/"
model_output: "s3://bucket/models/"
epochs: 50
depends_on: ["feature_engineering"]
Executor Configuration Overrides
Override executor parameters at the module level for fine-tuned resource allocation
Key Concept
You can override specific executor parameters at the module level while still using the base executor configuration. This is useful when you need different compute resources or settings for specific modules.
executors:
python_processor:
type: sagemaker_processor
class: sagemaker.sklearn.processing.SKLearnProcessor
instance_type: "ml.m5.large" # Base configuration
instance_count: 1
framework_version: "1.0-1"
modules:
# Uses base executor configuration
data_cleaning:
executor: ${executors.python_processor}
entry_point: "clean_data.py"
# Override instance type for memory-intensive task
feature_engineering:
executor: ${executors.python_processor}
entry_point: "create_features.py"
instance_type: "ml.m5.2xlarge" # Override: bigger instance
instance_count: 2 # Override: more instances
# Override for GPU processing
model_training:
executor: ${executors.python_processor}
entry_point: "train_model.py"
instance_type: "ml.p3.2xlarge" # Override: GPU instance
max_runtime_in_seconds: 7200 # Override: longer timeout
Override Benefits
- Cost Optimization: Use smaller instances for lightweight tasks, larger for heavy computation
- Performance Tuning: Adjust timeouts, memory, and CPU for specific workloads
- Resource Management: Scale compute resources per module without duplicating executor definitions
- Maintainability: Keep base configurations simple while customizing where needed
Executor Types
Compute environments that run your modules
EMR Spark Jar
Serverless Spark jobs using EMR Serverless for cost-effective big data processing
Best For
Large-scale Scala/Java Spark applications, complex ETL with custom JARs, cost-effective big data processing with automatic scaling, and batch processing workloads.
EMR Serverless Benefits
- Cost Optimization: Pay only for compute used, automatic start/stop
- Serverless: No cluster management, automatic scaling
- Performance: Optimized Spark runtime with faster startup
- Integration: Native AWS service integration
executors:
emr_spark_processor:
type: emr_spark
# Optional: All parameters below show defaults - can be customized
application_id: "the application id of the default applcation" # Custom application ID
executor_num: 4 # Number of executor instances
executor_cores: 2
executor_memory: "4g"
executor_disk: "20g" # Disk space per executor
driver_cores: 2
driver_memory: "4g"
driver_disk: "20g" # Disk space for driver
timeout: 1440 # 24 hours timeout (in minutes)
# Optional: All parameters below can be customized
dependent_jars: # Executor-level dependencies
- "org.apache.spark:spark-sql_2.12:3.3.0"
config:
- spark.sql.adaptive.enabled=true
- spark.sql.adaptive.coalescePartitions.enabled=true
- spark.serializer=org.apache.spark.serializer.KryoSerializer
Configuration Parameters
type
- Useemr_spark
for Scala/Java Spark applicationsrole_arn
- IAM execution role ARN (optional, uses default EMR execution role)application_id
- Custom EMR Serverless application ID (optional, uses shared application by default)executor_num
- Number of executor instances (optional, default: 2)executor_cores
- Number of CPU cores per executor (optional, default: 2)executor_memory
- Memory per executor (optional, default: 4g)executor_disk
- Disk space per executor (optional, default: 20g)driver_cores
- Number of CPU cores for driver (optional, default: 2)driver_memory
- Memory for driver (optional, default: 4g)driver_disk
- Disk space for driver (optional, default: 20g)timeout
- Job timeout in minutes (optional, default: 720 = 12 hours)dependent_jars
- List of JAR files or Maven packages for executor-level dependenciesconfig
- List of Spark configuration parameters
Module Usage
Note: All executor-level parameters can be overridden at the module level for fine-grained control per job.
modules:
data_transformation:
executor: ${executors.emr_spark_processor}
repository: "../spark-app"
entry_point: "com.company.DataProcessor"
build_command: "mvn clean package -DskipTests"
# Override executor-level settings for this specific job
executor_num: 8 # Override executor instances
executor_memory: "8g" # Override memory for large job
timeout: 1440 # Override timeout to 24 hours
job_parameters:
input_path: "s3://data-bucket/input/"
output_path: "s3://data-bucket/output/"
config_param: "production"
submit_jar: "target/data-processor-1.0.jar"
dependent_jars: # Override/extend executor JARs
# Local JAR files (uploaded to S3)
- "lib/custom-library.jar"
# Maven packages (downloaded automatically)
- "org.apache.spark:spark-sql_2.12:3.3.0"
- "com.databricks:spark-xml_2.12:0.14.0"
# S3 JAR files
- "s3://shared-jars/delta-core_2.12-2.3.0.jar"
Module Configuration Parameters
executor
- Reference to the configured EMR Spark executorrepository
- Local path to directory containing Spark application projectentry_point
- Main class name to execute (must have main method)build_command
- Shell command to build the project (e.g., Maven, SBT)- Executor Overrides - Any executor parameter can be overridden at module level:
executor_num
,executor_cores
,executor_memory
,executor_disk
driver_cores
,driver_memory
,driver_disk
timeout
,application_id
,role_arn
dependent_jars
,config
job_parameters
- Custom parameters passed to your Spark application as arguments:input_path
- S3 path for input data to processoutput_path
- S3 path where processed data will be written- Custom parameters passed as
--key value
arguments
submit_jar
- Path to the main JAR file (relative to repository, auto-detected if not specified)dependent_jars
- List of JAR files or Maven packages (overrides executor-level):- JAR files: Local paths or S3 URIs (added to
spark.jars
) - Maven packages: Format
groupId:artifactId:version
(added tospark.jars.packages
)
- JAR files: Local paths or S3 URIs (added to
EMR Application Management
Shared Application (Default):
- Uses team-shared EMR Serverless application created during
mk setup init
- Cost-efficient for typical workloads
- Automatic start/stop management
- No configuration required
Custom Application:
- Specify
application_id
for dedicated EMR application - Useful for resource isolation, different environments, or custom configurations
- Application must exist before job submission
- Full control over application lifecycle and settings
EMR PySpark
Serverless PySpark jobs using EMR Serverless for Python-based big data processing
Best For
Large-scale Python data processing, ETL pipelines with Python libraries, machine learning feature engineering, and cost-effective distributed computing for Python workloads.
executors:
emr_pyspark_processor:
type: emr_pyspark
# Optional: All parameters below show defaults - can be customized
application_id: "00def456uvw789123" # Custom application ID
executor_num: 3 # Number of executor instances
executor_cores: 2
executor_memory: "4g"
executor_disk: "20g" # Disk space per executor
driver_cores: 2
driver_memory: "4g"
driver_disk: "20g" # Disk space for driver
timeout: 720 # 12 hours timeout (in minutes)
# Optional: All parameters below can be customized
dependent_jars: # Executor-level JAR dependencies
- "io.delta:delta-core_2.12:2.3.0"
extra_py_files: # Executor-level Python files
- "src/utils:utils" # add utils package to the root
- "common/helpers.py"
additional_python_modules: # Auto-built Python packages
- delta-spark==2.4.0
- s3fs==2025.5.1
- numpy==1.24.4
config:
- spark.sql.adaptive.enabled=true
- spark.sql.adaptive.coalescePartitions.enabled=true
- spark.python.worker.memory=2g
Configuration Parameters
type
- Useemr_pyspark
for Python Spark applicationsrole_arn
- IAM execution role ARN (optional, uses default EMR execution role)application_id
- Custom EMR Serverless application ID (optional, uses shared application by default)executor_num
- Number of executor instances (optional, default: 2)executor_cores
- Number of CPU cores per executor (optional, default: 2)executor_memory
- Memory per executor (optional, default: 4g)executor_disk
- Disk space per executor (optional, default: 20g)driver_cores
- Number of CPU cores for driver (optional, default: 2)driver_memory
- Memory for driver (optional, default: 4g)driver_disk
- Disk space for driver (optional, default: 20g)timeout
- Job timeout in minutes (optional, default: 720 = 12 hours)dependent_jars
- List of JAR files or Maven packages for executor-level dependenciesextra_py_files
- List of additional Python files/directories for executor-level distributionadditional_python_modules
- List of Python packages to install and build automaticallyconfig
- List of Spark configuration parameters
Module Usage
Note: All executor-level parameters can be overridden at the module level for fine-grained control per job.
modules:
feature_engineering:
executor: ${executors.emr_pyspark_processor}
repository: "../pyspark_jobs"
entry_point: "feature_engineering.py"
# Override executor-level settings for this specific job
executor_num: 6 # Override executor instances
executor_memory: "8g" # Override memory for large dataset
driver_memory: "4g" # Override driver memory
timeout: 360 # Override timeout to 6 hours
job_parameters:
input_path: "s3://data-bucket/raw/"
output_path: "s3://data-bucket/features/"
feature_columns: "age,income,location"
target_column: "conversion"
submit_py_files:
- "utils/data_helpers.py"
- "transformers/"
dependent_jars: # Override/extend executor JARs
# Maven packages (downloaded automatically)
- "io.delta:delta-core_2.12:2.3.0"
- "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0"
# S3 JAR files
- "s3://jars-bucket/custom-transformers.jar"
extra_py_files: # Override/extend executor Python files
- "module_specific_utils/"
additional_python_modules: # Add module-specific packages
- scikit-learn==1.3.0
- matplotlib==3.7.1
Module Configuration Parameters
executor
- Reference to the configured EMR PySpark executorrepository
- Local path to directory containing PySpark job scriptsentry_point
- Main PySpark script to execute (becomes spark-submit application)job_parameters
- Custom parameters passed to your PySpark script as arguments:input_path
- S3 path for input data to processoutput_path
- S3 path where processed data will be written- Custom parameters passed as
--key value
arguments
submit_py_files
- List of additional Python files/directories to include in PySpark job (zipped and distributed to executors)dependent_jars
- List of JAR files or Maven packages:- JAR files: Local paths or S3 URIs (e.g., Delta Lake, Iceberg JARs)
- Maven packages: Format
groupId:artifactId:version
for automatic download
Cost Optimization
EMR PySpark uses the same shared EMR Serverless application as EMR Spark, providing:
- Zero idle costs: Application stops after 15 minutes of inactivity
- Fast startup: Applications start automatically when jobs are submitted
- Shared resources: Team members share the same EMR application
- Automatic scaling: Executors scale based on workload
Glue Job
ETL jobs using AWS Glue for large-scale data processing with Delta Lake support
Best For
Large-scale ETL operations, data transformation, Spark-based processing with automatic scaling, Delta Lake operations, and serverless execution with built-in data lake capabilities.
Delta Lake Integration
All Glue jobs come pre-configured with Delta Lake support, including:
- Automatic Delta Core:
io.delta:delta-core_2.12:2.3.0
JAR included by default - Delta Catalog: Spark SQL Delta catalog configured automatically
- S3 LogStore: Optimized Delta log store for S3 storage
- Iceberg Support: AWS Glue catalog integration for Iceberg tables
executors:
etl_processor:
type: glue_job
# Optional: All parameters below show defaults - can be customized
role_arn: "mlknife-glue-job"
runtime: "python3.9"
glue_version: "5.0"
executor_type: "G.1X"
executor_num: 2
timeout: 2880 # 48 hours in minutes
# Optional: All parameters below can be customized
dependent_jars: # Executor-level JAR dependencies
- "io.delta:delta-core_2.12:2.3.0"
extra_py_files: # Executor-level Python files
- "src/utils:utils" # add utils package to the root
- "common/helpers.py"
additional_python_modules: # Auto-built Python packages
- delta-spark==2.4.0
- s3fs==2025.5.1
- numpy==1.24.4
config:
- spark.sql.adaptive.enabled=true
- spark.sql.adaptive.coalescePartitions.enabled=true
- spark.python.worker.memory=2g
Configuration Parameters
role_arn
- IAM role ARN for Glue job execution (defaults tomlknife-glue-job
)runtime
- Python runtime version (default:"python3.9"
)glue_version
- AWS Glue version (default:"5.0"
)executor_type
- Worker node type: G.1X, G.2X, G.4X, G.8X (default:"G.1X"
)executor_num
- Number of worker nodes (default:2
)timeout
- Job timeout in minutes (default:2880
= 48 hours)dependent_jars
- Additional JAR files or Maven coordinates (default: includes Delta Lake)extra_py_files
- Additional Python files to includeadditional_python_modules
- Python packages to installconfig
- Custom Spark configuration overrides
Default Spark Configuration
Every Glue job includes optimized Spark settings:
- Hive Integration: AWS Glue Data Catalog as Hive metastore
- Delta Lake: Delta catalog and S3 log store configured
- Iceberg Support: Glue catalog integration for Iceberg tables
- Arrow Optimization: PyArrow enabled with 500 records per batch
- Performance: 300 shuffle partitions, dynamic partition overwrite
- Memory: 2GB Python worker memory allocation
- Monitoring: Metrics and Spark UI enabled by default
Module Usage
modules:
etl_transformation:
executor: "${executors.etl_processor}"
repository: "../glue_jobs"
entry_point: "transform_data.py"
job_parameters:
input_path: "s3://data-bucket/raw/"
output_path: "s3://data-bucket/processed/"
Module Configuration Parameters
executor
- Reference to the configured Glue job executorrepository
- Local path to directory containing Glue job scriptsentry_point
- Main Python script to execute as Glue job- Executor Overrides - Any executor parameter can be overridden at module level:
runtime
,glue_version
,worker_type
,number_of_workers
timeout
,role_arn
dependent_jars
,extra_py_files
,additional_python_modules
config
- Custom Spark configurations
job_parameters
- Custom parameters passed to your Glue job script as job arguments:input_path
- S3 path for source data to processoutput_path
- S3 path where processed data will be written- Custom parameters passed as
--key value
arguments
dependent_jars
- List of JAR files or Maven packages (overrides executor-level):- Maven coordinates:
"org.apache.spark:spark-avro_2.12:3.3.0"
- S3 URIs:
"s3://bucket/path/to/custom.jar"
- Note: Delta Lake JAR is included automatically
- Maven coordinates:
SageMaker PySpark Processor
Distributed data processing using PySpark on SageMaker
Best For
Large-scale feature engineering with Python, distributed data transformations, complex aggregations and joins, and ML data preprocessing pipelines.
executors:
spark_processor:
type: sagemaker_processor
class: sagemaker.spark.processing.PySparkProcessor
role: "${pipeline.role}"
instance_type: "ml.m5.xlarge"
instance_count: 2
max_runtime_in_seconds: 3600
spark_config:
spark.executor.memory: "4g"
spark.executor.cores: "2"
spark.sql.adaptive.enabled: "true"
Configuration Parameters
class
- Usesagemaker.spark.processing.PySparkProcessor
instance_type
- EC2 instance type for Spark clusterinstance_count
- Number of instances in the clusterspark_config
- Spark configuration parametersmax_runtime_in_seconds
- Maximum job runtime
Module Usage
modules:
feature_engineering:
executor: "${executors.spark_processor}"
repository: "../spark_jobs"
entry_point: "feature_engineering.py"
job_parameters:
input_path: "s3://data-bucket/raw/"
output_path: "s3://data-bucket/processed/"
submit_py_files:
- "utils/spark_helpers.py"
- "transformers/"
additional_python_modules:
- "delta-spark==2.4.0"
Module Configuration Parameters
executor
- Reference to the configured SageMaker PySpark processor executorrepository
- Local path to directory containing Spark job scriptsentry_point
- Main PySpark script to execute (becomes spark-submit application)job_parameters
- Custom parameters passed to your Spark script as arguments:input_path
- S3 path for input data to processoutput_path
- S3 path where processed data will be written
submit_py_files
- List of additional Python files/directories to include in Spark job (zipped and distributed to executors)additional_python_modules
- List of Python packages to download as wheels and include in Spark job
SageMaker Spark JAR Processor
Run Java/Scala Spark applications on SageMaker
Best For
High-performance Scala/Java Spark applications, complex business logic in JVM languages, integration with existing Java/Scala codebases, and performance-critical data processing.
executors:
spark_jar_processor:
type: sagemaker_processor
class: sagemaker.spark.processing.SparkJarProcessor
role: "${pipeline.role}"
instance_type: "ml.m5.2xlarge"
instance_count: 3
max_runtime_in_seconds: 7200
spark_config:
spark.executor.memory: "8g"
spark.executor.cores: "4"
spark.serializer: "org.apache.spark.serializer.KryoSerializer"
Configuration Parameters
class
- Usesagemaker.spark.processing.SparkJarProcessor
instance_type
- EC2 instance type for Spark clusterinstance_count
- Number of instances in the clusterspark_config
- Spark configuration parametersmax_runtime_in_seconds
- Maximum job runtime
Module Usage
modules:
data_transformation:
executor: "${executors.spark_jar_processor}"
repository: "../spark-app"
entry_point: "com.company.DataProcessor"
build_command: "mvn clean package -DskipTests"
job_parameters:
input_path: "s3://data-bucket/input/"
output_path: "s3://data-bucket/output/"
submit_jar: "target/data-processor-1.0.jar"
dependent_jars:
- "lib/custom-library.jar"
Module Configuration Parameters
executor
- Reference to the configured SageMaker Spark JAR processor executorrepository
- Local path to directory containing Spark application projectentry_point
- Main class name to execute (must have main method)build_command
- Shell command to build the project (e.g., Maven, SBT)job_parameters
- Custom parameters passed to your Spark application as arguments:input_path
- S3 path for input data to processoutput_path
- S3 path where processed data will be written
submit_jar
- Path to the main JAR file (relative to repository, auto-detected if not specified)dependent_jars
- List of additional JAR files to include in Spark classpath
Build Process
The Spark JAR processor automatically builds your project using the specified build_command
before execution. Ensure your project structure follows Maven or SBT conventions.
SageMaker Processor
Managed compute for Python/R-based ML processing jobs
Best For
Data preprocessing, feature engineering, model evaluation, and custom ML processing tasks using Python or R.
executors:
data_processor:
type: sagemaker_processor
class: sagemaker.sklearn.processing.SKLearnProcessor
role: "${pipeline.role}"
instance_type: "ml.m5.xlarge"
instance_count: 1
framework_version: "1.0-1"
py_version: "py3"
Configuration Parameters
class
- SageMaker processor class to useinstance_type
- EC2 instance type for processinginstance_count
- Number of instances to useframework_version
- Framework versionpy_version
- Python version
Module Usage
modules:
data_preprocessing:
executor: "${executors.data_processor}"
repository: "../processing_jobs"
entry_point: "preprocess_data.py"
input_names: "raw_data"
output_names: "processed_data"
job_parameters:
raw_data: "s3://data-bucket/raw/"
processed_data: "s3://data-bucket/processed/"
Module Configuration Parameters
executor
- Reference to the configured SageMaker processor executorrepository
- Local path to directory containing processing scriptsentry_point
- Main Python script to execute for processinginput_names
- Comma-separated list of input channel names. Each name must match a key in job_parameters. Creates SageMaker ProcessingInput that mounts S3 data to/opt/ml/processing/input/{name}/
and passes--{name} /opt/ml/processing/input/{name}/
as arguments to your scriptoutput_names
- Comma-separated list of output channel names. Each name must match a key in job_parameters. Creates SageMaker ProcessingOutput that uploads data from/opt/ml/processing/output/{name}/
to S3 and passes--{name} /opt/ml/processing/output/{name}/
as arguments to your scriptjob_parameters
- Custom parameters passed to your processing script:raw_data
- S3 path for input data (matches input_names, mounted to container)processed_data
- S3 path for output data (matches output_names, uploaded from container)feature_columns
- Custom parameter passed as--feature_columns
target_column
- Custom parameter passed as--target_column
SageMaker Training
Model training jobs using SageMaker Training with built-in or custom algorithms
Best For
Training machine learning models with distributed computing, hyperparameter tuning, and automatic model artifacts management.
executors:
model_trainer:
type: sagemaker_training
role: "${pipeline.role}"
instance_type: "ml.m5.2xlarge"
instance_count: 1
image_uri: "382416733822.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest"
hyperparameters:
max_depth: 5
eta: 0.2
objective: "multi:softmax"
Configuration Parameters
instance_type
- EC2 instance type for traininginstance_count
- Number of instances to useimage_uri
- Docker image for training algorithmhyperparameters
- Algorithm hyperparameters
Module Usage
modules:
model_training:
executor: "${executors.model_trainer}"
repository: "../training_jobs"
entry_point: "train_model.py"
input_names: "training_data,validation_data"
output_names: "model_artifacts"
job_parameters:
training_data: "s3://data-bucket/train/"
validation_data: "s3://data-bucket/validation/"
model_artifacts: "s3://model-bucket/artifacts/"
Module Configuration Parameters
executor
- Reference to the configured SageMaker training executorrepository
- Local path to directory containing training scriptsentry_point
- Main Python script to execute for traininginput_names
- Comma-separated list of input channel names. Each name must match a key in job_parameters. Creates SageMaker training input channels that mount S3 data to/opt/ml/input/data/{name}/
and passes--{name} /opt/ml/input/data/{name}/
as arguments to your scriptoutput_names
- Comma-separated list of output channel names. Each name must match a key in job_parameters. Creates SageMaker training output channels that upload artifacts from/opt/ml/model/{name}/
to S3 and passes--{name} /opt/ml/model/{name}/
as arguments to your scriptjob_parameters
- Custom parameters passed to your training script:training_data
- S3 path for training dataset (matches input_names, mounted to container)validation_data
- S3 path for validation dataset (matches input_names, mounted to container)model_artifacts
- S3 path where trained model will be saved (matches output_names, uploaded from container)epochs
- Custom parameter passed as--epochs
batch_size
- Custom parameter passed as--batch_size
Bedrock Batch Inference (Managed)
Queues jobs to a management service (Lambda + DynamoDB) that calls Bedrock on your behalf
Best For
Large-scale batch inference with foundation models, processing thousands of prompts efficiently using AWS Bedrock's batch inference capabilities.
Why This Executor?
This is not the native Bedrock batch API wrapper. It integrates with a queue-based management service (Lambdas + DynamoDB) to provide throttling, retries, scheduling, and cost/concurrency control for large batch runs. Use it when you need operational safeguards around Bedrock at scale.
How It Differs From Native
- Queue + Control: Jobs are queued; concurrency is capped to avoid quota errors.
- Resilience: Managed retries/status checks via Step Functions and Lambdas.
- Separation: Decouples submission from execution; safer for spikes.
- Observability: Status stored in DynamoDB; periodic polling (
wait_seconds
).
executors:
bedrock_batch_infer:
# Alias: you can also use type: foundation_batch_infer (future provider support)
type: bedrock_batch_infer
service_name: bedrock # optional; defaults to 'bedrock'
# Lambda ARNs for the batch manager service (configure per env)
queue_lambda_arn: arn:aws:lambda:REGION:ACCOUNT:function:bedrock-batch-inference-create-batch-queue
process_lambda_arn: arn:aws:lambda:REGION:ACCOUNT:function:bedrock-batch-inference-process-batch-job
status_lambda_arn: arn:aws:lambda:REGION:ACCOUNT:function:bedrock_batch_inference_status_checker
# Optional: polling wait between status checks (seconds)
wait_seconds: 300
How Bedrock Batch Works
- Enqueue: A Lambda creates/queues batch jobs in a DynamoDB-backed queue
- Process: A Lambda starts Bedrock batch jobs and updates job state
- Status: A Lambda checks progress; Step Functions polls every
wait_seconds
- Results: Outputs are written to the configured S3 directory
Required Parameters (in modules)
task_name
- Unique identifier for the batch jobprompt_input_dir
- S3 path containing input prompts (JSONL format)result_output_dir
- S3 path for batch inference resultsmodel_id
- Bedrock foundation model identifier (e.g., Claude, Titan)
Module Usage
modules:
llm_batch_processing:
executor: "${executors.bedrock_batch_infer}"
job_parameters:
task_name: "content-analysis-batch"
prompt_input_dir: "s3://data-bucket/prompts/"
result_output_dir: "s3://data-bucket/results/"
model_id: "anthropic.claude-3-sonnet-20240229-v1:0"
description: "Process large batches of prompts using Bedrock"
depends_on: []
Module Configuration Parameters
executor
- Reference to the configured Bedrock batch inference executorjob_parameters
- Required parameters for Bedrock batch inference:task_name
- Unique identifier for the batch inference jobprompt_input_dir
- S3 path containing input prompts in JSONL formatresult_output_dir
- S3 path where batch inference results will be writtenmodel_id
- Amazon Bedrock foundation model identifier (e.g., Claude, Titan models)
description
- Human-readable description of the batch processing taskdepends_on
- List of modules that must complete before this batch job runs
Service Dependencies
This executor requires the following AWS resources (configured via the executor):
queue_lambda_arn
: ARN of the queue-creation Lambdaprocess_lambda_arn
: ARN of the batch-processing Lambdastatus_lambda_arn
: ARN of the status-checker Lambda- IAM roles for Bedrock and S3 access