In Go I cannot figure out how to get the following setup.
runGoroutine
is not in a loop (I don't want it to loop) then how would the goroutine exit when a message gets sent to the channel? If there was no loop then case <-stopChan:
would only get called once at the very start.runGoroutine
calls other async functions? Would those get exited too when the return is hit?Maybe I am approaching this wrong. In my case I have x
task Groups that the user can start, when the user starts a task group a goroutine is made (1st level) to return CL user access. This task group (1st level) is a collection of y
API requests (y is 200-300 in practice). For each API request I make a new go-routine (2nd level). I am targeting speed; If I and performing hundreds of API requests a second, is it faster/efficient to put each request into its own goroutine? Or is it the same to do it on 1 thread?
Parent/Main process
| |
Child goroutine Child goroutine
| |
X bottom goroutines X bottom goroutines (API level)
package main
import (
"fmt"
"time"
)
func runGoroutine(stopChan <-chan struct{}) {
for {
select {
case <-stopChan:
return // Exit the goroutine
default:
fmt.Println("Goroutine is working...")
time.Sleep(1 * time.Second)
}
}
}
func main() {
stopChan := make(chan struct{})
// Start the goroutine
go runGoroutine(stopChan)
// User should be able to close the routines they want
close(stopChan)
}
New Code
package main
import (
"context"
"log"
"net/http"
"sync"
"time"
)
func main() {
// Initialize context with cancellation
ctx, cancel := context.WithCancel(context.Background())
// Call the worker pool in a separate goroutine to allow main to return immediately
// We can start any task group like this
go startWorkerPool(ctx, 20, 100)
// User has CL access on main thread
// Main function returns, but `cancel` is accessible to allow later cancellation if needed
log.Println("Worker pool started; you can cancel it by calling cancel()")
waitForNine()
}
func waitForNine() {
var input int
for input != 9 {
fmt.Print("Enter a number: ")
fmt.Scan(&input)
}
}
// startWorkerPool starts a fixed-size worker pool to handle `numRequests` requests.
func startWorkerPool(ctx context.Context, numWorkers, numRequests int) {
var wg sync.WaitGroup
work := make(chan string)
// Launch the specified number of workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
client := http.Client{}
for url := range work {
if req, err := http.NewRequestWithContext(ctx, "GET", url, nil); err != nil {
log.Println("failed to create new request:", err)
} else if resp, err := client.Do(req); err != nil {
log.Println("failed to make request:", err)
} else {
resp.Body.Close()
}
}
}(ctx)
}
// Enqueue URLs for workers and close the channel when done
go func() {
for i := 0; i < numRequests; i++ {
work <- "https://httpbin.org/get"
}
close(work)
}()
// Wait for all workers to finish
wg.Wait()
log.Println("All requests processed")
}
A shared context
is an easy way to cancel multiple goroutines at once. Consider this simple example:
package main
import (
"context"
"log"
"sync"
"time"
"net/http"
)
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
log.Print("Cancelling")
cancel()
}()
for range 100 {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
if req, err := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/get", nil); err != nil {
log.Println("failed to create new request: ", err.Error())
} else if resp, err := new(http.Client).Do(req); err != nil {
log.Println("failed to make request: ", err.Error())
} else {
resp.Body.Close()
}
}(ctx)
}
log.Print("Waiting")
wg.Wait()
}
This code starts 100 concurrent goroutines to request https://httpbin.org/get
, a convenient testing URL. One second after the program is started, their shared context is cancelled, causing the http requests to be interrupted. You could use a signal or a different event to cancel ths shared context.
If I and performing hundreds of API requests a second, is it faster/efficient to put each request into its own goroutine?
Starting goroutines does have more overhead than passing values through a channel. It makes sense to start a worker pool of http goroutines and distribute the work to them.
This next example creates 20 goroutines, and those 20 goroutines handle all 100 requests. The worker pool pattern is more practical when workloads are very high.
package main
import (
"context"
"log"
"net/http"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
log.Print("Cancelling")
cancel()
}()
var work = make(chan string)
for range 20 {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
client := http.Client{}
for url := range work {
if req, err := http.NewRequestWithContext(ctx, "GET", url, nil); err != nil {
log.Println("failed to create new request: ", err.Error())
} else if resp, err := client.Do(req); err != nil {
log.Println("failed to make request: ", err.Error())
} else {
resp.Body.Close()
}
}
}(ctx)
}
go func() {
for range 100 {
work <- "https://httpbin.org/get"
}
close(work)
}()
log.Print("Waiting")
wg.Wait()
}
Update
Now that we've got our worker pool down, let's have it pull our list of urls from stdin. The user can provide urls by typing them, or by redirecting file contents or other sources to stdin.
All we have to do is start reading our URLs from stdin and pass them to our work queue for retrieval in the background.
We can only take in so much work at once. After this point, we can choose between overwhelming our host computer, or pausing the intake of new work; generally, the latter is preferable. In this implementation, we limit the amout of work we enqueue by providing a limited buffer size to work
.
package main
import (
"bufio"
"context"
"log"
"net/http"
"net/url"
"sync"
"os"
)
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
var work = make(chan string, 1000)
for range 20 {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
client := http.Client{}
for url := range work {
if req, err := http.NewRequestWithContext(ctx, "GET", url, nil); err != nil {
log.Println("failed to create new request: ", err.Error())
} else if resp, err := client.Do(req); err != nil {
log.Println("failed to make request: ", err.Error())
} else {
resp.Body.Close()
}
}
}(ctx)
}
wg.Add(1)
go func() {
defer wg.Done()
//read standard input
var scanner = bufio.NewScanner(os.Stdin)
for scanner.Scan() {
if scanner.Text() == "cancel" {
// user could also just press Ctrl-C to kill this program and all its goroutines
cancel()
break
}
if url, err := url.Parse(scanner.Text()); err != nil {
log.Printf("Not a valid url: %s", scanner.Text())
} else {
work <- url.String()
}
}
if err := scanner.Err(); err != nil {
log.Println("failed reading standard input: ", err.Error())
}
close(work)
}()
log.Print("Waiting")
wg.Wait()
}
You can run the program interactively, or provide the inputs from a file or other source. Here's an example of providing 3 urls at the command line and immediately cancelling.
% echo -e "https://httpbin.org/get\nhttps://httpbin.org/get\nhttps://httpbin.org/get\ncancel" | ./tgo
2024/11/02 22:50:31 Waiting
2024/11/02 22:50:31 failed to make request: Get "https://httpbin.org/get": context canceled
2024/11/02 22:50:31 failed to make request: Get "https://httpbin.org/get": context canceled
2024/11/02 22:50:31 failed to make request: Get "https://httpbin.org/get": context canceled
If you omit the cancel
, the program still ends when the list of URLs is processed, and the program then completes. that's because scanner
reaches end-of-file. You can signal end-of-file at the command line with Ctrl-D.
% ./tgo <<< "https://httpbin.org"
2024/11/02 22:53:33 Waiting