Pairwise Comparisons of Large Datasets

It’s been a while since I last posted. Work’s been busy, interesting, challenging :) But now it’s the holidays and I have some time to write.

At work we’ve been building a small team around Big Data technologies; specifically Hadoop and Elasticsearch right now though those choices may change. Unlike many Big Data projects we’re not de-normalising our data for a specific application. We have several different applications and thoughts in mind so we want to keep options open. We’re working with graph-based data structures; Linked Data, essentially.

The first product we’re building is niche and the community of users are quite private about how they do business so as I’ve said before I won’t be talking much about that. That sounded kinda creepy 8-| they’re not the mafia, they’re really nice people!

What I can share with you is a little technique we’ve developed for doing pairwise comparisons in map/reduce.

We all know map/reduce is a great way to solve some kinds of problems and Hadoop is a great implementation that allows us to scale map/reduce solutions across many machines. One of the class of problems that is hard to do is pairwise comparisons. Let me first describe what I mean by a pairwise comparison…

Imagine you have a collection of documents. You want to know which ones are similar to which others. One way to do this is to compare every document with every other document and give the connection between them a similarity score. That is hard to do With a large collection of documents because of the number of comparisons – the problem is O(n²). Specifically, if we assume we don’t compare documents with themselves and that ɑ compared with β is the same as β compared with ɑ then the number of comparisons is (n²-n)/2.

If you want to scale this out across a cluster the specific difficulty is knowing what to compare next and what’s already been done. Most approaches I’ve seen use some central coordinator and require that every box in the cluster can access some central document store. Those cause more problems for very large sets.

Other approaches rely on re-defining the problem. One approach is to create some kind of initial grouping based on an attribute such as a subject classification and then only compare within those those groupings. That’s a great approach and is often very suitable. Another approach is to generate some kind of compound key describing the document and then connect all documents with the same key. That’s a great approach and means each document can have a key generated independently of the others. That scales really well but is not always possible.

What if we really do want to compare everything with everything else? That’s the situation I’ve been looking at.

Let’s simplify the example a little. We’ll use the words of the phonetic alphabet, alpha to zulu, to represent our set of documents:

Alpha Bravo Charlie Delta Echo Foxtrot Golf Hotel India Juliet Kilo Lima Mike November Oscar Papa Quebec Romeo Sierra Tango Uniform Victor Whiskey X-ray Yankee Zulu

A pairwise comparison can be viewed as a table with the same terms heading both rows and columns. This gives us a way of thinking about the workload. The smallest unit we can package as a piece of work is a cell in the table; the similarity score for which would be the comparison of the row and column headings.

Alpha Bravo Charlie Yankee Zulu
Alpha
Bravo
Charlie
Yankee
Zulu

The cells we need to calculate are highlighted in green. Using the cell as the unit of work is nice and simple – compare the similarity of two things – so being able to work at this level would be great. Thinking about map/reduce, the pair and their similarity score is the final result we’re looking for, so could be the output of the reducer code. That leaves us with the mapper to create pairs.

A simplistic approach to the mapper creating pairs would be to iterate all of the values:

Receiving ‘Alpha’ as input:
1) read ‘Alpha’ and ignore it
2) read ‘Bravo’ and output ‘Alpha, Bravo’
3) read ‘Charlie’ and output ‘Alpha, Charlie’

25) read ‘Yankee’ and output ‘Alpha, Yankee’
26) read ‘Zulu’ and output ‘Alpha, Zulu’

This is not a good approach it means the mapper will need to read all of the values for each input value. Remember that we can’t assume that the set will fit in memory, so can’t keep a full set inside each mapper to iterate quickly. The reading of values is then O(n²). The mapper has to do this in order to generate the pairs that will then be compared by the reducer. With this approach the mapper requires access to the full set of input values each time it processes. So, we’ve managed to remove the need for a central coordinator but not for a centrally accessible store.

What we need to find is a way of generating pairs without having to iterate the full input set multiple times. Our mental model of a table gives us a possible solution for that — coordinates. If we could generate pairs of values using coordinates as keys then the sort that occurs between the map and reduce will bring together pairs of values at the same coordinate — a coordinate identifying a cell:

1 2 3 25 26
Alpha Bravo Charlie Yankee Zulu
1 Alpha
2 Bravo
3 Charlie
25 Yankee
26 Zulu

This changes what our mapper needs to know. Rather than having to know every other value we need to know our position and every other coordinate. If we use sequential, incremented, values for the coords then we don’t need to query for those, we can simply calculate them. To do that, the mapper needs to know the row/column number of the current value it’s been given and the total number of rows/columns in the square. The total can be passed in as part of the job configuration.

Getting the position of our value within the input sequence is a little tricky. The TextInputFormat reads input files line by line and passes each line to the mapper. If the key it passed to the mapper were the line number that would make this problem very easy to solve. Unfortunately it passes the byte offset within the file. One way to know the position, then, would be to use fixed-lengths for the values. That way the byte offset divided by the fixed length would calculate the position. Alternatively we could pre-process the file and create a file of the form ‘1 [tab] Alpha’ to provide the position explicitly. This requires that we perform a single-threaded pass over the entire input set to generate an incrementing position number — not ideal.

It also means that if your comparison takes less time than creating a position-indexed file then this approach won’t be useful to you. In our case it is useful.

The mapper logic for a coordinate approach becomes:

1) read ‘Alpha’
2) output ‘Alpha’ to the coordinates of cells where it should be compared.

A naive implementation of this would output ‘Alpha’ to cells 1,1 to 26,1 for the top row and 1,1 to 1,26 for the left most column. That would create a grid n² but we know we can optimise that to (n²-n)/2 in which case Alpha would be output to cells 1,2 to 1,26 only; the green cells in our example. A middle-position value, Lima, would be output on 1,12 to 11,12 and 11,13 to 11,26. This means the mappers only have to pass over the input values a single time – O(n).

in code:

public class PairwiseMap extends Mapper<Text, Text, Text, Text> {

private static void output_rows(int row, Text name, Context context) throws IOException, InterruptedException {

for (int col = 1; col < row; col++) {
 String key = String.format("%d,%d", row, col);
 context.write(new Text(key), name);
 }
}

private static void output_cols(int wordPosition, Text name, int total, Context context) throws IOException, InterruptedException {
 int column = wordPosition;
 for (int row = wordPosition + 1; row <= total; row++) {
  String key = String.format("%d,%d", row, column);
  context.write(new Text(key), name);
 }
}
@Override
 protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
  int total = Integer.parseInt(context.getConfiguration().get("TotalInputValues"));
  int line = Integer.parseInt(new String(key.getBytes()));
  output_rows(line, value, context);
  output_cols(line, value, total, context);
 }
}

This solution is effective but the pre-processing and the need to know the total are both frustrating limitations.

I can’t think of a better way to get the position, either with input files in HDFS or with rows in a HBase table. If you have a super-clever way to know the position of a value in a sequence that would help a lot. Maybe a custom HBase input format might be a possibility.

Any suggestions for improvements would be gratefully received :)

 

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>