Data Stream Processing: A Scalable Bridge from Kafka to Hadoop

You may use stream-oriented systems to parallel process real-time, time-sensitive data.  You might also use data stream processing for bulk operations with time-insensitive operations, like data analysis and persistence operations.  At Conductor, we use Kangaroo for bulk data stream processing, and we’re open sourcing it for you to use.

Why Data Stream Processing with Kafka Wasn’t Working

Last year, our team built a stream processing framework for analyzing the data we collect, using Apache Kafka to connect our network of producers and consumers. It aggregates messages related to our customers’ visibility in online search, as well as the data we mine out of the content they publish on their sites. Kafka has some interesting semantics that make it a great fit for these kinds of analytical applications – it’s distributed, replicated, and scalable; and it has the ability to support many consumers with very different performance characteristics in a highly parallel way. (For a great crash course on Kafka’s design and architecture, check out the introduction on their wiki page.)

To give you a sense of the data processing scale enabled by this technology, we are comfortably able to consume and process upwards of 5 TBs of data a week using Kafka.

Kafka is great for data stream processing, but sometimes that computing paradigm doesn’t fit the bill. For example, the topic storage provided by Kafka is ephemeral by design, and our messages age out of them after two weeks. Pieces of data we want to keep around longer get archived in our HBase data warehouse. That warehouse is a fundamental piece of our infrastructure – for some types of data, it’s the store of record for everything we’ve ever collected, and it drives many important downstream processes. Exporting Kafka messages to this store is a critical step in our pipeline.

Large volumes of individual “put” operations perform poorly in HBase, so persisting messages to our warehouse one by one from the stream wasn’t an option for us. The documentation for HBase strongly encourages loading data in bulk whenever possible, so we decided to leverage Map/Reduce and the HFileOutputFormat to archive our data as fast as possible.

Conductor’s data warehouse is a fundamental piece of our infrastructure.  It’s the record of everything we’ve ever collected, and it drives many critical downstream processes. Exporting Kafka messages to this store is a critical step in our pipeline.

Kafka and Map/Reduce – Iteration 1

The HFileOutputFormat is native to the Hadoop distribution. It comes with a Partitioner implementation that intelligently buckets data by row key into Reducer data sets that correspond to the regions in your HBase table. The output of the job is a set of HFiles, the file format that HBase uses behind the scenes for storing all row data. When the job is finished, running the completeBulkLoad method loads these HFiles into HBase and makes them part of the table.

// Create new job
final Job job = Job.getInstance(getConf(), "my_job");
job.setOutputFormatClass(HFileOutputFormat.class);

// Configure input...

// Configure HFile output
final Path hdfsTempPath = new Path("/tmp/some/path");
HFileOutputFormat.setOutputPath(job, hdfsTempPath);

final HTable targetTable = new HTable(getConf(), "my_hbase_table");
HFileOutputFormat.configureIncrementalLoad(job, targetTable);

if (job.waitForCompletion(true)) {
  // load HFiles if the job was successful
  final LoadIncrementalHFiles loadHFiles = new LoadIncrementalHFiles(getConf());
  lihf.doBulkLoad(path, targetTable);
}

targetTable.close();

We needed a way to use the contents of a Kafka topic as the input to a Map/Reduce job, and we sought an existing, open source solution to create that bridge. We found the kafka-hadoop-consumer project, which provides a Hadoop InputFormat that maps Kafka topic partitions to InputSplits; every mapper gets a topic partition from a particular Kafka broker like so:

kafka1

When a RecordReader finishes an InputSplit, it commits partition offsets to a temporary path in Zookeeper. It’s up to the client to explicitly commit all of the temporary partition offsets if and when the Map/Reduce job finishes successfully. This ensures that failed jobs don’t commit partial offsets. The diagram below shows how this process works.

kafka 2

But Iteration 1 Was Too Slow & Wasted Computing Resources

Using the kafka-hadoop-consumer InputFormat and the HFileOutputFormat, we wrote a job to periodically export recent data from our Kafka topics to our HBase warehouse. We tuned the frequency of this job such that it would export several gigabytes of data per execution, which worked well initially.  However, we soon noticed several deficiencies:

  1. If our export process ever fell behind and had to consume a larger-than-expected number of messages, it would take a much longer time to run.
  2. When inflows of data to Kafka slowed down, the job would spawn superfluous mappers that would consume nothing, unnecessarily tying up compute resources in our Hadoop cluster.

In addition, we’d begun to find more use cases for performing batch processing over data from Kafka topics, oftentimes requiring us to process the entirety of one or more topic in a single job.

Our conclusion was that our jobs couldn’t scale up or down: the InputFormat produced a constant number of InputSplits, no matter how much or little fresh data was available to consume. We needed a way to further partition our topics and produce a variable number of InputSplits based on the true number of eligible messages, which would require a new and different InputFormat.  So I decided to write one.

We needed a way to further partition our topics and produce a variable number of InputSplits based on the true number of eligible messages, which would require a new and different InputFormat.  So I decided to write one.

Kafka and Map/Reduce – Iteration 2

The fundamental insight I had was that we could partition the input data even more by exploiting the Kafka offset. We’d use offsets to split a topic partition into multiple InputSplits, allowing us to scale the number of mappers based on the size of the topic, not just the number of partitions. For every topic partition, we could ask the Kafka broker that owned it for all of the partition’s offsets. We could then add each of these queue chunks as an input to our M/R job.

The fundamental insight I had was that we could partition the input data even more by exploiting the Kafka offset.  We’d use offsets to split a topic partition into multiple InputSplits, allowing us to scale the number of mappers based on the size of the topic, not just the number of partitions.

Here’s what the new relationship between topics and splits looked like:

kafka 3

 

Kafka topic partitions are made up of a set of log files stored on the broker’s local filesystem. In the following example, my_topic has five partitions stored on kafka-broker-1.  Futhermore, partition 0 of my_topic is made up of 7 log files:

cgreen@kafka-broker-1:/var/local/kafka-logs$ ls -1p \
> # these are all of the “my_topic” partitions on this broker
my_topic-0/
my_topic-1/
my_topic-2/
my_topic-3/
my_topic-4/
cgreen@kafka-broker-1:/var/local/kafka-logs$ ls -1 my_topic-0/ \
> # these are the Kafka partition files for partition 0 of “my_topic”
00000000077354539368.kafka
00000000077891471230.kafka
00000000078428904525.kafka
00000000078965803980.kafka
00000000079503043795.kafka
00000000080040106061.kafka
00000000080577359208.kafka

Note the file names – these numbers correspond exactly to the offsets returned by kafka.consumer.SimpleConsumer. Each of these files maps to a Kafka InputSplit in our new KafkaInputFormat. This is a subtle but important enhancement – it allows the RecordReader backed by Kafka’s SimpleConsumer to jump quickly to the data at this offset without having to scan through partition files looking for an otherwise “random” offset.

The size of these partition files, and thus the size of individual input splits, is determined by the Kafka server property “log.file.size”.  In this example, “log.file.size” is set to ~512MB:

cgreen@kafka-broker-1:/var/local/kafka-logs$ ls -lh my_topic-0/ \
> # Note that each file is approximately “513M” except the most recent partition file
-rw-r--r-- 1 kafka kafka 513M Apr 21 00:19 00000000077354539368.kafka
-rw-r--r-- 1 kafka kafka 513M Apr 21 05:50 00000000077891471230.kafka
-rw-r--r-- 1 kafka kafka 513M Apr 21 11:24 00000000078428904525.kafka
-rw-r--r-- 1 kafka kafka 513M Apr 21 17:12 00000000078965803980.kafka
-rw-r--r-- 1 kafka kafka 513M Apr 22 06:44 00000000079503043795.kafka
-rw-r--r-- 1 kafka kafka 513M Apr 24 06:49 00000000080040106061.kafka
-rw-r--r-- 1 kafka kafka  16M Apr 24 10:06 00000000080577359208.kafka

This approach pins the resources allocated by the Map/Reduce job much more accurately to the volume of data the job attempts to process, increasing throughput for both the full-topic and incremental use cases. It also preserves the offset management semantics offered by kafka-hadoop-consumer. Only messages that fall within our desired offset range will be seen by a mapper, and the current offset for the job’s consumer group will be updated if-and-only-if the job succeeds.

Enhanced Data Stream Processing with Kangaroo

We’ve been running batch processing jobs over Kafka using this system for some time now, and we think other projects could realize some benefit from it as well. I’ve separated our KafkaInputFormat and its associated classes from our internal codebase and packaged it as Kangaroo. Here’s how you can use it in your own code:

You can download the source code from Github:

git clone git@github.com:Conductor/kangaroo.git

To add Kangaroo as a depencey using Maven, add the following to your pom.xml file:

<dependency>
  <groupId>com.conductor</groupId>
  <artifactId>kangaroo</artifactId>
  <version>1.1.0</version>
</dependency>

Using the KafkaInputFormat is very simple.  First, write a mapper:

public static class MyMapper extends Mapper<LongWritable, BytesWritable, KEY_OUT, VALUE_OUT> {

  @Override
  protected void map(final LongWritable key, final BytesWritable value, final Context context) throws IOException, InterruptedException {
    // do some stuff
  }
}
  • KEY_IN of the Mapper must be LongWritable – this is the Kafka topic offset of the message.
  • VALUE_IN of the Mapper must be BytesWritable – this is the raw byte content of a single Kafka message.

Then, set up your Map/Reduce job:

// Create a new job
final Job job = Job.getInstance(getConf(), "my_job");

// Set the InputFormat
job.setInputFormatClass(KafkaInputFormat.class);

// Set your Zookeeper connection string
KafkaInputFormat.setZkConnect(job, "zookeeper-1.xyz.com:2181");

// Set the topic you want to consume
KafkaInputFormat.setTopic(job, "my_topic");

// Set the consumer group associated with this job
KafkaInputFormat.setConsumerGroup(job, "my_consumer_group");

// Set the mapper that will consume the data
job.setMapperClass(MyMapper.class);

// (Optional) Only commit offsets if the job is successful
if (job.waitForCompletion(true)) {
  final ZkUtils zk = new ZkUtils(job.getConfiguration());
  zk.commit("my_consumer_group", "my_topic");
  zk.close();
}

Setting up a Map/Reduce job requires:

  1. A Zookeeper quorum
  2. A Topic name
  3. A Consumer group

We also developed MultipleKafkaInputFormat, an InputFormat that reads multiple queues in a single job.  At Conductor, we’ve utilized this to join and data mine the contents of multiple Kafka queues at once.  Using this InputFormat is also quite simple:

// Create a new job
final Job job = Job.getInstance(getConf(), "my_job");

// Set the InputFormat
job.setInputFormatClass(MultipleKafkaInputFormat.class);

// Set your Zookeeper connection string
KafkaInputFormat.setZkConnect(job, "zookeeper-1.xyz.com:2181");

// Add as many queue inputs as you'd like
MultipleKafkaInputFormat.addTopic(job, "my_first_topic", "my_consumer_group", MyMapper.class);
MultipleKafkaInputFormat.addTopic(job, "my_second_topic", "my_consumer_group", MyMapper.class);
// ...

// (Optional) Only commit offsets if the job is successful
if (job.waitForCompletion(true)) {
  final ZkUtils zk = new ZkUtils(job.getConfiguration());
  // commit the offsets for each topic
  zk.commit("my_consumer_group", "my_first_topic");
  zk.commit("my_consumer_group", "my_second_topic");
  // ...
  zk.close();
}

We also developed MultipleKafkaInputFormat, an InputFormat that reads multiple queues in a single job.  At Conductor, we’ve utilized this to join and data mine the contents of multiple Kafka queues at once.

Use Kangaroo for Efficient, Scalable Data Stream Processing

In summary, our KafkaInputFormat improves the scalability of processing Kafka queues in Hadoop.

  1. KafkaInputSplits consist of Kafka partition files, rather than an entire partition.
  2. Partition files that have been consumed are filtered out during job setup.  This ensures that no superfluous map tasks are spawned during job execution.
  3. The number of input splits scales evenly.
    1. Kafka brokers distribute topic data evenly across all partitions.
    2. Partition files have a maximum size, ensuring that a single mapper gets, at most, “log.file.size” amount of input.

Kangaroo helps Conductor maximize throughput from our Kafka installation across a number of mission-critical processes. We hope that you can get value out of it as well. To learn more about Conductor – and all of the cool distributed computing technologies we use – visit our jobs page!

About Casey Green

Casey Green is a technical lead on the Infrastructure Engineering team at Conductor.

Related Posts