Sort a 42 GB CSV quickly
Sort a 42 GB CSV quickly
Say you're, I don't know, building a website that shows the most popular emojis on Bluesky realtime. Further assume that you want to move the MVP architecture that's mostly just Redis to Postgres + Timescale which unlocks a lot of potential for resilience and features.
Of course, you want to validate and test your architecture, and for that, you create a bunch of mock data, a hypertable of about 550m emojis, which is about 5x more than what's currently on Bluesky. You gather two days' worth of of data from the firehose, then run
BEGIN;
INSERT INTO emojis
SELECT * FROM emojis;
COMMIT;
on repeat until you have enough test data.
And for something quick and dirty, this does the job. But oh! You're working with time series data. Bluesky has been around for two years, so it'd be worth stretching all this data out, time-wise, from 2 days to 2 years.
So you dump your table in a CSV:
COPY (SELECT * from emojis) TO '/path/to/emojis.csv' WITH (FORMAT CSV, DELIMITER ',', HEADER TRUE);
And now you have a file that looks something like this:
$ head emojis.csv
did,rkey,emoji,lang,created_at
did:plc:pfgs64jdrbznui4qktarwvvz,3l7aowcsbgj23,๐ซถ,en,2024-01-31T13:47:33Z
did:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,๐ฅ,en,2024-06-06T21:18:01Z
did:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,๐ฅ,en,2024-05-10T02:16:11Z
did:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,๐ฅ,en,2023-06-19T16:46:21Z
did:plc:oemwnvl3bdhbglqsu4vtutg4,3l7aowe6tvz2y,โ๏ธ,ja,2024-03-17T23:13:22Z
did:plc:enbbvjkhuzg3bzmlxjjdko2a,3l7aowejtcc24,๐ฅถ,en,2024-05-16T03:36:28Z
did:plc:sdx2znbevuvup2osq5ng24yf,3l7aowd44ts2a,โ๏ธ,ja,2022-12-22T10:52:09Z
did:plc:yfozqgqtw7shp23rgj2lb57o,3l7aowf2mqv27,๐ฅฐ,es,2024-10-14T02:14:23Z
did:plc:yfcrcqqhqzt7h6aikn4ehypj,3l7aowfa3tk2j,๐ตโ๐ซ,zh,2023-04-04T10:53:03Z
Now, how do you stretch this out to two years? You ask o1-mini to write a Go program that does it as fast as possible for you.
package main
import (
"bufio"
"encoding/csv"
"fmt"
"io"
"log"
"math/rand"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
// Configuration Constants
const (
INPUT_CSV = "/path/to/emojis_dump.csv" // Path to the exported CSV
OUTPUT_CSV = "/path/to/emojis_modified.csv" // Path for the modified CSV
NUM_WORKERS = 8 // Number of concurrent worker goroutines
CHANNEL_BUFFER = 10000 // Buffer size for channels
PROGRESS_INTERVAL = 1000000 // Log progress every 1,000,000 records
)
// Record represents a single CSV record
type Record struct {
Fields []string
Line int // Line number for logging purposes
}
// ProcessedRecord represents a processed CSV record ready to be written
type ProcessedRecord struct {
Fields []string
Line int // Line number for logging purposes
}
func main() {
startTime := time.Now()
fmt.Println("Starting processing...")
// Ensure output directory exists
outputDir := "/path/to/tmp/" // Adjust based on OUTPUT_CSV path
if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
log.Fatalf("Error creating output directory: %v", err)
}
// Open input CSV file
inputFile, err := os.Open(INPUT_CSV)
if err != nil {
log.Fatalf("Error opening input CSV file: %v", err)
}
defer inputFile.Close()
// Create output CSV file
outputFile, err := os.Create(OUTPUT_CSV)
if err != nil {
log.Fatalf("Error creating output CSV file: %v", err)
}
defer outputFile.Close()
// Initialize CSV reader and writer with buffered I/O
reader := csv.NewReader(bufio.NewReaderSize(inputFile, 16*1024*1024)) // 10MB buffer
writer := csv.NewWriter(bufio.NewWriterSize(outputFile, 16*1024*1024)) // 10MB buffer
// Read header
header, err := reader.Read()
if err != nil {
log.Fatalf("Error reading CSV header: %v", err)
}
// Identify the index of 'created_at' column
createdAtIdx := -1
for idx, col := range header {
if strings.TrimSpace(col) == "created_at" {
createdAtIdx = idx
break
}
}
if createdAtIdx == -1 {
log.Fatalf("'created_at' column not found in CSV header.")
}
// Write header to output CSV
if err := writer.Write(header); err != nil {
log.Fatalf("Error writing header to output CSV: %v", err)
}
writer.Flush()
// Channels for pipeline
recordChan := make(chan Record, CHANNEL_BUFFER)
processedChan := make(chan ProcessedRecord, CHANNEL_BUFFER)
// WaitGroups to synchronize goroutines
var wgWorkers sync.WaitGroup
var wgWriter sync.WaitGroup
// Start worker goroutines
for i := 0; i < NUM_WORKERS; i++ {
wgWorkers.Add(1)
go worker(&wgWorkers, recordChan, processedChan, createdAtIdx, len(header))
}
// Start writer goroutine
wgWriter.Add(1)
go writerGoroutine(&wgWriter, writer, processedChan)
// Read and dispatch records
lineNumber := 1 // Starting after header
for {
recordFields, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error reading record at line %d: %v", lineNumber+1, err)
continue
}
rec := Record{
Fields: recordFields,
Line: lineNumber + 1,
}
recordChan <- rec
lineNumber++
// Optional: Implement early termination or error handling if needed
}
// Close the record channel to signal workers no more records are coming
close(recordChan)
// Wait for all workers to finish processing
wgWorkers.Wait()
// Close the processed channel to signal writer no more records are coming
close(processedChan)
// Wait for writer to finish
wgWriter.Wait()
// Flush any remaining data in the writer buffer
writer.Flush()
if err := writer.Error(); err != nil {
log.Fatalf("Error flushing writer: %v", err)
}
endTime := time.Now()
elapsed := endTime.Sub(startTime)
fmt.Printf("Processing completed in %s.\n", elapsed)
}
// worker processes records: updates 'created_at' with a random timestamp
func worker(wg *sync.WaitGroup, recordChan <-chan Record, processedChan chan<- ProcessedRecord, createdAtIdx int, expectedFields int) {
defer wg.Done()
// Initialize a new rand.Rand instance with a unique seed to avoid race conditions
r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(rand.Intn(1000)))) // Adding randomness to the seed
// Calculate time range for random timestamp
now := time.Now().UTC()
startTime := now.AddDate(-2, 0, 0).Unix()
endTime := now.Unix()
// Initialize local counters
localCount := 0
for rec := range recordChan {
// Validate field count
if len(rec.Fields) != expectedFields {
log.Printf("Warning: Record at line %d has %d fields; expected %d. Skipping.", rec.Line, len(rec.Fields), expectedFields)
continue
}
// Parse and replace 'created_at' field
newTimestamp := randomTimestamp(r, startTime, endTime)
rec.Fields[createdAtIdx] = newTimestamp
// Send the processed record to the writer
processedChan <- ProcessedRecord{
Fields: rec.Fields,
Line: rec.Line,
}
localCount++
if localCount%PROGRESS_INTERVAL == 0 {
log.Printf("[Worker %d] Processed %d records.", getGID(), localCount)
}
}
}
// writerGoroutine writes processed records to the output CSV
func writerGoroutine(wg *sync.WaitGroup, writer *csv.Writer, processedChan <-chan ProcessedRecord) {
defer wg.Done()
totalWritten := 0
for procRec := range processedChan {
if err := writer.Write(procRec.Fields); err != nil {
log.Printf("Error writing record at line %d: %v", procRec.Line, err)
continue
}
totalWritten++
// Flush periodically to ensure data is written to disk
if totalWritten%1000000 == 0 {
writer.Flush()
if err := writer.Error(); err != nil {
log.Printf("Error flushing writer at record %d: %v", totalWritten, err)
}
log.Printf("[Writer] Written %d records.", totalWritten)
}
}
// Final flush
writer.Flush()
if err := writer.Error(); err != nil {
log.Printf("Error during final flush: %v", err)
}
log.Printf("[Writer] Completed writing %d records.", totalWritten)
}
// randomTimestamp generates a random ISO8601 timestamp between start and end Unix timestamps
func randomTimestamp(r *rand.Rand, start, end int64) string {
randSec := r.Int63n(end-start+1) + start
randomTime := time.Unix(randSec, 0).UTC()
return randomTime.Format(time.RFC3339)
}
// getGID returns the goroutine ID for logging purposes
func getGID() int {
// WARNING: This uses a hack to get goroutine ID and is not recommended for production use
// It's used here solely for logging purposes as per the user's request
var buf [64]byte
n := runtime.Stack(buf[:], false)
stack := strings.TrimPrefix(string(buf[:n]), "goroutine ")
idField := strings.Fields(stack)[0]
id, err := strconv.Atoi(idField)
if err != nil {
return 0
}
return id
}
Now you have the right data, but when you try to COPY it back, it goes very slowly and you realize that this is the worst possible thing you can do to a time-series database: filling it up with completely random data, time-wise. A good way to make this a lot less bad is to sort the whole thing ascending by date, making it optimal for importing.
For this, we'll ask o1-mini to write another go program that will take several hours to complete haha who would do that the first time not me use GNU sort:
LC_ALL=C tail -n +2 emojis_modified.csv | gsort -t',' -k5,5 --parallel=$(nproc) --buffer-size=16G --temporary-directory=/path/to/tmp > sorted_emojis_modified.csv
This took a little over 49 minutes on my M1 Pro MBP, which is pretty damn good.
After which importing was a breeze, somewhere in the ballpark of 20-25 minutes:
COPY emojis_new FROM '/path/to/sorted_emojis_modified.csv' WITH (FORMAT CSV, DELIMITER ',', HEADER FALSE);
And there! Two years of test data, ready for benchmarking.
Discussion in the ATmosphere