Lesson Overview

In this lesson we will:

  • Work through a scenario for forecasting using data stored in Apache Druid#.

Introduction

Apache Druid is an open source database which is well suited for real time, analytical workloads. Druid is the database which underlies Timeflow, our real time event processing platform, which we make fully available to our customers to use for their steam processing or subsequent offline analysis.

In previous articles we discussed how to create simple visualizations in Plotly using real time data stored within Druid. We also looked at simple Anomaly detection, making use of the Facebook Prophet library.

To continue our example, we now wish to begin forecasting using our order time series data in order to predict the number of expected orders over the next period. This time we will make use of an XGBOOST algorithm to demonstrate an alternate approach over Prophet.

Extract Druid via JSON HTTP API

As per last time, the first step is to extract our data of interest from Druid via our preferred HTTP API:

    # define the Druid URL
    url = 'http://druid_ip:druid_port/druid/v2/?pretty'

    # define the Druid query
    query = {'queryType': 'scan',
             'dataSource': '1_1_Orders',
             'intervals': ['2020-06-01T00:00:00.000Z/2020-06-03T00:00:00.000Z'],
             'granularity': 'all'}

    # run the Druid query
    results = requests.post(url, headers={'Content-Type': 'application/json'}, json=query).json()

Convert Data Into Pandas and Perform Exploratory Analysis

Again, we will convert our dataset to a Pandas dataframe and carry out some exploratory analysis of the dataset to get a feel for the data returned.

There is a little complexity here as our dataset gets larger, in that we have to merge the data from multiple Druid segments as we build up our dataframe.

    # extract all segments in a data frame
    df = pd.DataFrame(results[0]['events'])

    for i in range(1, len(results)):

        df = df.append(pd.DataFrame(results[i]['events']))

    # organize the data frame
    df.drop_duplicates(inplace=True)
    df = df[['__time', 'Value']]
    df.rename(columns={'__time': 'time', 'Value': 'value'}, inplace=True)
    df['time'] = df['time'].apply(lambda x: datetime.utcfromtimestamp(x / 1000))
    df['value'] = df['value'].astype(float)
    df.sort_values(by='time', inplace=True)
    df.reset_index(inplace=True, drop=True)

Again we can do some standard Pandas based exploratory analysis to ensure we have a good dataset:

    df.head()
    df.tail()
    df.describe()
    (df['value'] > 100).sum()

Forecasting

    # Define the length of the forecasting window.
    n = 1440

    # Construct the features.
    t_past = df['time']
    X_past = np.transpose(np.vstack([t_past.dt.day.values, t_past.dt.hour.values, t_past.dt.minute.values, t_past.dt.second.values]))

    # Scale The Features
    scaler = MinMaxScaler().fit(X_past)
    X_past = scaler.transform(X_past)

    # Fit The Model
    xgboost = xgb.XGBRegressor(n_estimators=600, max_depth=2).fit(X_past, df['value'].values)

    # Examine Feature Importances
    [[x, format(y, '.4f')] for x, y in zip(['day', 'hour', 'minute', 'second'], xgboost.feature_importances_.tolist())]

    # Extract The In Sample Predictions
    predictions = pd.DataFrame({'time': t_past, 'value': xgboost.predict(X_past)})
    predictions.head()


    # Generate The Out Of Sample Predictions
    t_future = pd.date_range(start=df['time'].values[-1], periods=n, freq='T')
    X_future = np.transpose(np.vstack([t_future.day.values, t_future.hour.values, t_future.minute.values, t_future.second.values]))
    X_future = scaler.transform(X_future)
    forecasts = pd.DataFrame({'time': t_future, 'value': xgboost.predict(X_future)})
    forecasts.head()

Visualising Results With Plotly

The next step is to take the results of the forecast and plot in an interactive Plotly plot to visualise the results.

    # create the layout
    layout = {'plot_bgcolor': 'white',
              'paper_bgcolor': 'white',
              'margin': {'t':10, 'b':10, 'l':10, 'r':10, 'pad':0},
              'legend': {'x': 0, 'y': 1.1, 'orientation': 'h'},
              'font': {'size': 8},
              'yaxis': {'showgrid': True,
                        'zeroline': False,
                        'mirror': True,
                        'color': '#737373',
                        'linecolor': '#d9d9d9',
                        'gridcolor': '#d9d9d9',
                        'tickformat': '$,.0f'},
              'xaxis': {'showgrid': True,
                        'zeroline': False,
                        'mirror': True,
                        'color': '#737373',
                        'linecolor': '#d9d9d9',
                        'gridcolor': '#d9d9d9',
                        'type': 'date',
                        'tickformat': '%d %b %y %H:%M',
                        'tickangle': 0,
                        'nticks': 5}}

    # create the traces
    data = []

    data.append(go.Scatter(x=df['time'],
                           y=df['value'],
                           mode='markers',
                           marker=dict(color='#343a40', size=4),
                           name='Orders',
                           hovertemplate='<b>Actual</b><br>'
                           '<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
                           '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

    data.append(go.Scatter(x=predictions&#91;'time'],
                           y=predictions&#91;'value'],
                           mode='lines',
                           line=dict(color='#e83e8c', width=2, dash='dot', shape='spline'),
                           name='In-Sample Predictions',
                           hovertemplate='<b>In-Sample Predictions</b></br>'
                           '<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
                           '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

    data.append(go.Scatter(x=forecasts&#91;'time'],
                           y=forecasts&#91;'value'],
                           mode='lines',
                           line=dict(color='#8A348E', width=2, dash='dot', shape='spline'),
                           name='Out-Of-Sample Forecasts',
                           hovertemplate='<b>Out-Of-Sample Forecasts</b></br>'
                           '<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
                           '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

    # create the figure
    fig = go.Figure(data=data, layout=layout)

    # display the figure
    fig.show()

    # save the plot
    fig.write_html('forecasts_plot.html')

And the final finished result looks like this, with the forecasted "Out Of Sample" examples being shown on the right. As you can see, this is very much in line with our observed seasonal patterns.

Hopefully this offers an example of how to forecast using an XGBOOST algorithm based on data stored in Apache Druid. All code examples can be downloaded from this Github repository.

Hands-On Training For The Modern Data Stack

Timeflow Academy is an online, hands-on platform for learning about Data Engineering and Modern Cloud-Native Database management using tools such as DBT, Snowflake, Kafka, Spark and Airflow...

Sign Up

Already A Member? Log In

Next Lesson:

Introduction To DuckDB

Prev Lesson:

Forecasting Data Stored In Apache Druid

© 2022 Timeflow Academy.