Limit goroutines with buffered channels

Redowan Delowar August 23, 2023
Source

I was cobbling together a long-running Go script to send webhook messages to a system when some events occur. The initial script would continuously poll a Kafka topic for events and spawn worker goroutines in a fire-and-forget manner to make HTTP requests to the destination. This had two problems:

  • It could create unlimited goroutines if many events arrived quickly (no backpressure)
  • It might overload the destination system by making many concurrent requests (no concurrency control)

In Python, I'd use just asyncio.Semaphore to limit concurrency. I've previously written about limiting concurrency with semaphores. Turns out, in Go, you could do the same with a buffered channel. Here's how the naive version without any concurrency control looks:

Running it gives you this:

The main function runs an infinite loop where it polls the upstream message queue continuously to collect new message. Once a new message arrives, it spawns a new worker goroutine in a fire-and-forget manner that actually sends the HTTP request to the desination endpoint.

The problem here is that this setup creates an unbounded number of goroutines. If Kafka produces messages faster than the workers can process them, the system will keep spawning new goroutines, eventually consuming all available memory and CPU. Also, it can overwhelm the destination system by sending too many requests at once if that doesn't have any throttling mechanism in place. So we need a way to limit how many workers can run at once.

To fix this, we can use a buffered channel as a semaphore. The idea is to block before launching a new worker if too many are already running. This applies backpressure naturally and prevents unbounded spawning. Observe:

Here, the buffered channel sem works as a semaphore that limits concurrency. Its capacity defines how many goroutines can run at the same time. Before spawning a worker, we try to send an empty struct into the channel. If the channel is full, that line blocks until a running worker finishes and releases its spot by reading from the channel. This ensures that only maxConcurrency workers run at once and prevents goroutine buildup.

The closure around the worker is intentional: it keeps concurrency management out of the worker itself. The worker only focuses on processing messages, while the outer function handles synchronization and throttling. This separation allows the caller to call the worker synchronously if needed. It also makes testing the worker function much easier. In general, it's a good practice to push concurrency to the outer edge of your system so that the caller has the choice of leveraging concurrency or not.

The optional batch delay isn't required for correctness, but it helps spread out requests so the downstream system isn't flooded. Running the script shows that even though the loop is infinite, only two workers run at once, and there's a short pause between each batch.

The workers in this version are still getting spawned in a fire-and-forget manner but without leaking goroutines. When the concurrency limit is reached, the main loop blocks instead of spawning more workers. This applies natural backpressure to the producer, keeping the system stable even under heavy load.

Now, you might want to add extra abstractions over the core behavior to make it more ergonomic. Here's a pointer on how to do so. Effective Go also mentions this pattern briefly.

Further reading

Discussion in the ATmosphere

Loading comments...