Build pipelines with PySpark
This tutorial is focused on using Dagster Pipes to launch & monitor general PySpark jobs. The Spark integration page provides more information on using Pipes with specific Spark providers, such as AWS EMR or Databricks.
Spark is often used with object stores such as Amazon S3. We are going to simulate this setup locally with MinIO, which has an API compatible with AWS S3. All communication between Dagster and the Spark job (metadata and logs collection) will happen through a MinIO bucket.
Prerequisites
To run the examples, you'll need to:
- Create a new Dagster project:
uvx create-dagster@latest project <project-name>
- Install the necessary Python libraries:
- uv
- pip
Install the required dependencies:
uv add dagster-aws
Install the required dependencies:
pip install dagster-aws
- 
In the PySpark environment, you'll need to install the dagster-pipesPython package, and typically the Java AWS S3 SDK packages. For example:curl -fSL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.5.1/hadoop-aws-3.5.1.jar" \
 -o /opt/bitnami/spark/jars/hadoop-aws-3.5.1.jar
 curl -fSL "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.780/aws-java-sdk-bundle-1.12.780.jar" \
 -o /opt/bitnami/spark/jars/aws-java-sdk-bundle-1.12.780.jar
- 
We are going to upload the PySpark script to the S3 bucket directly inside the Dagster asset. In production, consider doing this via CI/CD instead. 
Step 1: Write the Dagster code
You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.
We will set up a few non-default Pipes components to streamline the otherwise challenging problem of orchestrating Spark jobs.
- Let's start by creating the asset and opening a Pipes session. We will be using S3 to pass Pipes messages from the Spark job to Dagster, so we will create PipesS3MessageReaderandPipesS3ContextInjectorobjects. (Technically, it's not strictly required to use S3 for passing the Dagster context, but storing it there will decrease the CLI arguments size).
import os
import subprocess
from pathlib import Path
import boto3
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader
import dagster as dg
LOCAL_SCRIPT_PATH = Path(__file__).parent / "script.py"
@dg.asset
def pipes_spark_asset(context: dg.AssetExecutionContext):
    s3_client = boto3.client("s3")
    bucket = os.environ["DAGSTER_PIPES_BUCKET"]
    # upload the script to S3
    # ideally, this should be done via CI/CD processes and not in the asset body
    # but for the sake of this example we are doing it here
    s3_script_path = f"{context.dagster_run.run_id}/pyspark_script.py"
    s3_client.upload_file(LOCAL_SCRIPT_PATH, bucket, s3_script_path)
    context_injector = PipesS3ContextInjector(
        client=s3_client,
        bucket=bucket,
    )
    message_reader = PipesS3MessageReader(
        client=s3_client,
        bucket=bucket,
        # the following setting will configure the Spark job to collect logs from the driver
        # and send them to Dagster via Pipes
        include_stdio_in_messages=True,
    )
Notice how PipesS3MessageReader has include_stdio_in_messages=True. This setting will configure the Pipes message writer in the Spark job to collect logs from the Spark driver and send them to Dagster via Pipes messages.
- We will be using CLI arguments to pass the bootstrap information from Dagster to the Spark job. We will fetch them from the session.get_bootstrap_cli_argumentsmethod. We pass these arguments tospark-submitalong with a few other settings.
    # pipes_spark_asset body continues below
    with dg.open_pipes_session(
        context=context,
        message_reader=message_reader,
        context_injector=context_injector,
    ) as session:
        dagster_pipes_args = " ".join(
            # prepare Pipes bootstrap CLI arguments
            [
                f"{key} {value}"
                for key, value in session.get_bootstrap_cli_arguments().items()
            ]
        )
        cmd = " ".join(
            [
                "spark-submit",
                # change --master and --deploy-mode according to specific Spark setup
                "--master",
                "local[*]",
                "--conf",
                "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem",
                # custom S3 endpoint for MinIO
                "--conf",
                "spark.hadoop.fs.s3a.endpoint=http://minio:9000",
                "--conf",
                "spark.hadoop.fs.s3a.path.style.access=true",
                f"s3a://{bucket}/{s3_script_path}",
                dagster_pipes_args,
            ]
        )
        subprocess.run(
            # we do not forward stdio on purpose to demonstrate how Pipes collect logs from the driver
            cmd,
            shell=True,
            check=True,
        )
    return session.get_results()
In other Pipes workflows, passing the bootstrap information from Dagster to the remote Pipes session is typically done via environment variables, but setting environment variables for Spark jobs can be complicated (the exact way of doing this depends on the Spark deployment) or not possible at all. CLI arguments are a convenient alternative.
Step 2: Use Pipes in the PySpark job (script.py)
First, create a new file named script.py, then add the following code to create a context that can be used to send messages to Dagster:
import boto3
from dagster_pipes import (
    PipesCliArgsParamsLoader,
    PipesS3ContextLoader,
    PipesS3MessageWriter,
    open_dagster_pipes,
)
from pyspark.sql import SparkSession
def main():
    with open_dagster_pipes(
        message_writer=PipesS3MessageWriter(client=boto3.client("s3")),
        context_loader=PipesS3ContextLoader(client=boto3.client("s3")),
        params_loader=PipesCliArgsParamsLoader(),
    ) as pipes:
        print("Hello from the Spark driver!")
        pipes.log.info("I am logging a Dagster message from the Spark driver!")
        spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
        df = spark.createDataFrame(
            [(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
            ["id", "name", "age"],
        )
        # calculate a really important statistic
        avg_age = float(df.agg({"age": "avg"}).collect()[0][0])
        # attach it to the asset materialization in Dagster
        pipes.report_asset_materialization(
            metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
            data_version="alpha",
        )
        spark.stop()
Note how PipesCliArgsParamsLoader is used to load the CLI arguments passed by Dagster. This information will be used to automatically configure PipesS3MessageWriter and PipesS3ContextLoader.
Because the Dagster code has include_stdio_in_messages=True, the message writer will collect logs from the driver and send them to Dagster via Pipes messages.
Step 3: Running the pipeline with Docker
- 
Place the PySpark code for script.pyand the Dagster orchestration code fordagster_code.pyin the same directory.
- 
Create a Dockerfile:
ARG SPARK_VERSION=3.5.1
FROM bitnami/spark:${SPARK_VERSION}
USER root
ARG SPARK_VERSION=3.4.1
COPY  /uv /uvx /bin/
RUN install_packages curl
RUN curl -fSL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/$SPARK_VERSION/hadoop-aws-$SPARK_VERSION.jar" \
    -o /opt/bitnami/spark/jars/hadoop-aws-$SPARK_VERSION.jar
RUN curl -fSL "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.780/aws-java-sdk-bundle-1.12.780.jar" \
    -o /opt/bitnami/spark/jars/aws-java-sdk-bundle-1.12.780.jar
# install Python dependencies
RUN  uv pip install --system dagster dagster-webserver dagster-aws pyspark
WORKDIR /src
ENV DAGSTER_HOME=/dagster_home
COPY dagster_code.py script.py ./
- Create a docker-compose.yml:
# this docker compose file creates a mini Spark cluster with 1 master and 2 workers to simulate a distributed environment
volumes:
    spark-logs:
    spark-data:
    minio-data:
    dagster_home:
networks:
    spark:
services:
    minio:
        image: bitnami/minio
        ports:
            - "9000:9000"
            - "9001:9001"
        environment:
            MINIO_ROOT_USER: minio
            MINIO_ROOT_PASSWORD: minio123
            MINIO_DEFAULT_BUCKETS: "dagster-pipes:public"
        volumes:
            - minio-data:/data
        networks:
            - spark
    dagster-dev:
        develop:
            watch:
                - action: sync
                  path: .
                  target: /src
        build:
            context: .
            dockerfile: Dockerfile
        command:
            - "dagster"
            - "dev"
            - "-f"
            - "/src/dagster_code.py"
            - "--host"
            - "0.0.0.0"
            - "--port"
            - "3000"
        ports:
            - "3000:3000"
        volumes:
            - spark-logs:/spark/logs
            - spark-data:/spark/data
            - dagster_home:/dagster_home
        environment:
            AWS_ACCESS_KEY_ID: minio
            AWS_SECRET_ACCESS_KEY: minio123
            AWS_ENDPOINT_URL: http://minio:9000
            DAGSTER_PIPES_BUCKET: dagster-pipes
        depends_on:
            - minio
        networks:
            - spark
- Start the Dagster dev instance inside Docker:
docker compose up --build --watch
--watch will automatically sync the changes in the local filesystem to the
Docker container, which is useful for live code updates without volume mounts.
- Navigate to http://localhost:3000 to open the Dagster UI and materialize the asset. Metadata and stdio logs emitted in the PySpark job will become available in Dagster.