feat: add temporal-golang-pro skill

This commit is contained in:
Khanh
2026-02-27 09:00:05 +07:00
parent 3a0dbf6cfe
commit c4df829d6b
3 changed files with 602 additions and 0 deletions

View File

@@ -0,0 +1,215 @@
---
name: temporal-golang-pro
description: "Use when building durable distributed systems with Temporal Go SDK. Covers deterministic workflow rules, mTLS worker configs, and advanced patterns."
risk: safe
source: self
---
# Temporal Go SDK (temporal-golang-pro)
## Overview
Expert-level guide for building resilient, scalable, and deterministic distributed systems using the Temporal Go SDK. This skill transforms vague orchestration requirements into production-grade Go implementations, focusing on durable execution, strict determinism, and enterprise-scale worker configuration.
## When to Use This Skill
- **Designing Distributed Systems**: When building microservices that require durable state and reliable orchestration.
- **Implementing Complex Workflows**: Using the Go SDK to handle long-running processes (days/months) or complex Saga patterns.
- **Optimizing Performance**: When workers need fine-tuned concurrency, mTLS security, or custom interceptors.
- **Ensuring Reliability**: Implementing idempotent activities, graceful error handling, and sophisticated retry policies.
- **Maintenance & Evolution**: Versioning running workflows or performing zero-downtime worker updates.
## Do not use this skill when
- Using Temporal with other SDKs (Python, Java, TypeScript) - refer to their specific `-pro` skills.
- The task is a simple request/response without durability or coordination needs.
- High-level design without implementation (use `workflow-orchestration-patterns`).
## Step-by-Step Guide
1. **Gather Context**: Proactively ask for:
- Target **Temporal Cluster** (Cloud vs. Self-hosted) and **Namespace**.
- **Task Queue** names and expected throughput.
- **Security requirements** (mTLS paths, authentication).
- **Failure modes** and desired retry/timeout policies.
2. **Verify Determinism**: Before suggesting workflow code, verify against these **5 Rules**:
- No native Go concurrency (goroutines).
- No native time (`time.Now`, `time.Sleep`).
- No non-deterministic map iteration (must sort keys).
- No direct external I/O or network calls.
- No non-deterministic random numbers.
3. **Implement Incrementally**: Start with shared Protobuf/Data classes, then Activities, then Workflows, and finally Workers.
4. **Leverage Resources**: If the implementation requires advanced patterns (Sagas, Interceptors, Replay Testing), explicitly refer to the implementation playbook and testing strategies.
## Capabilities
### Go SDK Implementation
- **Worker Management**: Deep knowledge of `worker.Options`, including `MaxConcurrentActivityTaskPollers`, `WorkerStopTimeout`, and `StickyScheduleToStartTimeout`.
- **Interceptors**: Implementing Client, Worker, and Workflow interceptors for cross-cutting concerns (logging, tracing, auth).
- **Custom Data Converters**: Integrating Protobuf, encrypted payloads, or custom JSON marshaling.
### Advanced Workflow Patterns
- **Durable Concurrency**: Using `workflow.Go`, `workflow.Channel`, and `workflow.Selector` instead of native primitives.
- **Versioning**: Implementing safe code evolution using `workflow.GetVersion` and `workflow.GetReplaySafeLogger`.
- **Large-scale Processing**: Pattern for `ContinueAsNew` to manage history size limits (defaults: 50MB or 50K events).
- **Child Workflows**: Managing lifecycle, cancellation, and parent-child signal propagation.
### Testing & Observability
- **Testsuite Mastery**: Using `WorkflowTestSuite` for unit and functional testing with deterministic time control.
- **Mocking**: Sophisticated activity and child workflow mocking strategies.
- **Replay Testing**: Validating code changes against production event histories.
- **Metrics**: Configuring Prometheus/OpenTelemetry exporters for worker performance tracking.
## Examples
### Example 1: Versioned Workflow (Deterministic)
```go
// Note: imports omitted. Requires 'go.temporal.io/sdk/workflow', 'go.temporal.io/sdk/temporal', and 'time'.
func SubscriptionWorkflow(ctx workflow.Context, userID string) error {
// 1. Versioning for logic evolution (v1 = DefaultVersion)
v := workflow.GetVersion(ctx, "billing_logic", workflow.DefaultVersion, 2)
for i := 0; i < 12; i++ {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 2. Activity Execution (Always handle errors)
err := workflow.ExecuteActivity(ctx, ChargePaymentActivity, userID).Get(ctx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("Payment failed", "Error", err)
return err
}
// 3. Durable Sleep (Time-skipping safe)
sleepDuration := 30 * 24 * time.Hour
if v >= 2 {
sleepDuration = 28 * 24 * time.Hour
}
if err := workflow.Sleep(ctx, sleepDuration); err != nil {
return err
}
}
return nil
}
```
### Example 2: Full mTLS Worker Setup
```go
func RunSecureWorker() error {
// 1. Load Client Certificate and Key
cert, err := tls.LoadX509KeyPair("client.pem", "client.key")
if err != nil {
return fmt.Errorf("failed to load client keys: %w", err)
}
// 2. Load CA Certificate for Server verification (Proper mTLS)
caPem, err := os.ReadFile("ca.pem")
if err != nil {
return fmt.Errorf("failed to read CA cert: %w", err)
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPem) {
return fmt.Errorf("failed to parse CA cert")
}
// 3. Dial Cluster with full TLS config
c, err := client.Dial(client.Options{
HostPort: "temporal.example.com:7233",
Namespace: "production",
ConnectionOptions: client.ConnectionOptions{
TLS: &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
},
},
})
if err != nil {
return fmt.Errorf("failed to dial temporal: %w", err)
}
defer c.Close()
w := worker.New(c, "payment-queue", worker.Options{})
w.RegisterWorkflow(SubscriptionWorkflow)
if err := w.Run(worker.InterruptCh()); err != nil {
return fmt.Errorf("worker run failed: %w", err)
}
return nil
}
```
### Example 3: Selector & Signal Integration
```go
func ApprovalWorkflow(ctx workflow.Context) (string, error) {
var approved bool
signalCh := workflow.GetSignalChannel(ctx, "approval-signal")
// Use Selector to wait for multiple async events
s := workflow.NewSelector(ctx)
s.AddReceive(signalCh, func(c workflow.ReceiveChannel, _ bool) {
c.Receive(ctx, &approved)
})
// Add 72-hour timeout timer
s.AddReceive(workflow.NewTimer(ctx, 72*time.Hour).GetChannel(), func(c workflow.ReceiveChannel, _ bool) {
approved = false
})
s.Select(ctx)
if !approved {
return "rejected", nil
}
return "approved", nil
}
```
## Best Practices
-**Do:** Always handle errors from `ExecuteActivity` and `client.Dial`.
-**Do:** Use `workflow.Go` and `workflow.Channel` for concurrency.
-**Do:** Sort map keys before iteration to maintain determinism.
-**Do:** Use `activity.RecordHeartbeat` for activities lasting > 1 minute.
-**Do:** Test logic compatibility using `replayer.ReplayWorkflowHistoryFromJSON`.
-**Don't:** Swallow errors with `_` or `log.Fatal` in production workers.
-**Don't:** Perform direct Network/Disk I/O inside a Workflow function.
-**Don't:** Rely on native `time.Now()` or `rand.Int()`.
-**Don't:** Apply this to simple cron jobs that don't require durability.
## Troubleshooting
- **Panic: Determinism Mismatch**: Usually caused by logic changes without `workflow.GetVersion` or non-deterministic code (e.g., native maps).
- **Error: History Size Exceeded**: History limit reached (default 50K events). Ensure `ContinueAsNew` is implemented.
- **Worker Hang**: Check `WorkerStopTimeout` and ensure all activities handle context cancellation.
## Limitations
- Does not cover Temporal Cloud UI navigation or TLS certificate provisioning workflows.
- Does not cover Temporal Java, Python, or TypeScript SDKs; refer to their dedicated `-pro` skills.
- Assumes Temporal Server v1.20+ and Go SDK v1.25+; older SDK versions may have different APIs.
- Does not cover experimental Temporal features (e.g., Nexus, Multi-cluster Replication).
- Does not address global namespace configuration or multi-region failover setup.
- Does not cover Temporal Worker versioning via the `worker-versioning` feature flag (experimental).
## Resources
- [Implementation Playbook](resources/implementation-playbook.md) - Deep dive into Go SDK patterns.
- [Testing Strategies](resources/testing-strategies.md) - Unit, Replay, and Integration testing for Go.
- [Temporal Go SDK Reference](https://pkg.go.dev/go.temporal.io/sdk)
- [Temporal Go Samples](https://github.com/temporalio/samples-go)
## Related Skills
- `grpc-golang` - Internal transport protocol and Protobuf design.
- `golang-pro` - General Go performance tuning and advanced syntax.
- `workflow-orchestration-patterns` - Language-agnostic orchestration strategy.

View File

@@ -0,0 +1,242 @@
# Temporal Go Implementation Playbook
This playbook provides production-ready patterns and deep technical guidance for implementing durable orchestration with the Temporal Go SDK.
## Table of Contents
1. [The Deterministic Commandments](#the-deterministic-commandments)
2. [Workflow Versioning](#workflow-versioning)
3. [Activity Design & Idempotency](#activity-design--idempotency)
4. [Worker Configuration for Scale](#worker-configuration-for-scale)
5. [Context & Heartbeating](#context--heartbeating)
6. [Interceptors & Observability](#interceptors--observability)
---
## 1. The Deterministic Commandments
In Go, workflows are state machines that must replay identically. Violating these rules causes "Determinism Mismatch" errors.
### ❌ Never Use Native Go Concurrency
- **Wrong:** `go myFunc()`
- **Right:** `workflow.Go(ctx, func(ctx workflow.Context) { ... })`
- **Why:** `workflow.Go` allows the Temporal orchestrator to track and pause goroutines during replay.
### ❌ Never Use Native Time
- **Wrong:** `time.Now()`, `time.Sleep(d)`, `time.After(d)`
- **Right:** `workflow.Now(ctx)`, `workflow.Sleep(ctx, d)`, `workflow.NewTimer(ctx, d)`
### ❌ Never Use Non-Deterministic Map Iteration
- **Wrong:** `for k, v := range myMap { ... }`
- **Right:** Collect keys, sort them, then iterate.
- ```go
keys := make([]string, 0, len(myMap))
for k := range myMap { keys = append(keys, k) }
sort.Strings(keys)
for _, k := range keys { v := myMap[k]; ... }
```
### ❌ Never Perform Direct External I/O
- **Wrong:** `http.Get("https://api.example.com")` or `os.ReadFile("data.txt")` inside a workflow.
- **Right:** Wrap all I/O in an Activity and call it with `workflow.ExecuteActivity`.
- **Why:** External calls are non-deterministic; their results change between replays.
### ❌ Never Use Non-Deterministic Random Numbers
- **Wrong:** `rand.Int()`, `uuid.New()` inside a workflow.
- **Right:** Pass random seeds or UUIDs as workflow input arguments, or generate them inside an Activity.
- **Why:** `rand.Int()` produces different values on each replay, causing a determinism mismatch.
---
## 2. Workflow Versioning
When you need to change logic in a running workflow, you MUST use `workflow.GetVersion`.
### Pattern: Safe Logic Update
```go
const VersionV2 = 1
func MyWorkflow(ctx workflow.Context) error {
v := workflow.GetVersion(ctx, "ChangePaymentStep", workflow.DefaultVersion, VersionV2)
if v == workflow.DefaultVersion {
// Old logic: kept alive until all pre-existing workflow runs complete.
return workflow.ExecuteActivity(ctx, OldActivity).Get(ctx, nil)
}
// New logic: all new and resumed workflow runs use this path.
return workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil)
}
```
### Pattern: Cleanup After Full Migration
Once you have confirmed **no running workflow instances** are on `DefaultVersion` (verify via Temporal Web UI or `tctl`), you can safely remove the old branch:
```go
func MyWorkflow(ctx workflow.Context) error {
// Pin minimum version to V2; histories from before the migration will
// fail the determinism check (replay error) if they replay against this code.
// Only remove the old branch after confirming zero running instances on DefaultVersion.
workflow.GetVersion(ctx, "ChangePaymentStep", VersionV2, VersionV2)
return workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil)
}
```
---
## 3. Activity Design & Idempotency
Activities can execute multiple times. They must be idempotent.
### Pattern: Upsert instead of Insert
Instead of a simple `INSERT`, use `UPSERT` or "Check-then-Act" with an idempotency key (like `WorkflowID` or `RunID`).
```go
func (a *Activities) ProcessPayment(ctx context.Context, req PaymentRequest) error {
info := activity.GetInfo(ctx)
// Use info.WorkflowExecution.ID as part of your idempotency key in DB
return a.db.UpsertPayment(req, info.WorkflowExecution.ID)
}
```
---
## 4. Worker Configuration for Scale
### Optimized Worker Options
```go
w := worker.New(c, "task-queue", worker.Options{
MaxConcurrentActivityExecutionSize: 100, // Limit based on resource constraints
MaxConcurrentWorkflowTaskExecutionSize: 50,
WorkerActivitiesPerSecond: 200, // Rate limit for this worker cluster
WorkerStopTimeout: time.Minute, // Allow activities to finish
})
```
---
## 5. Context & Heartbeating
### Propagating Metadata
Use `Workflow Interceptors` or custom `Header` propagation to pass tracing ID or user identity along the call chain.
### Activity Heartbeating
Mandatory for long-running activities to detect worker crashes before the `StartToCloseTimeout` expires.
```go
func LongRunningActivity(ctx context.Context) error {
for i := 0; i < 100; i++ {
activity.RecordHeartbeat(ctx, i) // Report progress
select {
case <-ctx.Done():
return ctx.Err() // Handle cancellation
default:
// Do work
}
}
return nil
}
```
---
## 6. Interceptors & Observability
### Custom Workflow Interceptors
Use interceptors to inject structured logging (Zap/Slog) or perform global error classification.
The interceptor must be wired via a root `WorkerInterceptor` that Temporal instantiates per workflow task.
```go
// Step 1: Implement the root WorkerInterceptor (registered on worker.Options)
type MyWorkerInterceptor struct {
interceptor.WorkerInterceptorBase
}
func (w *MyWorkerInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
return &myWorkflowInboundInterceptor{next: next}
}
// Step 2: Implement the per-workflow inbound interceptor
type myWorkflowInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
next interceptor.WorkflowInboundInterceptor
}
func (i *myWorkflowInboundInterceptor) ExecuteWorkflow(
ctx workflow.Context,
input *interceptor.ExecuteWorkflowInput,
) (interface{}, error) {
workflow.GetLogger(ctx).Info("Workflow started", "type", workflow.GetInfo(ctx).WorkflowType.Name)
result, err := i.next.ExecuteWorkflow(ctx, input)
if err != nil {
workflow.GetLogger(ctx).Error("Workflow failed", "error", err)
}
return result, err
}
// Step 3: Register on the worker
w := worker.New(c, "task-queue", worker.Options{
Interceptors: []interceptor.WorkerInterceptor{&MyWorkerInterceptor{}},
})
```
---
## Anti-Patterns to Avoid
1. **Massive Workflows:** Keeping too much state in a single workflow. Use `ContinueAsNew` if event history exceeds 50K events.
2. **Fat Activities:** Doing orchestration inside an activity. Activities should be unit-of-work.
3. **Global Variables:** Using global variables in workflows. They will not be preserved across worker restarts.
4. **Native Concurrency in Workflows:** Using `go` routines, `mutexes`, or `channels` will cause race conditions and determinism errors during replay.
---
## 7. SideEffect and MutableSideEffect
Use `workflow.SideEffect` when you need a **single non-deterministic value** captured once and replayed identically — for example, generating a UUID or reading a one-time config snapshot inside a workflow.
```go
// SideEffect: called only on first execution; result is recorded in history and
// replayed deterministically on all subsequent replays.
// Requires: "go.temporal.io/sdk/workflow"
encodedID := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.NewString()
})
var requestID string
if err := encodedID.Get(&requestID); err != nil {
return err
}
```
**When to use `MutableSideEffect`**: When the value may change across workflow tasks but must still be deterministic per history event (e.g., a feature flag that updates while the workflow is running).
```go
// MutableSideEffect: re-evaluated on each workflow task, but only recorded in
// history when the value changes from the previous recorded value.
encodedFlag := workflow.MutableSideEffect(ctx, "feature-flag-v2",
func(ctx workflow.Context) interface{} {
return featureFlagEnabled // read from workflow-local state, NOT an external call
},
func(a, b interface{}) bool { return a.(bool) == b.(bool) },
)
var enabled bool
encodedFlag.Get(&enabled)
```
> **Warning:** Do NOT use `SideEffect` as a workaround to call external APIs (HTTP, DB) inside a workflow. All external I/O must still go through Activities.

View File

@@ -0,0 +1,145 @@
# Temporal Go Testing Strategies
Testing workflows and activities in Go requires a deep understanding of the `testsuite` package, which provides a mocked environment with deterministic time-skipping.
## The Test Suite Setup
Always use the `WorkflowTestSuite` to maintain state across multiple tests in a file.
```go
// Requires: "github.com/stretchr/testify/suite", "go.temporal.io/sdk/testsuite"
type MyTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *MyTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func TestMyTestSuite(t *testing.T) {
suite.Run(t, new(MyTestSuite))
}
```
## 1. Unit Testing Workflows
The most powerful feature of the Go SDK is **Time-skipping**. A workflow that sleeps for 30 days will finish in milliseconds in your test.
### Mocking Activities
You must register activity mocks before running the workflow.
```go
func (s *MyTestSuite) Test_SuccessfulWorkflow() {
// Mock the activity
s.env.OnActivity(MyActivity, mock.Anything, "input").Return("output", nil)
s.env.ExecuteWorkflow(MyWorkflow, "input")
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var result string
s.env.GetWorkflowResult(&result)
s.Equal("Completed", result)
}
```
### Mocking Child Workflows
Similar to activities, use `OnChildWorkflow`.
```go
s.env.OnChildWorkflow(MyChildWorkflow, mock.Anything, "args").Return("result", nil)
```
## 2. Unit Testing Activities
Use `TestActivityEnvironment` to test activities in isolation.
```go
// Requires: "go.temporal.io/sdk/testsuite", "github.com/stretchr/testify/assert"
func Test_Activity(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
env.RegisterActivity(MyActivity)
val, err := env.ExecuteActivity(MyActivity, "input")
assert.NoError(t, err)
var result string
val.Get(&result)
assert.Equal(t, "expected", result)
}
```
## 3. Replay Testing (Determinism Check)
Replay testing ensures that new code changes won't break existing, running workflows.
```go
func Test_ReplayStaticHistory(t *testing.T) {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(MyWorkflow)
// Load history from JSON file (exported from Temporal Web UI or CLI).
// Web UI: Workflow Detail -> Download History (JSON)
// CLI: temporal workflow show --workflow-id <id> --namespace <ns> --output json > history.json
err := replayer.ReplayWorkflowHistoryFromJSONFile(
worker.ReplayWorkflowHistoryFromJSONFileOptions{},
"history.json",
)
assert.NoError(t, err)
}
```
## 4. Testing Signals and Queries
You can send signals during a test at specific points in time.
```go
func (s *MyTestSuite) Test_WorkflowWithSignal() {
// Delayed signal
s.env.RegisterDelayedCallback(func() {
s.env.SignalWorkflow("my-signal", "data")
}, time.Hour) // This hour passes instantly!
s.env.ExecuteWorkflow(MyWorkflow)
// Query state after signal
res, err := s.env.QueryWorkflow("get-state")
s.NoError(err)
var state string
res.Get(&state)
s.Equal("SignalReceived", state)
}
```
## Best Practices for Testing
- **>=80% Coverage**: Aim for high coverage on workflow logic since activities are often just wrappers around DB/API calls.
- **Assertion-based**: Use `testify/assert` or `testify/suite` for clean assertions.
- **Mock Everything External**: Never call a real database or API in a unit test.
- **Test Failure Paths**: Explicitly test what happens when an activity returns an error or when a heartbeat times out.
### Example: Testing an Activity Failure Path
```go
func (s *MyTestSuite) Test_WorkflowHandlesActivityError() {
// Mock the activity to return a non-retryable error
s.env.OnActivity(ChargePaymentActivity, mock.Anything, mock.Anything).
Return("", temporal.NewNonRetryableApplicationError("card declined", "PaymentError", nil))
s.env.ExecuteWorkflow(SubscriptionWorkflow, "user-123")
s.True(s.env.IsWorkflowCompleted())
// Verify the workflow correctly surfaces the error
err := s.env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "card declined")
}
```