Spark Java dataset transformation tutorial
In this tutorial we will learn, how to transform the ALPHA train set of brainwave data provided by one of the Machine Learning labs of Technische Universität Berlin.
Downloading the data using Wget
I saved you guys some time, you do not need to open another browser tab! Wget the juicy data straight from the source.
Helper Script
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/scripts/convert.py |
Download the feature data
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/alpha/alpha_train.dat.bz2 |
Download the label Data
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/alpha/alpha_train.lab.bz2 |
Then convert the data with the helper script
Convert Data with the provided python helper script
1 |
python convert.py alpha train-o alpha_train.libsvm |
How does our data look like?
Label Is a value of either -1 or 1 so it tells us, weater the particle with the corrosponding features is an instance of the particle we want to classify or not .
1 would imply the features describe the particle, -1 would imply the features describe a different particle.
Features is a tuple, where the first entry is the feature count = 500 and the second is an array of all the feature values.
For our ML Format, we want all the Label columns to contain only doubles , no arrays
What Spark objects will we be needing?
- Row to handle our dat
- , Row Factory ,
- GenericRowWithSchema which does not have a Doc but is analogus to Row Factory
- DataFrame Holds our data
- Pipelines Used to combine all our transformers together
- Vectors Will be used to extract data from the original dataset
- StringIndexer Will be used to index labes
- VectorIndexer Can be used to index features, we will not need this tough
- Transformers, will be used to transform our dataset
- VectorAssembler which we will use to create the feature vector
- SqlContext To create a custom dataframe
- StructField To define ou structs and table schema
- StructType To define the data types of our schema
How do we get our data in the proper format?
First we load the raw data into our dataframe
1 |
Dataset<Row> originalData = spark.read().format("libsvm").load("/path/to/data/alpha_train.libsvm"); |
Then we define our label column . and the struct field
1 |
StructField[] fields = new StructField[2]; |
Structfield for Label
1 |
fields[0] = createStructField("labelNotIndexed", DoubleType, false); |
This will be our label column.
The struct field will be used to tell Spark which schema our new Rows and data frames should have. Fields will hold the definition for every column of our new dataframe.
The first entry of the StructField is for labels, the second one is for the Features.
Structifeld for features
1 |
fields[1] = createStructField("featuresNotIndexed", VectorType() , false); |
We will have a vector type for the second column, so it can encode a feature vector of any size for us.
Build the struct from the fields
1 |
StructType schemata = DataTypes.createStructType(fields); |
The schmea will be infered by the createStructType(fields) method, which will generate us a nice Struct that represents all ou defined fields and is very nicely to work with.
Initialize our new DF
1 |
Dataset<Row > transformedDf = originalData.select("label" ); |
This writes the label column from the original data into our new dataset and also initialize it
Conert the Dataset to a list for easier Looping
1 |
List<Row> originalDataList = originalData.collectAsList(); |
It is time to get loopy … and to extract relevant data from the original dataset
1 2 3 4 5 6 7 8 9 |
but first, we will have to define some variables.. double [] valuesToAdd = new double[500]; Object [] tmpValues = new Object [500]; Row tmp ; SparseVector tmpVec ; |
The Actual Loop
//in this loop we create a new row for every Row in the original Dataset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
;for (Row r : originalDataList) { tmpVec = (SparseVector) r.get(1); //get the feature data label = (double) r.get(0); // get the label abelNotIndexed" tmpRow = RowFactory.create (label , tmpVec); transformedDataList.add(tmpRow); } transformedDf = sqlContext.createDataFrame(transformedDataList, schemata); |
Using
1 |
RowFactory.create(valueTmpStore, schemata); |
will not satisfy our needs, as our row will only have 2 columns, feature count and the feature array in one column
Create The dataset after the loop with this line
1 |
transformedDf = sqlContext.createDataFrame(transformedDataList, schemata); |
Defining the Pipleline
If you head over to https://spark.apache.org/docs/latest/ml-pipeline.html you can see the toolkit for creating a pipline spark provides for you.
Today most intresting for us, the
- StringIndexer for indexing labes
- VectorAssembler for idnexing the feature Vector
How to use the String Indexer
1 2 3 |
StringIndexerModel labelIndexer = new StringIndexer().setInputCol("labelNotIndexed").setOutputCol(“OutPut LabelColName”).fit(transformedDf); System.out.println("Alpha Labels indexed"); |
How to use the Vector ASsembler
//Get all the features, they are in all cols exept 0 and 1
1 2 |
VectorAssembler assembler = new VectorAssembler().setInputCols(Arrays.copyOfRange(transformedDf.schema().fieldNames(), 1, 2)).setOutputCol(“feature Output Col Name ); System.out.println("Alpha Vector Assembled"); |
Chain the indexers into a Pipeline
Now after defining the Indexers and Assemblers, we can stuff them in the Pipeline like this
1 2 3 4 5 6 7 |
(new PipelineStage[]{labelIndexer, assembler}); System.out.println("Alpha Pipleline build"); transformedDf = model.transform(transformedDf); |
Instantiate the pipeline
Instantiate an instance of the pipeline with
1 |
PipelineModel model = pipeline.fit(transformedDf); |
Apply the Pipline to a dataset, transforming it into Spark Format
1 |
transformedDf = model.transform(transformedDf); |
Validate The Dataset Schema
To see if we have done everyting correct, print ot the schema
1 |
transformedDf.printSchema(); |
it should look like this