## Pearson’s Correlation Coefficient using Apache Spark and Map Reduce

In my last two tutorials, I covered some simple mappers and reducers using Spark. Mainly, I covered filters, counters, adders and search.

In this tutorial, lets combine the same concepts from the previous posts to write a Spark implementation of the Pearson’s Correlation Coefficient. ## Sample Input

2 variables X and Y with values in comma-seperated form like :
1,2
3,4
5,6

The code is general enough to handle any given column. But we will keep it simple and assume that the file contains only 2 columns like above.

## Algorithm

For each line in the file, parse the two numbers as Java Double type
and emit the following (Key,Value) pairs :

1. (0,1) – To count the number of records
2. (1,X) – Value of X itself
3. (2,Y) – Value of Y itself
4. (3,X^2) – Square of X
5. (4,Y^2) – Square of Y
6. (5,X*Y) – Product of X and Y. This will help us compute the Dot Product of X and Y.

A single pass of the Spark mapper will finish all the heavy lifting in one awesome O(n) operation!

Next, our reducers will add up the values for each key and we will be almost done.

## Mapper and Reducer Functions

### The Mapper

In Spark style, the Mapper is a static nested class. I added a constructor so that one can supply custom values for delimiters and column numbers if needed.

``````private static class MappedValues
implements PairFlatMapFunction<String,Integer,Double> {
private int xPos = 0;
private int yPos = 1;
private String delimiter="\t";

public ValuesForCorrelation(int x, int y, String d)
{
this.xPos = x;
this.yPos = y;
this.delimiter = d;
}

public Iterable<Tuple2<Integer,Double>> call(String s)
{
Tuple2<Integer,Double>[] returnList = new Tuple2;
Double x = Double.parseDouble(s.split(delimiter)[xPos]);
Double y = Double.parseDouble(s.split(delimiter)[yPos]);
if(x != null && y != null)
{
//val data = Arrays.asList(Tuple2<int,int>(0,1));
returnList= new Tuple2(0,Double.valueOf(1));
returnList= new Tuple2(1,x);
returnList= new Tuple2(2,y);
returnList= new Tuple2(3,Math.pow(x,2));
returnList= new Tuple2(4,Math.pow(y,2));
returnList= new Tuple2(5,x*y);
}
return Arrays.asList(returnList);
}
}
``````

### The Reducer

The reducer is pretty straight forward and just adds up streams of numbers.

``````private static class AddDoublePair implements Function2<Double, Double, Double> {
/*
Given any two Floats, this method returns the sum.
Useful for a reduce operation that adds a stream of numbers.
*/
public Double call(Double a, Double b) {
return a + b;
}
}
``````

## Putting it all together

The following code simply calls the mappers and reducers.
Finally, it calculates the Pearson’s Correlation Coefficient as per the formula in the figure above.

``````public void pearsonCorrelation(JavaSparkContext sc)
{
double n,sumX,sumY,xDotY,sumXSq,sumYSq;
n = sumX = sumY = xDotY = sumXSq = sumYSq = 0.0d;

//Calling the Mapper and Reducer
JavaRDD<String> lines = sc.textFile(this.filename);
JavaPairRDD<Integer,Double> mappedValues = lines
.flatMapToPair(new ValuesForCorrelation(0, 1, ","))

List<Tuple2<Integer,Double>> output = mappedValues.collect();
for(Tuple2<Integer,Double> a:output)
{
switch (a._1())
{
case 0 : n = a._2();
break;
case 1 : sumX = a._2();
break;
case 2 : sumY = a._2();
break;
case 3 : sumXSq = a._2();
break;
case 4 : sumYSq = a._2();
break;
case 5 : xDotY = a._2();
break;
default:
System.out.println("Unexpected Value");
}
}

//The numerator
double num = n*xDotY - sumX*sumY;

//The denominator
double den = Math.sqrt(n*sumXSq-Math.pow(sumX,2)) * Math.sqrt(n*sumYSq-Math.pow(sumY,2));

double pearsonr = num/den;
System.out.println("Correlation:" + pearsonr);
}
``````

## Key concepts

This program really highlighted the power of Spark’s “Flat” mappers and the PairRDD based on Scala’s Tuple2 type.

As somebody new to Spark, I finally realized the different between the normal mapper functions like “Function” and “FlatMap”. FlatMap is best for situations where you need to return several records of the same type.

In case the record is of type Pair, Spark provides the neat PairFlatMapFunction to take advantage of both Flat Map and Pair RDDs.

## A Simple Tutorial for Apache Spark : Counting CSV columns

This is a simple tutorial for Apache Spark. The application is similar to a word counting program.

## Problem

Say, you have a huge CSV file and each row is supposed to have a fixed number of columns.

However, due to missing data, you cannot be sure of this. You want to compute the distribution of the number of fields in a given row across the entire file.

## Solution

Basic Idea :
1. Read the CSV file as a Spark textFile type.
2. Map each line/row of CSV to a tuple `(Number of fields in a row, 1)`
3. Reduce the resulting set of all tuples by key

``````SparkConf conf = new SparkConf().setAppName("My Spark App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(filename);
``````

### Functions for Map & Reduce

Before writing the actual Map and Reduce calls, we need to define the functions that perform the operations.

In spark, functions that are passed to Mappers/Reducers implement Interfaces from the package org.apache.spark.api.java.function.

A clean way to define these is in the form of Static Nested Classes.

Here is the map function that counts the number of columns in each row:

``````private static class CountCsvColumns implements PairFunction<String, Integer, Integer>
{
/*
Given a tab separated string, this returns the number of populated fields.
*/
public Tuple2<Integer, Integer> call(String s)
{
String parts[] = s.split("\t");
int columnCount = 0;
for (int i = 0; i < parts.length; i++) {
if (parts[i] != null && !parts[i].isEmpty())
columnCount += 1;
}
return new Tuple2<Integer, Integer>(columnCount, 1);
}
}
``````

Here is the reduce function that adds up the counts:

``````private static class AddIntegerPair implements Function2<Integer, Integer, Integer> {
/*
Given any two integers, this returns the sum.
Useful for a reduce operation that adds a stream of numbers.
*/
public Integer call(Integer a, Integer b) {
return a + b;
}
}
``````

### Putting it all together

``````List<Tuple2<Integer, Integer>> output = lines
.mapToPair(new CountCsvColumns())
.collect();
``````

Printing out the results:

``````for (Tuple2<Integer, Integer> tuple : output) {
System.out.println(tuple._1() + ":" + tuple._2());
}
``````

### Performance

On an AWS instance with 8 GB RAM and 2 cores, this chomped through a CSV file with 140 million rows in less than 45 minutes. Not bad, eh?

## Getting Scala Spark working with IntelliJ IDE

UPDATE : Updated the instructions for build.sbt on 1/29/2015 for Spark 1.2 and Scala 2.11.4

I have recently started using Apache Spark and it is awesome.

This is a post about some troubles that I had getting Spark(Scala) working with IntelliJ. All the links that I found on google were out of date and the process was non-trivial for somebody new to SBT/Scala like me.

So, off we go!

Note : Tested on Mac OS X 10.9.2. Also works on Yosemite.

### Install sbt

This can be installed like this-

brew install sbt

I was new to sbt myself and read up a bit about it here : http://www.scala-sbt.org/documentation.html

### Choosing a version of Scala

It is not enough to install the latest version of Scala. Make sure that the scala version matches that of the Spark builds in the Maven repository. These repositories are where sbt will actually download spark from.

To do this, go to http://search.maven.org and search for spark-core and spark-streaming. Find the latest entry by date.

Notice 3 things here for both the files:
1. GroupId
2. ArtifactId

In my case, these were org.apache.spark , spark-core_2.11 and 1.2.0, respectively.
The suffix on the ArtifactId tells you which Scala version you should install.

So, spark-core_2.11 means that you need Scala 2.11.X.

### Install Scala

Now, we can be sure that the Scala version being installed is the correct one. Do the following :

brew install scala

brew switch scala 2.11.4

Dont worry too much about the exact version. If you type in just 2.10, brew will automatically list the closest available formula.

### Install IntelliJ and Scala plugin

I guess you alreayd have the IDE. Otherwise, download IntelliJ Community Edition. I am
using v13.X.
Once done, install the Scala plugin from the Plugins menu (Preferences->Plugins)
Once done, make sure that Scala is being correctly detected in IntelliJ preferences.

### Create a Scala Project

Create a new Scala Project in IntelliJ.

### Setting up sbt

Make sure that the following two files exist and have the following contents:

~/.sbt/{sbt version}/plugins/build.sbt

resolvers += “Sonatype snapshots” at “http://oss.sonatype.org/content/repositories/snapshots/”

NOTE: If you face any errors, confirm these two lines are updated to the latest version numbers. For that, check https://github.com/mpeltonen/sbt-idea

\$PROJECT_DIR/build.sbt

scalaVersion := “2.11.0”

libraryDependencies += “org.apache.spark” %% “spark-core” % “1.2.0”

libraryDependencies += “org.apache.spark” %% “spark-streaming” % “1.2.0”

resolvers ++= Seq(
“Akka Repository” at “http://repo.akka.io/releases/”,
“Spray Repository” at “http://repo.spray.cc/”)

NOTE: The Scala version must match the version supplied to Brew earlier.

NOTE: The version numbers of spark-core and spark-streaming should exactly match the “Latest Version” field on the Maven Repository.

### Almost done

In \$PROJECT_DIR,

sbt update

sbt

Inside the sbt shell, run gen-idea

You are all set for Scala Spark development using IntelliJ IDE. Have fun!

## Data Munging : Portable Sparse Arrays

I came across this problem recently while working on a machine learning project. It turns out that while at first glance it might appear simple, it takes some work to get it working properly.

Hopefully somebody that needs to do something similar will find this guide useful.

### Some Background

I had a large text file with several thousand feature vectors. Each row/vector was of the format :

``````Label Feature1:Value1 Feature2:Value2 ... FeatureN:ValueN
``````

I also had some Python and MATLAB programs which needed the above representation to be converted to 2 matrices :

1. X – this matrix has the feature vectors
2. Y – a column vector containing the labels

### Constraints

The X matrix could have upto a million features. Therefore, X needed to be sparse.

### Problem

Convert Text File -> A portable format readable by Python or MATLAB.

This can be accomplished using the Matrix Market I/O Format(http://math.nist.gov/MatrixMarket/)

Python does have a savemat and loadmat function. But the .mtx file is supposed to be readable across different languages and does this job quite easily.

### The code

The necessary imports

``````import numpy as np
from scipy import sparse
import scipy.io as sio
``````

Read from the file with the features

``````f=open("filename.txt")
f.close()
``````

Initialize our X and Y arrays

``````X = sparse.lil_matrix((<Number of rows>,<Maximum number of features))
Y = np.zeros((<Number of rows>,1))
``````

The meat of the script

``````for i in range(len(lines)):
line=lines[i].strip()
label = int(line.split(' '))
entries = line.split(' ')[1:]
Y[i,0]=label
for entry in entries:
feature = int(entry.split(':'))
value = int(entry.split(':'))
X[i,feature]=value
``````

Saving the matrices to a file

``````sio.mmwrite('X.mtx',X)
sio.mmwrite('Y.mtx',Y)
``````

Notice that mmwrite() command has no problem with the X matrix being a sparse one(lil_matrix or Row-based linked list sparse matrix to be more precise)

#### In Python

This can be done easily using the mmread command:

``````sio.mmread('X.mtx') #Or any other mtx file
``````

#### In MATLAB

This part is not quite clear in a cursory google search. You need to download the *.m MATLAB files from here : http://math.nist.gov/MatrixMarket/mmio/matlab/mmiomatlab.html

Remember to place them in your current working directory in MATLAB. After that,

``````[X, rows, cols, entries] = mmread('X.mtx');
``````

### Few notes on performance

1. The file that I used had a matrix of around 4000 rows and 700,000 columns and approximately 25% sparsity. The Scipy mmwrite function generated a 603 MB file.

2. After reading into MATLAB, I used the inbuilt save function. The resulting .mat file had a size of only 40 MB. I guess MATLAB’s file format is highly optimized for sparse arrays.