Preface
One of the projects I'm handling at work requires reading a lot of data from internal API then writing the results to a CSV file. Since this will be used as an AWS Lambda function, I chose to use Go as the go-to language (pun intended, not sorry 😂).
The constraints:
- Overall process must finish below Lambda max execution time, which is currently 15 minutes.
- Data count under one million records.
- No need to care about the order of writing to CSV.
How shall we handle this if we want to achieve maximum efficiency?
👉 PSA: The link to the entire code gist is at the end of this article
Sequentially
First, let's try the most common approach. We process the API calls sequentially then write line by line to CSV file. Most of the time, sequential is good enough if we don't have complex requirements. KISS applies as always.
We can easily do like:
type data struct {
ID string
Name string
Description string
}
func testSequential(records int, w *csv.Writer) {
allData := make([]data, records)
for i := 0; i < records; i++ {
// Wait randomized 0~100ms to simulate API call
time.Sleep(getRandomSleepTime(100))
// Assign dummy data
allData[i] = getDummyData(i)
fmt.Printf("[Write " + allData[i].Name + "] ")
}
// Write CSV body
writeCSVBody(allData, w)
}
This is very straightforward. We call API and write data to file in one go. However, with this operation, the total execution time is linear to the number of API calls. We mock each API call as 10ms, so if we have 100 API calls, we have 1s execution time. For a small number of API calls, this is still OK. But what if we have 100,000 records? Or millions?
Note: Of course, in practice, it does not make sense to have 100,000 API calls in a short burst of time in production. We'd probably chunk the data to something like 5000 records per API call. In this example, for argument's sake, we'll pretend a data record is equal to an API call.
Look at the table below. Even to process a mere list of 10,000 records, we spend a whopping 8 minutes.
No. of API calls | Total time |
---|---|
10 | 516.792ms |
100 | 5.183s |
1,000 | 50.207s |
10,000 | 8m21.169s |
100,000 | 1 hour++ |
1,000,000 | eternity😱 |
Surely there is a better way to do this.
Concurrently
Enter concurrency.
Concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in a partial order, without affecting the outcome.
Go has one of the best concurrency handling baked into its core, but understanding how it handles concurrency is not as easy as it seems. When using concurrency in writing to a file, we should be careful to avoid the so-called race condition because something weird might happen if we neglect it. We may be lucky and get the right amount of lines, or we may be missing some lines in the file caused by a thread overwriting the previous lines. It is unpredictable.
Note: To understand concurrency, we need to understand first what it is goroutine. I suggest to try visiting the link here, here or here.
To solve this, there are two concurrency options that we can try here.
1. Use WaitGroups
Simply put, a WaitGroup is a mechanism to control multiple goroutines by making the goroutines wait. We could assign a WaitGroup to wait for all goroutines to finish their processes before moving on to the next process.
We can write like below. For each iteration of the loop, we spawn a goroutine to call API concurrently. And then make them wait like a well-behaved toddler before writing to CSV file.
// Previously defined struct goes here
// struct
func testConcurrencyWaitGroup(records int, w *csv.Writer) {
allData := make([]data, records)
// Define WaitGroup
var wg sync.WaitGroup
wg.Add(records)
for i := 0; i < records; i++ {
go func(i int) {
// Mark gotoutine as as finished when data is assigned
defer wg.Done()
// Wait randomized 0~100ms to simulate API call
time.Sleep(getRandomSleepTime(100))
// Assign dummy data
allData[i] = getDummyData(i)
fmt.Printf("[Write " + allData[i].Name + "] ")
}(i)
}
// Make WaitGroup wait for all goroutines to finish
wg.Wait()
// Write CSV body
writeCSVBody(allData, w)
}
Total time
As you can see, from the numbers alone implementing concurrency gives you a much, much faster total execution time.
No. of API calls | Total time |
---|---|
10 | 100.186ms |
100 | 103.523ms |
1,000 | 167.081ms |
10,000 | 752.808ms |
100,000 | 13.567s |
1,000,000 | theoretically fast ⚡ |
Side effect
Using WaitGroup in the above fashion will give you ordered CSV lines as per the original data obtained from API. This is the method you can choose if you care about order. You can see the result from concurrency.csv
below:
...
91,Name91,Desc
92,Name92,Desc
93,Name93,Desc
94,Name94,Desc
...
Notes
- While the above example works, it actually violates one of Go's principles.
Do not communicate by sharing memory. Instead, share memory by communicating.
Each goroutine shares the same memory array []data
. As we spawn more and more goroutines, there's no guarantee that the program will behave exactly as we want it to. Tread carefully.
- In addition, we simply allocate a goroutine to handle an API call, so it's no surprise that we got the error below if we try to make a million API calls. So take memory usage seriously into consideration if we wish to employ concurrency in production. Make sure to optimize the process and test vigorously.
fatal error: runtime: out of memory
fatal error: runtime: cannot allocate memory
If we wish to follow the above Go principle we can...
2. Use Channel
Just as the name implies, Channel is a mechanism in Go to send and receive data between goroutines throughout pipe or channel. We can use Channel to avoid the aforementioned race-condition by ensuring that the process is 'blocked' during data-sending (API calls) and data-sending (writing to CSV). In this way, we are promoting safety in processing our data.
An excellent article called 'The Nature Of Channels In Go' explains this concept visually. Do read that to understand more about Channels.
We can use Channel like below
// Previously defined struct goes here
// struct
func testConcurrencyChannel(records int, w *csv.Writer) {
// Define buffered channel
ch := make(chan data, records)
done := make(chan bool)
// Close channel only if sending is finished
defer close(ch)
for i := 0; i < records; i++ {
go func(i int) {
// Wait randomized 0~100ms to simulate API call
time.Sleep(getRandomSleepTime(100))
// Send data to channel
ch <- getDummyData(i)
fmt.Printf("[Write " + getDummyData(i).Name + "] ")
}(i)
}
// Write CSV body
go writeCSVBodyWithChannel(ch, done, records, w)
// Notify main goroutine process is finished
<-done
}
Note: It's ok to leave the channel open without explicit close as shown here
We need to make sure we send a finished
notification to done
channel inside the function to write the CSV body. Otherwise, the program will not know when to continue after writing.
func writeCSVBodyWithChannel(ch chan data, done chan bool, records int, w *csv.Writer) {
// Write data from channel to CSV
for data := range ch {
// Write to CSV here
records--
// Check if all records are processed, if yes then notify channel
if records == 0 {
done <- true
}
}
}
Total time
Using Channel will give you similar results to WaitGroup.
No. of API calls | Total time |
---|---|
10 | 91.547ms |
100 | 106.366ms |
1,000 | 169.152ms |
10,000 | 751.443ms |
100,000 | 12.369s |
1,000,000 | theoretically fast ⚡ |
Side effect
Using Channel in the above fashion will give you unordered CSV lines. The Channel will receive and process data as soon as data is sent to the queue, so there is no way to guarantee the order. This is the method you can choose if you care NOT about the order, which is exactly what my project needs.
...
35,Name35,Desc
66,Name66,Desc
12,Name12,Desc
13,Name13,Desc
...
Notes
-
Using Channel is a way to share memory by communicating, thus following the Go principle.
-
Similar to WaitGroup, a million API calls will result in an
out of memory
error, so take precaution.
Closing
This was supposed to be a short reading, but I had fun writing the different approaches and explaining the workings. Writing large data to a file is a recurrent requirement in any software engineering project, so it's always good to know the options/best practices to do it right in the respective programming language of choice. 😃