- Encoders main purpose is the task of performing serialization and deserialization (SerDe)
- Since Spark does not save data as JVM objects, but instead in it’s very own binary format.
- Spark comes with a lot of build in encoders
- An Encoder priovides information about a tables schema, without having to deserialize the whole object.
- Encoders are nessecary when mapping datasets
Category Archives: Java Machine learning
Machine learning and general programming tutorials in Java
Spark Java Library overview
Spark Java Library overview
This is an overview of the full ecosystem, but we will just take a look at the most important classes for everyday usage.
Spark provides a huge library of useful classes and interfaces. It is important to know, what functionality your tool offers or otherwise you are not able to use it to its fullest potential.
That’s why I give you this great overview :
What is Sparksession for?
The Sparksession provided the main entry point for dataset functionality in spark.
It provides ways to
- create Datasets and Dataframes
- Read in data with DAtaStreamReader
- Get the SQLContext so you can execute SQL statements
- Register User-Defined-Functions
- and of course configure our Spark environment, like setting executor count, RAM limis etc..
What is SparkContext for?
The Spark Context provides the main entry point for Spark functionality.
The context represents the connection to a Spark cluster and you can do operations in clusters with it like
- broadcast Variables to a cluster
- create Resilient Distributed Datasets
- create accumulators
- distribute files in nodes of you spark cluster
- distribute jars in nodes of your spark cluster
- broadcast any variable to nodes in your cluster
- Submit jobs to the Cluster
What is the Spark Launcher for?
This class allows you, to launch any application in your spark cluster programmatically from java! It will launch a new Spark Application as child process
What is the inProcessLauncher for?
This allows you to launch a new Spark application, within the invoking process
What is SparkAppHandle for?
The appHandler object is returned after starting a Spark app with the Spark launcher.
The appHandler gives you options to monitor and control the running application.
What is SparkAppHandle.listener for?
What is SparkSQLContext for?
The Spark Context provides the main entry point for Spark functionality.
The context representsit’s the connection to a Spark cluster and you can do operations in clusters with it like
- broadcast Variables to a cluster
- create Resilient Distributed Datasets
- create accumulators
What are UserDefinedFunctions for?
Basic Kafka Producer, How to write records in Java
Basic Kafka Producer, How to write records in Java
You will learn
- How to configure Kafka Consumer properties
- How to create Kafka consumer
- How to send records with Kafka Consumer
The first example shows how to print out records from Kafka to the console.
We will have to set the properties for a Kafka Consumer Object and create it.
Then we subscribe to the topics of our choice
Setting Properties
Properties props = new Properties(); |
Create an Instance of a Kafka Producer, with the properties we just set
Producer |
Cal the send() function on the producer object to send a record to the Kafka topic wetter
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord } |
Basic Kafka Consumer Java example (How to print Kafka records to terminal)
Basic Kafka Consumer example in java
In this tutorial you will learn, how to make a simple kafka consumer, which sends records for ever.
You will learn the following :
- How to create topics
- How to create Kafka Consumer
- How to configure Kafka Properties
- How to subscribe to a topic
- How to poll infinitely with a consumer
You will learn how to create topics, how to create a Kafka consumer and how to configure it.
Also, you w
The first example shows how to print out records from Kafka to the console.
We will have to set the properties for a Kafka Consumer Object and create it.
Then we subscribe to the topics of our choice
Setting Properties
Java
Properties props = new Properties(); |
Create an Instance of a KafkaConsumer Object and call the subscribe() method to subscribe to some topics
KafkaConsumer |
Ask the consumer for new records continuously
while (true) { |
Deployment Cylcle with Spark and Hadoop with java
This article will show you one of many possible cycles to deploy your code as quickly and efficiently as possible. Also, we will talk a little about what Hadoop and Spark actually is and how we can use it to make awesome distributed computations!
What is Hadoop and Spark for?
You use your Spark cloud, to do very computationally expensive tasks in a distributed fashion.
Hadoop provided the data in a distributed fashion, making it available from multiple nodes and by that increasing the rate at which every node in the cluster network will get its data.
We will write our code in Java and define cluster computations using the open source Apache Spark library.
After defining the code, we will use Maven to create a fat jar from it, which will contain all the dependencies.
We will make the Jar available from multiple sources, so that multiple computation nodes from our spark cluster can download it at the same time, this is achieved by making the data available distributed through hadoop.
What does a deployment cycle with kafka and hadoop look like in Java?
A typical cycle could look like this :
- Write code in Java
- Compile code into a fat Jar
- Make jar available in Hadoop cloud
- Launch Spark Driver which can allocate a dynamic amount of nodes to take care of the computations defined within the jar.
1.Write code in Java
You will have to define a main function with a main class. This will be the code that the cluster runs first, so everything starts from this function.
2. Compile code into fat Jar
mvn clean compile assembly:single |
3. Make jar available from Hadoop cloud
Go into your Hadoop web interface and browse the file system
3.1 Create a folder in the cloud and upload the jar
After uploading your jar into your Hadoop cloud, it will be available to any computer that can talk with the Hadoop cloud. It is now distributed available on all the Hadoop nodes and is ready for highly efficient and fast data exchange with any cluster, in our example we use a Spark cluster.
If your hadoop node is called hadoop_fs and port is 9000, your jar is available to any node under the following URL:
hdfs://hadop_fs:9000/jars/example.jar |
4. Launch distributed Spark Computation
To launch the driver, you need an instance of the spark-submit class. The most straightforward way to get it, is to just download the Spark library and unzip it.
wget http://apache.lauf-forum.at/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz |
4.1 Launch Spark driver from command line.
Go to the directory where you have unzipped your Spark library, for me it would be
loan@Y510P:~/Libraries/Apache/Spark/spark-2.3.0-bin-hadoop2.7$ |
In the ./bin/spark-submit will have all the functionality we will require,
4.2 Gathering the Parameters
You need the following parameters, to launch your jar in the cluster
- Spark Master URL
- Hadoop Jar Url
- Name of your main Class
- Define –deploy-mode as Cluster to run the computation in cluster mode
4.3 Final step :Put the parameters together and launch the Jar in the cluster
./bin/spark-submit –class com.package.name.mainClass –master spark://10.0.1.10:6066 –deploy-mode cluster hdfs://hadop_fs:9000/jars/example.jar |
This will tell the Spark cluster, where the Jar we want to run is. It will launch a user defined (or appropriate) amount of executors and finish the computation in a distributed fashion
Your Task should now show up in your Spark Webinterface.
What have you learned :
- How to turn your java code into a fat jar
- How to deploy your fat jar into the Hadoop cloud
- How to run your code distributed in Spark, usinsg Hadoop as data source
Spark Feature Engineering Tutorial 3 – Forest Alpha Brainwave Data transformation
Spark Java dataset transformation tutorial
In this tutorial we will learn, how to transform the ALPHA train set of brainwave data provided by one of the Machine Learning labs of Technische Universität Berlin.
Downloading the data using Wget
I saved you guys some time, you do not need to open another browser tab! Wget the juicy data straight from the source.
Helper Script
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/scripts/convert.py |
Download the feature data
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/alpha/alpha_train.dat.bz2 |
Download the label Data
1 |
wget ftp://largescale.ml.tu-berlin.de/largescale/alpha/alpha_train.lab.bz2 |
Then convert the data with the helper script
Convert Data with the provided python helper script
1 |
python convert.py alpha train-o alpha_train.libsvm |
How does our data look like?
Label Is a value of either -1 or 1 so it tells us, weater the particle with the corrosponding features is an instance of the particle we want to classify or not .
1 would imply the features describe the particle, -1 would imply the features describe a different particle.
Features is a tuple, where the first entry is the feature count = 500 and the second is an array of all the feature values.
For our ML Format, we want all the Label columns to contain only doubles , no arrays
What Spark objects will we be needing?
- Row to handle our dat
- , Row Factory ,
- GenericRowWithSchema which does not have a Doc but is analogus to Row Factory
- DataFrame Holds our data
- Pipelines Used to combine all our transformers together
- Vectors Will be used to extract data from the original dataset
- StringIndexer Will be used to index labes
- VectorIndexer Can be used to index features, we will not need this tough
- Transformers, will be used to transform our dataset
- VectorAssembler which we will use to create the feature vector
- SqlContext To create a custom dataframe
- StructField To define ou structs and table schema
- StructType To define the data types of our schema
How do we get our data in the proper format?
First we load the raw data into our dataframe
1 |
Dataset<Row> originalData = spark.read().format("libsvm").load("/path/to/data/alpha_train.libsvm"); |
Then we define our label column . and the struct field
1 |
StructField[] fields = new StructField[2]; |
Structfield for Label
1 |
fields[0] = createStructField("labelNotIndexed", DoubleType, false); |
This will be our label column.
The struct field will be used to tell Spark which schema our new Rows and data frames should have. Fields will hold the definition for every column of our new dataframe.
The first entry of the StructField is for labels, the second one is for the Features.
Structifeld for features
1 |
fields[1] = createStructField("featuresNotIndexed", VectorType() , false); |
We will have a vector type for the second column, so it can encode a feature vector of any size for us.
Build the struct from the fields
1 |
StructType schemata = DataTypes.createStructType(fields); |
The schmea will be infered by the createStructType(fields) method, which will generate us a nice Struct that represents all ou defined fields and is very nicely to work with.
Initialize our new DF
1 |
Dataset<Row > transformedDf = originalData.select("label" ); |
This writes the label column from the original data into our new dataset and also initialize it
Conert the Dataset to a list for easier Looping
1 |
List<Row> originalDataList = originalData.collectAsList(); |
It is time to get loopy … and to extract relevant data from the original dataset
1 2 3 4 5 6 7 8 9 |
but first, we will have to define some variables.. double [] valuesToAdd = new double[500]; Object [] tmpValues = new Object [500]; Row tmp ; SparseVector tmpVec ; |
The Actual Loop
//in this loop we create a new row for every Row in the original Dataset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
;for (Row r : originalDataList) { tmpVec = (SparseVector) r.get(1); //get the feature data label = (double) r.get(0); // get the label abelNotIndexed" tmpRow = RowFactory.create (label , tmpVec); transformedDataList.add(tmpRow); } transformedDf = sqlContext.createDataFrame(transformedDataList, schemata); |
Using
1 |
RowFactory.create(valueTmpStore, schemata); |
will not satisfy our needs, as our row will only have 2 columns, feature count and the feature array in one column
Create The dataset after the loop with this line
1 |
transformedDf = sqlContext.createDataFrame(transformedDataList, schemata); |
Defining the Pipleline
If you head over to https://spark.apache.org/docs/latest/ml-pipeline.html you can see the toolkit for creating a pipline spark provides for you.
Today most intresting for us, the
- StringIndexer for indexing labes
- VectorAssembler for idnexing the feature Vector
How to use the String Indexer
1 2 3 |
StringIndexerModel labelIndexer = new StringIndexer().setInputCol("labelNotIndexed").setOutputCol(“OutPut LabelColName”).fit(transformedDf); System.out.println("Alpha Labels indexed"); |
How to use the Vector ASsembler
//Get all the features, they are in all cols exept 0 and 1
1 2 |
VectorAssembler assembler = new VectorAssembler().setInputCols(Arrays.copyOfRange(transformedDf.schema().fieldNames(), 1, 2)).setOutputCol(“feature Output Col Name ); System.out.println("Alpha Vector Assembled"); |
Chain the indexers into a Pipeline
Now after defining the Indexers and Assemblers, we can stuff them in the Pipeline like this
1 2 3 4 5 6 7 |
(new PipelineStage[]{labelIndexer, assembler}); System.out.println("Alpha Pipleline build"); transformedDf = model.transform(transformedDf); |
Instantiate the pipeline
Instantiate an instance of the pipeline with
1 |
PipelineModel model = pipeline.fit(transformedDf); |
Apply the Pipline to a dataset, transforming it into Spark Format
1 |
transformedDf = model.transform(transformedDf); |
Validate The Dataset Schema
To see if we have done everyting correct, print ot the schema
1 |
transformedDf.printSchema(); |
it should look like this
Spark Error ” Exception in thread “main” java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running” MetricsSystem”
I recently did some feature engineering on a few datasets with spark.
I wanted to make the datasets available in our Hadoop cluster, so i used our normal dataset upload pattern, but ran into these nasty little errors
1 2 3 |
ERROR SparkContext: Error initializing SparkContext. Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem |
and also
1 2 |
18/07/26 14:36:35 ERROR Utils: Uncaught exception in thread driver-revive-thread org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. |
The full exception stacks looks like this
So what do these error mean and why do they occur?
- It often has to do something, with not initializing the Spark session properly
- Usually means, there is a wrong value, for the master location
- Double check the master adress for the spark session, by default it should use port 7077 and NOT 6066
- Check if the version of the spark cluster is the same as your spark version in the jar / of the job you want to submit
How do I fix “Could not find CoarseGrained Scheduler” or “an only call getServletHandlers on a running MetricsSystem”
- Update master URL
- Update dependencies POM/Gradle/Jar, so you use the same version as the cluster
Now your error should be fixed. Have fun with your Spark Cluster!
How to fix Java Error “The trustAnchors parameter must be non-empty” while building Maven Single Jar
Recently I tried to deploy my Java project as a single fat jar. To my bad awekening, Maven did not feel like creating jars anymore and complained with errors like :
You might encounter an error like this, if you recently reinstalled your java and want to build Jar with maven
1 |
javax.net.ssl.SSLException: java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty |
Or
1 |
java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty |
Or
1 |
java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty |
Or
1 |
Could not resolve dependencies for project com.gt-arc.coda:repo:jar:0.0.3-SNAPSHOT: Failed to collect dependencies at com.sun.jersey:jersey-core:jar:1.9.1: Failed to read artifact descriptor for com.sun.jersey:jersey-core:jar:1.9.1: Could not transfer artifact com.sun.jersey:jersey-core:pom:1.9.1 from/to central (https://repo.maven.apache.org/maven2): java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty -> [Help 1] |
Or
1 |
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project repo: Could not resolve dependencies for project com.gt-arc.coda:repo:jar:0.0.3-SNAPSHOT: Failed to collect dependencies at com.sun.jersey:jersey-core:jar:1.9.1 |
Or
1 |
Caused by: org.eclipse.aether.collection.DependencyCollectionException: Failed to collect dependencies at com.sun.jersey:jersey-core:jar:1.9.1 |
Or
1 |
Caused by: org.eclipse.aether.resolution.ArtifactDescriptorException: Failed to read artifact descriptor for com.sun.jersey:jersey-core:jar:1.9.1 |
Or
1 |
Caused by: org.eclipse.aether.transfer.ArtifactTransferException: Could not transfer artifact com.sun.jersey:jersey-core:pom:1.9.1 from/to central (https://repo.maven.apache.org/maven2): java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty |
The console with spit something like this out :
How to fix the error
To fix the error, we will download the java JDK and extract the certificates into our local java installation.
Get java
1 |
wget http://download.oracle.com/otn-pub/java/jdk/8u171-b11/512cd62ec5174c3487ac17c61aaa89e8/jdk-8u171-linux-x64.rpm |
Extract the jdk
1 |
tar -xzf jdk-8u171-linux-x64.tar.gz |
Copy the Certificate into your java directory
1 |
sudo cp -i /path/to/java/jdk/jdk1.8.0_171/jre/lib/security/cacerts /etc/ssl/certs/java |
And now you can finally build your jar with
1 |
mvn clean compile assembly:single |
How to setup Hadoop 2.9 Pseudo Cluster mode on a remote PC using SSH
In my <other tutorial> we learned about what Hadoop is, why Hadoop is so awesome and what Hadoop is used for. No I will show you, how to setup Hadoop 2.9 in Pseudo Cluster mode on a VM using SSH.
Download Hadoop 2.9
wget http://www-eu.apache.org/dist/hadoop/common/hadoop-2.9.0/hadoop-2.9.0-src.tar.gz
Then unzip it
tar -xvzf hadoop-2.9.0-src.tar.gz
Remember where you extracted this to, because we will need to add the path to the Enviroment Variables later!
To get the path use the handy command
pwd
Download SSH and Rsync
sudo apt-get install ssh
sudo apt-get install rsync
Setup SSH connecton to localhost
ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod og-wx ~/.ssh/authorized_keys
Setup Hadoop Enviroment Variables
sudo gedit ~/.bashrc
and enter the following text (and by that adding the following variables)
export HADOOP_HOME=/path/to/hadoop/folder
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
Next step is to edit the Hadoop-env.sh file located inside of your Hadoop folder in /etc/hadoop/Hadoop-env.sh .
We will add your Java home path to the Hadoop settings.
Change
export JAVA_HOME=${JAVA_HOME}
for
export JAVA_HOME= /usr/lib/jvm/java-8-openjdk-amd64
To make sure you use the right path, write
echo $JAVA_HOME
in your Terminal, to recieve the Java Home Path
Enable Pseudo Cluster Mode
Now we can finally setup the configurations for Hadoop pseudo distributed mode
The necessary files to edit are located inside of the HadoopBase/etc/hadoop folder.
hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>1</value></property>
<property>
<name>dfs.name.dir</name>
<value>file:///home/user/hadoop/data/hdfs/namenode</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///home/user/hadoop/data/hdfs/datanode</value>
</property>
mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
Then Format the File system
bin/hdfs namenode -format
and we are done!
To see how to run Hadoop check this article out!
What is hadoop and Why is it awesome?
Introduction
Hadoop Provides big companies a mean to distribute and store huge amount of data on not only one computer but multiple! You can imagine it like you normal Window or Unix Filesystem, but only distributed! At first you might think, wow ok so what, now I have my 4K Video on 5 different Computers, what do we get from that?
Usually, only one computer supplies us with the data we want, this one computer only has 1 Network connection and limited bandwith. If you have multiple computers in different locations using different connections to the internet, their bandwith sums up and a high perfomance boost will be noticable
You can imagine it, as 1 Person having to deliver a giant rocket consisting of multiple big parts. That one person can only deliver 1 Rocket part at a time. If we use 2 Persons, we already doubled our speed ! The same principle applies to down loading and uploading
So instead of just 1 Computer supplying you with a limited Datastream, you have multiple Computers serving you Data at the same time!
IF you want to setup Hadoop on your Local machien check <THIS> out!
If you are intrested in setting up a Hadoop pseudo cluster check <THIS> out!
If you want to learn about basic java interacting with Hadoop, downloading, uploading from a Distributed File System , check <THIS> out!
Notation
Model
FileSystem Class