Streaming Millions of Rows from Postgres to AWS S3

Swati Yadav
OYOTech
Published in
6 min readMay 14, 2021

--

In today’s time, requiring reports with millions of rows is a common use case and the same was to be fulfilled by our in-house ERP System.

Getting the rows from the Postgres query in one go, iterating over them, creating the CSV file and uploading it to AWS S3.

These are the steps that are generally done for a small dataset but they fail miserably when dealing with huge datasets.

How do we deal with this problem?

Let’s start with a simple analogy to explain this scenario.

Bob wants to fill a tank located in his backyard with water stored in a water tanker. He came up with the below approaches:

  • In the first approach, he gets a storage container having the same size as required to be filled and fills it with water from the water tanker. Then, he carries it to fill the tank in the backyard with water.
  • In the second approach, he fills the buckets one by one from the water tanker and then carries them to his backyard to fill the tank.

Too much effort, isn’t it?

  • His friend, Alice solves the same problem by using a pipe to directly transfer water from a tap connected with the water tanker till the tank in the backyard. This saved her the effort of filling and carrying the tank or the buckets and didn’t tire her for the rest of the day.

The above scenario can be compared with how the application pulls data from PostgresDB and uploads to AWS S3. The PostgresDB is equivalent to the Water tanker, AWS S3 is equivalent to the Backyard Tank and the dataset is equivalent to water.

The above 3 approaches will now be as follows:

1. Loading everything at once

A traditional approach where we bring the result in-memory all at once, prepare a CSV and then upload to AWS using multipart.

This approach is effective for results containing 100 or 1000 rows but results in an OutOfMemory Exception when the result contains millions of rows.

2. Using Paging/Slicing

To deal with the OutOfMemory Exception thrown in the above approach, people generally use the database limit and offset combination to break the query into smaller queries and read multiple rows in the form of a page.

In case of Paging, logic for iteration needs to be written which means more code to maintain and test. Paging requires an additional query to calculate the total number of pages in the result. This query can be avoided by using Slicing.

3. Using Streams

The last approach is to pull data from PostgresDB using Spring Data JPA Streams and then upload to AWS S3 using S3 Streams. This process is called Stream Processing. It means that computation of data is done directly as it is produced or received. Streams overcome the disadvantages of both the above approaches and perform at a fast speed and use less memory footprint.

Let’s look at the steps taken in Stream Processing for creating reports and uploading in AWS S3:

  1. Streaming data from PostgresDB to Application:

In this step, we are Streaming data while receiving it. For Streaming data from PostgresDB till the Application, we can use Spring Data JPA Stream in Java.

In order to make Spring Data JPA stream to work, the return type of the query in the repository file should be Stream<Object>.

@QueryHints(value = {
@QueryHint(name = HINT_FETCH_SIZE, value = “600”),
@QueryHint(name = HINT_CACHEABLE, value = “false”),
@QueryHint(name = READ_ONLY, value = “true”)
})
@Query(value = “SELECT * FROM journal_entries where accounting_entity_id = :accountingEntityId”, nativeQuery = true)
Stream<JournalEntry> getJournalEntriesForAccountingEntity(@Param(“accountingEntityId”) Integer accountingEntityId)

Here, the Hibernate has been hinted about the fetch size, the second layer caching has been disabled (no caching for the next execution of the same query) and all the loaded objects are read-only.

Things to remember while using Streams:

  • Methods using streams must be annotated with @Transactional as the connection needs to be kept opened so that streams can be consumed.
  • Clear the cached data of your object in the EntityManager’s cache to avoid an increase in cache memory.
  • Tweak HINT_FETCH_SIZE and set HINT_CACHEABLE, READ_ONLY according to your use case.

2. Stream processing for CSV Generation:

In this step, we are connecting multiple streams wherein the output of the Postgres Stream becomes input for the Stream uploading the result set in AWS.

@Transactional(readOnly = true)
public String generateReportFileUrl(ReportParam reportParam) throws IOException {
Stream<JournalEntry> journalEntryStream = journalEntryManager.getJournalEntryStreamForAccountingEntity(reportParam.getAccountingEntityId());
return reportExporter.exportToCsv(getReportClass(), journalEntryStream, reportParam.getReportName());
}

3. Streaming data from Application to Amazon S3:

In this step, we are streaming data while producing it. For Streaming data from the Application to AWS S3, we can use the s3-stream-upload library. This library allows efficient streaming of large amounts of data to AWS S3 in Java without the need to store the whole object in memory or use files. This library provides an MultipartOutputStream that packages data written in chunks which are sent in a multipart upload. After all the chunks are uploaded, S3 assembles these chunks and creates an object.

POM Dependency for s3-stream-upload library -

<dependency>
<groupId>com.github.alexmojaki</groupId>
<artifactId>s3-stream-upload</artifactId>
<version>2.1.0</version>
</dependency>

StreamTransferManager needs to be initialized with the upload details and outputStream is obtained using getMultiPartOutputStreams(). Along with writing data, the parts are uploaded to S3. Once the process of writing data is finished, call MultiPartOutputStream.close(). When all the streams have been closed, call complete().

public <T, U> String exportToCsv(Class<T> reportClass, Stream<U> dataStream, String filePath) throws IOException {    // Initialize StreamTransferManager and output stream
StreamTransferManager streamTransferManager = new StreamTransferManager(awsS3Bucket, filePath, awsS3Client);
OutputStream outputStream = streamTransferManager.getMultiPartOutputStreams().get(0);

// CSV Generator
CsvSchema csvSchema = CsvUtil.buildCsvSchema(reportClass);
CsvMapper csvMapper = new CsvMapper();
CsvGenerator csvGenerator = csvMapper.getFactory().createGenerator(outputStream);
csvGenerator.setSchema(csvSchema);

// Processing Stream
dataStream.forEach(data -> {
try {
// writing object to the s3 output stream
csvGenerator.writeObject(data);
} catch (IOException e) {
throw new EmsBaseException("Something went wrong :: " + e.getMessage());
}
});

// Closing output stream
outputStream.close();
streamTransferManager.complete();

awsS3Client.setObjectAcl(awsS3Bucket, filePath, CannedAccessControlList.PublicRead);
URL url = awsS3Client.getUrl(awsS3Bucket, filePath);

return url.toString();
}

Comparison of the 3 approaches on basis of Memory and Time:

For comparison, we have used a dataset of 1.5 million rows and set the maximum memory limit as 4GB for each test. We have used JProfiler(Java Profiling Tool) to analyse the performance of our approaches.

Everything at once
Using Slicing
Using Streams

Conclusion

Pagination/Slicing is an effective approach if memory is considered as a performance parameter but if we take time as a performance parameter, the time taken in pagination is approximately double that of Streams.

Streams on the other hand, is found to be optimal both in terms of memory and time and therefore was our go to approach for creating reports with millions of rows for our ERP System.

References

--

--