How to Build a Speedy Custom Compression Codec for Hadoop

What’s Data Compression?

Data compression provides a means by which a system can improve I/O performance by shrinking the data being transmitted between stores and processes. Anyone who uses the Internet knows that downloading an immense file takes noticeably longer than downloading a much smaller file; compression can be used to ameliorate this issue.

Data Compression Formats that Devs Love

Data Compression with Gzip

One very popular compression format is Gzip. Gzip is known for its high compression ratio – a sizable difference (pardon the pun) between the uncompressed and compressed sizes of a file.

Note, however, that Gzip is CPU-intensive to apply; an application using Gzip saves I/O time while transferring Gzipped data but loses CPU time compressing or decompressing the data on either end. It follows, then, that optimizing the performance of an I/O intensive application requires striking a balance between compression ratio (I/O load) and compression speed (CPU load) depending on the available resources.

Because of Gzip’s popularity, and because the hardware we use for our data analysis and ETL (Extract, Transform, Load) processes has CPU horsepower in abundance, Gzip was our first compression format of choice.

On the data consumption side, Conductor Searchlight’s reporting architecture allows customers to manipulate report data interactively, but depends on our UI servers having ready access to the compressed data produced by our ETL pipeline.

After producing reports in Gzip for a while, we realized that the time our UI servers spent decompressing our reports as part of rendering them to a user was taking up about a third of the average application transaction time, and generally leading to long page load times. We knew we could scale these machines vertically, by moving our application to hardware with faster CPUs or more CPU cores, but in general we aim to be able to provision UI servers less beefy than our backend servers. So we looked for other ways to tune the process of delivering report data to our application.

Data Compression with Snappy

Another popular compression format is Snappy. Developed by Google, Snappy compression/decompression is about three times as fast as Gzip, but for this improvement in CPU time it sacrifices compression efficiency (or ratio, defined above).

We began using Snappy as the compression format for the report data produced by our ETL systems and consumed by our UI servers, and we saw immediate improvement in the response time of our application. As we continued to profile our application, though, we noticed that memory utilization on our servers “spiked” during the delivery of a report, which surprised us as our application is designed to “stream” report data to a user’s browser without keeping the entire report in memory on the server. After some investigation we realized that Snappy was causing a lot of data to be buffered during decompression.

Data Compression with the Snappy Frame Format

Fortunately, Google has also published a specification for a variation on the Snappy format called Snappy framing format. This format modifies the Snappy compression algorithm to compress a file incrementally, such that the compressed result is composed of independent, compressed chunks or “frames.” Both data compression and decompression, then, occur frame by frame, and there’s no need to hold an entire file in memory at any time.

Because the files that power our reports can be large relative to the memory available across all request handler threads (dozens of megabytes uncompressed) and because each user transaction might depend on multiple files being streamed down to a user, we opted to switch to the Snappy framing format to better control memory utilization on our UI servers.

Data Compression in the Hadoop Ecosystem

At Conductor, we use a variety of tools in the Hadoop family of technologies as building blocks in our ETL pipelines. It is convenient, then, that Hadoop MapReduce jobs can be easily configured to compress output produced by mappers and reducers. The Hadoop Java API already includes compression codecs for both Gzip and Snappy. All you need to do is configure your Hadoop-based application to use them:

<property>
 <name>io.compression.codecs</name>
 <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec,conductor.hadoop.core.io.compression.SnappyFramedCodec</value>
 <final>false</final>
</property> 
<property>
  <name>mapred.map.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<property>
  <name>mapred.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

Unfortunately, since implementing the Snappy framing format is optional (indeed, Google doesn’t even provide a reference implementation) your Hadoop installation probably does not support it out of the box. Your options, then, are

A. Settle for one of the compression codecs Hadoop created for you. B. Put your developer hat on, and create your own Snappy framing codec. It’s not hard!

Using a Custom Data Compression Codec in Hadoop (Java)

 

Open Source Java Libraries for Snappy Frame Compression

Fortunately, the Snappy framing format has already been implemented in Java as part of several open source projects. Unfortunately, we found when we started this project that not all of them were ready to use. For example, some projects were written in the “gap year” between the respective releases of Snappy and Snappy framing specifications. Consequently, the framing formats in those projects don’t follow the official specification. However, snappy-java suited our needs just fine, and it should work for you too; in particular, the included classes SnappyFramedOutputStream and SnappyFramedInputStream will provide the respective compression/decompression functionality you will need.

Building a new Compression Codec

CompressionCodec is the only interface absolutely necessary to implement to add a compression format to your Hadoop installation. The primary responsibilities of a CompressionCodec implementation are to produce CompressionOutputStream and CompressionInputStream objects by which data can be compressed or decompressed, respectively. All you need to do is provide an adapter between the aforementioned SnappyFramed streams and their corresponding Compression streams (see our examples OStreamDelegatingCompressorStream and IStreamDelegatingDecompressorStream). There’s no need to support Compressor or Decompressor, since MapReduce doesn’t use those by themselves — see the example for one way to deal with this.

Installing the new compression codec

So you’ve built yourself a nifty new data compression codec ready to use with your MapReduce jobs. Depending on the other Hadoop technologies in your stack, there may be a tad bit more to do before your application can fully support Snappy framing. For example, many of Conductor’s ETL systems use Oozie for workflow management and Hive for scalable data transformation. I’ll show you how to make sure that the installations for both of these technologies are able to utilize your new custom compression codec.

Installing the Compression Codec in Hive

Start by making sure that your Hive installation has access to your new code. Copy the JAR file containing your codec to the machine hosting Hive, and add its location to the HIVE_AUX_JARS_PATH variable. Now all that’s left is to tell Hive when to use the new format. Remember those configuration properties we touched on earlier? You’ll want to update the list of codecs in io.compression.codecs to include the fully qualified path of the new codec:

<property>
   <name>io.compression.codecs</name>
   <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec,conductor.hadoop.core.io.compression.SnappyFramedCodec</value>
   <final>false</final>
</property>

You’ll also need to specify that the MapReduce output compression codec (intermediate or final output) is set to use the new codec. You can do this in the individual query scripts or in the global configuration files, depending on the desired scope of your (mighty fine) compression choice:

SET mapred.child.java.opts=-Xmx2048m -XX:MaxPermSize=256M -noverify;
SET mapred.output.compression.codec=conductor.hadoop.core.io.compression.SnappyFramedCodec;
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

Installing the Compression Codec in Oozie

It’s not enough for Hive to know about your custom codec – Oozie has to be in the loop as well. Making this happen is a matter of adding the aforementioned JAR to the “libext” directory of your oozie installation. First, create the libext directory if it doesn’t exist. Next, run:

[oozie installation directory]/bin/oozie-setup.sh prepare-war

to create the WAR file containing all the code Oozie needs to do its thing. This will now include your custom codec! Restart Oozie, and everything should be ready to go.

At this point, Google’s Snappy Frame format should be available in both Hive and Oozie. You really ought to take a moment to celebrate how easy that was.

Additional Snappy Frame Goodies

  • Should you ever desire to peek at your Snappy-compressed files manually for sanity checks and such, there is a handy command-line tool you can use to compress and decompress files in the Snappy-frame format.
  • Our implementation of the Snappy Frame Compression Codec for Hadoop can be found in our growing open source library, Kangaroo.

About Tyrone

Tyrone Hinderson is a backend software engineer from New York, NY. He graduated from Columbia University in Spring ’14, and now he designs and builds scalable data ETL systems for Conductor, Inc. In his spare time, Tyrone loves to play video games at home, go indoor rock climbing in Brooklyn, read up on projects in the Hadoop ecosystem, and visit his family in his hometown of Los Angeles—among myriad other interests and activities.

Related Posts