Files
Max dml 7e5abd504f feat: add DBOS skills for TypeScript, Python, and Go (#94)
Add three DBOS SDK skills with reference documentation for building
reliable, fault-tolerant applications with durable workflows.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:26:51 +01:00

1.8 KiB

title, impact, impactDescription, tags
title impact impactDescription tags
Partition Queues for Per-Entity Limits HIGH Enables per-user or per-entity flow control queue, partition, per-user, flow-control

Partition Queues for Per-Entity Limits

Partitioned queues apply flow control limits per partition, not globally. Useful for per-user or per-entity concurrency limits.

Incorrect (global limit affects all users):

queue = Queue("user_tasks", concurrency=1)  # Only 1 task total

def handle_user_task(user_id, task):
    # One user blocks all other users!
    queue.enqueue(process_task, task)

Correct (per-user limits with partitioning):

from dbos import Queue, SetEnqueueOptions

# Partition queue with concurrency=1 per partition
queue = Queue("user_tasks", partition_queue=True, concurrency=1)

@DBOS.workflow()
def process_task(task):
    pass

def handle_user_task(user_id: str, task):
    # Each user gets their own "subqueue" with concurrency=1
    with SetEnqueueOptions(queue_partition_key=user_id):
        queue.enqueue(process_task, task)

For both per-partition AND global limits, use two-level queueing:

# Global limit of 5 concurrent tasks
global_queue = Queue("global_queue", concurrency=5)
# Per-user limit of 1 concurrent task
user_queue = Queue("user_queue", partition_queue=True, concurrency=1)

def handle_task(user_id: str, task):
    with SetEnqueueOptions(queue_partition_key=user_id):
        user_queue.enqueue(concurrency_manager, task)

@DBOS.workflow()
def concurrency_manager(task):
    # Enforces global limit
    return global_queue.enqueue(process_task, task).get_result()

@DBOS.workflow()
def process_task(task):
    pass

Reference: Partitioning Queues