Pearson’s Correlation Coefficient using Apache Spark and Map Reduce

In my last two tutorials, I covered some simple mappers and reducers using Spark. Mainly, I covered filters, counters, adders and search.

In this tutorial, lets combine the same concepts from the previous posts to write a Spark implementation of the Pearson’s Correlation Coefficient.

Screenshot 2014-08-11 16.00.29

 

Sample Input

2 variables X and Y with values in comma-seperated form like :
1,2
3,4
5,6

The code is general enough to handle any given column. But we will keep it simple and assume that the file contains only 2 columns like above.

Algorithm

For each line in the file, parse the two numbers as Java Double type
and emit the following (Key,Value) pairs :

  1. (0,1) – To count the number of records
  2. (1,X) – Value of X itself
  3. (2,Y) – Value of Y itself
  4. (3,X^2) – Square of X
  5. (4,Y^2) – Square of Y
  6. (5,X*Y) – Product of X and Y. This will help us compute the Dot Product of X and Y.

A single pass of the Spark mapper will finish all the heavy lifting in one awesome O(n) operation!

Next, our reducers will add up the values for each key and we will be almost done.

Mapper and Reducer Functions

The Mapper

In Spark style, the Mapper is a static nested class. I added a constructor so that one can supply custom values for delimiters and column numbers if needed.

private static class MappedValues
        implements PairFlatMapFunction<String,Integer,Double> {
    private int xPos = 0;
    private int yPos = 1;
    private String delimiter="\t";

    public ValuesForCorrelation(int x, int y, String d)
    {
        this.xPos = x;
        this.yPos = y;
        this.delimiter = d;
    }

    public Iterable<Tuple2<Integer,Double>> call(String s)
    {
        Tuple2<Integer,Double>[] returnList = new Tuple2[6];
        Double x = Double.parseDouble(s.split(delimiter)[xPos]);
        Double y = Double.parseDouble(s.split(delimiter)[yPos]);
        if(x != null && y != null)
        {
            //val data = Arrays.asList(Tuple2<int,int>(0,1));
            returnList[0]= new Tuple2(0,Double.valueOf(1));
            returnList[1]= new Tuple2(1,x); 
            returnList[2]= new Tuple2(2,y); 
            returnList[3]= new Tuple2(3,Math.pow(x,2)); 
            returnList[4]= new Tuple2(4,Math.pow(y,2)); 
            returnList[5]= new Tuple2(5,x*y);
        }
        return Arrays.asList(returnList);
    }
}

The Reducer

The reducer is pretty straight forward and just adds up streams of numbers.

private static class AddDoublePair implements Function2<Double, Double, Double> {
    /*
    Given any two Floats, this method returns the sum.
    Useful for a reduce operation that adds a stream of numbers.
     */
    public Double call(Double a, Double b) {
        return a + b;
    }
}

Putting it all together

The following code simply calls the mappers and reducers.
Finally, it calculates the Pearson’s Correlation Coefficient as per the formula in the figure above.

public void pearsonCorrelation(JavaSparkContext sc)
{
    double n,sumX,sumY,xDotY,sumXSq,sumYSq;
    n = sumX = sumY = xDotY = sumXSq = sumYSq = 0.0d;

    //Calling the Mapper and Reducer
    JavaRDD<String> lines = sc.textFile(this.filename);
    JavaPairRDD<Integer,Double> mappedValues = lines
            .flatMapToPair(new ValuesForCorrelation(0, 1, ","))
            .reduceByKey(new AddDoublePair());

    List<Tuple2<Integer,Double>> output = mappedValues.collect();
    for(Tuple2<Integer,Double> a:output)
    {
        switch (a._1())
        {
            case 0 : n = a._2();
                break;
            case 1 : sumX = a._2();
                break;
            case 2 : sumY = a._2();
                break;
            case 3 : sumXSq = a._2();
                break;
            case 4 : sumYSq = a._2();
                break;
            case 5 : xDotY = a._2();
                break;
            default:
                System.out.println("Unexpected Value");
        }
    }

    //The numerator
    double num = n*xDotY - sumX*sumY;

    //The denominator
    double den = Math.sqrt(n*sumXSq-Math.pow(sumX,2)) * Math.sqrt(n*sumYSq-Math.pow(sumY,2));

    double pearsonr = num/den;
    System.out.println("Correlation:" + pearsonr);
}

Key concepts

This program really highlighted the power of Spark’s “Flat” mappers and the PairRDD based on Scala’s Tuple2 type.

As somebody new to Spark, I finally realized the different between the normal mapper functions like “Function” and “FlatMap”. FlatMap is best for situations where you need to return several records of the same type.

In case the record is of type Pair, Spark provides the neat PairFlatMapFunction to take advantage of both Flat Map and Pair RDDs.

Hope you found this post informative and helpful in your adventures with Spark.

A Simple Tutorial for Apache Spark : Counting CSV columns

This is a simple tutorial for Apache Spark. The application is similar to a word counting program.

Problem

Say, you have a huge CSV file and each row is supposed to have a fixed number of columns.

However, due to missing data, you cannot be sure of this. You want to compute the distribution of the number of fields in a given row across the entire file.

Solution

Basic Idea :
1. Read the CSV file as a Spark textFile type.
2. Map each line/row of CSV to a tuple (Number of fields in a row, 1)
3. Reduce the resulting set of all tuples by key

Read the file

SparkConf conf = new SparkConf().setAppName("My Spark App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(filename);

Functions for Map & Reduce

Before writing the actual Map and Reduce calls, we need to define the functions that perform the operations.

In spark, functions that are passed to Mappers/Reducers implement Interfaces from the package org.apache.spark.api.java.function.

A clean way to define these is in the form of Static Nested Classes.

Here is the map function that counts the number of columns in each row:

private static class CountCsvColumns implements PairFunction<String, Integer, Integer>
{
    /*
    Given a tab separated string, this returns the number of populated fields.
     */
    public Tuple2<Integer, Integer> call(String s)
    {
        String parts[] = s.split("\t");
        int columnCount = 0;
        for (int i = 0; i < parts.length; i++) {
            if (parts[i] != null && !parts[i].isEmpty())
                columnCount += 1;
        }
        return new Tuple2<Integer, Integer>(columnCount, 1);
    }
}    

Here is the reduce function that adds up the counts:

private static class AddIntegerPair implements Function2<Integer, Integer, Integer> {
    /*
    Given any two integers, this returns the sum.
    Useful for a reduce operation that adds a stream of numbers.
     */
    public Integer call(Integer a, Integer b) {
        return a + b;
    }
}

Putting it all together

List<Tuple2<Integer, Integer>> output = lines
            .mapToPair(new CountCsvColumns())
            .reduceByKey(new AddIntegerPair())
            .collect();

Printing out the results:

for (Tuple2<Integer, Integer> tuple : output) {
        System.out.println(tuple._1() + ":" + tuple._2());
    }

Performance

On an AWS instance with 8 GB RAM and 2 cores, this chomped through a CSV file with 140 million rows in less than 45 minutes. Not bad, eh?