diff --git a/skills/temporal-golang-pro/SKILL.md b/skills/temporal-golang-pro/SKILL.md new file mode 100644 index 00000000..9d31cf62 --- /dev/null +++ b/skills/temporal-golang-pro/SKILL.md @@ -0,0 +1,215 @@ +--- +name: temporal-golang-pro +description: "Use when building durable distributed systems with Temporal Go SDK. Covers deterministic workflow rules, mTLS worker configs, and advanced patterns." +risk: safe +source: self +--- + +# Temporal Go SDK (temporal-golang-pro) + +## Overview + +Expert-level guide for building resilient, scalable, and deterministic distributed systems using the Temporal Go SDK. This skill transforms vague orchestration requirements into production-grade Go implementations, focusing on durable execution, strict determinism, and enterprise-scale worker configuration. + +## When to Use This Skill + +- **Designing Distributed Systems**: When building microservices that require durable state and reliable orchestration. +- **Implementing Complex Workflows**: Using the Go SDK to handle long-running processes (days/months) or complex Saga patterns. +- **Optimizing Performance**: When workers need fine-tuned concurrency, mTLS security, or custom interceptors. +- **Ensuring Reliability**: Implementing idempotent activities, graceful error handling, and sophisticated retry policies. +- **Maintenance & Evolution**: Versioning running workflows or performing zero-downtime worker updates. + +## Do not use this skill when + +- Using Temporal with other SDKs (Python, Java, TypeScript) - refer to their specific `-pro` skills. +- The task is a simple request/response without durability or coordination needs. +- High-level design without implementation (use `workflow-orchestration-patterns`). + +## Step-by-Step Guide + +1. **Gather Context**: Proactively ask for: + - Target **Temporal Cluster** (Cloud vs. Self-hosted) and **Namespace**. + - **Task Queue** names and expected throughput. + - **Security requirements** (mTLS paths, authentication). + - **Failure modes** and desired retry/timeout policies. +2. **Verify Determinism**: Before suggesting workflow code, verify against these **5 Rules**: + - No native Go concurrency (goroutines). + - No native time (`time.Now`, `time.Sleep`). + - No non-deterministic map iteration (must sort keys). + - No direct external I/O or network calls. + - No non-deterministic random numbers. +3. **Implement Incrementally**: Start with shared Protobuf/Data classes, then Activities, then Workflows, and finally Workers. +4. **Leverage Resources**: If the implementation requires advanced patterns (Sagas, Interceptors, Replay Testing), explicitly refer to the implementation playbook and testing strategies. + +## Capabilities + +### Go SDK Implementation + +- **Worker Management**: Deep knowledge of `worker.Options`, including `MaxConcurrentActivityTaskPollers`, `WorkerStopTimeout`, and `StickyScheduleToStartTimeout`. +- **Interceptors**: Implementing Client, Worker, and Workflow interceptors for cross-cutting concerns (logging, tracing, auth). +- **Custom Data Converters**: Integrating Protobuf, encrypted payloads, or custom JSON marshaling. + +### Advanced Workflow Patterns + +- **Durable Concurrency**: Using `workflow.Go`, `workflow.Channel`, and `workflow.Selector` instead of native primitives. +- **Versioning**: Implementing safe code evolution using `workflow.GetVersion` and `workflow.GetReplaySafeLogger`. +- **Large-scale Processing**: Pattern for `ContinueAsNew` to manage history size limits (defaults: 50MB or 50K events). +- **Child Workflows**: Managing lifecycle, cancellation, and parent-child signal propagation. + +### Testing & Observability + +- **Testsuite Mastery**: Using `WorkflowTestSuite` for unit and functional testing with deterministic time control. +- **Mocking**: Sophisticated activity and child workflow mocking strategies. +- **Replay Testing**: Validating code changes against production event histories. +- **Metrics**: Configuring Prometheus/OpenTelemetry exporters for worker performance tracking. + +## Examples + +### Example 1: Versioned Workflow (Deterministic) + +```go +// Note: imports omitted. Requires 'go.temporal.io/sdk/workflow', 'go.temporal.io/sdk/temporal', and 'time'. +func SubscriptionWorkflow(ctx workflow.Context, userID string) error { + // 1. Versioning for logic evolution (v1 = DefaultVersion) + v := workflow.GetVersion(ctx, "billing_logic", workflow.DefaultVersion, 2) + + for i := 0; i < 12; i++ { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3}, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + // 2. Activity Execution (Always handle errors) + err := workflow.ExecuteActivity(ctx, ChargePaymentActivity, userID).Get(ctx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("Payment failed", "Error", err) + return err + } + + // 3. Durable Sleep (Time-skipping safe) + sleepDuration := 30 * 24 * time.Hour + if v >= 2 { + sleepDuration = 28 * 24 * time.Hour + } + + if err := workflow.Sleep(ctx, sleepDuration); err != nil { + return err + } + } + return nil +} +``` + +### Example 2: Full mTLS Worker Setup + +```go +func RunSecureWorker() error { + // 1. Load Client Certificate and Key + cert, err := tls.LoadX509KeyPair("client.pem", "client.key") + if err != nil { + return fmt.Errorf("failed to load client keys: %w", err) + } + + // 2. Load CA Certificate for Server verification (Proper mTLS) + caPem, err := os.ReadFile("ca.pem") + if err != nil { + return fmt.Errorf("failed to read CA cert: %w", err) + } + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caPem) { + return fmt.Errorf("failed to parse CA cert") + } + + // 3. Dial Cluster with full TLS config + c, err := client.Dial(client.Options{ + HostPort: "temporal.example.com:7233", + Namespace: "production", + ConnectionOptions: client.ConnectionOptions{ + TLS: &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: certPool, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to dial temporal: %w", err) + } + defer c.Close() + + w := worker.New(c, "payment-queue", worker.Options{}) + w.RegisterWorkflow(SubscriptionWorkflow) + + if err := w.Run(worker.InterruptCh()); err != nil { + return fmt.Errorf("worker run failed: %w", err) + } + return nil +} +``` + +### Example 3: Selector & Signal Integration + +```go +func ApprovalWorkflow(ctx workflow.Context) (string, error) { + var approved bool + signalCh := workflow.GetSignalChannel(ctx, "approval-signal") + + // Use Selector to wait for multiple async events + s := workflow.NewSelector(ctx) + s.AddReceive(signalCh, func(c workflow.ReceiveChannel, _ bool) { + c.Receive(ctx, &approved) + }) + + // Add 72-hour timeout timer + s.AddReceive(workflow.NewTimer(ctx, 72*time.Hour).GetChannel(), func(c workflow.ReceiveChannel, _ bool) { + approved = false + }) + + s.Select(ctx) + + if !approved { + return "rejected", nil + } + return "approved", nil +} +``` + +## Best Practices + +- ✅ **Do:** Always handle errors from `ExecuteActivity` and `client.Dial`. +- ✅ **Do:** Use `workflow.Go` and `workflow.Channel` for concurrency. +- ✅ **Do:** Sort map keys before iteration to maintain determinism. +- ✅ **Do:** Use `activity.RecordHeartbeat` for activities lasting > 1 minute. +- ✅ **Do:** Test logic compatibility using `replayer.ReplayWorkflowHistoryFromJSON`. +- ❌ **Don't:** Swallow errors with `_` or `log.Fatal` in production workers. +- ❌ **Don't:** Perform direct Network/Disk I/O inside a Workflow function. +- ❌ **Don't:** Rely on native `time.Now()` or `rand.Int()`. +- ❌ **Don't:** Apply this to simple cron jobs that don't require durability. + +## Troubleshooting + +- **Panic: Determinism Mismatch**: Usually caused by logic changes without `workflow.GetVersion` or non-deterministic code (e.g., native maps). +- **Error: History Size Exceeded**: History limit reached (default 50K events). Ensure `ContinueAsNew` is implemented. +- **Worker Hang**: Check `WorkerStopTimeout` and ensure all activities handle context cancellation. + +## Limitations + +- Does not cover Temporal Cloud UI navigation or TLS certificate provisioning workflows. +- Does not cover Temporal Java, Python, or TypeScript SDKs; refer to their dedicated `-pro` skills. +- Assumes Temporal Server v1.20+ and Go SDK v1.25+; older SDK versions may have different APIs. +- Does not cover experimental Temporal features (e.g., Nexus, Multi-cluster Replication). +- Does not address global namespace configuration or multi-region failover setup. +- Does not cover Temporal Worker versioning via the `worker-versioning` feature flag (experimental). + +## Resources + +- [Implementation Playbook](resources/implementation-playbook.md) - Deep dive into Go SDK patterns. +- [Testing Strategies](resources/testing-strategies.md) - Unit, Replay, and Integration testing for Go. +- [Temporal Go SDK Reference](https://pkg.go.dev/go.temporal.io/sdk) +- [Temporal Go Samples](https://github.com/temporalio/samples-go) + +## Related Skills + +- `grpc-golang` - Internal transport protocol and Protobuf design. +- `golang-pro` - General Go performance tuning and advanced syntax. +- `workflow-orchestration-patterns` - Language-agnostic orchestration strategy. diff --git a/skills/temporal-golang-pro/resources/implementation-playbook.md b/skills/temporal-golang-pro/resources/implementation-playbook.md new file mode 100644 index 00000000..8c973bbb --- /dev/null +++ b/skills/temporal-golang-pro/resources/implementation-playbook.md @@ -0,0 +1,242 @@ +# Temporal Go Implementation Playbook + +This playbook provides production-ready patterns and deep technical guidance for implementing durable orchestration with the Temporal Go SDK. + +## Table of Contents + +1. [The Deterministic Commandments](#the-deterministic-commandments) +2. [Workflow Versioning](#workflow-versioning) +3. [Activity Design & Idempotency](#activity-design--idempotency) +4. [Worker Configuration for Scale](#worker-configuration-for-scale) +5. [Context & Heartbeating](#context--heartbeating) +6. [Interceptors & Observability](#interceptors--observability) + +--- + +## 1. The Deterministic Commandments + +In Go, workflows are state machines that must replay identically. Violating these rules causes "Determinism Mismatch" errors. + +### ❌ Never Use Native Go Concurrency + +- **Wrong:** `go myFunc()` +- **Right:** `workflow.Go(ctx, func(ctx workflow.Context) { ... })` +- **Why:** `workflow.Go` allows the Temporal orchestrator to track and pause goroutines during replay. + +### ❌ Never Use Native Time + +- **Wrong:** `time.Now()`, `time.Sleep(d)`, `time.After(d)` +- **Right:** `workflow.Now(ctx)`, `workflow.Sleep(ctx, d)`, `workflow.NewTimer(ctx, d)` + +### ❌ Never Use Non-Deterministic Map Iteration + +- **Wrong:** `for k, v := range myMap { ... }` +- **Right:** Collect keys, sort them, then iterate. +- ```go + keys := make([]string, 0, len(myMap)) + for k := range myMap { keys = append(keys, k) } + sort.Strings(keys) + for _, k := range keys { v := myMap[k]; ... } + ``` + +### ❌ Never Perform Direct External I/O + +- **Wrong:** `http.Get("https://api.example.com")` or `os.ReadFile("data.txt")` inside a workflow. +- **Right:** Wrap all I/O in an Activity and call it with `workflow.ExecuteActivity`. +- **Why:** External calls are non-deterministic; their results change between replays. + +### ❌ Never Use Non-Deterministic Random Numbers + +- **Wrong:** `rand.Int()`, `uuid.New()` inside a workflow. +- **Right:** Pass random seeds or UUIDs as workflow input arguments, or generate them inside an Activity. +- **Why:** `rand.Int()` produces different values on each replay, causing a determinism mismatch. + +--- + +## 2. Workflow Versioning + +When you need to change logic in a running workflow, you MUST use `workflow.GetVersion`. + +### Pattern: Safe Logic Update + +```go +const VersionV2 = 1 + +func MyWorkflow(ctx workflow.Context) error { + v := workflow.GetVersion(ctx, "ChangePaymentStep", workflow.DefaultVersion, VersionV2) + + if v == workflow.DefaultVersion { + // Old logic: kept alive until all pre-existing workflow runs complete. + return workflow.ExecuteActivity(ctx, OldActivity).Get(ctx, nil) + } + // New logic: all new and resumed workflow runs use this path. + return workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil) +} +``` + +### Pattern: Cleanup After Full Migration + +Once you have confirmed **no running workflow instances** are on `DefaultVersion` (verify via Temporal Web UI or `tctl`), you can safely remove the old branch: + +```go +func MyWorkflow(ctx workflow.Context) error { + // Pin minimum version to V2; histories from before the migration will + // fail the determinism check (replay error) if they replay against this code. + // Only remove the old branch after confirming zero running instances on DefaultVersion. + workflow.GetVersion(ctx, "ChangePaymentStep", VersionV2, VersionV2) + return workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil) +} +``` + +--- + +## 3. Activity Design & Idempotency + +Activities can execute multiple times. They must be idempotent. + +### Pattern: Upsert instead of Insert + +Instead of a simple `INSERT`, use `UPSERT` or "Check-then-Act" with an idempotency key (like `WorkflowID` or `RunID`). + +```go +func (a *Activities) ProcessPayment(ctx context.Context, req PaymentRequest) error { + info := activity.GetInfo(ctx) + // Use info.WorkflowExecution.ID as part of your idempotency key in DB + return a.db.UpsertPayment(req, info.WorkflowExecution.ID) +} +``` + +--- + +## 4. Worker Configuration for Scale + +### Optimized Worker Options + +```go +w := worker.New(c, "task-queue", worker.Options{ + MaxConcurrentActivityExecutionSize: 100, // Limit based on resource constraints + MaxConcurrentWorkflowTaskExecutionSize: 50, + WorkerActivitiesPerSecond: 200, // Rate limit for this worker cluster + WorkerStopTimeout: time.Minute, // Allow activities to finish +}) +``` + +--- + +## 5. Context & Heartbeating + +### Propagating Metadata + +Use `Workflow Interceptors` or custom `Header` propagation to pass tracing ID or user identity along the call chain. + +### Activity Heartbeating + +Mandatory for long-running activities to detect worker crashes before the `StartToCloseTimeout` expires. + +```go +func LongRunningActivity(ctx context.Context) error { + for i := 0; i < 100; i++ { + activity.RecordHeartbeat(ctx, i) // Report progress + + select { + case <-ctx.Done(): + return ctx.Err() // Handle cancellation + default: + // Do work + } + } + return nil +} +``` + +--- + +## 6. Interceptors & Observability + +### Custom Workflow Interceptors + +Use interceptors to inject structured logging (Zap/Slog) or perform global error classification. +The interceptor must be wired via a root `WorkerInterceptor` that Temporal instantiates per workflow task. + +```go +// Step 1: Implement the root WorkerInterceptor (registered on worker.Options) +type MyWorkerInterceptor struct { + interceptor.WorkerInterceptorBase +} + +func (w *MyWorkerInterceptor) InterceptWorkflow( + ctx workflow.Context, + next interceptor.WorkflowInboundInterceptor, +) interceptor.WorkflowInboundInterceptor { + return &myWorkflowInboundInterceptor{next: next} +} + +// Step 2: Implement the per-workflow inbound interceptor +type myWorkflowInboundInterceptor struct { + interceptor.WorkflowInboundInterceptorBase + next interceptor.WorkflowInboundInterceptor +} + +func (i *myWorkflowInboundInterceptor) ExecuteWorkflow( + ctx workflow.Context, + input *interceptor.ExecuteWorkflowInput, +) (interface{}, error) { + workflow.GetLogger(ctx).Info("Workflow started", "type", workflow.GetInfo(ctx).WorkflowType.Name) + result, err := i.next.ExecuteWorkflow(ctx, input) + if err != nil { + workflow.GetLogger(ctx).Error("Workflow failed", "error", err) + } + return result, err +} + +// Step 3: Register on the worker +w := worker.New(c, "task-queue", worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{&MyWorkerInterceptor{}}, +}) +``` + +--- + +## Anti-Patterns to Avoid + +1. **Massive Workflows:** Keeping too much state in a single workflow. Use `ContinueAsNew` if event history exceeds 50K events. +2. **Fat Activities:** Doing orchestration inside an activity. Activities should be unit-of-work. +3. **Global Variables:** Using global variables in workflows. They will not be preserved across worker restarts. +4. **Native Concurrency in Workflows:** Using `go` routines, `mutexes`, or `channels` will cause race conditions and determinism errors during replay. + +--- + +## 7. SideEffect and MutableSideEffect + +Use `workflow.SideEffect` when you need a **single non-deterministic value** captured once and replayed identically — for example, generating a UUID or reading a one-time config snapshot inside a workflow. + +```go +// SideEffect: called only on first execution; result is recorded in history and +// replayed deterministically on all subsequent replays. +// Requires: "go.temporal.io/sdk/workflow" +encodedID := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return uuid.NewString() +}) + +var requestID string +if err := encodedID.Get(&requestID); err != nil { + return err +} +``` + +**When to use `MutableSideEffect`**: When the value may change across workflow tasks but must still be deterministic per history event (e.g., a feature flag that updates while the workflow is running). + +```go +// MutableSideEffect: re-evaluated on each workflow task, but only recorded in +// history when the value changes from the previous recorded value. +encodedFlag := workflow.MutableSideEffect(ctx, "feature-flag-v2", + func(ctx workflow.Context) interface{} { + return featureFlagEnabled // read from workflow-local state, NOT an external call + }, + func(a, b interface{}) bool { return a.(bool) == b.(bool) }, +) +var enabled bool +encodedFlag.Get(&enabled) +``` + +> **Warning:** Do NOT use `SideEffect` as a workaround to call external APIs (HTTP, DB) inside a workflow. All external I/O must still go through Activities. diff --git a/skills/temporal-golang-pro/resources/testing-strategies.md b/skills/temporal-golang-pro/resources/testing-strategies.md new file mode 100644 index 00000000..d5962f8a --- /dev/null +++ b/skills/temporal-golang-pro/resources/testing-strategies.md @@ -0,0 +1,145 @@ +# Temporal Go Testing Strategies + +Testing workflows and activities in Go requires a deep understanding of the `testsuite` package, which provides a mocked environment with deterministic time-skipping. + +## The Test Suite Setup + +Always use the `WorkflowTestSuite` to maintain state across multiple tests in a file. + +```go +// Requires: "github.com/stretchr/testify/suite", "go.temporal.io/sdk/testsuite" +type MyTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + env *testsuite.TestWorkflowEnvironment +} + +func (s *MyTestSuite) SetupTest() { + s.env = s.NewTestWorkflowEnvironment() +} + +func TestMyTestSuite(t *testing.T) { + suite.Run(t, new(MyTestSuite)) +} +``` + +## 1. Unit Testing Workflows + +The most powerful feature of the Go SDK is **Time-skipping**. A workflow that sleeps for 30 days will finish in milliseconds in your test. + +### Mocking Activities + +You must register activity mocks before running the workflow. + +```go +func (s *MyTestSuite) Test_SuccessfulWorkflow() { + // Mock the activity + s.env.OnActivity(MyActivity, mock.Anything, "input").Return("output", nil) + + s.env.ExecuteWorkflow(MyWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) + + var result string + s.env.GetWorkflowResult(&result) + s.Equal("Completed", result) +} +``` + +### Mocking Child Workflows + +Similar to activities, use `OnChildWorkflow`. + +```go +s.env.OnChildWorkflow(MyChildWorkflow, mock.Anything, "args").Return("result", nil) +``` + +## 2. Unit Testing Activities + +Use `TestActivityEnvironment` to test activities in isolation. + +```go +// Requires: "go.temporal.io/sdk/testsuite", "github.com/stretchr/testify/assert" +func Test_Activity(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + env.RegisterActivity(MyActivity) + + val, err := env.ExecuteActivity(MyActivity, "input") + assert.NoError(t, err) + + var result string + val.Get(&result) + assert.Equal(t, "expected", result) +} +``` + +## 3. Replay Testing (Determinism Check) + +Replay testing ensures that new code changes won't break existing, running workflows. + +```go +func Test_ReplayStaticHistory(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + + replayer.RegisterWorkflow(MyWorkflow) + + // Load history from JSON file (exported from Temporal Web UI or CLI). + // Web UI: Workflow Detail -> Download History (JSON) + // CLI: temporal workflow show --workflow-id --namespace --output json > history.json + err := replayer.ReplayWorkflowHistoryFromJSONFile( + worker.ReplayWorkflowHistoryFromJSONFileOptions{}, + "history.json", + ) + assert.NoError(t, err) +} +``` + +## 4. Testing Signals and Queries + +You can send signals during a test at specific points in time. + +```go +func (s *MyTestSuite) Test_WorkflowWithSignal() { + // Delayed signal + s.env.RegisterDelayedCallback(func() { + s.env.SignalWorkflow("my-signal", "data") + }, time.Hour) // This hour passes instantly! + + s.env.ExecuteWorkflow(MyWorkflow) + + // Query state after signal + res, err := s.env.QueryWorkflow("get-state") + s.NoError(err) + var state string + res.Get(&state) + s.Equal("SignalReceived", state) +} +``` + +## Best Practices for Testing + +- **>=80% Coverage**: Aim for high coverage on workflow logic since activities are often just wrappers around DB/API calls. +- **Assertion-based**: Use `testify/assert` or `testify/suite` for clean assertions. +- **Mock Everything External**: Never call a real database or API in a unit test. +- **Test Failure Paths**: Explicitly test what happens when an activity returns an error or when a heartbeat times out. + +### Example: Testing an Activity Failure Path + +```go +func (s *MyTestSuite) Test_WorkflowHandlesActivityError() { + // Mock the activity to return a non-retryable error + s.env.OnActivity(ChargePaymentActivity, mock.Anything, mock.Anything). + Return("", temporal.NewNonRetryableApplicationError("card declined", "PaymentError", nil)) + + s.env.ExecuteWorkflow(SubscriptionWorkflow, "user-123") + + s.True(s.env.IsWorkflowCompleted()) + // Verify the workflow correctly surfaces the error + err := s.env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "card declined") +} +```