In this lesson we will:
- Work through a scenario for forecasting using data stored in Apache Druid#.
Introduction
Apache Druid is an open source database which is well suited for real time, analytical workloads. Druid is the database which underlies Timeflow, our real time event processing platform, which we make fully available to our customers to use for their steam processing or subsequent offline analysis.
In previous articles we discussed how to create simple visualizations in Plotly using real time data stored within Druid. We also looked at simple Anomaly detection, making use of the Facebook Prophet library.
To continue our example, we now wish to begin forecasting using our order time series data in order to predict the number of expected orders over the next period. This time we will make use of an XGBOOST algorithm to demonstrate an alternate approach over Prophet.
Extract Druid via JSON HTTP API
As per last time, the first step is to extract our data of interest from Druid via our preferred HTTP API:
# define the Druid URL
url = 'http://druid_ip:druid_port/druid/v2/?pretty'
# define the Druid query
query = {'queryType': 'scan',
'dataSource': '1_1_Orders',
'intervals': ['2020-06-01T00:00:00.000Z/2020-06-03T00:00:00.000Z'],
'granularity': 'all'}
# run the Druid query
results = requests.post(url, headers={'Content-Type': 'application/json'}, json=query).json()
Convert Data Into Pandas and Perform Exploratory Analysis
Again, we will convert our dataset to a Pandas dataframe and carry out some exploratory analysis of the dataset to get a feel for the data returned.
There is a little complexity here as our dataset gets larger, in that we have to merge the data from multiple Druid segments as we build up our dataframe.
# extract all segments in a data frame
df = pd.DataFrame(results[0]['events'])
for i in range(1, len(results)):
df = df.append(pd.DataFrame(results[i]['events']))
# organize the data frame
df.drop_duplicates(inplace=True)
df = df[['__time', 'Value']]
df.rename(columns={'__time': 'time', 'Value': 'value'}, inplace=True)
df['time'] = df['time'].apply(lambda x: datetime.utcfromtimestamp(x / 1000))
df['value'] = df['value'].astype(float)
df.sort_values(by='time', inplace=True)
df.reset_index(inplace=True, drop=True)
Again we can do some standard Pandas based exploratory analysis to ensure we have a good dataset:
df.head()
df.tail()
df.describe()
(df['value'] > 100).sum()
Forecasting
# Define the length of the forecasting window.
n = 1440
# Construct the features.
t_past = df['time']
X_past = np.transpose(np.vstack([t_past.dt.day.values, t_past.dt.hour.values, t_past.dt.minute.values, t_past.dt.second.values]))
# Scale The Features
scaler = MinMaxScaler().fit(X_past)
X_past = scaler.transform(X_past)
# Fit The Model
xgboost = xgb.XGBRegressor(n_estimators=600, max_depth=2).fit(X_past, df['value'].values)
# Examine Feature Importances
[[x, format(y, '.4f')] for x, y in zip(['day', 'hour', 'minute', 'second'], xgboost.feature_importances_.tolist())]
# Extract The In Sample Predictions
predictions = pd.DataFrame({'time': t_past, 'value': xgboost.predict(X_past)})
predictions.head()
# Generate The Out Of Sample Predictions
t_future = pd.date_range(start=df['time'].values[-1], periods=n, freq='T')
X_future = np.transpose(np.vstack([t_future.day.values, t_future.hour.values, t_future.minute.values, t_future.second.values]))
X_future = scaler.transform(X_future)
forecasts = pd.DataFrame({'time': t_future, 'value': xgboost.predict(X_future)})
forecasts.head()
Visualising Results With Plotly
The next step is to take the results of the forecast and plot in an interactive Plotly plot to visualise the results.
# create the layout
layout = {'plot_bgcolor': 'white',
'paper_bgcolor': 'white',
'margin': {'t':10, 'b':10, 'l':10, 'r':10, 'pad':0},
'legend': {'x': 0, 'y': 1.1, 'orientation': 'h'},
'font': {'size': 8},
'yaxis': {'showgrid': True,
'zeroline': False,
'mirror': True,
'color': '#737373',
'linecolor': '#d9d9d9',
'gridcolor': '#d9d9d9',
'tickformat': '$,.0f'},
'xaxis': {'showgrid': True,
'zeroline': False,
'mirror': True,
'color': '#737373',
'linecolor': '#d9d9d9',
'gridcolor': '#d9d9d9',
'type': 'date',
'tickformat': '%d %b %y %H:%M',
'tickangle': 0,
'nticks': 5}}
# create the traces
data = []
data.append(go.Scatter(x=df['time'],
y=df['value'],
mode='markers',
marker=dict(color='#343a40', size=4),
name='Orders',
hovertemplate='<b>Actual</b><br>'
'<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
'<b>Value:</b> %{y: $,.2f}<extra></extra>'))
data.append(go.Scatter(x=predictions['time'],
y=predictions['value'],
mode='lines',
line=dict(color='#e83e8c', width=2, dash='dot', shape='spline'),
name='In-Sample Predictions',
hovertemplate='<b>In-Sample Predictions</b></br>'
'<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
'<b>Value:</b> %{y: $,.2f}<extra></extra>'))
data.append(go.Scatter(x=forecasts['time'],
y=forecasts['value'],
mode='lines',
line=dict(color='#8A348E', width=2, dash='dot', shape='spline'),
name='Out-Of-Sample Forecasts',
hovertemplate='<b>Out-Of-Sample Forecasts</b></br>'
'<b>Time:</b> %{x|%d %b %Y %H:%M}<br>'
'<b>Value:</b> %{y: $,.2f}<extra></extra>'))
# create the figure
fig = go.Figure(data=data, layout=layout)
# display the figure
fig.show()
# save the plot
fig.write_html('forecasts_plot.html')
And the final finished result looks like this, with the forecasted "Out Of Sample" examples being shown on the right. As you can see, this is very much in line with our observed seasonal patterns.
Hopefully this offers an example of how to forecast using an XGBOOST algorithm based on data stored in Apache Druid. All code examples can be downloaded from this Github repository.