feat: Add Official Microsoft & Gemini Skills (845+ Total)

🚀 Impact

Significantly expands the capabilities of **Antigravity Awesome Skills** by integrating official skill collections from **Microsoft** and **Google Gemini**. This update increases the total skill count to **845+**, making the library even more comprehensive for AI coding assistants.

 Key Changes

1. New Official Skills

- **Microsoft Skills**: Added a massive collection of official skills from [microsoft/skills](https://github.com/microsoft/skills).
  - Includes Azure, .NET, Python, TypeScript, and Semantic Kernel skills.
  - Preserves the original directory structure under `skills/official/microsoft/`.
  - Includes plugin skills from the `.github/plugins` directory.
- **Gemini Skills**: Added official Gemini API development skills under `skills/gemini-api-dev/`.

2. New Scripts & Tooling

- **`scripts/sync_microsoft_skills.py`**: A robust synchronization script that:
  - Clones the official Microsoft repository.
  - Preserves the original directory heirarchy.
  - Handles symlinks and plugin locations.
  - Generates attribution metadata.
- **`scripts/tests/inspect_microsoft_repo.py`**: Debug tool to inspect the remote repository structure.
- **`scripts/tests/test_comprehensive_coverage.py`**: Verification script to ensure 100% of skills are captured during sync.

3. Core Improvements

- **`scripts/generate_index.py`**: Enhanced frontmatter parsing to safely handle unquoted values containing `@` symbols and commas (fixing issues with some Microsoft skill descriptions).
- **`package.json`**: Added `sync:microsoft` and `sync:all-official` scripts for easy maintenance.

4. Documentation

- Updated `README.md` to reflect the new skill counts (845+) and added Microsoft/Gemini to the provider list.
- Updated `CATALOG.md` and `skills_index.json` with the new skills.

🧪 Verification

- Ran `scripts/tests/test_comprehensive_coverage.py` to verify all Microsoft skills are detected.
- Validated `generate_index.py` fixes by successfully indexing the new skills.
This commit is contained in:
Ahmed Rehan
2026-02-11 20:16:23 +05:00
parent 167d7c97c7
commit 17bce709de
145 changed files with 44081 additions and 72 deletions

View File

@@ -0,0 +1,168 @@
---
name: azure-eventgrid-py
description: |
Azure Event Grid SDK for Python. Use for publishing events, handling CloudEvents, and event-driven architectures.
Triggers: "event grid", "EventGridPublisherClient", "CloudEvent", "EventGridEvent", "publish events".
package: azure-eventgrid
---
# Azure Event Grid SDK for Python
Event routing service for building event-driven applications with pub/sub semantics.
## Installation
```bash
pip install azure-eventgrid azure-identity
```
## Environment Variables
```bash
EVENTGRID_TOPIC_ENDPOINT=https://<topic-name>.<region>.eventgrid.azure.net/api/events
EVENTGRID_NAMESPACE_ENDPOINT=https://<namespace>.<region>.eventgrid.azure.net
```
## Authentication
```python
from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient
credential = DefaultAzureCredential()
endpoint = "https://<topic-name>.<region>.eventgrid.azure.net/api/events"
client = EventGridPublisherClient(endpoint, credential)
```
## Event Types
| Format | Class | Use Case |
|--------|-------|----------|
| Cloud Events 1.0 | `CloudEvent` | Standard, interoperable (recommended) |
| Event Grid Schema | `EventGridEvent` | Azure-native format |
## Publish CloudEvents
```python
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.identity import DefaultAzureCredential
client = EventGridPublisherClient(endpoint, DefaultAzureCredential())
# Single event
event = CloudEvent(
type="MyApp.Events.OrderCreated",
source="/myapp/orders",
data={"order_id": "12345", "amount": 99.99}
)
client.send(event)
# Multiple events
events = [
CloudEvent(
type="MyApp.Events.OrderCreated",
source="/myapp/orders",
data={"order_id": f"order-{i}"}
)
for i in range(10)
]
client.send(events)
```
## Publish EventGridEvents
```python
from azure.eventgrid import EventGridEvent
from datetime import datetime, timezone
event = EventGridEvent(
subject="/myapp/orders/12345",
event_type="MyApp.Events.OrderCreated",
data={"order_id": "12345", "amount": 99.99},
data_version="1.0"
)
client.send(event)
```
## Event Properties
### CloudEvent Properties
```python
event = CloudEvent(
type="MyApp.Events.ItemCreated", # Required: event type
source="/myapp/items", # Required: event source
data={"key": "value"}, # Event payload
subject="items/123", # Optional: subject/path
datacontenttype="application/json", # Optional: content type
dataschema="https://schema.example", # Optional: schema URL
time=datetime.now(timezone.utc), # Optional: timestamp
extensions={"custom": "value"} # Optional: custom attributes
)
```
### EventGridEvent Properties
```python
event = EventGridEvent(
subject="/myapp/items/123", # Required: subject
event_type="MyApp.ItemCreated", # Required: event type
data={"key": "value"}, # Required: event payload
data_version="1.0", # Required: schema version
topic="/subscriptions/.../topics/...", # Optional: auto-set
event_time=datetime.now(timezone.utc) # Optional: timestamp
)
```
## Async Client
```python
from azure.eventgrid.aio import EventGridPublisherClient
from azure.identity.aio import DefaultAzureCredential
async def publish_events():
credential = DefaultAzureCredential()
async with EventGridPublisherClient(endpoint, credential) as client:
event = CloudEvent(
type="MyApp.Events.Test",
source="/myapp",
data={"message": "hello"}
)
await client.send(event)
import asyncio
asyncio.run(publish_events())
```
## Namespace Topics (Event Grid Namespaces)
For Event Grid Namespaces (pull delivery):
```python
from azure.eventgrid.aio import EventGridPublisherClient
# Namespace endpoint (different from custom topic)
namespace_endpoint = "https://<namespace>.<region>.eventgrid.azure.net"
topic_name = "my-topic"
async with EventGridPublisherClient(
endpoint=namespace_endpoint,
credential=DefaultAzureCredential()
) as client:
await client.send(
event,
namespace_topic=topic_name
)
```
## Best Practices
1. **Use CloudEvents** for new applications (industry standard)
2. **Batch events** when publishing multiple events
3. **Include meaningful subjects** for filtering
4. **Use async client** for high-throughput scenarios
5. **Handle retries** — Event Grid has built-in retry
6. **Set appropriate event types** for routing and filtering

View File

@@ -0,0 +1,240 @@
---
name: azure-eventhub-py
description: |
Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.
Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".
package: azure-eventhub
---
# Azure Event Hubs SDK for Python
Big data streaming platform for high-throughput event ingestion.
## Installation
```bash
pip install azure-eventhub azure-identity
# For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
```
## Environment Variables
```bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
```
## Authentication
```python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
# Producer
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
# Consumer
consumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
```
## Client Types
| Client | Purpose |
|--------|---------|
| `EventHubProducerClient` | Send events to Event Hub |
| `EventHubConsumerClient` | Receive events from Event Hub |
| `BlobCheckpointStore` | Track consumer progress |
## Send Events
```python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# Create batch (handles size limits)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Batch is full, send and create new one
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Send remaining
producer.send_batch(event_data_batch)
```
### Send to Specific Partition
```python
# By partition ID
event_data_batch = producer.create_batch(partition_id="0")
# By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
```
## Receive Events
### Simple Receive
```python
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Beginning of stream
)
```
### With Blob Checkpoint Store (Production)
```python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint after processing
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)
```
## Async Client
```python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())
```
## Event Properties
```python
event = EventData("My event body")
# Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
# Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
```
## Get Event Hub Info
```python
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
```
## Best Practices
1. **Use batches** for sending multiple events
2. **Use checkpoint store** in production for reliable processing
3. **Use async client** for high-throughput scenarios
4. **Use partition keys** for ordered delivery within a partition
5. **Handle batch size limits** — catch ValueError when batch is full
6. **Use context managers** (`with`/`async with`) for proper cleanup
7. **Set appropriate consumer groups** for different applications
## Reference Files
| File | Contents |
|------|----------|
| [references/checkpointing.md](references/checkpointing.md) | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| [references/partitions.md](references/partitions.md) | Partition management, load balancing, starting positions |
| [scripts/setup_consumer.py](scripts/setup_consumer.py) | CLI for Event Hub info, consumer setup, and event sending/receiving |

View File

@@ -0,0 +1,267 @@
---
name: azure-servicebus-py
description: |
Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.
Triggers: "service bus", "ServiceBusClient", "queue", "topic", "subscription", "message broker".
package: azure-servicebus
---
# Azure Service Bus SDK for Python
Enterprise messaging for reliable cloud communication with queues and pub/sub topics.
## Installation
```bash
pip install azure-servicebus azure-identity
```
## Environment Variables
```bash
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=myqueue
SERVICEBUS_TOPIC_NAME=mytopic
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
```
## Authentication
```python
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
```
## Client Types
| Client | Purpose | Get From |
|--------|---------|----------|
| `ServiceBusClient` | Connection management | Direct instantiation |
| `ServiceBusSender` | Send messages | `client.get_queue_sender()` / `get_topic_sender()` |
| `ServiceBusReceiver` | Receive messages | `client.get_queue_receiver()` / `get_subscription_receiver()` |
## Send Messages (Async)
```python
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
```
## Receive Messages (Async)
```python
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
```
## Receive Modes
| Mode | Behavior | Use Case |
|------|----------|----------|
| `PEEK_LOCK` (default) | Message locked, must complete/abandon | Reliable processing |
| `RECEIVE_AND_DELETE` | Removed immediately on receive | At-most-once delivery |
```python
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
```
## Message Settlement
```python
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
```
| Action | Effect |
|--------|--------|
| `complete_message()` | Remove from queue (success) |
| `abandon_message()` | Release lock, retry immediately |
| `dead_letter_message()` | Move to dead-letter queue |
| `defer_message()` | Set aside, receive by sequence number |
## Topics and Subscriptions
```python
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscription
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
```
## Sessions (FIFO)
```python
# Send with session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Receive from specific session
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Receive from next available session
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
```
## Scheduled Messages
```python
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Schedule message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
await sender.cancel_scheduled_messages(sequence_number)
```
## Dead-Letter Queue
```python
from azure.servicebus import ServiceBusSubQueue
# Receive from dead-letter queue
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
```
## Sync Client (for simple scripts)
```python
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
```
## Best Practices
1. **Use async client** for production workloads
2. **Use context managers** (`async with`) for proper cleanup
3. **Complete messages** after successful processing
4. **Use dead-letter queue** for poison messages
5. **Use sessions** for ordered, FIFO processing
6. **Use message batches** for high-throughput scenarios
7. **Set `max_wait_time`** to avoid infinite blocking
## Reference Files
| File | Contents |
|------|----------|
| [references/patterns.md](references/patterns.md) | Competing consumers, sessions, retry patterns, request-response, transactions |
| [references/dead-letter.md](references/dead-letter.md) | DLQ handling, poison messages, reprocessing strategies |
| [scripts/setup_servicebus.py](scripts/setup_servicebus.py) | CLI for queue/topic/subscription management and DLQ monitoring |

View File

@@ -0,0 +1,245 @@
---
name: azure-messaging-webpubsubservice-py
description: |
Azure Web PubSub Service SDK for Python. Use for real-time messaging, WebSocket connections, and pub/sub patterns.
Triggers: "azure-messaging-webpubsubservice", "WebPubSubServiceClient", "real-time", "WebSocket", "pub/sub".
package: azure-messaging-webpubsubservice
---
# Azure Web PubSub Service SDK for Python
Real-time messaging with WebSocket connections at scale.
## Installation
```bash
# Service SDK (server-side)
pip install azure-messaging-webpubsubservice
# Client SDK (for Python WebSocket clients)
pip install azure-messaging-webpubsubclient
```
## Environment Variables
```bash
AZURE_WEBPUBSUB_CONNECTION_STRING=Endpoint=https://<name>.webpubsub.azure.com;AccessKey=...
AZURE_WEBPUBSUB_HUB=my-hub
```
## Service Client (Server-Side)
### Authentication
```python
from azure.messaging.webpubsubservice import WebPubSubServiceClient
# Connection string
client = WebPubSubServiceClient.from_connection_string(
connection_string=os.environ["AZURE_WEBPUBSUB_CONNECTION_STRING"],
hub="my-hub"
)
# Entra ID
from azure.identity import DefaultAzureCredential
client = WebPubSubServiceClient(
endpoint="https://<name>.webpubsub.azure.com",
hub="my-hub",
credential=DefaultAzureCredential()
)
```
### Generate Client Access Token
```python
# Token for anonymous user
token = client.get_client_access_token()
print(f"URL: {token['url']}")
# Token with user ID
token = client.get_client_access_token(
user_id="user123",
roles=["webpubsub.sendToGroup", "webpubsub.joinLeaveGroup"]
)
# Token with groups
token = client.get_client_access_token(
user_id="user123",
groups=["group1", "group2"]
)
```
### Send to All Clients
```python
# Send text
client.send_to_all(message="Hello everyone!", content_type="text/plain")
# Send JSON
client.send_to_all(
message={"type": "notification", "data": "Hello"},
content_type="application/json"
)
```
### Send to User
```python
client.send_to_user(
user_id="user123",
message="Hello user!",
content_type="text/plain"
)
```
### Send to Group
```python
client.send_to_group(
group="my-group",
message="Hello group!",
content_type="text/plain"
)
```
### Send to Connection
```python
client.send_to_connection(
connection_id="abc123",
message="Hello connection!",
content_type="text/plain"
)
```
### Group Management
```python
# Add user to group
client.add_user_to_group(group="my-group", user_id="user123")
# Remove user from group
client.remove_user_from_group(group="my-group", user_id="user123")
# Add connection to group
client.add_connection_to_group(group="my-group", connection_id="abc123")
# Remove connection from group
client.remove_connection_from_group(group="my-group", connection_id="abc123")
```
### Connection Management
```python
# Check if connection exists
exists = client.connection_exists(connection_id="abc123")
# Check if user has connections
exists = client.user_exists(user_id="user123")
# Check if group has connections
exists = client.group_exists(group="my-group")
# Close connection
client.close_connection(connection_id="abc123", reason="Session ended")
# Close all connections for user
client.close_all_connections(user_id="user123")
```
### Grant/Revoke Permissions
```python
from azure.messaging.webpubsubservice import WebPubSubServiceClient
# Grant permission
client.grant_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
# Revoke permission
client.revoke_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
# Check permission
has_permission = client.check_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
```
## Client SDK (Python WebSocket Client)
```python
from azure.messaging.webpubsubclient import WebPubSubClient
client = WebPubSubClient(credential=token["url"])
# Event handlers
@client.on("connected")
def on_connected(e):
print(f"Connected: {e.connection_id}")
@client.on("server-message")
def on_message(e):
print(f"Message: {e.data}")
@client.on("group-message")
def on_group_message(e):
print(f"Group {e.group}: {e.data}")
# Connect and send
client.open()
client.send_to_group("my-group", "Hello from Python!")
```
## Async Service Client
```python
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
from azure.identity.aio import DefaultAzureCredential
async def broadcast():
credential = DefaultAzureCredential()
client = WebPubSubServiceClient(
endpoint="https://<name>.webpubsub.azure.com",
hub="my-hub",
credential=credential
)
await client.send_to_all("Hello async!", content_type="text/plain")
await client.close()
await credential.close()
```
## Client Operations
| Operation | Description |
|-----------|-------------|
| `get_client_access_token` | Generate WebSocket connection URL |
| `send_to_all` | Broadcast to all connections |
| `send_to_user` | Send to specific user |
| `send_to_group` | Send to group members |
| `send_to_connection` | Send to specific connection |
| `add_user_to_group` | Add user to group |
| `remove_user_from_group` | Remove user from group |
| `close_connection` | Disconnect client |
| `connection_exists` | Check connection status |
## Best Practices
1. **Use roles** to limit client permissions
2. **Use groups** for targeted messaging
3. **Generate short-lived tokens** for security
4. **Use user IDs** to send to users across connections
5. **Handle reconnection** in client applications
6. **Use JSON** content type for structured data
7. **Close connections** gracefully with reasons