Build pipelines with AWS EMR
This article covers how to use Dagster Pipes with AWS EMR.
The dagster-aws integration library provides the pipes.PipesEMRClient resource, which can be used to launch AWS EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.
Prerequisites
To run the examples, you'll need to:
- Create a new Dagster project:
uvx create-dagster@latest project <project-name>
- Install the necessary Python libraries:
- uv
- pip
Install the required dependencies:
uv add dagster-aws
Install the required dependencies:
pip install dagster-aws
- Configure AWS authentication credentials. If you don't have this set up already, refer to the boto3 quickstart.
- In AWS, you'll need:
- An existing AWS account
- Prepared infrastructure such as S3 buckets, IAM roles, and other resources required for your EMR job
 
Step 1: Install the dagster-pipes module in your EMR environment
Choose one of the options to install dagster-pipes in the EMR environment.
For example, this Dockerfile can be used to package all required dependencies into a single PEX file (in practice, the most straightforward way to package Python dependencies for EMR jobs):
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR
FROM amazonlinux:2 AS builder
RUN yum install -y python3
WORKDIR /build
COPY  /uv /bin/uv
ENV VIRTUAL_ENV=/build/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16
RUN uv venv .venv
RUN  \
    uv pip install pex dagster-pipes boto3 pyspark
RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex
# test imports
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"
FROM scratch AS export
COPY  /output/venv.pex /venv.pex
The build can be launched with:
DOCKER_BUILDKIT=1 docker build --output type=local,dest=./output .
Then, upload the produced output/venv.pix file to an S3 bucket:
aws s3 cp output/venv.pex s3://your-bucket/venv.pex
Finally, use the --files and spark.pyspark.python options to specify the path to the PEX file in the spark-submit command:
spark-submit ... --files s3://your-bucket/venv.pex --conf spark.pyspark.python=./venv.pex
Step 2: Add dagster-pipes to the EMR job script
Call open_dagster_pipes in the EMR script to create a context that can be used to send messages to Dagster:
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession
def main():
    with open_dagster_pipes(
        message_writer=PipesS3MessageWriter(client=boto3.client("s3"))
    ) as pipes:
        pipes.log.info("Hello from AWS EMR!")
        spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
        df = spark.createDataFrame(
            [(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
            ["id", "name", "age"],
        )
        # calculate a really important statistic
        avg_age = float(df.agg({"age": "avg"}).collect()[0][0])
        # attach it to the asset materialization in Dagster
        pipes.report_asset_materialization(
            metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
            data_version="alpha",
        )
        spark.stop()
        print("Hello from stdout!")
The metadata format shown above ({"raw_value": value, "type": type}) is part of Dagster Pipes' special syntax for specifying rich Dagster metadata. For a complete reference of all supported metadata types and their formats, see the Dagster Pipes metadata reference.
Step 3: Create an asset using the PipesEMRClient to launch the job
You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.
In the Dagster asset/op code, use the PipesEMRClient resource to launch the job:
import os
import boto3
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
import dagster as dg
@dg.asset
def emr_pipes_asset(
    context: dg.AssetExecutionContext, pipes_emr_client: PipesEMRClient
):
    return pipes_emr_client.run(
        context=context,
        # see full reference here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr/client/run_job_flow.html#EMR.Client.run_job_flow
        run_job_flow_params={},
    ).get_materialize_result()
This will launch the AWS EMR job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.
EMR application steps stdout and stderr will be forwarded to the Dagster process.
Step 4: Create Dagster definitions
You can scaffold resources from the command line by running dg scaffold defs dagster.resources <path/to/resources_file.py>. For more information, see the dg CLI docs.
Next, add the PipesEMRClient resource to your project's Definitions object:
import boto3
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
import dagster as dg
# start_resources
@dg.definitions
def resources():
    return dg.Definitions(
        resources={
            "pipes_emr_client": PipesEMRClient(
                message_reader=PipesS3MessageReader(
                    client=boto3.client("s3"),
                    bucket=dg.EnvVar("DAGSTER_PIPES_BUCKET"),
                )
            )
        },
    )
Dagster will now be able to launch the AWS EMR job from the emr_asset asset, and receive logs and events from the job.