A Detailed Look at the Data Platform

For a more gentle introduction to the data platform, please read the Pipeline Overview article.

This article goes into more depth about the architecture and flow of data in the platform.

The Entire Platform

The full detail of the platform can get quite complex, but at a high level the structure is fairly simple.

graph LR Producers[Data Producers] --> Ingestion Ingestion --> Storage[Long-term Storage] Ingestion --> Stream[Stream Processing] Stream --> Storage Batch[Batch Processing] --> Storage Storage --> Batch Self[Self Serve] -.- Stream Self -.- Batch Stream -.-> Visualization Batch -.-> Visualization Stream --> Export Batch --> Export

Each of these high-level parts of the platform are described in more detail below.

Data Producers

By far most data handled by the Data Platform is produced by Firefox. There are other producers, though, and the eventual aim is to generalize data production using a client SDK or set of standard tools.

Most data is submitted via HTTP POST, but data is also produced in the form of service logs and statsd messages.

If you would like to locally test a new data producer, the gzipServer project provides a simplified server that makes it easy to inspect submitted messages.

Ingestion

graph LR subgraph HTTP tee lb[Load Balancer] mozingest end subgraph Kafka kafka_unvalidated[Kafka unvalidated] kafka_validated[Kafka validated] zookeeper[ZooKeeper] -.- kafka_unvalidated zookeeper -.- kafka_validated end subgraph Storage s3_heka[S3 Heka Protobuf Storage] s3_parquet[S3 Parquet Storage] end subgraph Data Producers Firefox --> lb more_producers[Other Producers] --> lb end

lb --> tee tee --> mozingest mozingest --> kafka_unvalidated mozingest --> Landfill kafka_unvalidated --> dwl[Data Store Loader] kafka_validated --> cep[Hindsight CEP] kafka_validated --> sparkstreaming[Spark Streaming] Schemas -.->|validation| dwl dwl --> kafka_validated dwl --> s3_heka dwl --> s3_parquet sparkstreaming --> s3_parquet

Data arrives as an HTTP POST of an optionally gzipped payload of JSON. See the common Edge Server specification for details.

Submissions hit a load balancer which handles the SSL connection, then forwards to a "tee" server, which may direct some or all submissions to alternate backends. In the past, the tee was used to manage the cutover between different versions of the backend infrastructure. It is implemented as an OpenResty plugin.

From there, the mozingest HTTP Server receives submissions from the tee and batches and stores data durably on Amazon S3 as a fail-safe (we call this "Landfill"). Data is then passed along via Kafka for validation and further processing. If there is a problem with decoding, validation, or any of the code described in the rest of this section, data can be re-processed from this fail-safe store. The mozingest server is implemented as an nginx module.

Validation, at a minimum, ensures that a payload is valid JSON (possibly compressed). Many document types also have a JSONSchema specification, and are further validated against that.

Invalid messages are redirected to a separate "errors" stream for debugging and inspection.

Valid messages proceed for further decoding and processing. This involves things like doing GeoIP lookup and discarding the IP address, and attaching some HTTP header info as annotated metadata.

Validated and annotated messages become available for stream processing.

They are also batched and stored durably for later batch processing and ad-hoc querying.

See also the "generic ingestion" proposal which aims to make ingestion, validation, storage, and querying available as self-serve for platform users.

Data flow for valid submissions
sequenceDiagram participant Fx as Firefox participant lb as Load Balancer participant mi as mozingest participant lf as Landfill participant k as Kafka participant dwl as Data Store Loader participant dl as Data Lake
Fx->>lb: HTTPS POST
lb->>mi: forward
mi-->>lf: failsafe store
mi->>k: enqueue
k->>dwl: validate, decode
dwl->>k: enqueue validated
dwl->>dl: store durably
Other ingestion methods

Hindsight is used for ingestion of logs from applications and services, it supports parsing of log lines and appending similar metadata as the HTTP ingestion above (timestamp, source, and so on).

Statsd messages are ingested in the usual way.

Storage

graph TD subgraph RDBMS PostgreSQL Redshift MySQL BigQuery end subgraph NoSQL DynamoDB end subgraph S3 landfill[Landfill] s3_heka[Heka Data Lake] s3_parquet[Parquet Data Lake] s3_analysis[Analysis Outputs] s3_public[Public Outputs] end

Ingestion --> s3_heka Ingestion --> s3_parquet Ingestion --> landfill Ingestion -.-> stream[Stream Processing] stream --> s3_parquet batch[Batch Processing] --> s3_parquet batch --> PostgreSQL batch --> DynamoDB batch --> s3_public selfserve[Self Serve] --> s3_analysis s3_analysis --> selfserve Hive -->|Presto| redash[Re:dash] PostgreSQL --> redash Redshift --> redash MySQL --> redash BigQuery --> redash

s3_parquet -.- Hive

Amazon S3 forms the backbone of the platform storage layer. The primary format used in the Data Lake is parquet, which is a strongly typed columnar storage format that can easily be read and written by Spark, as well as being compatible with SQL interfaces such as Hive and Presto. Some data is also stored in Heka-framed protobuf format. This custom format is usually reserved for data where we do not have a complete JSONSchema specification.

Using S3 for storage avoids the need for an always-on cluster, which means that data at rest is inexpensive. S3 also makes it very easy to automatically expire (delete) objects after a certain period of time, which is helpful for implementing data retention policies.

Once written to S3, the data is typically treated as immutable - data is not appended to existing files, nor is data normally updated in place. The exception here is when data is back-filled, in which case previous data may be overwritten.

There are a number of other types of storage used for more specialized applications, including relational databases (such as PostgreSQL for the Telemetry Aggregates) and NoSQL databases (DynamoDB is used for a backing store for the TAAR project). Reading data from a variety of RDBMS sources is also supported via Re:dash.

The data stored in Heka format is readable from Spark using libraries in Scala or Python.

Parquet data can be read and written natively from Spark, and many datasets are indexed in a Hive Metastore, making them available through a SQL interface on Re:dash and in notebooks via Spark SQL. Many other SQL data sources are also made available via Re:dash, see this article for more information on accessing data using SQL.

There is a separate data store for self-serve Analysis Outputs, intended to keep ad-hoc, temporary data out of the Data Lake. This is implemented as a separate S3 location, with personal output locations prefixed with each person's user id, similar to the layout of the /home directory on a Unix system. See the Working with Parquet data cookbook for more details.

Analysis outputs can also be made public using the Public Outputs bucket. This is a web-accessible S3 location for powering public dashboards. This public data is available at https://analysis-output.telemetry.mozilla.org/<job name>/data/<files>.

Stream Processing

Stream processing is done using Hindsight and Spark Streaming.

Hindsight allows you to run plugins written in Lua inside a sandbox. This gives a safe, performant way to do self-serve streaming analysis. See this article for an introduction. Hindsight plugins do the initial data validation and decoding, as well as writing out to long-term storage in both Heka-framed protobuf and parquet forms.

Spark Streaming is used to read from Kafka and perform low-latency ETL and aggregation tasks. These aggregates are currently used by Mission Control and are also available for querying via Re:dash.

Batch Processing

Batch processing is done using Spark. Production ETL code is written in both Python and Scala.

There are Python and Scala libraries for reading data from the Data Lake in Heka-framed protobuf form, though it is much easier and more performant to make use of a derived dataset whenever possible.

Datasets in parquet format can be read natively by Spark, either using Spark SQL or by reading data directly from S3.

Data produced by production jobs go into the Data Lake, while output from ad-hoc jobs go into Analysis Outputs.

Job scheduling and dependency management is done using Airflow. Most jobs run once a day, processing data from "yesterday" on each run. A typical job launches a cluster, which fetches the specified ETL code as part of its bootstrap on startup, runs the ETL code, then shuts down upon completion. If something goes wrong, a job may time out or fail, and in this case it is retried automatically.

Self Serve Data Analysis

graph TD subgraph Storage lake[Data Lake] s3_output_public[Public Outputs] s3_output_private[Analysis Outputs] end subgraph ATMO Jupyter -->|runs on| emr_cluster[Ad hoc EMR Cluster] Zeppelin -->|runs on| emr_cluster atmo_service[ATMO Service] -->|launch| emr_cluster atmo_service -->|schedule| emr_job[fa:fa-clock-o Scheduled EMR Job] emr_cluster -->|mount| EFS emr_cluster -->|read + write| lake emr_job -->|read + write| s3_output_public emr_job -->|read + write| s3_output_private end subgraph STMO redash[Re:dash] -->|read| lake end subgraph TMO evo[Evolution Dashboard] histo[Histogram Dashboard] agg[Telemetry Aggregates] evo -.- agg histo -.- agg end subgraph Databricks db_notebook[Notebook] db_notebook -->|read + write| lake end

Most of the data analysis tooling has been developed with the goal of being "self-serve". This means that people should be able to access and analyze data on their own, without involving data engineers or operations. Thus can data access scale beyond a small set of people with specialized knowledge of the entire pipeline.

The use of these self-serve tools is described in the Getting Started article. This section focuses on how these tools integrate with the platform infrastructure.

ATMO: Spark Analysis

ATMO is a service for managing Spark clusters for data analysis. Clusters can be launched on demand, or can be scheduled to run a job on an ongoing basis. These clusters can read from the Data Lake described in the Storage section above, and can write results to either public (web-accessible) or private output locations.

Jupyter or Zeppelin notebooks are the usual interface to getting work done using a cluster, though you get full SSH access to the cluster.

Clusters launched via ATMO are automatically killed after a user-defined period of time (by default, 8 hours), though their lifetime can be extended as needed. Each cluster has a user-specific EFS volume mounted on the /home/hadoop directory, which means that data stored locally on the cluster persists from one cluster to the next. This volume is shared by all clusters launched by a given ATMO user.

STMO: SQL Analysis

STMO is a customized Re:dash installation that provides self-serve access to a a variety of different datasets. From here, you can query data in the Parquet Data Lake as well as various RDBMS data sources.

STMO interfaces with the data lake using both Presto and Amazon Athena. Each has its own data source in Re:dash. Since Athena does not support user-defined functions, datasets with HyperLogLog columns, such as client_count_daily, are only available via Presto..

Different Data Sources in STMO connect to different backends, and each backend might use a slightly different flavor of SQL. You should find a link to the documentation for the expected SQL variant next to the Data Sources list.

Queries can be run just once, or scheduled to run periodically to keep data up-to-date.

There is a command-line interface to STMO called St. Mocli, if you prefer writing SQL using your own editor and tools.

Databricks: Managed Spark Analysis

Our Databricks instance (see Databricks docs) offers another notebook interface for doing analysis in Scala, SQL, Python and R.

Databricks provides an always-on shared server which is nice for quick data investigations. ATMO clusters take some time to start up, usually on the order of tens of minutes. The shared server allows you to avoid this start-up cost. Prefer ATMO for heavy analyses since you will have dedicated resources.

TMO: Aggregate Graphs

TMO provides easy visualizations of histogram and scalar measures over time. Time can be in terms of either builds or submission dates. This is the most convenient interface to the Telemetry data, as it does not require any custom code.

Visualization

There are a number of visualization libraries and tools being used to display data.

TMO Dashboards

The landing page at telemetry.mozilla.org is a good place to look for existing graphs, notably the measurement dashboard which gives a lot of information about histogram and scalar measures collected on pre-release channels.

Notebooks

Use of interactive notebooks has become a standard in the industry, and Mozilla makes heavy use of this approach. ATMO makes it easy to run, share, and schedule Jupyter and Zeppelin notebooks.

Databricks notebooks are also an option.

Others

Re:dash lets you query the data using SQL, but it also supports a number of useful visualizations.

Hindsight's web interface has the ability to visualize time-series data.

Mission Control gives a low-latency view into release health.

Many bespoke visualizations are built using the Metrics Graphics library as a display layer.

Monitoring and Alerting

There are multiple layers of monitoring and alerting.

At a low level, the system is monitored to ensure that it is functioning as expected. This includes things like machine-level resources (network capacity, disk space, available RAM, CPU load) which are typically monitored using DataDog.

Next, we monitor the "transport" functionality of the system. This includes monitoring incoming submission rates, payload sizes, traffic patterns, schema validation failure rates, and alerting if anomalies are detected. This type of anomaly detection and alerting is handled by Hindsight.

Once data has been safely ingested and stored, we run some automatic regression detection on all Telemetry histogram measures using Cerberus. This code looks for changes in the distribution of a measure, and emails probe owners if a significant change is observed.

Production ETL jobs are run via Airflow, which monitors batch job progress and alerts if there are failures in any job. Self-serve batch jobs run via ATMO also generate alerts upon failure.

Scheduled Re:dash queries may also be configured to generate alerts, which is used to monitor the last-mile user facing status of derived datasets. Re:dash may also be used to monitor and alert on high-level characteristics of the data, or really anything you can think of.

Data Exports

Data is exported from the pipeline to a few other tools and systems. Examples include integration with Amplitude for mobile and product analytics, publishing reports and visualizations to the Mozilla Data Collective, and shipping data to other parts of the Mozilla organization.

There are also a few data sets which are made publicly available, such as the Firefox Hardware Report.

Bringing it all together

Finally, here is a more detailed view of the entire platform. Some connections are omitted for clarity.

graph LR subgraph Data Producers Firefox more_producers[...] end subgraph Storage Landfill warehouse_heka[Heka Data Lake] warehouse_parquet[Parquet Data Lake] warehouse_analysis[Analysis Outputs] PostgreSQL Redshift MySQL hive[Hive] -.- warehouse_parquet end subgraph Stream Processing cep[Hindsight Streaming] dwl[Data Store Loader] --> warehouse_heka dwl --> warehouse_parquet sparkstreaming[Spark Streaming] --> warehouse_parquet end subgraph Ingestion Firefox --> lb[Load Balancer] more_producers --> lb lb --> tee tee --> mozingest mozingest --> kafka mozingest --> Landfill ZooKeeper -.- kafka[Kafka] kafka --> dwl kafka --> cep kafka --> sparkstreaming end subgraph Batch Processing Airflow -.->|spark|tbv[telemetry-batch-view] Airflow -.->|spark|python_mozetl warehouse_heka --> tbv warehouse_parquet --> tbv warehouse_heka --> python_mozetl warehouse_parquet --> python_mozetl tmo_agg[Telemetry Aggregates] end subgraph Visualization Hindsight Jupyter Zeppelin TMO redash_graphs[Re:dash] MissionControl bespoke_viz[Bespoke Viz] end subgraph Export tbv --> Amplitude sparkstreaming --> Amplitude end subgraph Self Serve redash[Re:dash] -.-> Presto Presto --> hive redash -.-> Athena Athena --> hive ATMO -.-> spcluster[Spark Cluster] warehouse_heka --> spcluster warehouse_parquet --> spcluster spcluster --> warehouse_analysis end Schemas -.->|validation| dwl