{
"$type": "site.standard.document",
"canonicalUrl": "https://rednafi.com/go/limit-goroutines-with-buffered-channels/",
"description": "Control goroutine concurrency with buffered channels as semaphores. Prevent resource exhaustion with backpressure patterns in Go workers.",
"path": "/go/limit-goroutines-with-buffered-channels/",
"publishedAt": "2023-08-23T00:00:00.000Z",
"site": "at://did:plc:fgtm2c26vfcj74rfmeggbyqj/site.standard.publication/3mnl6f7ob462z",
"tags": [
"Go",
"TIL",
"Concurrency"
],
"textContent": "I was cobbling together a long-running Go script to send webhook messages to a system when\nsome events occur. The initial script would continuously poll a Kafka topic for events and\nspawn worker goroutines in a fire-and-forget manner to make HTTP requests to the\ndestination. This had two problems:\n\n- It could create unlimited goroutines if many events arrived quickly (no backpressure)\n- It might overload the destination system by making many concurrent requests (no\n concurrency control)\n\nIn Python, I'd use just asyncio.Semaphore to limit concurrency. I've previously [written\nabout limiting concurrency with semaphores]. Turns out, in Go, you could do the same with a\nbuffered channel. Here's how the naive version without any concurrency control looks:\n\nRunning it gives you this:\n\nThe main function runs an infinite loop where it polls the upstream message queue\ncontinuously to collect new message. Once a new message arrives, it spawns a new worker\ngoroutine in a fire-and-forget manner that actually sends the HTTP request to the desination\nendpoint.\n\nThe problem here is that this setup creates an unbounded number of goroutines. If Kafka\nproduces messages faster than the workers can process them, the system will keep spawning\nnew goroutines, eventually consuming all available memory and CPU. Also, it can overwhelm\nthe destination system by sending too many requests at once if that doesn't have any\nthrottling mechanism in place. So we need a way to limit how many workers can run at once.\n\nTo fix this, we can use a buffered channel as a semaphore. The idea is to block before\nlaunching a new worker if too many are already running. This applies backpressure naturally\nand prevents unbounded spawning. Observe:\n\nHere, the buffered channel sem works as a semaphore that limits concurrency. Its capacity\ndefines how many goroutines can run at the same time. Before spawning a worker, we try to\nsend an empty struct into the channel. If the channel is full, that line blocks until a\nrunning worker finishes and releases its spot by reading from the channel. This ensures that\nonly maxConcurrency workers run at once and prevents goroutine buildup.\n\nThe closure around the worker is intentional: it keeps concurrency management out of the\nworker itself. The worker only focuses on processing messages, while the outer function\nhandles synchronization and throttling. This separation allows the caller to call the worker\nsynchronously if needed. It also makes testing the worker function much easier. In general,\nit's a good practice to push concurrency to the outer edge of your system so that the caller\nhas the choice of leveraging concurrency or not.\n\nThe optional batch delay isn't required for correctness, but it helps spread out requests so\nthe downstream system isn't flooded. Running the script shows that even though the loop is\ninfinite, only two workers run at once, and there's a short pause between each batch.\n\nThe workers in this version are still getting spawned in a fire-and-forget manner but\nwithout leaking goroutines. When the concurrency limit is reached, the main loop blocks\ninstead of spawning more workers. This applies natural backpressure to the producer, keeping\nthe system stable even under heavy load.\n\nNow, you might want to add extra abstractions over the core behavior to make it more\nergonomic. Here's a [pointer] on how to do so. [Effective Go also mentions] this pattern\nbriefly.\n\nFurther reading\n\n- [How to wait until buffered channel semaphore is empty]\n\n\n\n\n[written about limiting concurrency with semaphores]:\n /python/limit-concurrency-with-semaphore/\n\n\n[pointer]:\n https://levelup.gitconnected.com/go-concurrency-pattern-semaphore-9587d45f058d\n\n\n[effective go also mentions]:\n https://go.dev/doc/effective_go#channels\n\n[how to wait until buffered channel semaphore is empty]:\n https://stackoverflow.com/questions/39776481/how-to-wait-until-buffered-channel-semaphore-is-empty",
"title": "Limit goroutines with buffered channels"
}