Pearson’s Correlation Coefficient using Apache Spark and Map Reduce

In my last two tutorials, I covered some simple mappers and reducers using Spark. Mainly, I covered filters, counters, adders and search.

In this tutorial, lets combine the same concepts from the previous posts to write a Spark implementation of the Pearson’s Correlation Coefficient.

Screenshot 2014-08-11 16.00.29

 

Sample Input

2 variables X and Y with values in comma-seperated form like :
1,2
3,4
5,6

The code is general enough to handle any given column. But we will keep it simple and assume that the file contains only 2 columns like above.

Algorithm

For each line in the file, parse the two numbers as Java Double type
and emit the following (Key,Value) pairs :

  1. (0,1) – To count the number of records
  2. (1,X) – Value of X itself
  3. (2,Y) – Value of Y itself
  4. (3,X^2) – Square of X
  5. (4,Y^2) – Square of Y
  6. (5,X*Y) – Product of X and Y. This will help us compute the Dot Product of X and Y.

A single pass of the Spark mapper will finish all the heavy lifting in one awesome O(n) operation!

Next, our reducers will add up the values for each key and we will be almost done.

Mapper and Reducer Functions

The Mapper

In Spark style, the Mapper is a static nested class. I added a constructor so that one can supply custom values for delimiters and column numbers if needed.

private static class MappedValues
        implements PairFlatMapFunction<String,Integer,Double> {
    private int xPos = 0;
    private int yPos = 1;
    private String delimiter="\t";

    public ValuesForCorrelation(int x, int y, String d)
    {
        this.xPos = x;
        this.yPos = y;
        this.delimiter = d;
    }

    public Iterable<Tuple2<Integer,Double>> call(String s)
    {
        Tuple2<Integer,Double>[] returnList = new Tuple2[6];
        Double x = Double.parseDouble(s.split(delimiter)[xPos]);
        Double y = Double.parseDouble(s.split(delimiter)[yPos]);
        if(x != null && y != null)
        {
            //val data = Arrays.asList(Tuple2<int,int>(0,1));
            returnList[0]= new Tuple2(0,Double.valueOf(1));
            returnList[1]= new Tuple2(1,x); 
            returnList[2]= new Tuple2(2,y); 
            returnList[3]= new Tuple2(3,Math.pow(x,2)); 
            returnList[4]= new Tuple2(4,Math.pow(y,2)); 
            returnList[5]= new Tuple2(5,x*y);
        }
        return Arrays.asList(returnList);
    }
}

The Reducer

The reducer is pretty straight forward and just adds up streams of numbers.

private static class AddDoublePair implements Function2<Double, Double, Double> {
    /*
    Given any two Floats, this method returns the sum.
    Useful for a reduce operation that adds a stream of numbers.
     */
    public Double call(Double a, Double b) {
        return a + b;
    }
}

Putting it all together

The following code simply calls the mappers and reducers.
Finally, it calculates the Pearson’s Correlation Coefficient as per the formula in the figure above.

public void pearsonCorrelation(JavaSparkContext sc)
{
    double n,sumX,sumY,xDotY,sumXSq,sumYSq;
    n = sumX = sumY = xDotY = sumXSq = sumYSq = 0.0d;

    //Calling the Mapper and Reducer
    JavaRDD<String> lines = sc.textFile(this.filename);
    JavaPairRDD<Integer,Double> mappedValues = lines
            .flatMapToPair(new ValuesForCorrelation(0, 1, ","))
            .reduceByKey(new AddDoublePair());

    List<Tuple2<Integer,Double>> output = mappedValues.collect();
    for(Tuple2<Integer,Double> a:output)
    {
        switch (a._1())
        {
            case 0 : n = a._2();
                break;
            case 1 : sumX = a._2();
                break;
            case 2 : sumY = a._2();
                break;
            case 3 : sumXSq = a._2();
                break;
            case 4 : sumYSq = a._2();
                break;
            case 5 : xDotY = a._2();
                break;
            default:
                System.out.println("Unexpected Value");
        }
    }

    //The numerator
    double num = n*xDotY - sumX*sumY;

    //The denominator
    double den = Math.sqrt(n*sumXSq-Math.pow(sumX,2)) * Math.sqrt(n*sumYSq-Math.pow(sumY,2));

    double pearsonr = num/den;
    System.out.println("Correlation:" + pearsonr);
}

Key concepts

This program really highlighted the power of Spark’s “Flat” mappers and the PairRDD based on Scala’s Tuple2 type.

As somebody new to Spark, I finally realized the different between the normal mapper functions like “Function” and “FlatMap”. FlatMap is best for situations where you need to return several records of the same type.

In case the record is of type Pair, Spark provides the neat PairFlatMapFunction to take advantage of both Flat Map and Pair RDDs.

Hope you found this post informative and helpful in your adventures with Spark.

A Simple Tutorial for Apache Spark : Counting CSV columns

This is a simple tutorial for Apache Spark. The application is similar to a word counting program.

Problem

Say, you have a huge CSV file and each row is supposed to have a fixed number of columns.

However, due to missing data, you cannot be sure of this. You want to compute the distribution of the number of fields in a given row across the entire file.

Solution

Basic Idea :
1. Read the CSV file as a Spark textFile type.
2. Map each line/row of CSV to a tuple (Number of fields in a row, 1)
3. Reduce the resulting set of all tuples by key

Read the file

SparkConf conf = new SparkConf().setAppName("My Spark App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(filename);

Functions for Map & Reduce

Before writing the actual Map and Reduce calls, we need to define the functions that perform the operations.

In spark, functions that are passed to Mappers/Reducers implement Interfaces from the package org.apache.spark.api.java.function.

A clean way to define these is in the form of Static Nested Classes.

Here is the map function that counts the number of columns in each row:

private static class CountCsvColumns implements PairFunction<String, Integer, Integer>
{
    /*
    Given a tab separated string, this returns the number of populated fields.
     */
    public Tuple2<Integer, Integer> call(String s)
    {
        String parts[] = s.split("\t");
        int columnCount = 0;
        for (int i = 0; i < parts.length; i++) {
            if (parts[i] != null && !parts[i].isEmpty())
                columnCount += 1;
        }
        return new Tuple2<Integer, Integer>(columnCount, 1);
    }
}    

Here is the reduce function that adds up the counts:

private static class AddIntegerPair implements Function2<Integer, Integer, Integer> {
    /*
    Given any two integers, this returns the sum.
    Useful for a reduce operation that adds a stream of numbers.
     */
    public Integer call(Integer a, Integer b) {
        return a + b;
    }
}

Putting it all together

List<Tuple2<Integer, Integer>> output = lines
            .mapToPair(new CountCsvColumns())
            .reduceByKey(new AddIntegerPair())
            .collect();

Printing out the results:

for (Tuple2<Integer, Integer> tuple : output) {
        System.out.println(tuple._1() + ":" + tuple._2());
    }

Performance

On an AWS instance with 8 GB RAM and 2 cores, this chomped through a CSV file with 140 million rows in less than 45 minutes. Not bad, eh?

Getting Scala Spark working with IntelliJ IDE

UPDATE : Updated the instructions for build.sbt on 1/29/2015 for Spark 1.2 and Scala 2.11.4

I have recently started using Apache Spark and it is awesome.

This is a post about some troubles that I had getting Spark(Scala) working with IntelliJ. All the links that I found on google were out of date and the process was non-trivial for somebody new to SBT/Scala like me.

So, off we go!

Note : Tested on Mac OS X 10.9.2. Also works on Yosemite.

Install sbt

This can be installed like this-

brew install sbt

I was new to sbt myself and read up a bit about it here : http://www.scala-sbt.org/documentation.html

Choosing a version of Scala

It is not enough to install the latest version of Scala. Make sure that the scala version matches that of the Spark builds in the Maven repository. These repositories are where sbt will actually download spark from.

To do this, go to http://search.maven.org and search for spark-core and spark-streaming. Find the latest entry by date.

Notice 3 things here for both the files:
1. GroupId
2. ArtifactId
3. Latest Version

In my case, these were org.apache.spark , spark-core_2.11 and 1.2.0, respectively.
The suffix on the ArtifactId tells you which Scala version you should install.

So, spark-core_2.11 means that you need Scala 2.11.X.

The “Latest Version” is Spark’s version number.

Install Scala

Now, we can be sure that the Scala version being installed is the correct one. Do the following :

brew install scala

brew switch scala 2.11.4

Dont worry too much about the exact version. If you type in just 2.10, brew will automatically list the closest available formula.

Install IntelliJ and Scala plugin

I guess you alreayd have the IDE. Otherwise, download IntelliJ Community Edition. I am
using v13.X.
Once done, install the Scala plugin from the Plugins menu (Preferences->Plugins)
Once done, make sure that Scala is being correctly detected in IntelliJ preferences.

Create a Scala Project

Create a new Scala Project in IntelliJ.

Setting up sbt

Make sure that the following two files exist and have the following contents:

~/.sbt/{sbt version}/plugins/build.sbt

resolvers += “Sonatype snapshots” at “http://oss.sonatype.org/content/repositories/snapshots/”

addSbtPlugin(“com.github.mpeltonen” % “sbt-idea” % “1.6.0”)

NOTE: If you face any errors, confirm these two lines are updated to the latest version numbers. For that, check https://github.com/mpeltonen/sbt-idea

$PROJECT_DIR/build.sbt

scalaVersion := “2.11.0”

libraryDependencies += “org.apache.spark” %% “spark-core” % “1.2.0”

libraryDependencies += “org.apache.spark” %% “spark-streaming” % “1.2.0”

resolvers ++= Seq(
“Akka Repository” at “http://repo.akka.io/releases/”,
“Spray Repository” at “http://repo.spray.cc/”)

NOTE: The Scala version must match the version supplied to Brew earlier.

NOTE: The version numbers of spark-core and spark-streaming should exactly match the “Latest Version” field on the Maven Repository.

Almost done

In $PROJECT_DIR,

sbt update

sbt

Inside the sbt shell, run gen-idea

You are all set for Scala Spark development using IntelliJ IDE. Have fun!