Course Overview
Druid For Data Engineers

Forecasting Data Stored In Apache Druid

Lesson #5

In this lesson we will:

  • Work through a scenario for forecasting using data stored in Apache Druid#.

Introduction

Apache Druid is an open source database which is well suited for real time, analytical workloads. Druid is the database which underlies Timeflow, our real time event processing platform, which we make fully available to our customers to use for their steam processing or subsequent offline analysis.

In previous articles we discussed how to create simple visualizations in Plotly using real time data stored within Druid. We also looked at simple Anomaly detection, making use of the Facebook Prophet library.

To continue our example, we now wish to begin forecasting using our order time series data in order to predict the number of expected orders over the next period. This time we will make use of an XGBOOST algorithm to demonstrate an alternate approach over Prophet.

Extract Druid via JSON HTTP API

As per last time, the first step is to extract our data of interest from Druid via our preferred HTTP API:

    # define the Druid URL
    url = 'http://druid_ip:druid_port/druid/v2/?pretty'

    # define the Druid query
    query = {'queryType': 'scan',
             'dataSource': '1_1_Orders',
             'intervals': ['2020-06-01T00:00:00.000Z/2020-06-03T00:00:00.000Z'],
             'granularity': 'all'}

    # run the Druid query
    results = requests.post(url, headers={'Content-Type': 'application/json'}, json=query).json()

Convert Data Into Pandas and Perform Exploratory Analysis

Again, we will convert our dataset to a Pandas dataframe and carry out some exploratory analysis of the dataset to get a feel for the data returned.

There is a little complexity here as our dataset gets larger, in that we have to merge the data from multiple Druid segments as we build up our dataframe.

    # extract all segments in a data frame
    df = pd.DataFrame(results[0]['events'])

    for i in range(1, len(results)):

        df = df.append(pd.DataFrame(results[i]['events']))

    # organize the data frame
    df.drop_duplicates(inplace=True)
    df = df[['__time', 'Value']]
    df.rename(columns={'__time': 'time', 'Value': 'value'}, inplace=True)
    df['time'] = df['time'].apply(lambda x: datetime.utcfromtimestamp(x / 1000))
    df['value'] = df['value'].astype(float)
    df.sort_values(by='time', inplace=True)
    df.reset_index(inplace=True, drop=True)

Again we can do some standard Pandas based exploratory analysis to ensure we have a good dataset:

    df.head()
    df.tail()
    df.describe()
    (df['value'] > 100).sum()

Forecasting

    # Define the length of the forecasting window.
    n = 1440

    # Construct the features.
    t_past = df['time']
    X_past = np.transpose(np.vstack([t_past.dt.day.values, t_past.dt.hour.values, t_past.dt.minute.values, t_past.dt.second.values]))

    # Scale The Features
    scaler = MinMaxScaler().fit(X_past)
    X_past = scaler.transform(X_past)

    # Fit The Model
    xgboost = xgb.XGBRegressor(n_estimators=600, max_depth=2).fit(X_past, df['value'].values)

    # Examine Feature Importances
    [[x, format(y, '.4f')] for x, y in zip(['day', 'hour', 'minute', 'second'], xgboost.feature_importances_.tolist())]

    # Extract The In Sample Predictions
    predictions = pd.DataFrame({'time': t_past, 'value': xgboost.predict(X_past)})
    predictions.head()


    # Generate The Out Of Sample Predictions
    t_future = pd.date_range(start=df['time'].values[-1], periods=n, freq='T')
    X_future = np.transpose(np.vstack([t_future.day.values, t_future.hour.values, t_future.minute.values, t_future.second.values]))
    X_future = scaler.transform(X_future)
    forecasts = pd.DataFrame({'time': t_future, 'value': xgboost.predict(X_future)})
    forecasts.head()

Visualising Results With Plotly

The next step is to take the results of the forecast and plot in an interactive Plotly plot to visualise the results.

    # create the layout
    layout = {'plot_bgcolor': 'white',
              'paper_bgcolor': 'white',
              'margin': {'t':10, 'b':10, 'l':10, 'r':10, 'pad':0},
              'legend': {'x': 0, 'y': 1.1, 'orientation': 'h'},
              'font': {'size': 8},
              'yaxis': {'showgrid': True,
                        'zeroline': False,
                        'mirror': True,
                        'color': '#737373',
                        'linecolor': '#d9d9d9',
                        'gridcolor': '#d9d9d9',
                        'tickformat': '$,.0f'},
              'xaxis': {'showgrid': True,
                        'zeroline': False,
                        'mirror': True,
                        'color': '#737373',
                        'linecolor': '#d9d9d9',
                        'gridcolor': '#d9d9d9',
                        'type': 'date',
                        'tickformat': '%d %b %y %H:%M',
                        'tickangle': 0,
                        'nticks': 5}}

    # create the traces
    data = []

    data.append(go.Scatter(x=df['time'],
                           y=df['value'],
                           mode='markers',
                           marker=dict(color='#343a40', size=4),
                           name='Orders',
                           hovertemplate='<b>Actual</b><br>'
                           '<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
                           '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

    data.append(go.Scatter(x=predictions&#91;'time'],
                           y=predictions&#91;'value'],
                           mode='lines',
                           line=dict(color='#e83e8c', width=2, dash='dot', shape='spline'),
                           name='In-Sample Predictions',
                           hovertemplate='<b>In-Sample Predictions</b></br>'
                           '<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
                           '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

    data.append(go.Scatter(x=forecasts&#91;'time'],
                           y=forecasts&#91;'value'],
                           mode='lines',
                           line=dict(color='#8A348E', width=2, dash='dot', shape='spline'),
                           name='Out-Of-Sample Forecasts',
                           hovertemplate='<b>Out-Of-Sample Forecasts</b></br>'
                           '<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
                           '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

    # create the figure
    fig = go.Figure(data=data, layout=layout)

    # display the figure
    fig.show()

    # save the plot
    fig.write_html('forecasts_plot.html')

And the final finished result looks like this, with the forecasted "Out Of Sample" examples being shown on the right. As you can see, this is very much in line with our observed seasonal patterns.

Hopefully this offers an example of how to forecast using an XGBOOST algorithm based on data stored in Apache Druid. All code examples can be downloaded from this Github repository.

Next Lesson:
05

Integrating Plotly Dash With Apache Druid

Work through a scenario for integrating Plotly Dash with Apache Druid.

0h 15m



Continuous Delivery For Data Engineers

This site has been developed by the team behind Timeflow, an Open Source CI/CD platform designed for Data Engineers who use dbt as part of the Modern Data Stack. Our platform helps Data Engineers improve the quality, reliability and speed of their data transformation pipelines.

Join our mailing list for our latest insights on Data Engineering:

Timeflow Academy is the leading online, hands-on platform for learning about Data Engineering using the Modern Data Stack. Bought to you by Timeflow CI

© 2023 Timeflow Academy. All rights reserved