A Simple Tutorial for Apache Spark : Counting CSV columns

This is a simple tutorial for Apache Spark. The application is similar to a word counting program.

Problem

Say, you have a huge CSV file and each row is supposed to have a fixed number of columns.

However, due to missing data, you cannot be sure of this. You want to compute the distribution of the number of fields in a given row across the entire file.

Solution

Basic Idea :
1. Read the CSV file as a Spark textFile type.
2. Map each line/row of CSV to a tuple (Number of fields in a row, 1)
3. Reduce the resulting set of all tuples by key

Read the file

SparkConf conf = new SparkConf().setAppName("My Spark App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(filename);

Functions for Map & Reduce

Before writing the actual Map and Reduce calls, we need to define the functions that perform the operations.

In spark, functions that are passed to Mappers/Reducers implement Interfaces from the package org.apache.spark.api.java.function.

A clean way to define these is in the form of Static Nested Classes.

Here is the map function that counts the number of columns in each row:

private static class CountCsvColumns implements PairFunction<String, Integer, Integer>
{
    /*
    Given a tab separated string, this returns the number of populated fields.
     */
    public Tuple2<Integer, Integer> call(String s)
    {
        String parts[] = s.split("\t");
        int columnCount = 0;
        for (int i = 0; i < parts.length; i++) {
            if (parts[i] != null && !parts[i].isEmpty())
                columnCount += 1;
        }
        return new Tuple2<Integer, Integer>(columnCount, 1);
    }
}    

Here is the reduce function that adds up the counts:

private static class AddIntegerPair implements Function2<Integer, Integer, Integer> {
    /*
    Given any two integers, this returns the sum.
    Useful for a reduce operation that adds a stream of numbers.
     */
    public Integer call(Integer a, Integer b) {
        return a + b;
    }
}

Putting it all together

List<Tuple2<Integer, Integer>> output = lines
            .mapToPair(new CountCsvColumns())
            .reduceByKey(new AddIntegerPair())
            .collect();

Printing out the results:

for (Tuple2<Integer, Integer> tuple : output) {
        System.out.println(tuple._1() + ":" + tuple._2());
    }

Performance

On an AWS instance with 8 GB RAM and 2 cores, this chomped through a CSV file with 140 million rows in less than 45 minutes. Not bad, eh?