Go Concurrency Patterns

Master Go concurrency patterns with goroutines, channels, and sync primitives

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Master Go concurrency with goroutines, channels, sync primitives, and context. Use when building concurrent Go applications, implementing worker pools, or debugging race conditions.

go concurrency goroutines channels sync worker-pool pipeline race-detection
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

Show me how to implement a worker pool in Go that processes 100 jobs with 5 workers and handles graceful shutdown

Skill Processing

Analyzing request...

Agent Response

Complete worker pool implementation with context cancellation, WaitGroup synchronization, and proper channel management

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install go-concurrency-patterns

claude-code skill install go-concurrency-patterns
2

Config

3

First Trigger

@go-concurrency-patterns help

Commands

CommandDescriptionRequired Args
@go-concurrency-patterns worker-pool-implementationBuild a scalable worker pool to process jobs concurrently with proper resource managementNone
@go-concurrency-patterns race-condition-debuggingIdentify and fix race conditions in concurrent Go applicationsNone
@go-concurrency-patterns pipeline-processingCreate fan-out/fan-in pipelines for parallel data processingNone

Typical Use Cases

Worker Pool Implementation

Build a scalable worker pool to process jobs concurrently with proper resource management

Race Condition Debugging

Identify and fix race conditions in concurrent Go applications

Pipeline Processing

Create fan-out/fan-in pipelines for parallel data processing

Overview

Go Concurrency Patterns

Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.

When to Use This Skill

  • Building concurrent Go applications
  • Implementing worker pools and pipelines
  • Managing goroutine lifecycles
  • Using channels for communication
  • Debugging race conditions
  • Implementing graceful shutdown

Core Concepts

1. Go Concurrency Primitives

PrimitivePurpose
goroutineLightweight concurrent execution
channelCommunication between goroutines
selectMultiplex channel operations
sync.MutexMutual exclusion
sync.WaitGroupWait for goroutines to complete
context.ContextCancellation and deadlines

2. Go Concurrency Mantra

Don't communicate by sharing memory;
share memory by communicating.

Quick Start

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "sync"
 7    "time"
 8)
 9
10func main() {
11    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
12    defer cancel()
13
14    results := make(chan string, 10)
15    var wg sync.WaitGroup
16
17    // Spawn workers
18    for i := 0; i < 3; i++ {
19        wg.Add(1)
20        go worker(ctx, i, results, &wg)
21    }
22
23    // Close results when done
24    go func() {
25        wg.Wait()
26        close(results)
27    }()
28
29    // Collect results
30    for result := range results {
31        fmt.Println(result)
32    }
33}
34
35func worker(ctx context.Context, id int, results chan<- string, wg *sync.WaitGroup) {
36    defer wg.Done()
37
38    select {
39    case <-ctx.Done():
40        return
41    case results <- fmt.Sprintf("Worker %d done", id):
42    }
43}

Patterns

Pattern 1: Worker Pool

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "sync"
 7)
 8
 9type Job struct {
10    ID   int
11    Data string
12}
13
14type Result struct {
15    JobID int
16    Output string
17    Err   error
18}
19
20func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
21    results := make(chan Result, len(jobs))
22
23    var wg sync.WaitGroup
24    for i := 0; i < numWorkers; i++ {
25        wg.Add(1)
26        go func(workerID int) {
27            defer wg.Done()
28            for job := range jobs {
29                select {
30                case <-ctx.Done():
31                    return
32                default:
33                    result := processJob(job)
34                    results <- result
35                }
36            }
37        }(i)
38    }
39
40    go func() {
41        wg.Wait()
42        close(results)
43    }()
44
45    return results
46}
47
48func processJob(job Job) Result {
49    // Simulate work
50    return Result{
51        JobID:  job.ID,
52        Output: fmt.Sprintf("Processed: %s", job.Data),
53    }
54}
55
56// Usage
57func main() {
58    ctx, cancel := context.WithCancel(context.Background())
59    defer cancel()
60
61    jobs := make(chan Job, 100)
62
63    // Send jobs
64    go func() {
65        for i := 0; i < 50; i++ {
66            jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}
67        }
68        close(jobs)
69    }()
70
71    // Process with 5 workers
72    results := WorkerPool(ctx, 5, jobs)
73
74    for result := range results {
75        fmt.Printf("Result: %+v\n", result)
76    }
77}

Pattern 2: Fan-Out/Fan-In Pipeline

 1package main
 2
 3import (
 4    "context"
 5    "sync"
 6)
 7
 8// Stage 1: Generate numbers
 9func generate(ctx context.Context, nums ...int) <-chan int {
10    out := make(chan int)
11    go func() {
12        defer close(out)
13        for _, n := range nums {
14            select {
15            case <-ctx.Done():
16                return
17            case out <- n:
18            }
19        }
20    }()
21    return out
22}
23
24// Stage 2: Square numbers (can run multiple instances)
25func square(ctx context.Context, in <-chan int) <-chan int {
26    out := make(chan int)
27    go func() {
28        defer close(out)
29        for n := range in {
30            select {
31            case <-ctx.Done():
32                return
33            case out <- n * n:
34            }
35        }
36    }()
37    return out
38}
39
40// Fan-in: Merge multiple channels into one
41func merge(ctx context.Context, cs ...<-chan int) <-chan int {
42    var wg sync.WaitGroup
43    out := make(chan int)
44
45    // Start output goroutine for each input channel
46    output := func(c <-chan int) {
47        defer wg.Done()
48        for n := range c {
49            select {
50            case <-ctx.Done():
51                return
52            case out <- n:
53            }
54        }
55    }
56
57    wg.Add(len(cs))
58    for _, c := range cs {
59        go output(c)
60    }
61
62    // Close out after all inputs are done
63    go func() {
64        wg.Wait()
65        close(out)
66    }()
67
68    return out
69}
70
71func main() {
72    ctx, cancel := context.WithCancel(context.Background())
73    defer cancel()
74
75    // Generate input
76    in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
77
78    // Fan out to multiple squarers
79    c1 := square(ctx, in)
80    c2 := square(ctx, in)
81    c3 := square(ctx, in)
82
83    // Fan in results
84    for result := range merge(ctx, c1, c2, c3) {
85        fmt.Println(result)
86    }
87}

Pattern 3: Bounded Concurrency with Semaphore

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7    "sync"
 8)
 9
10type RateLimitedWorker struct {
11    sem *semaphore.Weighted
12}
13
14func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
15    return &RateLimitedWorker{
16        sem: semaphore.NewWeighted(maxConcurrent),
17    }
18}
19
20func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
21    var (
22        wg     sync.WaitGroup
23        mu     sync.Mutex
24        errors []error
25    )
26
27    for _, task := range tasks {
28        // Acquire semaphore (blocks if at limit)
29        if err := w.sem.Acquire(ctx, 1); err != nil {
30            return []error{err}
31        }
32
33        wg.Add(1)
34        go func(t func() error) {
35            defer wg.Done()
36            defer w.sem.Release(1)
37
38            if err := t(); err != nil {
39                mu.Lock()
40                errors = append(errors, err)
41                mu.Unlock()
42            }
43        }(task)
44    }
45
46    wg.Wait()
47    return errors
48}
49
50// Alternative: Channel-based semaphore
51type Semaphore chan struct{}
52
53func NewSemaphore(n int) Semaphore {
54    return make(chan struct{}, n)
55}
56
57func (s Semaphore) Acquire() {
58    s <- struct{}{}
59}
60
61func (s Semaphore) Release() {
62    <-s
63}

Pattern 4: Graceful Shutdown

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "os"
 7    "os/signal"
 8    "sync"
 9    "syscall"
10    "time"
11)
12
13type Server struct {
14    shutdown chan struct{}
15    wg       sync.WaitGroup
16}
17
18func NewServer() *Server {
19    return &Server{
20        shutdown: make(chan struct{}),
21    }
22}
23
24func (s *Server) Start(ctx context.Context) {
25    // Start workers
26    for i := 0; i < 5; i++ {
27        s.wg.Add(1)
28        go s.worker(ctx, i)
29    }
30}
31
32func (s *Server) worker(ctx context.Context, id int) {
33    defer s.wg.Done()
34    defer fmt.Printf("Worker %d stopped\n", id)
35
36    ticker := time.NewTicker(time.Second)
37    defer ticker.Stop()
38
39    for {
40        select {
41        case <-ctx.Done():
42            // Cleanup
43            fmt.Printf("Worker %d cleaning up...\n", id)
44            time.Sleep(500 * time.Millisecond) // Simulated cleanup
45            return
46        case <-ticker.C:
47            fmt.Printf("Worker %d working...\n", id)
48        }
49    }
50}
51
52func (s *Server) Shutdown(timeout time.Duration) {
53    // Signal shutdown
54    close(s.shutdown)
55
56    // Wait with timeout
57    done := make(chan struct{})
58    go func() {
59        s.wg.Wait()
60        close(done)
61    }()
62
63    select {
64    case <-done:
65        fmt.Println("Clean shutdown completed")
66    case <-time.After(timeout):
67        fmt.Println("Shutdown timed out, forcing exit")
68    }
69}
70
71func main() {
72    // Setup signal handling
73    ctx, cancel := context.WithCancel(context.Background())
74
75    sigCh := make(chan os.Signal, 1)
76    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
77
78    server := NewServer()
79    server.Start(ctx)
80
81    // Wait for signal
82    sig := <-sigCh
83    fmt.Printf("\nReceived signal: %v\n", sig)
84
85    // Cancel context to stop workers
86    cancel()
87
88    // Wait for graceful shutdown
89    server.Shutdown(5 * time.Second)
90}

Pattern 5: Error Group with Cancellation

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7    "net/http"
 8)
 9
10func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
11    g, ctx := errgroup.WithContext(ctx)
12
13    results := make([]string, len(urls))
14
15    for i, url := range urls {
16        i, url := i, url // Capture loop variables
17
18        g.Go(func() error {
19            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
20            if err != nil {
21                return fmt.Errorf("creating request for %s: %w", url, err)
22            }
23
24            resp, err := http.DefaultClient.Do(req)
25            if err != nil {
26                return fmt.Errorf("fetching %s: %w", url, err)
27            }
28            defer resp.Body.Close()
29
30            results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
31            return nil
32        })
33    }
34
35    // Wait for all goroutines to complete or one to fail
36    if err := g.Wait(); err != nil {
37        return nil, err // First error cancels all others
38    }
39
40    return results, nil
41}
42
43// With concurrency limit
44func fetchWithLimit(ctx context.Context, urls []string, limit int) ([]string, error) {
45    g, ctx := errgroup.WithContext(ctx)
46    g.SetLimit(limit) // Max concurrent goroutines
47
48    results := make([]string, len(urls))
49    var mu sync.Mutex
50
51    for i, url := range urls {
52        i, url := i, url
53
54        g.Go(func() error {
55            result, err := fetchURL(ctx, url)
56            if err != nil {
57                return err
58            }
59
60            mu.Lock()
61            results[i] = result
62            mu.Unlock()
63            return nil
64        })
65    }
66
67    if err := g.Wait(); err != nil {
68        return nil, err
69    }
70
71    return results, nil
72}

Pattern 6: Concurrent Map with sync.Map

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7// For frequent reads, infrequent writes
 8type Cache struct {
 9    m sync.Map
10}
11
12func (c *Cache) Get(key string) (interface{}, bool) {
13    return c.m.Load(key)
14}
15
16func (c *Cache) Set(key string, value interface{}) {
17    c.m.Store(key, value)
18}
19
20func (c *Cache) GetOrSet(key string, value interface{}) (interface{}, bool) {
21    return c.m.LoadOrStore(key, value)
22}
23
24func (c *Cache) Delete(key string) {
25    c.m.Delete(key)
26}
27
28// For write-heavy workloads, use sharded map
29type ShardedMap struct {
30    shards    []*shard
31    numShards int
32}
33
34type shard struct {
35    sync.RWMutex
36    data map[string]interface{}
37}
38
39func NewShardedMap(numShards int) *ShardedMap {
40    m := &ShardedMap{
41        shards:    make([]*shard, numShards),
42        numShards: numShards,
43    }
44    for i := range m.shards {
45        m.shards[i] = &shard{data: make(map[string]interface{})}
46    }
47    return m
48}
49
50func (m *ShardedMap) getShard(key string) *shard {
51    // Simple hash
52    h := 0
53    for _, c := range key {
54        h = 31*h + int(c)
55    }
56    return m.shards[h%m.numShards]
57}
58
59func (m *ShardedMap) Get(key string) (interface{}, bool) {
60    shard := m.getShard(key)
61    shard.RLock()
62    defer shard.RUnlock()
63    v, ok := shard.data[key]
64    return v, ok
65}
66
67func (m *ShardedMap) Set(key string, value interface{}) {
68    shard := m.getShard(key)
69    shard.Lock()
70    defer shard.Unlock()
71    shard.data[key] = value
72}

Pattern 7: Select with Timeout and Default

 1func selectPatterns() {
 2    ch := make(chan int)
 3
 4    // Timeout pattern
 5    select {
 6    case v := <-ch:
 7        fmt.Println("Received:", v)
 8    case <-time.After(time.Second):
 9        fmt.Println("Timeout!")
10    }
11
12    // Non-blocking send/receive
13    select {
14    case ch <- 42:
15        fmt.Println("Sent")
16    default:
17        fmt.Println("Channel full, skipping")
18    }
19
20    // Priority select (check high priority first)
21    highPriority := make(chan int)
22    lowPriority := make(chan int)
23
24    for {
25        select {
26        case msg := <-highPriority:
27            fmt.Println("High priority:", msg)
28        default:
29            select {
30            case msg := <-highPriority:
31                fmt.Println("High priority:", msg)
32            case msg := <-lowPriority:
33                fmt.Println("Low priority:", msg)
34            }
35        }
36    }
37}

Race Detection

1# Run tests with race detector
2go test -race ./...
3
4# Build with race detector
5go build -race .
6
7# Run with race detector
8go run -race main.go

Best Practices

Do’s

  • Use context - For cancellation and deadlines
  • Close channels - From sender side only
  • Use errgroup - For concurrent operations with errors
  • Buffer channels - When you know the count
  • Prefer channels - Over mutexes when possible

Don’ts

  • Don’t leak goroutines - Always have exit path
  • Don’t close from receiver - Causes panic
  • Don’t use shared memory - Unless necessary
  • Don’t ignore context cancellation - Check ctx.Done()
  • Don’t use time.Sleep for sync - Use proper primitives

Resources

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Go 1.18+
golang.org/x/sync/semaphore (for advanced patterns)
golang.org/x/sync/errgroup (for error handling)

Framework Support

Standard Go library ✓ (recommended) golang.org/x/sync ✓

Context Window

Token Usage ~3K-8K tokens depending on pattern complexity

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
debugging