Transforming Quantum Datasets into Spark ML Format – tackling the Particle Physics tasks by cs-Cornell
Since Spark ML algorithms only work on datasets in the correct data format, it is necessary to transform your data to the proper data format.
But what is the right data format?
To find out, today we are checking looking at quantum physics particle data! The challenge provided by Cornell is to learn a classification rule that differentiates between two types of particles generated in high energy collider experiments!
It has 78 attributes and 2 classes that we want to find.
First, we head to http://osmot.cs.cornell.edu/kddcup/datasets.html and register. Then we download the datasets provided.
It comes with 2 datasets, one for the classification of 2 different types of particles that are
Downloading the data using Wget
Helper Script
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/scripts/convert.py |
Download the tain data
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/alpha/alpha_train.dat.bz2 |
Then convert the data with the helper script
Convert Data with the provided python helper script
1 |
python convert.py alpha train -o alpha_train.libsvm |
Use a text editor like Nano to check out the data. The phy_test.dat looks something like this:
As we can see, the data is separated by tabs!
What is the data format?
Each line in the training and test files describe one example. The structure of each line is as follows:
- The first element of each line is an EXAMPLE ID which uniquely describes the example. You will need this EXAMPLE ID when submitting results.
- The second element is the example’s class. Positive examples are denoted by 1, negative examples by 0. Test examples have a “?” in this position. This is a balanced problem so the target values are roughly half 0s and 1s. All of the following elements are feature values. There are 78 feature values in each line.
- Missing values: columns 22, 23, 24, and 46, 47, 48 use a value of “999” to denote “not available,” and columns 31 and 57 use “9999” to denote “not available.” These are the column numbers in the data tables starting with 1 for the first column (the case ID numbers). If you remove the first two columns (the case ID numbers and the targets), and start numbering the columns at the first attribute, these are attributes 20, 21, 22, and 44, 45, 46, and 29 and 55, respectively. You may treat missing values any way you want, including coding them as a unique value, imputing missing values, using learning methods that can handle missing values, ignoring these attributes, etc..
The elements in each line are separated by whitespace.
So we have an ID, 2 classes and continuous attributes
What Spark objects will we need?
https://spark.apache.org/docs/latest/api/java/index.html Get your documentation out, it’s time to program!
We will need the docs for DataFrame , Pipelines, Vectors, StringIndexer, VectorIndexer, Estimators, Transformers, Parameter and VectorAssembler.
A pipeline consists of a sequence of stages in which each stage either has an estimator or transformer to be executed by calling Pipeline.fit(). On each estimator, the fit() method is called to generate a transformer which then transforms the data in the pipeline.
What is the vector indexer for?
The vector indexer enables us to detect whether the features of our data are categorical or continuous. We achieve this by passing a parameter N to Max Categories().
When the vector indexer is called during the pipeline execution process, it looks if there are more than N different values for each feature. If a feature has more than N different values, it is declared continuous. If a feature has N or fewer different values in its feature, this feature is declared categorical.
What is a Pipline for?
A pipeline is the thing, where we put all our Transformers and Estimators
– E.g.: Feature 0 has unique values {-1.0, 0.0} and feature 1 values {1.0, 3.0, 5.0}. If maxCategories = 2, then feature 0 is categorical and feature 1 is continuous.
Loading the data into Spark
1 |
Dataset<Row> data=spark.read().option("sep", "\t").format("csv").format("csv").load("phy_train.dat") |
We specify the tab delimiters with .option(“sep”, “\t”)
Doing this, Spark will label all the columns with _C0 to _C80.
_C0 represents the data point’s ID
_C1 represents the class
_C2 – _C80 are feature values
Define the labels
Define the column that contains the data labels with a StringIndexer. We know _c1 contains the class values, so we tell our indexer that that is the input column and the output column will be “indexedLabel” in our example.
1 |
StringIndexerModel labelIndexer = new StringIndexer() .setInputCol("_c1") .setOutputCol("indexedLabel") |
As we can see by calling this code, all our columns are currently of type String, but there are Double values in the rows.
1 2 |
scala.Tuple2<String,String>[] colTypes = data.dtypes(); for (scala.Tuple2<String,String> tup : colTypes){ System.out.println("Name : +" + tup._1 + " Type : " + tup._2); } |
Cast data types
This sexy loop updates all column data types to double
1 2 3 |
for (String c : data.columns()){ data = data.withColumn(c, data.col(c).cast("Double")); } |
Index the features
Now it is time to use our VectorAssembler. We give it all the column names as input and a range of which columns to index into the features. Since column 0 is an ID and column 1 is a label, we start indexing features from the index 2.
1 2 3 4 5 |
String[] fieldNames = data.schema().fieldNames(); VectorAssembler assembler = new VectorAssembler().setInputCols( Arrays.copyOfRange(fieldNames, 2, fieldNames.length - 1)) .setOutputCol(“indexedFeatures”); |
Putting it all together
After defining all our estimators and transformers it is time to pass them all as a set to the Pipeline constructor. It will call every component of our Pipeline sequentially to generate the desired data format. Calling the transform function on the dataset services us with the processed data.
1 2 3 4 |
Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[]{labelIndexer, assembler}); PipelineModel model = pipeline.fit(data); Dataset<Row> dataTransformed = model.transform(data); |
Verify that we transformed our data properly
Let’s create a simple classification tree to see if Spark ML can work with our data!
Here is the function, which you can just copy (make sure you have the same col names!):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
private void applyClassifier(Dataset<Row> dataset) { // Split the data into training and test sets (30% held out for testing). Dataset<Row>[] splits = dataset.randomSplit(new double[] { 0.7, 0.3 }); Dataset<Row> trainingData = splits[0]; Dataset<Row> testData = splits[1]; // Train a DecisionTree model. DecisionTreeClassifier dt = new DecisionTreeClassifier().setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures"); // Chain indexers and tree in a Pipeline. Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt }); System.out.println("Start Training"); // Train model. This also runs the indexers. PipelineModel model = pipeline.fit(trainingData); // Select (prediction, true label) and compute test error. MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy"); // Make predictions. Dataset<Row> predictions = model.transform(testData); double accuracy = evaluator.evaluate(predictions); System.out.println("Test Error = " + (1.0 - accuracy)); double metrics = evaluator.evaluate(predictions); System.out.println("Accuracy" + ": " + String.valueOf(metrics)); } |
If all went well, you should get an output similar to this:
Test Error = 0.3079487863430248 Accuracy: 0.6920512136569752 |
Congratulations! We have imported a dataset from the Internet, built a pipeline to transform its column datatype to the proper machine learning Spark format!