Stream process a CSV file in Python
A common bottleneck for processing large data files is - memory. Downloading the file and loading the entire content is surely the easiest way to go. However, it's likely that you'll quickly hit OOM errors. Often time, whenever I have to deal with large data files that need to be downloaded and processed, I prefer to stream the content line by line and use multiple processes to consume them concurrently.
For example, say, you have a CSV file containing millions of rows with the following structure:
Here, let's say you need to download the file from some source and run some other heavy tasks that depends on the data from the file. To avoid downloading the file to the disk, you can stream and read the content line by line directly from the network. While doing so, you may want to trigger multiple other tasks that can run independent of the primary process.
At my workplace, I often have to create objects in a relational database using the information in a CSV file. The idea here is to consume the information in the CSV file directly from the network and create the objects in the database. This database object creation task can be offloaded to a separate process outside of the main process that's streaming the file contents.
Since we're streaming the content from the network line by line, there should be zero disk usage and minimal memory footprint. Also, to speed up the consumption, we'll fork multiple OS processes. To put in concisely, we'll need to perform the following steps:
- Stream a single row from the target CSV file.
- Write the content of the row in an in-memory string buffer.
- Parse the file buffer with csv.DictReader.
- Collect the dict the contains the information of the parsed row.
- Yield the dict.
- Flush the buffer.
- Another process will collect the yielded dict and consume that outside of the main process.
- And continue the loop for the next row.
The following snippet implements the workflow mentioned above:
The first function stream_csv accepts a URL that points to a CSV file. In this case, the URL used here points to a real CSV file hosted on GitHub. HTTPx allows you to make a streaming GET request and iterate through the contents of the file without fully downloading it to the disk.
Inside the client.stream block, we've created an in-memory file instance with io.StringIO. This allows us to write the streamed content of the source CSV file to the in-memory file. Then we pull one row from the source file, write it to the in-memory buffer, and pass the in-memory file buffer over to the csv.DictReader class.
The DictReader class will parse the content of the row and emit a reader object. Running next on the reader iterator returns a dictionary with the parsed content of the row. The parsed content for the first row of the example CSV looks like this:
Next, the process_row function takes in the data of a single row as a dict like the one above and does some processing on that. For demonstration, currently, it just prints the values of the rows and then sleeps for two seconds.
Finally, in the main block, we fire up four processes to apply the process_row function to the output of the stream_csv function. Running the script will print the following output:
Since we're forking 4 processes, the script will print four items, and then it'll pause roughly for 2 seconds before moving on. If we were using a single process, the script would wait for 2 seconds after printing every row. By increasing the number of processes, you can speed up the consumption rate. Also, if the consumer tasks are lightweight, you can open multiple threads to consume them.
Discussion in the ATmosphere