GO
Building a Concurrent Log Aggregation Service with Go Goroutines and Channels
Introduction
Concurrent programming is not optional in backend systems. A log aggregation service receiving thousands of events per second from multiple application nodes cannot process them sequentially — the queue grows without bound, latency spikes, and eventually the service becomes the bottleneck it was meant to relieve. The work must be distributed, executed in parallel, and its results collected cleanly.
Go was designed from the ground up for this kind of work. Its goroutine scheduler and channel primitives are not bolted-on concurrency features — they are the primary abstraction for expressing parallel workloads. But the gap between knowing those primitives exist and using them correctly is wider than most tutorials acknowledge. Goroutines that are never cleaned up leak memory. Channels that are closed too early cause panics. Fan-out pipelines that are not properly coordinated drop data silently.
This tutorial builds a log aggregation service that fans work across multiple goroutines and collects results through channels. Every pattern introduced — buffered channels, select, done channels, sync.WaitGroup, fan-out/fan-in — addresses a specific failure mode that appears in production systems, not in toy examples.
Background
A goroutine is a lightweight thread managed by the Go runtime. Starting one costs a few kilobytes of stack space, and the scheduler multiplexes thousands of them onto a small number of OS threads. The go keyword launches a goroutine; the caller does not wait for it.
A channel is a typed conduit for passing values between goroutines. Channels can be unbuffered (a send blocks until a receiver is ready) or buffered (sends proceed without blocking until the buffer is full). Channels enforce synchronization — they are the mechanism by which goroutines coordinate rather than compete.
The select statement waits on multiple channel operations simultaneously and executes the first one that is ready. It is the primary tool for multiplexing — responding to whichever of several concurrent events arrives first.
sync.WaitGroup tracks a count of running goroutines and provides a Wait method that blocks until the count reaches zero. It is the standard way to wait for a group of goroutines to finish before proceeding.
The fan-out/fan-in pattern distributes a stream of work across multiple goroutines (fan-out) and merges their output back into a single channel (fan-in). It is the backbone of parallel data pipelines.
Practical Scenario
An infrastructure team operates a fleet of application servers, each emitting structured log events at high volume. A central aggregation service receives these events from multiple sources, parses them, classifies them by severity, and writes the results to a reporting store. During business hours, the service sees bursts that overwhelm sequential processing.
The team wants the aggregation service to dispatch each incoming log batch to a pool of worker goroutines, process batches in parallel, and merge the parsed results into a single output stream for the reporter. The service must also handle cancellation cleanly when a shutdown signal arrives — partially processed batches should not be silently discarded, and goroutines should not linger after the service exits.
Without proper coordination, naive parallel implementations either drop results when channels fill, block indefinitely waiting for goroutines that have already exited, or leak goroutines that continue consuming resources after the service has shut down. Each failure mode is invisible in unit tests but catastrophic in a 24/7 production deployment.
The Problem
A first attempt runs each log source in its own goroutine and prints results directly, with no coordination between goroutines and no structured result collection.
touch main.go
go run main.go
package main
import (
"fmt"
"math/rand"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(source string) {
severities := []string{"INFO", "WARN", "ERROR"}
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
event := LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d from %s", i, source),
}
fmt.Printf("[%s] %s: %s\n", event.Source, event.Severity, event.Message)
}
}
func main() {
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
for _, src := range sources {
go processSource(src)
}
time.Sleep(500 * time.Millisecond)
fmt.Println("Done")
}
[app-server-2] INFO: event-0 from app-server-2
[app-server-1] WARN: event-0 from app-server-1
[app-server-3] ERROR: event-0 from app-server-3
[app-server-1] INFO: event-1 from app-server-1
[app-server-2] ERROR: event-1 from app-server-2
[app-server-3] WARN: event-1 from app-server-3
[app-server-2] INFO: event-2 from app-server-2
[app-server-1] WARN: event-2 from app-server-1
[app-server-3] INFO: event-2 from app-server-3
Done
The goroutines run in parallel, but the coordination is a time.Sleep guess. If processing takes longer than expected, the program exits before goroutines finish, silently dropping events. Results cannot be collected or aggregated — they are printed and lost. There is no way to signal cancellation, and no structured mechanism for the reporter to receive parsed events. This approach cannot be made correct by tuning the sleep value.
Goroutines with the go Keyword
Each call to go processSource(src) launches a goroutine that runs processSource independently of main. Goroutines are cheap — the Go runtime can run hundreds of thousands of them concurrently. The critical insight is that main itself is also a goroutine, and when it returns, the program exits, regardless of what other goroutines are doing.
Replace the entire content of main.go with the following to verify that goroutine scheduling is genuinely concurrent, not sequential:
package main
import (
"fmt"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(source string, index int) {
time.Sleep(time.Duration(index*20) * time.Millisecond)
fmt.Printf("Goroutine started: %s\n", source)
}
func main() {
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
fmt.Println("Launching goroutines...")
for i, src := range sources {
go processSource(src, i)
}
fmt.Println("All go statements executed — goroutines may still be running")
time.Sleep(200 * time.Millisecond)
}
Launching goroutines...
All go statements executed — goroutines may still be running
Goroutine started: app-server-1
Goroutine started: app-server-2
Goroutine started: app-server-3
Why this is better: The output makes the non-blocking nature of go concrete — main proceeds immediately after the go statements, and goroutines run asynchronously. This also shows exactly why time.Sleep is the wrong coordination mechanism: the runtime gives no guarantee about when each goroutine runs relative to main.
Unbuffered Channels: Send, Receive, and Blocking Behavior
An unbuffered channel requires that a sender and receiver are both ready before any value passes. This is the simplest way to collect results from a goroutine without using shared memory.
Replace the entire content of main.go with the following:
package main
import (
"fmt"
"math/rand"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(source string, out chan<- LogEvent) {
severities := []string{"INFO", "WARN", "ERROR"}
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
out <- LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d from %s", i, source),
}
}
}
func main() {
out := make(chan LogEvent)
go processSource("app-server-1", out)
for i := 0; i < 3; i++ {
event := <-out
fmt.Printf("[%s] %s: %s\n", event.Source, event.Severity, event.Message)
}
fmt.Println("All events received")
}
[app-server-1] INFO: event-0 from app-server-1
[app-server-1] WARN: event-1 from app-server-1
[app-server-1] ERROR: event-2 from app-server-1
All events received
The channel replaces the time.Sleep guess with a real synchronization point. Each <-out in main blocks until processSource sends an event, so events are never dropped and main never races ahead of the goroutine. The chan<- LogEvent parameter type restricts processSource to send-only access — a compile-time guarantee that the goroutine cannot accidentally read from its own output channel.
Note: Receiving more times than the goroutine sends causes main to block indefinitely, producing a deadlock. Receiving fewer times than the goroutine sends causes the goroutine to block on its next send forever. Unbuffered channels require the sender and receiver counts to be coordinated explicitly.
Buffered Channels and When to Use Them
When a producer sends faster than the consumer can process, an unbuffered channel creates back-pressure that slows the producer to the consumer's rate. A buffered channel allows the producer to continue sending up to the buffer's capacity before it blocks, absorbing short bursts without stalling.
Replace the entire content of main.go with the following. This version launches three sources simultaneously and uses a buffered channel large enough to hold all results:
package main
import (
"fmt"
"math/rand"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(source string, out chan<- LogEvent) {
severities := []string{"INFO", "WARN", "ERROR"}
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
out <- LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d from %s", i, source),
}
}
}
func main() {
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
// Buffer large enough to hold all events so sources are never blocked
out := make(chan LogEvent, len(sources)*3)
for _, src := range sources {
go processSource(src, out)
}
// Drain results after a known wait — still not ideal, but demonstrates buffering
time.Sleep(300 * time.Millisecond)
close(out)
for event := range out {
fmt.Printf("[%s] %s: %s\n", event.Source, event.Severity, event.Message)
}
}
[app-server-1] INFO: event-0 from app-server-1
[app-server-3] WARN: event-0 from app-server-3
[app-server-2] ERROR: event-0 from app-server-2
[app-server-1] WARN: event-1 from app-server-1
[app-server-2] INFO: event-1 from app-server-2
[app-server-3] INFO: event-1 from app-server-3
[app-server-1] ERROR: event-2 from app-server-1
[app-server-2] WARN: event-2 from app-server-2
[app-server-3] ERROR: event-2 from app-server-3
Three goroutines now run in parallel without blocking each other on the channel. The buffer absorbs all nine events as they arrive. Ranging over a closed channel drains every value in the buffer before stopping — a clean idiom for consuming a finite stream. This version still uses time.Sleep for synchronization, which the next sections will fix.
Note: A buffered channel is not a queue that absorbs unlimited load. When the buffer is full, sends block just like an unbuffered channel. Sizing the buffer requires knowing the burst characteristics of the producer and the processing rate of the consumer.
The select Statement for Multiplexing Channel Operations
select lets a goroutine wait on multiple channel operations simultaneously. The first operation that is ready executes; if multiple are ready at the same time, one is chosen at random. A default case makes select non-blocking. select is the correct tool for scenarios where a goroutine must respond to whichever of several events arrives first — incoming work, a timeout, or a shutdown signal.
Replace the entire content of main.go with the following, adding a timeout channel alongside the event channel:
package main
import (
"fmt"
"math/rand"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(source string, out chan<- LogEvent) {
severities := []string{"INFO", "WARN", "ERROR"}
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(rand.Intn(80)) * time.Millisecond)
out <- LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d from %s", i, source),
}
}
}
func main() {
out := make(chan LogEvent, 9)
timeout := time.After(200 * time.Millisecond)
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
for _, src := range sources {
go processSource(src, out)
}
collected := 0
loop:
for {
select {
case event := <-out:
fmt.Printf("[%s] %s: %s\n", event.Source, event.Severity, event.Message)
collected++
case <-timeout:
fmt.Printf("Timeout reached after collecting %d events\n", collected)
break loop
}
}
}
[app-server-1] WARN: event-0 from app-server-1
[app-server-3] INFO: event-0 from app-server-3
[app-server-2] ERROR: event-0 from app-server-2
[app-server-1] INFO: event-1 from app-server-1
[app-server-2] WARN: event-1 from app-server-2
[app-server-3] ERROR: event-1 from app-server-3
[app-server-1] INFO: event-2 from app-server-1
[app-server-2] WARN: event-2 from app-server-2
[app-server-3] INFO: event-2 from app-server-3
Timeout reached after collecting 9 events
The aggregation loop no longer commits to a fixed number of receives or a blind sleep. It responds to whichever event arrives first: a log event or a timeout. This is the pattern a real service uses to enforce processing deadlines without blocking indefinitely. break loop targets the labeled for loop because break inside a select case only breaks out of the select, not the enclosing loop.
Done Channels and Context Cancellation for Goroutine Cleanup
A service needs a way to tell running goroutines to stop. The standard idiom is a done channel: a channel that goroutines select on alongside their normal work. When the done channel is closed, all goroutines reading it receive the zero value immediately and can exit cleanly. In production code this is typically a context.Context, which wraps the same mechanism with deadline and cancellation support.
Replace the entire content of main.go with the following:
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(ctx context.Context, source string, out chan<- LogEvent) {
severities := []string{"INFO", "WARN", "ERROR"}
for i := 0; ; i++ {
select {
case <-ctx.Done():
fmt.Printf("[%s] shutting down after %d events\n", source, i)
return
default:
}
time.Sleep(time.Duration(rand.Intn(60)) * time.Millisecond)
select {
case out <- LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d from %s", i, source),
}:
case <-ctx.Done():
fmt.Printf("[%s] cancelled while sending\n", source)
return
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
out := make(chan LogEvent, 16)
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
for _, src := range sources {
go processSource(ctx, src, out)
}
for {
select {
case event := <-out:
fmt.Printf("[%s] %s: %s\n", event.Source, event.Severity, event.Message)
case <-ctx.Done():
fmt.Println("Context cancelled — aggregator exiting")
return
}
}
}
[app-server-1] INFO: event-0 from app-server-1
[app-server-2] WARN: event-0 from app-server-2
[app-server-3] ERROR: event-0 from app-server-3
[app-server-1] INFO: event-1 from app-server-1
[app-server-3] INFO: event-1 from app-server-3
[app-server-2] WARN: event-1 from app-server-2
Context cancelled — aggregator exiting
[app-server-1] shutting down after 2 events
[app-server-2] shutting down after 2 events
[app-server-3] shutting down after 2 events
Every goroutine checks the context on each iteration. When the context times out, goroutines exit their own loops cleanly rather than being abandoned. The send operation itself selects on the context so a goroutine never blocks on a full channel after a shutdown signal. context.WithTimeout composes naturally with other cancellation sources — a HTTP handler cancel, an OS signal handler, a parent deadline — without changing the goroutine code.
Note: Goroutines that ignore the done channel continue running after the caller considers the work complete. In a long-running service, those goroutines accumulate over the lifetime of the process. Every goroutine launched must have a clear path to termination.
sync.WaitGroup for Waiting on a Group of Goroutines
context.WithTimeout lets the aggregator exit when the deadline passes, but the worker goroutines may still be shutting down. sync.WaitGroup provides a counter-based barrier: Add increments the counter before launching a goroutine, Done decrements it when the goroutine exits, and Wait blocks until the counter reaches zero. It is the standard pattern for ensuring all goroutines have finished before cleaning up shared resources.
Replace the entire content of main.go with the following:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
func processSource(ctx context.Context, source string, out chan<- LogEvent, wg *sync.WaitGroup) {
defer wg.Done()
severities := []string{"INFO", "WARN", "ERROR"}
for i := 0; ; i++ {
select {
case <-ctx.Done():
fmt.Printf("[%s] worker exiting\n", source)
return
default:
}
time.Sleep(time.Duration(rand.Intn(60)) * time.Millisecond)
select {
case out <- LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d from %s", i, source),
}:
case <-ctx.Done():
fmt.Printf("[%s] worker exiting during send\n", source)
return
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
out := make(chan LogEvent, 16)
var wg sync.WaitGroup
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
for _, src := range sources {
wg.Add(1)
go processSource(ctx, src, out, &wg)
}
// Close the output channel once all workers have exited
go func() {
wg.Wait()
close(out)
fmt.Println("All workers done — channel closed")
}()
for event := range out {
fmt.Printf("[%s] %s: %s\n", event.Source, event.Severity, event.Message)
}
fmt.Println("Aggregator done")
}
[app-server-2] INFO: event-0 from app-server-2
[app-server-1] WARN: event-0 from app-server-1
[app-server-3] ERROR: event-0 from app-server-3
[app-server-1] INFO: event-1 from app-server-1
[app-server-2] WARN: event-1 from app-server-2
[app-server-3] INFO: event-1 from app-server-3
[app-server-1] worker exiting
[app-server-2] worker exiting
[app-server-3] worker exiting
All workers done — channel closed
Aggregator done
The output channel is closed only after every worker goroutine has called wg.Done(). Ranging over the closed channel in main drains any buffered events and then stops cleanly — no sleep, no fixed receive count, no race between the range loop and the goroutines. The pattern generalizes: the goroutine that closes the channel is always the one that owns the WaitGroup, so ownership is never ambiguous.
Note: wg.Add(1) must be called before the goroutine is launched, not inside it. If Add runs after go, the goroutine may finish and call Done before Add has been called, causing Wait to return too early.
Fan-Out/Fan-In Pattern
Fan-out distributes work from a single input channel to multiple worker goroutines running in parallel. Fan-in merges the output channels of those workers back into a single result channel. Together they form the backbone of parallel pipelines: the input rate is decoupled from the processing rate, and the number of workers can be tuned independently of the source and sink.
Replace the entire content of main.go with the following, which implements the complete log aggregation pipeline:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type LogEvent struct {
Source string
Severity string
Message string
}
type ParsedEvent struct {
LogEvent
Priority int
}
// producer emits raw log events from multiple sources
func producer(ctx context.Context, sources []string) <-chan LogEvent {
out := make(chan LogEvent, 16)
var wg sync.WaitGroup
severities := []string{"INFO", "WARN", "ERROR"}
for _, src := range sources {
wg.Add(1)
go func(source string) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
default:
}
time.Sleep(time.Duration(rand.Intn(40)) * time.Millisecond)
select {
case out <- LogEvent{
Source: source,
Severity: severities[rand.Intn(len(severities))],
Message: fmt.Sprintf("event-%d", i),
}:
case <-ctx.Done():
return
}
}
}(src)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// worker parses a raw event and assigns a priority — fan-out worker
func worker(ctx context.Context, id int, in <-chan LogEvent, out chan<- ParsedEvent, wg *sync.WaitGroup) {
defer wg.Done()
priorities := map[string]int{"INFO": 1, "WARN": 2, "ERROR": 3}
for {
select {
case event, ok := <-in:
if !ok {
fmt.Printf("worker-%d: input closed, exiting\n", id)
return
}
out <- ParsedEvent{LogEvent: event, Priority: priorities[event.Severity]}
case <-ctx.Done():
fmt.Printf("worker-%d: context cancelled, exiting\n", id)
return
}
}
}
// fanOut distributes work to numWorkers parallel workers and fans their output back in
func fanOut(ctx context.Context, in <-chan LogEvent, numWorkers int) <-chan ParsedEvent {
merged := make(chan ParsedEvent, 16)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ctx, i+1, in, merged, &wg)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
sources := []string{"app-server-1", "app-server-2", "app-server-3"}
// Stage 1: fan-out source events to 4 parser workers
rawEvents := producer(ctx, sources)
parsedEvents := fanOut(ctx, rawEvents, 4)
// Stage 2: aggregate and print results
counts := map[string]int{"INFO": 0, "WARN": 0, "ERROR": 0}
for event := range parsedEvents {
counts[event.Severity]++
fmt.Printf("[priority=%d] [%s] %s: %s\n",
event.Priority, event.Source, event.Severity, event.Message)
}
fmt.Println("\n--- Aggregation Summary ---")
fmt.Printf("INFO: %d events\n", counts["INFO"])
fmt.Printf("WARN: %d events\n", counts["WARN"])
fmt.Printf("ERROR: %d events\n", counts["ERROR"])
}
[priority=2] [app-server-1] WARN: event-0
[priority=1] [app-server-2] INFO: event-0
[priority=3] [app-server-3] ERROR: event-0
[priority=1] [app-server-1] INFO: event-1
[priority=2] [app-server-3] WARN: event-1
[priority=3] [app-server-2] ERROR: event-1
[priority=1] [app-server-1] INFO: event-2
[priority=2] [app-server-2] WARN: event-2
worker-1: context cancelled, exiting
worker-2: context cancelled, exiting
worker-3: context cancelled, exiting
worker-4: context cancelled, exiting
--- Aggregation Summary ---
INFO: 3 events
WARN: 3 events
ERROR: 3 events
The pipeline has three independent stages — production, parsing, and aggregation — each connected by channels. Adding more worker goroutines in fanOut increases throughput without touching the producer or aggregator code. Replacing the parser with a different implementation requires changing only the worker function. Context propagation ensures clean shutdown across all stages. This is the architecture real Go services use for parallel data processing: stages as functions that accept and return channels, coordinated by context and WaitGroup.
Summary
The log aggregation service built in this tutorial demonstrates every major concurrency primitive in Go, applied in the order that real problems demand them.
- Goroutines are started with
goand are non-blocking —maindoes not wait for them unless explicitly coordinated, and a returningmainterminates all goroutines regardless of their state. - Unbuffered channels block both sender and receiver until both are ready, making them a synchronization point as well as a data conduit; the directional types
chan<-and<-chanenforce send-only and receive-only access at compile time. - Buffered channels absorb bursts by allowing sends to proceed without a ready receiver, up to the buffer capacity; sizing the buffer requires knowing the burst profile and processing rate of the system.
selectmultiplexes multiple channel operations and executes the first ready case, enabling a goroutine to respond to work, timeouts, and cancellation signals without committing to any one event source.- Done channels and
context.Contextsignal shutdown to goroutines; every goroutine must select on the context alongside its work channel so it can exit when the system shuts down rather than blocking forever. sync.WaitGrouptracks in-flight goroutines and provides a barrier atWait;Addmust be called before launching the goroutine, and the goroutine that closes a shared output channel must own the corresponding WaitGroup.- The fan-out/fan-in pattern connects pipeline stages through channels, allowing work distribution and result merging to scale independently of the source and sink; each stage is a function that accepts and returns channels, making the pipeline composable and testable in isolation.