WhiteWind
Visit Post

Sort a 42 GB CSV quickly

alice May 23, 2026
Source

Sort a 42 GB CSV quickly

Say you're, I don't know, building a website that shows the most popular emojis on Bluesky realtime. Further assume that you want to move the MVP architecture that's mostly just Redis to Postgres + Timescale which unlocks a lot of potential for resilience and features.

Of course, you want to validate and test your architecture, and for that, you create a bunch of mock data, a hypertable of about 550m emojis, which is about 5x more than what's currently on Bluesky. You gather two days' worth of of data from the firehose, then run

BEGIN;
INSERT INTO emojis
SELECT * FROM emojis;
COMMIT;

on repeat until you have enough test data.

And for something quick and dirty, this does the job. But oh! You're working with time series data. Bluesky has been around for two years, so it'd be worth stretching all this data out, time-wise, from 2 days to 2 years.

So you dump your table in a CSV:

COPY (SELECT * from emojis) TO '/path/to/emojis.csv' WITH (FORMAT CSV, DELIMITER ',', HEADER TRUE);

And now you have a file that looks something like this:

$ head emojis.csv
did,rkey,emoji,lang,created_at
did:plc:pfgs64jdrbznui4qktarwvvz,3l7aowcsbgj23,๐Ÿซถ,en,2024-01-31T13:47:33Z
did:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,๐Ÿ’ฅ,en,2024-06-06T21:18:01Z
did:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,๐Ÿ’ฅ,en,2024-05-10T02:16:11Z
did:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,๐Ÿ’ฅ,en,2023-06-19T16:46:21Z
did:plc:oemwnvl3bdhbglqsu4vtutg4,3l7aowe6tvz2y,โ‰๏ธ,ja,2024-03-17T23:13:22Z
did:plc:enbbvjkhuzg3bzmlxjjdko2a,3l7aowejtcc24,๐Ÿฅถ,en,2024-05-16T03:36:28Z
did:plc:sdx2znbevuvup2osq5ng24yf,3l7aowd44ts2a,โœŒ๏ธ,ja,2022-12-22T10:52:09Z
did:plc:yfozqgqtw7shp23rgj2lb57o,3l7aowf2mqv27,๐Ÿฅฐ,es,2024-10-14T02:14:23Z
did:plc:yfcrcqqhqzt7h6aikn4ehypj,3l7aowfa3tk2j,๐Ÿ˜ตโ€๐Ÿ’ซ,zh,2023-04-04T10:53:03Z

Now, how do you stretch this out to two years? You ask o1-mini to write a Go program that does it as fast as possible for you.

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "math/rand"
    "os"
    "runtime"
    "strconv"
    "strings"
    "sync"
    "time"
)

// Configuration Constants
const (
    INPUT_CSV         = "/path/to/emojis_dump.csv"     // Path to the exported CSV
    OUTPUT_CSV        = "/path/to/emojis_modified.csv" // Path for the modified CSV
    NUM_WORKERS       = 8                                      // Number of concurrent worker goroutines
    CHANNEL_BUFFER    = 10000                                  // Buffer size for channels
    PROGRESS_INTERVAL = 1000000                                // Log progress every 1,000,000 records
)

// Record represents a single CSV record
type Record struct {
    Fields []string
    Line   int // Line number for logging purposes
}

// ProcessedRecord represents a processed CSV record ready to be written
type ProcessedRecord struct {
    Fields []string
    Line   int // Line number for logging purposes
}

func main() {
    startTime := time.Now()
    fmt.Println("Starting processing...")

    // Ensure output directory exists
    outputDir := "/path/to/tmp/" // Adjust based on OUTPUT_CSV path
    if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
        log.Fatalf("Error creating output directory: %v", err)
    }

    // Open input CSV file
    inputFile, err := os.Open(INPUT_CSV)
    if err != nil {
        log.Fatalf("Error opening input CSV file: %v", err)
    }
    defer inputFile.Close()

    // Create output CSV file
    outputFile, err := os.Create(OUTPUT_CSV)
    if err != nil {
        log.Fatalf("Error creating output CSV file: %v", err)
    }
    defer outputFile.Close()

    // Initialize CSV reader and writer with buffered I/O
    reader := csv.NewReader(bufio.NewReaderSize(inputFile, 16*1024*1024))  // 10MB buffer
    writer := csv.NewWriter(bufio.NewWriterSize(outputFile, 16*1024*1024)) // 10MB buffer

    // Read header
    header, err := reader.Read()
    if err != nil {
        log.Fatalf("Error reading CSV header: %v", err)
    }

    // Identify the index of 'created_at' column
    createdAtIdx := -1
    for idx, col := range header {
        if strings.TrimSpace(col) == "created_at" {
            createdAtIdx = idx
            break
        }
    }
    if createdAtIdx == -1 {
        log.Fatalf("'created_at' column not found in CSV header.")
    }

    // Write header to output CSV
    if err := writer.Write(header); err != nil {
        log.Fatalf("Error writing header to output CSV: %v", err)
    }
    writer.Flush()

    // Channels for pipeline
    recordChan := make(chan Record, CHANNEL_BUFFER)
    processedChan := make(chan ProcessedRecord, CHANNEL_BUFFER)

    // WaitGroups to synchronize goroutines
    var wgWorkers sync.WaitGroup
    var wgWriter sync.WaitGroup

    // Start worker goroutines
    for i := 0; i < NUM_WORKERS; i++ {
        wgWorkers.Add(1)
        go worker(&wgWorkers, recordChan, processedChan, createdAtIdx, len(header))
    }

    // Start writer goroutine
    wgWriter.Add(1)
    go writerGoroutine(&wgWriter, writer, processedChan)

    // Read and dispatch records
    lineNumber := 1 // Starting after header
    for {
        recordFields, err := reader.Read()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Error reading record at line %d: %v", lineNumber+1, err)
            continue
        }

        rec := Record{
            Fields: recordFields,
            Line:   lineNumber + 1,
        }

        recordChan <- rec
        lineNumber++

        // Optional: Implement early termination or error handling if needed
    }

    // Close the record channel to signal workers no more records are coming
    close(recordChan)

    // Wait for all workers to finish processing
    wgWorkers.Wait()

    // Close the processed channel to signal writer no more records are coming
    close(processedChan)

    // Wait for writer to finish
    wgWriter.Wait()

    // Flush any remaining data in the writer buffer
    writer.Flush()
    if err := writer.Error(); err != nil {
        log.Fatalf("Error flushing writer: %v", err)
    }

    endTime := time.Now()
    elapsed := endTime.Sub(startTime)
    fmt.Printf("Processing completed in %s.\n", elapsed)
}

// worker processes records: updates 'created_at' with a random timestamp
func worker(wg *sync.WaitGroup, recordChan <-chan Record, processedChan chan<- ProcessedRecord, createdAtIdx int, expectedFields int) {
    defer wg.Done()

    // Initialize a new rand.Rand instance with a unique seed to avoid race conditions
    r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(rand.Intn(1000)))) // Adding randomness to the seed

    // Calculate time range for random timestamp
    now := time.Now().UTC()
    startTime := now.AddDate(-2, 0, 0).Unix()
    endTime := now.Unix()

    // Initialize local counters
    localCount := 0

    for rec := range recordChan {
        // Validate field count
        if len(rec.Fields) != expectedFields {
            log.Printf("Warning: Record at line %d has %d fields; expected %d. Skipping.", rec.Line, len(rec.Fields), expectedFields)
            continue
        }

        // Parse and replace 'created_at' field
        newTimestamp := randomTimestamp(r, startTime, endTime)
        rec.Fields[createdAtIdx] = newTimestamp

        // Send the processed record to the writer
        processedChan <- ProcessedRecord{
            Fields: rec.Fields,
            Line:   rec.Line,
        }

        localCount++
        if localCount%PROGRESS_INTERVAL == 0 {
            log.Printf("[Worker %d] Processed %d records.", getGID(), localCount)
        }
    }
}

// writerGoroutine writes processed records to the output CSV
func writerGoroutine(wg *sync.WaitGroup, writer *csv.Writer, processedChan <-chan ProcessedRecord) {
    defer wg.Done()

    totalWritten := 0

    for procRec := range processedChan {
        if err := writer.Write(procRec.Fields); err != nil {
            log.Printf("Error writing record at line %d: %v", procRec.Line, err)
            continue
        }
        totalWritten++

        // Flush periodically to ensure data is written to disk
        if totalWritten%1000000 == 0 {
            writer.Flush()
            if err := writer.Error(); err != nil {
                log.Printf("Error flushing writer at record %d: %v", totalWritten, err)
            }
            log.Printf("[Writer] Written %d records.", totalWritten)
        }
    }

    // Final flush
    writer.Flush()
    if err := writer.Error(); err != nil {
        log.Printf("Error during final flush: %v", err)
    }
    log.Printf("[Writer] Completed writing %d records.", totalWritten)
}

// randomTimestamp generates a random ISO8601 timestamp between start and end Unix timestamps
func randomTimestamp(r *rand.Rand, start, end int64) string {
    randSec := r.Int63n(end-start+1) + start
    randomTime := time.Unix(randSec, 0).UTC()
    return randomTime.Format(time.RFC3339)
}

// getGID returns the goroutine ID for logging purposes
func getGID() int {
    // WARNING: This uses a hack to get goroutine ID and is not recommended for production use
    // It's used here solely for logging purposes as per the user's request
    var buf [64]byte
    n := runtime.Stack(buf[:], false)
    stack := strings.TrimPrefix(string(buf[:n]), "goroutine ")
    idField := strings.Fields(stack)[0]
    id, err := strconv.Atoi(idField)
    if err != nil {
        return 0
    }
    return id
}

Now you have the right data, but when you try to COPY it back, it goes very slowly and you realize that this is the worst possible thing you can do to a time-series database: filling it up with completely random data, time-wise. A good way to make this a lot less bad is to sort the whole thing ascending by date, making it optimal for importing.

For this, we'll ask o1-mini to write another go program that will take several hours to complete haha who would do that the first time not me use GNU sort:

LC_ALL=C tail -n +2 emojis_modified.csv | gsort -t',' -k5,5 --parallel=$(nproc) --buffer-size=16G --temporary-directory=/path/to/tmp > sorted_emojis_modified.csv

This took a little over 49 minutes on my M1 Pro MBP, which is pretty damn good.

After which importing was a breeze, somewhere in the ballpark of 20-25 minutes:

COPY emojis_new FROM '/path/to/sorted_emojis_modified.csv' WITH (FORMAT CSV, DELIMITER ',', HEADER FALSE);

And there! Two years of test data, ready for benchmarking.

Discussion in the ATmosphere

Loading comments...