chris bons

Python frameworks with Hadoop

Discussion created by chris bons on Dec 8, 2017
Latest reply on Feb 27, 2018 by gracy layla

In this post, We will Show how Python frameworks that exist for working with Hadoop, including:

  • Hadoop Streaming
  • mrjob
  • dumbo
  • hadoopy
  • pydoop
  • and others

Ultimately, in my analysis, Hadoop Streaming is the fastest and most transparent option, and the best one for text processing. mrjob is best for rapidly working on Amazon EMR, but incurs a significant performance penalty. dumbo is convenient for more complex jobs (objects as keys; multistep MapReduce) without incurring as much overhead as mrjob, but it’s still slower than Streaming.

Read on for implementation details, performance comparisons, and feature comparisons.

Toy Problem Definition

To test out the different frameworks, we will not be doing “word count”. Instead, we will be transforming the Google Books Ngram data. An n-gram is a synonym for a tuple of n words. The n-gram data set provides counts for every single 1-, 2-, 3-, 4-, and 5-gram observed in the Google Books corpus grouped by year. Each row in the n-gram data set is composed of 3 fields: the n-gram, the year, and the number of observations. (You can explore the data interactively here.)

We would like to aggregate the data to count the number of times any pair of words are observed near each other, grouped by year. This would allow us to determine if any pair of words are statistically near each other more often than we would expect by chance. Two words are “near” if they are observed within 4 words of each other. Or equivalently, two words are near each other if they appear together in any 2-, 3-, 4-, or 5-gram. So a row in the resulting data set would be comprised of a 2-gram, a year, and a count.

There is one subtlety that must be addressed. The n-gram data set for each value of n is computed across the whole Google Books corpus. In principle, given the 5-gram data set, I could compute the 4-, 3-, and 2-gram data sets simply by aggregating over the correct n-grams. For example, if the 5-gram data set contains

  1. (the,cat,in,the,hat)199920
  2. (the,cat,is,on,youtube)199913
  3. (how,are,you,doing,today)19865000

then we could aggregate this into 2-grams which would result in records like

  1. (the,cat)199933// i.e., 20 + 13

However, in practice, Google only includes an n-gram if it is observed more than 40 times across the entire corpus. So while a particular 5-gram may be too rare to meet the 40-occurrence threshold, the 2-grams it is composed of may be common enough to break the threshold in the Google-supplied 2-gram data. For this reason, we use the 2-gram data for words that are next to each other, the 3-gram data for pairs of words that are separated by one word, the 4-gram data for pairs of words that are separated by 2 words, etc. In other words, given the 2-gram data, the only additional information the 3-gram data provide are the outermost words of the 3-gram. In addition to being more sensitive to potentially rare n-grams, using only the outermost words of the n-grams helps ensure we avoid double counting. In total, we will be running our computation on the combination of 2-, 3-, 4-, and 5-gram data sets.

The MapReduce pseudocode to implement this solution would look like so:

  1. def map(record):
  2. (ngram,year,count)=unpack(record)
  3. // ensure word1 has the lexicographically first word:
  4. (word1,word2)=sorted(ngram[first],ngram[last])
  5. key=(word1,word2,year)
  6. emit(key,count)
  7. def reduce(key,values):
  8. emit(key,sum(values))


These MapReduce jobs are executed on a ~20 GB random subset of the data. The full data set is split across 1500 files; we select a random subset of the files using this script. The filenames remain intact, which is important because the filename identifies the value of n in the n-grams for that chunk of data.

The Hadoop cluster comprises five virtual nodes running CentOS 6.2 x64, each with 4 CPUs, 10 GB RAM, 100 GB disk, running CDH4. The cluster can execute 20 maps at a time, and each job is set to run with 10 reducers.

The software versions I worked with on the cluster were as follows:

  • Hadoop: 2.0.0-cdh4.1.2
  • Python: 2.6.6
  • mrjob: 0.4-dev
  • dumbo: 0.21.36
  • hadoopy: 0.6.0
  • pydoop: 0.7 (PyPI) and the latest version on git repository
  • Java: 1.6


Most of the Python frameworks wrap Hadoop Streaming, while others wrap Hadoop Pipes or implement their own alternatives. Below, I will discuss my experience with a number of tools for using Python to write Hadoop jobs, along with a final comparison of performance and features. One of the features I am interested in is the ease of getting up and running, so I did not attempt to optimize the performance of the individual packages.

As with every large data set, there are bad records. We check for a few kinds of errors in each record including missing fields and wrong n-gram size. For the latter case, we must know the name of the file that is being processed in order to determine the expected n-gram size.