Course Overview
Druid For Data Engineers

Anomaly Detection Using Data Stored In Apache Druid

Lesson #4

In this lesson we will:

  • Work through a scenario for anomaly detection using data stored in Apache Druid#.

Anomaly Detection

Imagine that using the interactive application outlined above, our data scientists or business users have observed the obvious seasonal behavior as well as a number of potential anomalies in the data that occur throughout the business day. Our aim is to further investigate these seasonal patterns and the anomaly data in a more rigorous way.

IMAGE HERE

The data scientist would like to fit a model to the time series which allows to capture the seasonal behavior and also to identify the anomalies automatically. To do this, he decides to make use of Facebooks Prophet library.

Extract Druid via JSON HTTP API

The first step is to extract our data of interest from Druid via our preferred HTTP API:

    import json
    import requests

    # define the Druid URL
    url = 'http://druid_ip:druid_port/druid/v2/?pretty'

    # define the Druid query
    query = {'queryType': 'scan',
                  'dataSource': 'table_name',
                  'intervals': ['2020-06-03T07:00:00.000Z/2020-06-03T12:00:00.000Z'],
                  'granularity': 'all'}

    # run the Druid query
    results = json.dumps(requests.post(url, headers={'Content-Type': 'application/json'}, json=query).json()[1]['events'])

Convert Data Into Pandas and Perform Exploratory Analysis

Again, we will convert our dataset to a Pandas dataframe and carry out some exploratory analysis of the dataset to get a feel for the data returned.

# organize the results of the Druid query in a pandas data frame
import pandas as pd

df = pd.read_json(results, orient='records')
df = df[['__time', 'Value']]
df.rename(columns={'__time': 'time', 'Value': 'value'}, inplace=True)
df.sort_values(by='time', inplace=True)

df.head()
df.tail()
df.describe()
(df['value'] > 150).sum()

Anomaly Detection

In order to identify the anomalies, we fit a model to our time series and extract the corresponding 99.99% prediction interval. We then classify as anomalies all the data points outside the 99.99% prediction interval. Given that our previous analysis has shown that our time series displays intraday seasonal behavior, the Prophet model is an appropriate choice in our case.

The following code fits the Prophet model to our time series.

from fbprophet import Prophet
X = pd.DataFrame({'ds': df['time'], 'y': df['value']})
m = Prophet(interval_width=0.9999).fit(X)

The following code extracts the data frame with model predictions. The interpretation of the different data frame columns is the following: ‘ds' is the time index, ‘yhat_lower' is the lower bound of the 99.99% prediction interval, ‘yhat_upper' is the upper bound of the 99.99% prediction interval, and ‘yhat' is the model prediction or fitted value. For convenience we also include the actual values in the same data frame, which we call ‘ytrue'.

predictions = m.predict(X)
predictions = predictions[['ds', 'yhat_lower', 'yhat_upper', 'yhat']]
predictions['ytrue'] = df['value'].values
predictions.head()

The following code extracts the anomalies, which as outlined above are the data points falling outside the 99.99% prediction interval, i.e. either below the lower bound of the interval, or above the upper bound of the interval.

predictions['anomaly'] = np.where((predictions['ytrue'] < predictions['yhat_lower']) | (predictions['ytrue'] > predictions['yhat_upper']), True, False)
predictions.head()

Visualizing Anomalies with Plotly

We now wish to expose our anomalies to the business users so they can analyse the identified orders to understand what is special about them. The sample code below demonstrates how to do this:

import plotly.graph_objects as go

# create the layout
layout = {'plot_bgcolor': 'white',
            'paper_bgcolor': 'white',
            'margin': {'t':10, 'b':10, 'l':10, 'r':10, 'pad':0},
            'yaxis': {'showgrid': True,
                    'zeroline': False,
                    'mirror': True,
                    'color': '#737373',
                    'linecolor': '#d9d9d9',
                    'gridcolor': '#d9d9d9',
                    'tickformat': '$,.0f'},
            'xaxis': {'range':[predictions['ds'].min(), predictions['ds'].max()],
                    'autorange': False,
                    'showgrid': True,
                    'zeroline': False,
                    'mirror': True,
                    'color': '#737373',
                    'linecolor': '#d9d9d9',
                    'gridcolor': '#d9d9d9',
                    'type': 'date',
                    'tickformat': '%d %b %y %H:%M',
                    'tickangle': 0,
                    'nticks': 5}}

# create the traces
data = []

data.append(go.Scatter(x=predictions.query('anomaly == False')['ds'],
                        y=predictions.query('anomaly == False')['ytrue'],
                        mode='markers',
                        marker=dict(color='#000000', size=5),
                        name='Orders',
                        hovertemplate='<b>Orders</b><br>'
                        '<b>Time:</b> %{x|%d %b %Y %H:%M:%S}<br>'
                        '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

data.append(go.Scatter(x=predictions.query('anomaly == True')['ds'],
                        y=predictions.query('anomaly == True')['ytrue'],
                        mode='markers',
                        marker=dict(color='#e83e8c', size=5),
                        name='Anomalies',
                        hovertemplate='<b>Anomalies</b><br>'
                        '<b>Time:</b> %{x|%d %b %Y %H:%M:%S}<br>'
                        '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

data.append(go.Scatter(x=predictions['ds'],
                        y=predictions['yhat'],
                        mode='lines',
                        line=dict(color='#8A348E', width=3, dash='dot'),
                        name='Model Predictions',
                        hovertemplate='<b>Model Predictions</b></br>'
                        '<b>Time:</b> %{x|%d %b %Y %H:%M:%S}<br>'
                        '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

# create the figure
fig = go.Figure(data=data, layout=layout)

# display the figure
fig.show()

# export the figure
fig.write_html('model_plot.html')

And here is the finished interactive result:

IMAGES HERE

As in our previous blog post, the user can hover on the data points with the mouse in order to display a tooltip with their corresponding time and value.

Furthermore, the user can double click on the legend items to switch on and off the visibility of the different series. For instance, double clicking on the ‘Orders' and ‘Model Predictions' legend items will remove them from the plot, allowing the user to focus only on the anomalies.

As with the previous post, this could of course be added into a richer Dash application where we need to create a more interactive experience.

Though it wasn't a particularly sophisticated example of anomaly detection, hopefully this example builds on the previous one to demonstrate how we can integrate much more sophisticated analysis between Druid and Plotly using standard Python libraries and frameworks.

All code examples can be downloaded from this Github repository.

Next Lesson:
04

Forecasting Data Stored In Apache Druid

How to forecast using Data in Apache Druid.

0h 15m



Continuous Delivery For Data Engineers

This site has been developed by the team behind Timeflow, an Open Source CI/CD platform designed for Data Engineers who use dbt as part of the Modern Data Stack. Our platform helps Data Engineers improve the quality, reliability and speed of their data transformation pipelines.

Join our mailing list for our latest insights on Data Engineering:

Timeflow Academy is the leading online, hands-on platform for learning about Data Engineering using the Modern Data Stack. Bought to you by Timeflow CI

© 2023 Timeflow Academy. All rights reserved