Today, we’re open sourcing an in-house Java 8 stream utility library, which can aggregate, merge, or join streams in a memory efficient way. To skip right to the code, check out the github repo.
Here at Conductor, we build Searchlight, a content intelligence platform that helps users gain insights from large amounts of raw data. We’ve written a bunch of posts in the past about some of the various techniques we use to make our data processing more efficient, from bulk operations against kafka streams, to incorporating snappy framed compression into hadoop, and even aggressively caching data requests on the frontend using Backbone.Hoard. I’m here today to talk to you about some techniques we use to do efficient data streaming, and to share some code we’ve built that might make your life better!
Our customers rely on us to collect massive amounts of data (close to 20TB a week and growing) and process it into actionable insights that they can view in our application. One step of that processing is distilling the data into just what it necessary to render a specific insight. However, even after paring down to the smallest possible set of relevant information, we are still left with reports that are hundreds of MBs on disk.
We need to aggregate, join, and summarize these potentially large reports in a small, fixed amount of memory.
Enter Java 8 streams.
Java 8 streams describe a pipeline of operations that bring elements from a source to a destination. More concretely, streams allow you to define a set of manipulations on a set of data, agnostic of where that data comes from, in a functional paradigm.
While most literature online utilizes Java 8 streams primarily for list comprehensions, streams can also be used to describe operations against a set of data with an unknown size (potentially infinite) that isn’t necessarily all held in memory.
In our case, we connect our data source with some nifty plumbing that converts an InputStream into a stream of materialized objects. This lets us take advantage of one of the key differences between a Stream and a Collection in Java – while Collections represent a set of data stored entirely in memory, Streams are simply storage-agnostic series of data.
Streaming through our large datasets allows only the item(s) being currently manipulated to be held in memory, as opposed to the complete collection. This lets us obtain our goal above of “small, fixed amount of memory”.
Complex use cases where Java 8 streams fall short
Our UI enables some highly dynamic views over our dataset, and as such, there are limits to the amount of pre-computation we can reasonably do.
For example, we have a view that joins together two different weeks’ worth of data, along a particular axis. The two weeks can be arbitrarily selected, which explodes the pre-computation space into a massive set of permutations. This means we must do our joining of these two datasets on the fly.
Since Java 8 streams are built to describe general operations against arbitrary sources of data, there are some operations that simply are not supported out of the box, and others that are implemented inefficiently. However, since we own our dataset, and have the ability to pre-process our data, we can enforce that our data is sorted, and use that knowledge to build specialized stream utilities that can operate lazily.
Our stream utilities repo contains some tooling to solve complex problems where pre-processing our dataset to be sorted can dramatically improve the memory footprint of the operations we are trying to execute.
Let’s say I have a stream of all of the content on a given client’s site. If my dataset is not sorted, in order to group the set of content by category (How-To, Product Review, etc.) and return a
Stream<List<ContentCategory>>, I would need to materialize the complete stream – an unsorted stream of data might have an item which belongs in the first group as the first item in the stream, and the last item in the stream. This means you don’t know the group is complete until the data set is completely processed. But let’s say I sort my stream by content category. In that case, I can group the stream by category in a lazy fashion.
The Java 8 grouping operator actually does materialize the full stream, because the operator is part of the
collect method, which is a terminal operation. Terminal operations require the consumption of complete streams, and thus are almost always eager:
final Map<String, List<Content>> contentGroupedByCategory = contentStream .collect(Collectors.groupingBy(c -> c.getContentCategory()));
However, since we know that the stream is sorted by
ContentCategory already, we can take advantage of that knowledge to build a grouping operator that simply iterates until it finds the boundary between content categories, and emits grouped items.
In order to use this utility, simply make sure your stream of data is sorted by whatever your keying function returns (in our case, content category).
import com.conductor.stream.utils.OrderedStreamUtils; // We know that contentStream is sorted by category. Therefore, // we can do a streaming group final Stream<List<Content>> contentGroupedByCategory = OrderedStreamUtils.groupBy(contentStream, c -> c.getContentCategory());
As opposed to the Java stream framework version above, which returns a Map, this stream utility will return a stream of the grouped data. That means that the only
List<Content> in memory is the one currently being processed.
Now, let’s say we have two streams of data, each representing the search engine performance of our client, perhaps from different years. What we want as output is a stream of search results, with a before and after for each result that existed in both years, as well as the non-overlapping results from individual years.
To clarify, when I say “join” here, the operation that represents is taking each matching item from both of the two streams and combining those matching objects to produce a new object.
There is no way to do this in a lazy fashion using the Java 8 stream framework’s native operators. In this case, there isn’t even a non-lazy convenience operator.
Again, the fact that our streams are sorted allows us to efficiently join these two datasets. Upon request, we can just pull items from each side until we find a matching set, and use the sort order to determine which side is behind the other. The provided JoinType determines what we do with unmatched items.
In this case, we will be joining on each search result’s ID. That means the two streams must each be sorted by result ID for our join operation to work.
import com.conductor.stream.utils.OrderedStreamUtils; import com.conductor.stream.utils.join.JoinType; final Function<SearchResult, Integer> searchResultIdKeyingFunction = sr -> sr.getResultId(); // We know that searchResultStream2016 and searchResultStream2017 // are sorted by result ID. Therefore, we can do a streaming join. final Stream<MultiYearResult> joinedStream = OrderedStreamUtils.join( searchResultStream2016, searchResultStream2017, Comparator.naturalOrder(), // the streams are sorted in natural order.... searchResultIdKeyingFunction, // of search result ID... searchResultIdKeyingFunction, // on both sides (left, right) -> new MultiYearResult(left, right), JoinType.OUTER // do an outer join so we get all results, even if it was only present in a single year. );
Note how this also returns a stream of data that is lazily evaluated.
Multiple Stream Merging
Let’s say you have a bunch of streams of client HTML recommendations, one for each page on their site. These streams are sorted internally by priority. You want to assemble a list of recommendations across the whole site, sorted by priority.
This isn’t one of the native operators in the Java 8 stream framework, because it requires that the underlying streams produce items in sorted order, which isn’t a guarantee all streams can make.
Once again, if we know our streams are sorted up-front, it’s relatively simple to merge them lazily.
The sortedMerge operator simply compares the next item in each stream, and emits the next item according to the given comparator. This means the streams must all be sorted by said comparator already.
import com.conductor.stream.utils.OrderedStreamUtils; // The streams are all sorted by priority. Therefore, we need a comparator // on priority. final Comparator<HtmlRecommendation> comparator = Comparator.comparing(HtmlRecommendation::getPriority); final Stream<HtmlRecommendation> mergedStreams = OrderedStreamUtils.sortedMerge(Arrays.asList(page1Recommendations, page2Recommendations, page3Recommendations), comparator);
In addition to
OrderedStreamUtils operators, we have also assembled a small list of utilities that can make it easier to build processing pipelines on top of any stream, ordered or not.
Find yourself writing a lot of
Collectors without a combiner? Use
Used to RxJava’s switchIfEmpty operator? We brought it into Java 8 streams, with
Want to chunk up a stream into sets of a certain size? Maybe you want to process 1000 items at a time without materializing your full set of data? Try
Conclusion – Use Conductor’s stream-utils to enhance your Java 8 Streams
I hope you find this set of utilities helpful. We’ve been using them internally for the past couple months to make some of our largest sets of data easier to process in Searchlight, and we’re thrilled to be giving them to the community!
Again, if you’re looking for the code, check out Conductor’s
stream.utils on github.
If you’ve read this far, and you’re looking for a new opportunity to write fast, fluent code to process large sets of data, Conductor is hiring.