{
  "$type": "site.standard.document",
  "canonicalUrl": "https://rednafi.com/python/stream-process-a-csv-file/",
  "description": "Process large CSV files without OOM errors by streaming content line-by-line with HTTPX and concurrent.futures for parallel processing.",
  "path": "/python/stream-process-a-csv-file/",
  "publishedAt": "2022-07-01T00:00:00.000Z",
  "site": "at://did:plc:fgtm2c26vfcj74rfmeggbyqj/site.standard.publication/3mnl6f7ob462z",
  "tags": [
    "Python",
    "Networking",
    "Concurrency"
  ],
  "textContent": "A common bottleneck for processing large data files is - memory. Downloading the file and\nloading the entire content is surely the easiest way to go. However, it's likely that you'll\nquickly hit OOM errors. Often time, whenever I have to deal with large data files that need\nto be downloaded and processed, I prefer to stream the content line by line and use multiple\nprocesses to consume them concurrently.\n\nFor example, say, you have a CSV file containing millions of rows with the following\nstructure:\n\nHere, let's say you need to download the file from some source and run some other heavy\ntasks that depends on the data from the file. To avoid downloading the file to the disk, you\ncan stream and read the content line by line directly from the network. While doing so, you\nmay want to trigger multiple other tasks that can run independent of the primary process.\n\nAt my workplace, I often have to create objects in a relational database using the\ninformation in a CSV file. The idea here is to consume the information in the CSV file\ndirectly from the network and create the objects in the database. This database object\ncreation task can be offloaded to a separate process outside of the main process that's\nstreaming the file contents.\n\nSince we're streaming the content from the network line by line, there should be zero disk\nusage and minimal memory footprint. Also, to speed up the consumption, we'll fork multiple\nOS processes. To put in concisely, we'll need to perform the following steps:\n\n- Stream a single row from the target CSV file.\n- Write the content of the row in an in-memory string buffer.\n- Parse the file buffer with csv.DictReader.\n- Collect the dict the contains the information of the parsed row.\n- Yield the dict.\n- Flush the buffer.\n- Another process will collect the yielded dict and consume that outside of the main\n  process.\n- And continue the loop for the next row.\n\nThe following snippet implements the workflow mentioned above:\n\nThe first function stream_csv accepts a URL that points to a CSV file. In this case, the\nURL used here points to a real CSV file hosted on GitHub. [HTTPx] allows you to make a\n[streaming GET request] and iterate through the contents of the file without fully\ndownloading it to the disk.\n\nInside the client.stream block, we've created an in-memory file instance with\nio.StringIO. This allows us to write the streamed content of the source CSV file to the\nin-memory file. Then we pull one row from the source file, write it to the in-memory buffer,\nand pass the in-memory file buffer over to the csv.DictReader class.\n\nThe DictReader class will parse the content of the row and emit a reader object. Running\nnext on the reader iterator returns a dictionary with the parsed content of the row. The\nparsed content for the first row of the example CSV looks like this:\n\nNext, the process_row function takes in the data of a single row as a dict like the one\nabove and does some processing on that. For demonstration, currently, it just prints the\nvalues of the rows and then sleeps for two seconds.\n\nFinally, in the __main__ block, we fire up four processes to apply the process_row\nfunction to the output of the stream_csv function. Running the script will print the\nfollowing output:\n\nSince we're forking 4 processes, the script will print four items, and then it'll pause\nroughly for 2 seconds before moving on. If we were using a single process, the script would\nwait for 2 seconds after printing every row. By increasing the number of processes, you can\nspeed up the consumption rate. Also, if the consumer tasks are lightweight, you can open\nmultiple threads to consume them.\n\n\n\n\n[httpx]:\n    https://www.python-httpx.org/\n\n[streaming get request]:\n    https://www.python-httpx.org/quickstart/#streaming-responses",
  "title": "Stream process a CSV file in Python"
}