Some Data Gymnastics with Spark 2.0s DataSets and Equivalent Code in RDDs

While working with some healthcare use case of Patient 360 Degree analytics here is a situation I wanted to solve. I am giving a very simplified example below

A Patient has a Claims data set which contains –

PatientID, Diagnosis1, Diagnosis2, Other Fields…

and a given Patient can have multiple rows of Claims Data

A Patient has a Labs dataset which contains –

PatientID, Lab1, Lab2, Other Fields…

and a given Patient can have multiple rows of Lab Data

A Patient has a  RxClaims dataset which contains –

PatientID, Drug1, Drug2, Other Fields…

a given Patient can have multiple rows of RxClaims Data

My Claims Data would typically look like ( again simplified example )

“PID1”, “diag1”, “diag2”
“PID1”, “diag2”, “diag3”
“PID2”, “diag2”, “diag3”
“PID2”, “diag1”, “diag3”
“PID3”, “diag1”, “diag2”
“PID2”, “diag2”, “diag4”

My Lab Data would look like ( again simplified example )

“PID1”, “lab1”, “lab2”
“PID1”, “lab2”, “lab3”
“PID2”, “lab2”, “lab3”
“PID2”, “lab1”, “lab3”
“PID3”, “lab1”, “lab2”

My RxClaims Data Would look like ( again simplified example )

“PID1”, “drug1”, “drug2”
“PID1”, “drug2”, “drug3”
“PID2”, “drug2”, “drug3”
“PID2”, “drug1”, “drug3”
“PID3”, “drug1”, “drug2”

I have shown the example with only 3 Data Sets ( Claims, Lab, RXClaims ) – I will have more data sets

This is what I wanted to get from the above Data which in RDD world would have the following data structure

org.apache.spark.rdd.RDD[(String, (Iterable[Claim], Iterable[Lab], Iterable[RxClaims]))]

SparkQuestionToMailingList.Image1

Spark 2.0 with DataSets

( tested with the Preview Version of Spark )

case class Claim(pid:String, diag1:String, diag2:String)
case class Lab(pid:String, lab1:String, lab2:String)
case class RxClaim(pid:String, drug1:String, drug2:String)

case class PatientClaims(pid:String, claims:Array[Claim])
case class PatientLab(pid:String, labs:Array[Lab])
case class PatientRxClaim(pid:String, rxclaims:Array[RxClaim])

case class Patient(pid:String, claims:Array[Claim], labs:Array[Lab], rxclaims:Array[RxClaim])

val claimsData = Seq(Claim("PID1", "diag1", "diag2"), Claim("PID1", "diag2", "diag3"), Claim("PID1", "diag1", "diag5"),Claim("PID2", "diag3", "diag4"), Claim("PID2", "diag2", "diag1"))


val labsData = Seq(Lab("PID1", "lab1", "lab2"), Lab("PID1", "lab1", "lab2"), Lab("PID2", "lab3", "lab4"), Lab("PID2", "lab3", "lab6"))


val rxClaimsData = Seq(RxClaim("PID1", "drug4", "drug1"), RxClaim("PID1", "drug3", "drug1"), RxClaim("PID2", "drug3", "drug5"), RxClaim("PID2", "drug2", "drug1"), RxClaim("PID1", "drug5", "drug2"))

val claimRDD = spark.sparkContext.parallelize(claimsData)
val labRDD = spark.sparkContext.parallelize(labsData)
val rxclaimRDD = spark.sparkContext.parallelize(rxClaimsData)

val claimPairRDD = claimRDD.map(x => (x.pid, x.diag1, x.diag2))
val claimDS = claimRDD.toDF("pid", "diag1", "diag2").as[Claim]
val claimsDSGroupedByPID = claimDS.groupByKey(v => v.pid)
val gClaims = claimsDSGroupedByPID.mapGroups({case(k,iter) => PatientClaims(k,iter.map(x => Claim(x.pid, x.diag1, x.diag2)).toArray)})


val labPairRDD = labRDD.map(x => (x.pid, x))
val labDS = labRDD.toDF("pid","lab1","lab2").as[Lab]
val labDSGroupedByPID = labDS.groupByKey(v => v.pid)
val gLabs = labDSGroupedByPID.mapGroups({case(k,iter) => PatientLab(k,iter.map(x => Lab(x.pid, x.lab1, x.lab2)).toArray)})

val rxclaimPairRDD = rxclaimRDD.map(x => (x.pid, x))
val rxClaimDS = rxclaimRDD.toDF("pid","drug1","drug2").as[RxClaim]
val rxClaimsDSGroupedByPID = rxClaimDS.groupByKey(v => v.pid)
val gRxClaim = rxClaimsDSGroupedByPID.mapGroups({case(k,iter) => PatientRxClaim(k,iter.map(x => RxClaim(x.pid, x.drug1, x.drug2)).toArray)})

val allJoined = gClaims.join(gLabs, "pid").join(gRxClaim, "pid")

val allJoinedDS = allJoined.as[Patient]

allJoinedDS show false

Spark 2.0 code with RDD

(no DataSets / DataFrame)

case class Claim(pid:String, diag1:String, diag2:String)
case class Lab(pid:String, lab1:String, lab2:String)
case class RxClaims(pid:String, drug1:String, drug2:String)

val claim = Seq(Claim("PID1", "diag1", "diag2"), Claim("PID1", "diag2", "diag3"),Claim("PID2", "diag2", "diag3"),Claim("PID2", "diag1", "diag3"), Claim("PID3", "diag1", "diag2"), Claim("PID2", "diag2", "diag4"))
val claimRDD = spark.sparkContext.parallelize(claim)
val claimPairRDD = claimRDD.map(x => (x.pid, x))

val lab = Seq(Lab("PID1", "lab1", "lab2"), Lab("PID1", "lab2", "lab3"),Lab("PID2", "lab2", "lab3"),Lab("PID2", "lab1", "lab3"), Lab("PID3", "lab1", "lab2"))
val labRDD = spark.sparkContext.parallelize(lab)
val labPairRDD = labRDD.map(x => (x.pid, x))

val rxclaim = Seq(RxClaims("PID1", "drug1", "drug2"), RxClaims("PID1", "drug2", "drug3"),RxClaims("PID2", "drug2", "drug3"),RxClaims("PID2", "drug1", "drug3"), RxClaims("PID3", "drug1", "drug2"), RxClaims("PID2", "drug2", "drug4"))
val rxclaimRDD = spark.sparkContext.parallelize(rxclaim)
val rxclaimPairRDD = rxclaimRDD.map(x => (x.pid, x))

val result = claimPairRDD.cogroup(labPairRDD, rxclaimPairRDD)

result foreach println

Badminton Bronze Medal at Connecticut Open Nov 2015 – Men’s Singles Event (40-49)

BadmintonBronzeMedal

Lost the 2nd-3rd place game by a whisker 21-14, 20-22, 18-21  😦

Summary of Tachyon InMemory File System

Recently I had the opportunity to work with Tachyon an In Memory File System. I used Tachyon as a Caching Layer for ETL Output to be used downstream for 2 purposes

  1. Low Latency Adhoc Query using Spark SQL
  2. Used for Analytics and Algorithms downstream

The writeup below is a consolidation of what I learnt about Tachyon.

Tachyon – In Memory Data Exchange Layer

Tachyon is an in-memory distributed file system with HDFS / any file system backup. It has resilience built into it through lineage and survives Spark JVM restarts. It allows for fine tuning performance and can act as a cache for Warehouse table – which is faster than in-process cache due to delayed GC. It can provide efficient in-memory columnar storage with compression. It is written in Java and currently works on Linux and Mac.

Image1

In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:

  • It allows multiple apps / executors to share the same pool of memory
  • It significantly reduces garbage collection costs
  • Cached data is not lost if individual executors crash
  • Spark Context might crash

Tachyon is Hadoop compatible. Existing Spark and MapReduce programs can run on top of it without any code change.

Tachyon implements the Hadoop FileSystem interface. Therefore, Hadoop MapReduce and Spark can run with Tachyon without modification.

Pluggable under-layer file system: To provide fault-tolerance, Tachyon checkpoints in-memory data to the under-layer file system. It has a generic interface to make plugging different under-layer file systems easy. Currently support HDFS, S3, GlusterFS, and single-node local file systems.

Native support for raw tables: Table data with over hundreds of columns is common in data warehouses. Tachyon provides native support for multi-columned data, with the option to put only hot columns in memory to save space.

What happens if data set does not fit in memory

Depends on the system setup, Tachyon may leverage local SSD and HDD. It keeps hot data in Tachyon, and cold data in Under-Filesystem

Fault Tolerance in Tachyon is based upon a multi-master approach where multiple master processes are run. One of these processes is elected the leader and is used by all workers and clients as the primary point of contact. The other masters act as standbys using the shared journal to ensure that they maintain the same file system metadata as the leader and can rapidly take over in the event of the leader failing.

If the leader fails a new leader is automatically selected from the available standby masters and Tachyon proceeds as usual.

Image2

Tachyon as a Tiered Storage

The under-file system in Tachyon can be modelled as a Tiered Layer – where each layer can be a different storage

Image3

  • Eviction policy – Only LRU for now
    ● Directories and their sizes configured for each tier separately
    ● When storage tier became full data is spilled to next level by eviction policy

Where Tachyon makes MOST sense

In an enterprise setting, with multiple jobs and applications running together, there are some variables that you cannot always control for:  A JVM may simply crash.  Spark can run out of memory.  An app may impact memory in some unforeseen way.  But in any of these cases, customer’s jobs can be restarted without losing their in-memory datasets (RDDs) and the overall system must respond gracefully.  This is where Tachyon comes in; it survives JVM crashes so the show does indeed go on.

Moreover, for long-running Spark jobs, Tachyon outperforms the Spark Cache as garbage collection kicks in sooner in the Spark JVM, whereas Tachyon and its off-heap memory storage is not affected.

Where Tachyon Does NOT make sense

Tachyon provides high I/O performance, but if task is primarily CPU bound, will not be able to get significant performance gains.

Tachyons Use Cases

Memory storage for serialized blocks

Image4

Caching layer for predictable performance

Image5

Where is Tachyon currently being used

Image6

Tachyon can be used as a Fast Analytic Query Engine Server by hooking up SparkSQL to the BI /Visualization Tool

Image7

References

Start Here – http://tachyon-project.org/downloads/

Next – http://ampcamp.berkeley.edu/5/exercises/tachyon.html

Tachyon Locally –              https://github.com/amplab/tachyon/wiki/Running-Tachyon-Locally

Tachyon HDFS – https://github.com/amplab/tachyon/wiki/Running-Hadoop-MapReduce-on-Tachyon

Running Spark with Tachyon – http://tachyon-project.org/documentation/Running-Spark-on-Tachyon.html

Tachyon Examples – https://github.com/amplab/tachyon/tree/master/examples/src/main/java/tachyon/examples

Tachyon on AWS – http://tachyon-project.org/documentation/Deploy-Module.html

Developer Docs – http://tachyon-project.org/documentation/#developer-documentation

Tachyon Performance Benchmarks compared to HDFS

http://www.datanami.com/2014/08/14/amplabs-tachyon-promises-solidify-memory-analytics/

Tachyon Git Repository – https://github.com/amplab/tachyon

Documents

http://files.meetup.com/14452042/Tachyon_Meetup_2014_8_25.pdf

http://files.meetup.com/14452042/Tachyon_Meetup_2015_5_28-1-Baidu.pdf

http://www.cs.berkeley.edu/~haoyuan/talks/Tachyon_2014-10-16-Strata.pdf

https://spark-summit.org/2015-east/wp-content/uploads/2015/03/SSE15-14-Zoomdata-Alarcon.pdf

Tachyon JIRA – https://tachyon.atlassian.net/projects/TACHYON/issues/TACHYON-780?filter=allopenissues

Companies Using Tachyon

http://www.tachyonnexus.com/

Atigeo – Company Slides – http://www.slideshare.net/ClaudiuBarbura/tachyon-meetup-san-francisco-oct-2014

H20 is actively using Tachyon

Baidu

Data characters that failed to be Inserted to HBase using Pigs HBaseStorage Class

In this article I describe how to insert data into an HBase Column Family using Pig.

While doing this process I also learnt – the kind of characters ( some special ones
which CANNOT be inserted using this process )

Figuring out the above was a pretty tedious task – especially since I ran into these problems while the loader was running on extremely large data sets and I had to use
sort of Binary Search in these multi million row files where the code failed
( by this I mean – I divided up the data set into 2-3 almost equal chunks and ran the
loading process for each chunk and did this recursively for the chunks that failed to load – to zero down on the characters / rows of data that was causing the problem.

Soon I discovered that the best way to debug was to generate a data set for all the special characters and see which ones pass and which ones fail and then adopt my Data Loading Code to replace the characters which failed the data loading with a white space.

Characters in the data that did not work in Pig bulk loading into HBase

(
)
&
,
[
]
{
}

General Format of the data file for insertion into an HBase Column Family using Pig

ROWID [ColumnQualifierName#ColumValue,ColumnQualifierName#ColumValue………]

The 1st field is the ROWID for the HBase Table
There is a TAB Separator with the next Field
The next field is enclosed in []

Since there can be any number of ColumnQualifiers – which can be defined dynamically in an HBase Column Family – this 2nd Field contains a different number of Key, Value Pairs

This field contains the ColumnQualifierName & ColumnQualifierValue

Each such columnQualifier is be separated by a ‘,’

ColumnQualifierName and ColumnQualifierValue is separated by the ‘#’ character.

Since HBase allows the flexibility to have any number of ColumnQualifiers for a given row in a Column Family with any name all the ColumnQualifiers for a given row can be specified in 1 line of the data file
ROWID1 [ColumnQualifierName1#ColumValue1,ColumnQualifierName34#ColumValue7]
ROWID2 [ColumnQualifierName1#ColumValue2,ColumnQualifierName12#ColumValue7,ColumnQualifierName10#ColumValue17]
ROWID3 [ColumnQualifierName2#ColumValue22,ColumnQualifierName12#ColumValue5,ColumnQualifierName15#ColumValue15,ColumnQualifierName16#ColumValue16]

Pig Script to load the data into HBase

Once the dataSet is generated and it is on HDFS – use the line below to specify the data set location in PIG and its format

dataSet = load ‘pigdata/DataSet’ as (rowID:chararray, dataMap:map[]);

Then make a call to PIG’s HBase Loader Class – HBaseStorage

store dataSet into ‘hbase://Table_Name’ using org.apache.pig.backend.hadoop.hbase.HBaseStorage(‘A:*’);

The parameter to the HBaseStorage method contains –
‘A’ — Which is the name of the Column Family in HBase – where the data is to be inserted

Once the 2nd command is executed within PIG – the data file starts getting loded into HBASE

Binarization in Spark and Scala

Here is my 100 line ( or less ) – of Spark with Scala code – to perform Binarization of text file

This is a very simplistic solution – which was written in 1-2 hours of heads down coding time.

I had previously coded the same algorithm in MR ( Java Code – with multiple stages of MR ) and it took more than 400 lines of coding.

Map of Big Data Tools and Solutions Landscape

BigDataMindMap

Sudoku Solver using Simulated Annealing

Here is my failed attempt to solve Sudoku using Simulated Annealing. ( I think it is good to fail – it makes me learn 🙂 )

Input to the algorithm – initial Sudoku board configuration – set of 81 numbers – where a missing number in the board is indicated by -1.

Algorithm :

We choose random numbers between (1-9) and allocate it to 2 random ( not fixed i.e not already given numbers in the input ) cells on each sub-square* – making sure that each sub-square has no repeated integers.

* There are 9 sub-squares in the Sudoku board

The cost function for the algorithm is the number of integers between (1-9) which are not used per row and per column and sum those values across the grid.

The algorithm then tries to minimize the above cost function and then it follows the general Simulated Annealing pattern of reducing the temperature.

The Alpha value chosen by the algorithm is .94

However, using the above value of Alpha – algorithm has not converged to the solution.

The output of the algorithm prints out the resultant Grid and the best possible result.
The output also includes the number of Integers which are still mis-positioned.

Most of the runs of the algorithm end up between 5-6-7 numbers still out of place.

Command to run the code

python SudokuSolver -1 -1 -1 -1 -1 7 5 -1 4 -1 3 6 4 -1 -1 -1 -1 2 -1 -1 -1 -1 6 -1 -1 7 8 -1 -1 -1 5 1 -1 2 -1 -1 -1 -1 -1 -1 -1 9 -1 -1 -1 -1 -1 -1 2 -1 4 -1 -1 1 -1 -1 -1 8 9 -1 -1 -1 6 1 5 -1 7 3 -1 -1 -1 -1 -1 2 9 1 -1 5 -1 8 3

Code