Tag Archives: streaming

Spark Kubernetes error Can only call getServletHandlers on a running MetricsSystem – How to fix it

Did you encounter an error like

java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem

The fix is pretty straight forward.
The error is caused by running a different Spark version in the cluster then the one used for Spark submit.

Double-check the cluster Spark version via for the cluster UI or checking the Spark Master pod.
Then check the version of your spark-submit, it is usually shown during submitting a job.

This is the full stack trace :

java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
at phabmacsJobs.KafkaJsonWriter$.main(KafkaJsonWriter.scala:32)
at phabmacsJobs.KafkaJsonWriter.main(KafkaJsonWriter.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
at phabmacsJobs.KafkaJsonWriter$.main(KafkaJsonWriter.scala:32)
at phabmacsJobs.KafkaJsonWriter.main(KafkaJsonWriter.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Java Spark Tips, Tricks and Basics 7 – How to accumulate a variable in Spark cluster? Why do we need to accumulate variables?

Why do we need Spark accumulators

An accumulator is a shared variable across all the nodes and it is used to accumulate values of a type ( Long or Double).

It is necessary to use an accumulator, to implement a distributed counting variable which can be updated by multiple processes.

Nodes may not read the value of an accumulator, but the driver has full access to it.

Nodes can only accumulate values into the accumulator.

You will find the functionality for this in the accumulator Class of Spark. Keep in mind, that we are using the AccumulatorV2, older accumulators are deprecated for Spark version below 2.0

 

Don’t forget to register your accumulator to the Spark Context if you create it separately.

 

What did we learn?

In this short tutorial, you learned what Spark Accumulators are for,  what accumulators do  and how to use them in Java.

Java Spark Tips, Tricks and Basics 6 – How to broadcast a variable to Spark cluster? Why do we need to broadcast variables?

Why do we need Spark broadcasters?

Spark is all about cluster computing. In a cluster of nodes, each node of course has it’s personal private memory.

If we want all the nodes in the cluster to work towards a common goal,  having shared variables just seems necessary.

Let’s say we want to sum up all the rows in a CSV table with 1 million lines. It makes just sense, to let 1 node work with 1/2 million and the other work with the other 1/2 million rows. Both calculate their results and then the driver program will combine their results.

Broadcasting allows us to create a read-only cached copy of a variable on every node in our cluster. The distribution of those variables is handled by efficient broadcast algorithms implemented by Spark under the hood. This will also take the burden of thinking about serialization and deserialization since good old Spark takes care of that!

This great functionality for broadcasting is provided by the SparkContext class.  Alternatively, one can also consider to use the broadcast class right away, do your work

How to broadcast a variable in Spark Java

What did we learn?

In this short tutorial, you learned what Spark Broadcast is for,  what Broadcast does and how to use it in Java.

Java Spark – Errors while using map function in cluster mode – Spark java

Ever tried a map function, for each function or a simmilar Lambda function and it runs in local mode but you cannot get it running in cluster mode?

Then you just found your solution!

 

First, go to your java root directory and call

 

If you have this error :

 

Or a Stack trace like this :

 

Java Spark Tips, Tricks and Basics 2 – How to add columns to Datasets / Dataframes in Spark Java

This tutorial will show you how to add a new column to an already existing dataset /dataframe .

 

First we create a dataset.

 

 

Then we add a column with lit

 

and we are done!

Java Spark Tips, Tricks and Basics 1 – How to read images as Datasets / Dataframes from Hadoop in Spark Java

This tutorial will show you how to read a folder of images from a Hadoop folder.

Just use the following command and update the path to your image folder in the Hadoop HDFS

We will be using  Image Schema   and it’s  readImages function.

 

That’s it already!