Working with HyperLogLog in Zeppelin
This guide will set you up to work with HyperLogLog in Zeppelin.
Zeppelin Configuration
-
Launch a Zeppelin notebook
-
Open the panel for Interpreter configuration
- This can be found at
localhost:8890/#/intepreter
- This can be found at
-
Add the Sonatype Snapshot repository
- Expand the Repository Information cog, next to the create button
- Settings are as follows:
ID: Sonatype OSS Snapshots URL: https://oss.sonatype.org/content/repositories/snapshots Snapshot: true -
Add the dependency to the
Sparkinterpreter- spark > Edit > Dependencies
- Add the following entry to artifacts:
com.mozilla.telemetry:spark-hyperloglog:2.0.0-SNAPSHOT
These steps should enable the use of the library within the notebook. Using the
%dep interpreter to dynamically add the library is currently not supported.
You may want to add a short snippet near the top of the notebook to make the
functions more accessible.
import org.apache.spark.sql.functions.udf
import com.mozilla.spark.sql.hyperloglog.aggregates._
import com.mozilla.spark.sql.hyperloglog.functions._
val HllMerge = new HyperLogLogMerge
val HllCreate = udf(hllCreate _)
val HllCardinality = udf(hllCardinality _)
spark.udf.register("hll_merge", HllMerge)
spark.udf.register("hll_create", HllCreate)
spark.udf.register("hll_cardinality", HllCardinality)
Example Usage
This is a short example which can also be used to verify expected behavior.
case class Example(uid: String, color: String)
val examples = Seq(
Example("uid_1", "red"),
Example("uid_2", "blue"),
Example("uid_3", "blue"),
Example("uid_3", "red"))
val frame = examples.toDF()
In a single expression, we can create and count the unique id's that appear in the DataFrame.
>>> frame
.select(expr("hll_create(uid, 12) as hll"))
.groupBy()
.agg(expr("hll_cardinality(hll_merge(hll)) as count"))
.show()
+-----+
|count|
+-----+
| 3|
+-----+
The code in the previous section defines UDF functions that can be used directly as Spark column expressions. Let's explore the data structure a bit more in slightly more detail.
val example = frame
.select(HllCreate($"uid", lit(12)).alias("hll"), $"color")
.groupBy("color")
.agg(HllMerge($"hll").alias("hll"))
example.createOrReplaceTempView("example")
This groups uids by the color attribute and registers the table with the SQL
context. Each row contains a HLL binary object representing the set of uids.
>>> example.show()
+-----|--------------------+
|color| hll|
+-----|--------------------+
| red|[02 0C 00 00 00 0...|
| blue|[02 0C 00 00 00 0...|
+-----|--------------------+
Each HLL object takes up 2^12 bits of space. This configurable size parameter
affects the size and standard error of the cardinality estimates. The
cardinality operator can count the number of uids associated with each color.
>>> example.select($"color", HllCardinality($"hll").alias("count")).show()
+-----|-----+
|color|count|
+-----|-----+
| red| 2|
| blue| 2|
+-----|-----+
We can also write this query in the %sql interpreter.
%dep sql
SELECT color, hll_cardinality(hll_merge(hll)) as count
FROM example
GROUP BY color
Finally, note that the color HLL sets have an overlapping uid. We obtain the
count of uids and avoid double counting by merging the sets.
>>> example.groupBy().agg(HllCardinality(HllMerge($"hll")).alias("count")).show()
+-----+
|count|
+-----+
| 3|
+-----+