Hadoop custom keys: the right way

mapreduce

I would start this post with a simple Hadoop exercise: a set of edges are given as input, even with repetitions, and you are requested to write a Hadoop MapReduce algorithm that makes the graph undirected, namely it adds the inverse edge if it's not present. This means that if the input is (where every line is the):

1 3
2 1
0 1
3 1
2 0
1 1
2 1

Then we expect to have as output

0 1
1 0
2 0
0 2
1 1
2 1
1 2
3 1
1 3

We can solve this problem with just one Mapper and one Reducer classes in the following way. Map every line to a key-value in which the key a custom EdgeWritable key that is implemented in such a way every edge is equal to its inverse (e.g. edge from node 1 to 3 is equal to edge from 3 to 1). In this way in the reduce function of the Reducer we will have every edge just once even if it's repeated multiple times or its inverse is already in the input. Now the only thing that we have to do is to print every key once (since the key is an edge) and its inverse, taking into account that edges like 1,1 have to be printed only once.

Hadoop implementation

The crucial point of the implementation is to create a custom key that follows the equal rule that we defined earlier. First of all, let's see the complete runner.

MapReduce

As you can see the code is clear to understand, there's nothing complex. We just create new EdgeWritable objects every time but it's not mandatory and it can be optimized.

EdgeWritable

The true challenge is the custom type, defined as follows.

When we create a new custom type that will be used as key in the MapReduce process we need to extend the class WritableComparable. We have to override the following methods:

  • public void write(DataOutput d) - used for serializing the object;
  • public void readFields(DataInput di) - used for deserializing the object;
  • public int compareTo(EdgeWritable given) - used for sorting keys;
  • public boolean equals(Object o) - may be used internally;
  • public int hashCode() - it is used by the Partitioner to decide to which Reducer assign the key;

The first two are easily understandble the others may also seem simple but it's not the case, I spent hours on understanding why my implementation was not working (see notWorkingCompareTo function). What I realized is that if you want to use a custom data type have to provide a consistent output with functions hashcode and compareTo in particular.

The first attempt that I made is to write a compareTo function that was only looking if the from and to parameters were equal. And that not worked. Then I found on the documentation[1] that hashCode was indispensable and I implemented it but that was not working again. At the end I realized that I could implement compareTo just by using the hashCode function and that worked. That was really strange because local tests passed with the notWorkingCompareTo, that was semantically correct.


  1. https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/WritableComparable.html ↩︎