Modal Application
For this example, we will create multiple pipelines that do the following:
- Download audio files
- Transcribe the audio
- Summarize the audio
- Send an email summary
Many of these steps can be done in Dagster, but the transcription step is better suited for a different environment.
An advantage of Dagster is that you are not limited to only executing code with Dagster. In this case, we will use Modal. Modal makes it easy to manage and scale the infrastructure needed to perform distributed computation while maintaining a Pythonic workflow. It works especially well with Dagster, since Dagster can help manage and orchestrate the various components in our pipeline, while Modal can be used to spin up auto-scaling infrastructure in a serverless way.
We will start by explaining the Modal application of our pipeline and then show how we can use it within Dagster.
Modal application
Within Modal, we need to define the image that will be used by the Modal infrastructure. As mentioned, Modal allows you to define an image and the desired dependencies in a Pythonic way so you can avoid defining a separate Dockerfile. The app image is then supplied to the `modal.App, which we will use later on to decorate our Modal functions:
app_image = (
    modal.Image.debian_slim(python_version="3.10")
    .apt_install("git")
    .pip_install(
        "git+https://github.com/openai/whisper.git",
        "dacite",
        "jiwer",
        "ffmpeg-python",
        "gql[all]~=3.0.0a5",
        "python-multipart~=0.0.9",
        "pandas",
        "loguru==0.6.0",
        "torchaudio==2.1.0",
        "python-dotenv",
    )
    .apt_install("ffmpeg")
    .pip_install("ffmpeg-python")
)
app = modal.App(
    "whisper-pod-transcriber",
    image=app_image,
)
Another benefit of Modal is that it allows us to mount a Cloudflare R2 Bucket like a file system. R2 will serve as the staging ground between Dagster and Modal:
cloud_bucket_mount = modal.CloudBucketMount(
    "dagster-modal-demo",
    bucket_endpoint_url=os.environ.get("CLOUDFLARE_R2_API"),
    secret=modal.Secret.from_dict(
        {
            "AWS_ACCESS_KEY_ID": os.environ.get("CLOUDFLARE_R2_ACCESS_KEY_ID"),
            "AWS_SECRET_ACCESS_KEY": os.environ.get("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
            "AWS_REGION": "auto",
        }
    ),
)
With the image and R2 mount ready, we can define our Modal functions. The first function will transcribe a segment of a podcast. Because Modal scales to fit the needs of the application and allows for parallel processing, we can optimize our application by splitting podcasts that may be several hours into smaller pieces. Modal will manage all of the infrastructure provisioning as needed. As you can see in the decorator, all we need to provide Modal with is our image, the R2 bucket, and our required CPUs (we could use GPUs but the OpenAI Whisper model is relatively small and does not require GPU processing like some other models):
@app.function(
    image=app_image,
    cpu=2,
    timeout=400,
    volumes={
        "/mount": cloud_bucket_mount,
    },
)
def transcribe_segment(
    start: float,
    end: float,
    audio_filepath: pathlib.Path,
    model: config.ModelSpec,
):
    import tempfile
    import time
    import ffmpeg
    import torch
    import whisper
    t0 = time.time()
    with tempfile.NamedTemporaryFile(suffix=".mp3") as f:
        (
            ffmpeg.input(str(audio_filepath))
            .filter("atrim", start=start, end=end)
            .output(f.name)
            .overwrite_output()
            .run(quiet=True)
        )
        use_gpu = torch.cuda.is_available()
        device = "cuda" if use_gpu else "cpu"
        model = whisper.load_model(model.name, device=device, download_root=config.MODEL_DIR)
        result = model.transcribe(f.name, language="en", fp16=use_gpu)
    logger.info(
        f"Transcribed segment {start:.2f} to {end:.2f} ({end - start:.2f}s duration) in {time.time() - t0:.2f} seconds."
    )
    # Add back offsets.
    for segment in result["segments"]:
        segment["start"] += start
        segment["end"] += start
    return result
The next function, transcribe_episode, will split an audio file into smaller segments and then apply the transcribe_segment function. After all the segments have been processed, it will write the transcribed text into JSON files within the R2 bucket:
    segment_gen = split_silences(str(audio_file))
    output_text = ""
    output_segments = []
    for result in transcribe_segment.starmap(
        segment_gen, kwargs=dict(audio_filepath=audio_file, model=model)
    ):
        output_text += result["text"]
        output_segments += result["segments"]
With the Modal functions in place, we can define the entry point main. This is what Dagster will use to interact with Modal:
@app.local_entrypoint()
def main():
    from dagster_pipes import open_dagster_pipes
    model = config.DEFAULT_MODEL
    with open_dagster_pipes() as context:
        audio_path = context.extras.get("audio_file_path")
        if not audio_path:
            raise Exception("Missing `audio_file_path` extras parameter")
        audio_path = "/mount/" + audio_path
        transcription_path = audio_path.replace(".mp3", ".json")
        transcribe_episode.remote(
            audio_file=Path(audio_path),
            result_path=Path(transcription_path),
            model=model,
        )
        context.report_asset_materialization(
            metadata={
                "audio_file": audio_path,
                "transcription_file": transcription_path,
            }
        )
Next steps
- Continue this example with rss assets