Spark Feature Engineering Tutorial – 4 – transforming RCV1 dataset
What is the data?
The dataset was provided by the Jorunal of achien Learning research in 2004 as new benchmark for text categorization research. You can read more about the journal that has released the dataset over here .
Where to get the data ?
www.csie.ntu.edu.tw/
is the data Provider
|
wget https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/rcv1_train.binary.bz2 for train data |
|
wget https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/rcv1_test.binary.bz2 for test data |
What is the data about?
It contains information about Newswire stories and their categorization
Lets load the data
We can load the data into spark with this command
|
Dataset originalData = spark.read().format("libsvm").load("/home/loan/Documents/Dai-Work/tmpGit/Ressources/rcv1/rcv1_train.binary"); |
Let’s checkout the data
We should checkout the data schema and a few rows. This is how you can do it
|
originalData.printSchema(); originalData.show(5) |
;
Your console output should look like this 
Very nice, the data is aleady in a nice format.
Using String Indexer
We will use the stirng indexer, to index the amount of classes we have.
|
StringIndexerModel labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol(“indexed_labels”) .fit(originalData); |
This will define the first column as label column.
Using the Vector Assembler
|
VectorAssembler assembler = new VectorAssembler().setInputCols( Arrays.copyOfRange(originalData.schema().fieldNames(), 1, 2)) .setOutputCol(“indexed_features”); |
This will define the 2nd column as feature column
Build the pieline
|
// Chain indexers and tree in a Pipeline. Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[]{labelIndexer, assembler}); |
This will tell spark in which order to apply the transformers
Instantiate the pipeline
|
// Train model. This also runs the indexers. PipelineModel model = pipeline.fit(originalData); |
This will apply the pipeline on the original Datas and return a model.
Get the transformed dataset
|
transformedDf = model.transform(originalData); |
This will apply the transformation on the dataset and returns the transformed Dataframe.
Lets checkout ou transformed data
This looks pretty good, but we do not need the Label and features column anymore.
Drop useless columns
|
transformedDf = transformedDf.drop("features").drop("label"); |
The cleaned dataset
This is now our struct, perfect!
We are ready to do soem machine learning on this.
Let’s test if we transformed our data properly, by applying a linear classyfier to it!.
Define the linear classifier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
private static void applyLinearClassifer(Dataset dataset) { LinearRegression lr = new LinearRegression() .setMaxIter(10) .setRegParam(0.3) .setElasticNetParam(0.8) .setLabelCol(indexed_labels) .setFeaturesCol(indexed_features); LinearRegressionModel lrModel = lr.fit(dataset); } |
Call your dataset with this function and you should get no errors, if you did everything like in this tutorial!
Here is the full code :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
Dataset originalData = spark.read().format("libsvm").load("/home/loan/Documents/Dai-Work/tmpGit/Ressources/rcv1/rcv1_train.binary").limit(10); //_c0 is the label index StringIndexerModel labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol(“indexed_labels”) .fit(originalData); //Get all the features, they are in all cols exept 0 and 1 VectorAssembler assembler = new VectorAssembler().setInputCols( Arrays.copyOfRange(originalData.schema().fieldNames(), 1, 2)) .setOutputCol(“indexed_features”); // Chain indexers and tree in a Pipeline. Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[]{labelIndexer, assembler}); // Train model. This also runs the indexers. PipelineModel model = pipeline.fit(originalData); transformedDf = model.transform(originalData); transformedDf.printSchema(); transformedDf.show(10); transformedDf = transformedDf.drop("features").drop("label"); transformedDf.printSchema(); return transformedDf; |