# Pearson’s Correlation Coefficient using Apache Spark and Map Reduce

In my last two tutorials, I covered some simple mappers and reducers using Spark. Mainly, I covered filters, counters, adders and search.

In this tutorial, lets combine the same concepts from the previous posts to write a Spark implementation of the Pearson’s Correlation Coefficient. ## Sample Input

2 variables X and Y with values in comma-seperated form like :
1,2
3,4
5,6

The code is general enough to handle any given column. But we will keep it simple and assume that the file contains only 2 columns like above.

## Algorithm

For each line in the file, parse the two numbers as Java Double type
and emit the following (Key,Value) pairs :

1. (0,1) – To count the number of records
2. (1,X) – Value of X itself
3. (2,Y) – Value of Y itself
4. (3,X^2) – Square of X
5. (4,Y^2) – Square of Y
6. (5,X*Y) – Product of X and Y. This will help us compute the Dot Product of X and Y.

A single pass of the Spark mapper will finish all the heavy lifting in one awesome O(n) operation!

Next, our reducers will add up the values for each key and we will be almost done.

## Mapper and Reducer Functions

### The Mapper

In Spark style, the Mapper is a static nested class. I added a constructor so that one can supply custom values for delimiters and column numbers if needed.

``````private static class MappedValues
implements PairFlatMapFunction<String,Integer,Double> {
private int xPos = 0;
private int yPos = 1;
private String delimiter="\t";

public ValuesForCorrelation(int x, int y, String d)
{
this.xPos = x;
this.yPos = y;
this.delimiter = d;
}

public Iterable<Tuple2<Integer,Double>> call(String s)
{
Tuple2<Integer,Double>[] returnList = new Tuple2;
Double x = Double.parseDouble(s.split(delimiter)[xPos]);
Double y = Double.parseDouble(s.split(delimiter)[yPos]);
if(x != null && y != null)
{
//val data = Arrays.asList(Tuple2<int,int>(0,1));
returnList= new Tuple2(0,Double.valueOf(1));
returnList= new Tuple2(1,x);
returnList= new Tuple2(2,y);
returnList= new Tuple2(3,Math.pow(x,2));
returnList= new Tuple2(4,Math.pow(y,2));
returnList= new Tuple2(5,x*y);
}
return Arrays.asList(returnList);
}
}
``````

### The Reducer

The reducer is pretty straight forward and just adds up streams of numbers.

``````private static class AddDoublePair implements Function2<Double, Double, Double> {
/*
Given any two Floats, this method returns the sum.
Useful for a reduce operation that adds a stream of numbers.
*/
public Double call(Double a, Double b) {
return a + b;
}
}
``````

## Putting it all together

The following code simply calls the mappers and reducers.
Finally, it calculates the Pearson’s Correlation Coefficient as per the formula in the figure above.

``````public void pearsonCorrelation(JavaSparkContext sc)
{
double n,sumX,sumY,xDotY,sumXSq,sumYSq;
n = sumX = sumY = xDotY = sumXSq = sumYSq = 0.0d;

//Calling the Mapper and Reducer
JavaRDD<String> lines = sc.textFile(this.filename);
JavaPairRDD<Integer,Double> mappedValues = lines
.flatMapToPair(new ValuesForCorrelation(0, 1, ","))

List<Tuple2<Integer,Double>> output = mappedValues.collect();
for(Tuple2<Integer,Double> a:output)
{
switch (a._1())
{
case 0 : n = a._2();
break;
case 1 : sumX = a._2();
break;
case 2 : sumY = a._2();
break;
case 3 : sumXSq = a._2();
break;
case 4 : sumYSq = a._2();
break;
case 5 : xDotY = a._2();
break;
default:
System.out.println("Unexpected Value");
}
}

//The numerator
double num = n*xDotY - sumX*sumY;

//The denominator
double den = Math.sqrt(n*sumXSq-Math.pow(sumX,2)) * Math.sqrt(n*sumYSq-Math.pow(sumY,2));

double pearsonr = num/den;
System.out.println("Correlation:" + pearsonr);
}
``````

## Key concepts

This program really highlighted the power of Spark’s “Flat” mappers and the PairRDD based on Scala’s Tuple2 type.

As somebody new to Spark, I finally realized the different between the normal mapper functions like “Function” and “FlatMap”. FlatMap is best for situations where you need to return several records of the same type.

In case the record is of type Pair, Spark provides the neat PairFlatMapFunction to take advantage of both Flat Map and Pair RDDs.