Files
Max dml 7e5abd504f feat: add DBOS skills for TypeScript, Python, and Go (#94)
Add three DBOS SDK skills with reference documentation for building
reliable, fault-tolerant applications with durable workflows.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:26:51 +01:00

1.5 KiB

title, impact, impactDescription, tags
title impact impactDescription tags
Use Streams for Real-Time Data MEDIUM Enables real-time progress and LLM streaming streaming, write_stream, read_stream, realtime

Use Streams for Real-Time Data

Workflows can stream data in real-time to clients. Useful for LLM responses, progress reporting, or long-running results.

Incorrect (returning all data at end):

@DBOS.workflow()
def llm_workflow(prompt):
    # Client waits for entire response
    response = call_llm(prompt)
    return response

Correct (streaming results):

@DBOS.workflow()
def llm_workflow(prompt):
    for chunk in call_llm_streaming(prompt):
        DBOS.write_stream("response", chunk)
    DBOS.close_stream("response")
    return "complete"

# Client reads stream
@app.get("/stream/{workflow_id}")
def stream_response(workflow_id: str):
    def generate():
        for value in DBOS.read_stream(workflow_id, "response"):
            yield value
    return StreamingResponse(generate())

Stream characteristics:

  • Streams are immutable and append-only
  • Writes from workflows happen exactly-once
  • Writes from steps happen at-least-once (may duplicate on retry)
  • Streams auto-close when workflow terminates

Close streams explicitly when done:

@DBOS.workflow()
def producer():
    DBOS.write_stream("data", {"step": 1})
    DBOS.write_stream("data", {"step": 2})
    DBOS.close_stream("data")  # Signal completion

Reference: Workflow Streaming