Dagster & OpenAI
The OpenAI library allows you to easily interact with the OpenAI REST API using the OpenAI Python API to build AI steps into your Dagster pipelines. You can also log OpenAI API usage metadata in Dagster Insights, giving you detailed observability on API call credit consumption.
Using this library's OpenAIResource, you can easily interact with the OpenAI REST API via the OpenAI Python API.
When used with Dagster's asset definitions, the resource automatically logs OpenAI usage metadata in asset metadata.
Getting started
Before you get started with the dagster-openai library, we recommend familiarizing yourself with the OpenAI Python API library, which this integration uses to interact with the OpenAI REST API.
Prerequisites
To get started, install the dagster and dagster-openai Python packages:
- uv
- pip
uv add dagster-openai
pip install dagster-openai
Note that you will need an OpenAI API key to use the resource, which can be generated in your OpenAI account.
Connecting to OpenAI
The first step in using OpenAI with Dagster is to tell Dagster how to connect to an OpenAI client using an OpenAI resource. This resource contains the credentials needed to interact with OpenAI API.
We will supply our credentials as environment variables by adding them to a .env file. For more information on setting environment variables in a production setting, see Using environment variables and secrets.
# .env
OPENAI_API_KEY=...
Then, we can instruct Dagster to authorize the OpenAI resource using the environment variables:
from dagster_openai import OpenAIResource
from dagster import EnvVar
# Pull API key from environment variables
openai = OpenAIResource(
    api_key=EnvVar("OPENAI_API_KEY"),
)
Using the OpenAI resource with assets
The OpenAI resource can be used in assets in order to interact with the OpenAI API. Note that in this example, we supply our credentials as environment variables directly when instantiating the Definitions object.
from dagster_openai import OpenAIResource
from dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test."}],
        )
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
defs = Definitions(
    assets=[openai_asset],
    jobs=[openai_asset_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)
After materializing your asset, your OpenAI API usage metadata will be available in the Events and Plots tabs of your asset in the Dagster UI. If you are using Dagster+, your usage metadata will also be available in Dagster Insights.
Using the OpenAI resource with ops
The OpenAI resource can also be used in ops.
Currently, the OpenAI resource doesn't (out-of-the-box) log OpenAI usage metadata when used in ops.
from dagster_openai import OpenAIResource
from dagster import Definitions, EnvVar, GraphDefinition, OpExecutionContext, op
@op
def openai_op(context: OpExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test"}],
        )
openai_op_job = GraphDefinition(name="openai_op_job", node_defs=[openai_op]).to_job()
defs = Definitions(
    jobs=[openai_op_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)
About OpenAI
OpenAI is a U.S. based artificial intelligence (AI) research organization with the goal of developing "safe and beneficial" artificial general intelligence, which it defines as "highly autonomous systems that outperform humans at most economically valuable work".