Pearson’s Correlation Coefficient using Apache Spark and Map Reduce

In my last two tutorials, I covered some simple mappers and reducers using Spark. Mainly, I covered filters, counters, adders and search.

In this tutorial, lets combine the same concepts from the previous posts to write a Spark implementation of the Pearson’s Correlation Coefficient.

Screenshot 2014-08-11 16.00.29

 

Sample Input

2 variables X and Y with values in comma-seperated form like :
1,2
3,4
5,6

The code is general enough to handle any given column. But we will keep it simple and assume that the file contains only 2 columns like above.

Algorithm

For each line in the file, parse the two numbers as Java Double type
and emit the following (Key,Value) pairs :

  1. (0,1) – To count the number of records
  2. (1,X) – Value of X itself
  3. (2,Y) – Value of Y itself
  4. (3,X^2) – Square of X
  5. (4,Y^2) – Square of Y
  6. (5,X*Y) – Product of X and Y. This will help us compute the Dot Product of X and Y.

A single pass of the Spark mapper will finish all the heavy lifting in one awesome O(n) operation!

Next, our reducers will add up the values for each key and we will be almost done.

Mapper and Reducer Functions

The Mapper

In Spark style, the Mapper is a static nested class. I added a constructor so that one can supply custom values for delimiters and column numbers if needed.

private static class MappedValues
        implements PairFlatMapFunction<String,Integer,Double> {
    private int xPos = 0;
    private int yPos = 1;
    private String delimiter="\t";

    public ValuesForCorrelation(int x, int y, String d)
    {
        this.xPos = x;
        this.yPos = y;
        this.delimiter = d;
    }

    public Iterable<Tuple2<Integer,Double>> call(String s)
    {
        Tuple2<Integer,Double>[] returnList = new Tuple2[6];
        Double x = Double.parseDouble(s.split(delimiter)[xPos]);
        Double y = Double.parseDouble(s.split(delimiter)[yPos]);
        if(x != null && y != null)
        {
            //val data = Arrays.asList(Tuple2<int,int>(0,1));
            returnList[0]= new Tuple2(0,Double.valueOf(1));
            returnList[1]= new Tuple2(1,x); 
            returnList[2]= new Tuple2(2,y); 
            returnList[3]= new Tuple2(3,Math.pow(x,2)); 
            returnList[4]= new Tuple2(4,Math.pow(y,2)); 
            returnList[5]= new Tuple2(5,x*y);
        }
        return Arrays.asList(returnList);
    }
}

The Reducer

The reducer is pretty straight forward and just adds up streams of numbers.

private static class AddDoublePair implements Function2<Double, Double, Double> {
    /*
    Given any two Floats, this method returns the sum.
    Useful for a reduce operation that adds a stream of numbers.
     */
    public Double call(Double a, Double b) {
        return a + b;
    }
}

Putting it all together

The following code simply calls the mappers and reducers.
Finally, it calculates the Pearson’s Correlation Coefficient as per the formula in the figure above.

public void pearsonCorrelation(JavaSparkContext sc)
{
    double n,sumX,sumY,xDotY,sumXSq,sumYSq;
    n = sumX = sumY = xDotY = sumXSq = sumYSq = 0.0d;

    //Calling the Mapper and Reducer
    JavaRDD<String> lines = sc.textFile(this.filename);
    JavaPairRDD<Integer,Double> mappedValues = lines
            .flatMapToPair(new ValuesForCorrelation(0, 1, ","))
            .reduceByKey(new AddDoublePair());

    List<Tuple2<Integer,Double>> output = mappedValues.collect();
    for(Tuple2<Integer,Double> a:output)
    {
        switch (a._1())
        {
            case 0 : n = a._2();
                break;
            case 1 : sumX = a._2();
                break;
            case 2 : sumY = a._2();
                break;
            case 3 : sumXSq = a._2();
                break;
            case 4 : sumYSq = a._2();
                break;
            case 5 : xDotY = a._2();
                break;
            default:
                System.out.println("Unexpected Value");
        }
    }

    //The numerator
    double num = n*xDotY - sumX*sumY;

    //The denominator
    double den = Math.sqrt(n*sumXSq-Math.pow(sumX,2)) * Math.sqrt(n*sumYSq-Math.pow(sumY,2));

    double pearsonr = num/den;
    System.out.println("Correlation:" + pearsonr);
}

Key concepts

This program really highlighted the power of Spark’s “Flat” mappers and the PairRDD based on Scala’s Tuple2 type.

As somebody new to Spark, I finally realized the different between the normal mapper functions like “Function” and “FlatMap”. FlatMap is best for situations where you need to return several records of the same type.

In case the record is of type Pair, Spark provides the neat PairFlatMapFunction to take advantage of both Flat Map and Pair RDDs.

Hope you found this post informative and helpful in your adventures with Spark.

Data Munging : Portable Sparse Arrays

I came across this problem recently while working on a machine learning project. It turns out that while at first glance it might appear simple, it takes some work to get it working properly.

Hopefully somebody that needs to do something similar will find this guide useful.

Some Background

I had a large text file with several thousand feature vectors. Each row/vector was of the format :

Label Feature1:Value1 Feature2:Value2 ... FeatureN:ValueN

I also had some Python and MATLAB programs which needed the above representation to be converted to 2 matrices :

  1. X – this matrix has the feature vectors
  2. Y – a column vector containing the labels

Constraints

The X matrix could have upto a million features. Therefore, X needed to be sparse.

Problem

Convert Text File -> A portable format readable by Python or MATLAB.

This can be accomplished using the Matrix Market I/O Format(http://math.nist.gov/MatrixMarket/)

Python does have a savemat and loadmat function. But the .mtx file is supposed to be readable across different languages and does this job quite easily.

The code

The necessary imports

import numpy as np
from scipy import sparse
import scipy.io as sio

Read from the file with the features

f=open("filename.txt")
lines=f.readlines()
f.close()

Initialize our X and Y arrays

X = sparse.lil_matrix((<Number of rows>,<Maximum number of features))
Y = np.zeros((<Number of rows>,1))

The meat of the script

for i in range(len(lines)):
    line=lines[i].strip()
    label = int(line.split(' ')[0])
    entries = line.split(' ')[1:]
    Y[i,0]=label
    for entry in entries:
        feature = int(entry.split(':')[0])
        value = int(entry.split(':')[1])
        X[i,feature]=value

Saving the matrices to a file

sio.mmwrite('X.mtx',X)
sio.mmwrite('Y.mtx',Y)

Notice that mmwrite() command has no problem with the X matrix being a sparse one(lil_matrix or Row-based linked list sparse matrix to be more precise)

Reading the mtx file

In Python

This can be done easily using the mmread command:

sio.mmread('X.mtx') #Or any other mtx file

In MATLAB

This part is not quite clear in a cursory google search. You need to download the *.m MATLAB files from here : http://math.nist.gov/MatrixMarket/mmio/matlab/mmiomatlab.html

Remember to place them in your current working directory in MATLAB. After that,

[X, rows, cols, entries] = mmread('X.mtx');

Few notes on performance

  1. The file that I used had a matrix of around 4000 rows and 700,000 columns and approximately 25% sparsity. The Scipy mmwrite function generated a 603 MB file.

  2. After reading into MATLAB, I used the inbuilt save function. The resulting .mat file had a size of only 40 MB. I guess MATLAB’s file format is highly optimized for sparse arrays.