from deephaven import ui, agg, empty_table
from deephaven.stream.table_publisher import table_publisher
from deephaven.stream import blink_to_append_only
from deephaven.plot import express as dx
from deephaven import updateby as uby
from deephaven import dtypes as dht
stocks = dx.data.stocks().reverse()
def set_bol_properties(fig):
fig.update_layout(showlegend=False)
fig.update_traces(fill="tonexty", fillcolor="rgba(255,165,0,0.08)")
@ui.component
def line_plot(filtered_source, exchange, window_size, bol_bands):
window_size_key = {
"5 seconds": ("PriceAvg5s", "PriceStd5s"),
"30 seconds": ("PriceAvg30s", "PriceStd30s"),
"1 minute": ("PriceAvg1m", "PriceStd1m"),
"5 minutes": ("PriceAvg5m", "PriceStd5m"),
}
# 90th, 95th, 97.5th, and 99.5th percentiles of a standard normal distribution
bol_bands_key = {
"None": None,
"80%": 1.282,
"90%": 1.645,
"95%": 1.960,
"99%": 2.576,
}
base_plot = ui.use_memo(
lambda: (
dx.line(
filtered_source,
x="Timestamp",
y="Price",
by="Exchange" if exchange == "All" else None,
unsafe_update_figure=lambda fig: fig.update_traces(opacity=0.4),
)
),
[filtered_source, exchange],
)
window_size_avg_key_col = window_size_key[window_size][0]
window_size_std_key_col = window_size_key[window_size][1]
avg_plot = ui.use_memo(
lambda: dx.line(
filtered_source,
x="Timestamp",
y=window_size_avg_key_col,
color_discrete_sequence=["orange"],
labels={window_size_avg_key_col: "Rolling Average"},
),
[filtered_source, window_size_avg_key_col],
)
bol_bands_key_col = bol_bands_key[bol_bands]
bol_plot = ui.use_memo(
lambda: (
dx.line(
filtered_source.update(
[
f"errorY={window_size_avg_key_col} + {bol_bands_key_col}*{window_size_std_key_col}",
f"errorYMinus={window_size_avg_key_col} - {bol_bands_key_col}*{window_size_std_key_col}",
]
),
x="Timestamp",
y=["errorYMinus", "errorY"],
color_discrete_sequence=["rgba(255,165,0,0.3)", "rgba(255,165,0,0.3)"],
unsafe_update_figure=set_bol_properties,
)
if bol_bands_key_col is not None
else None
),
[
filtered_source,
window_size_avg_key_col,
window_size_std_key_col,
bol_bands_key_col,
],
)
plot = ui.use_memo(
lambda: dx.layer(base_plot, avg_plot, bol_plot), [base_plot, avg_plot, bol_plot]
)
return ui.panel(plot, title="Prices")
@ui.component
def full_table(source):
return ui.panel(source, title="Full Table")
@ui.component
def filtered_table(source, exchange):
if exchange == "All":
return ui.panel(
source.drop_columns(
[
"PriceAvg5s",
"PriceStd5s",
"PriceAvg30s",
"PriceStd30s",
"PriceAvg1m",
"PriceStd1m",
"PriceAvg5m",
"PriceStd5m",
]
).reverse(),
title="Filtered Table",
)
return ui.panel(
source.drop_columns(
[
"PriceAvg5s",
"PriceStd5s",
"PriceAvg30s",
"PriceStd30s",
"PriceAvg1m",
"PriceStd1m",
"PriceAvg5m",
"PriceStd5m",
]
)
.where(f"exchange == `{exchange}`")
.reverse(),
title="Filtered Table",
)
@ui.component
def parameters_panel(
symbols,
exchanges,
symbol,
set_symbol,
exchange,
set_exchange,
window_size,
set_window_size,
bol_bands,
set_bol_bands,
):
symbol_picker = ui.picker(
*symbols,
label="Symbol",
on_selection_change=set_symbol,
selected_key=symbol,
)
exchange_picker = ui.picker(
*exchanges,
label="Exchange",
on_selection_change=set_exchange,
selected_key=exchange,
)
window_size_selector = ui.button_group(
ui.button(
"5 seconds",
variant="accent" if window_size == "5 seconds" else None,
on_press=lambda: set_window_size("5 seconds"),
),
ui.button(
"30 seconds",
variant="accent" if window_size == "30 seconds" else None,
on_press=lambda: set_window_size("30 seconds"),
),
ui.button(
"1 minute",
variant="accent" if window_size == "1 minute" else None,
on_press=lambda: set_window_size("1 minute"),
),
ui.button(
"5 minutes",
variant="accent" if window_size == "5 minutes" else None,
on_press=lambda: set_window_size("5 minutes"),
),
margin_x=10,
)
bolinger_band_selector = ui.button_group(
ui.button(
"None",
variant="accent" if bol_bands == "None" else None,
on_press=lambda: set_bol_bands("None"),
),
ui.button(
"80%",
variant="accent" if bol_bands == "80%" else None,
on_press=lambda: set_bol_bands("80%"),
),
ui.button(
"90%",
variant="accent" if bol_bands == "90%" else None,
on_press=lambda: set_bol_bands("90%"),
),
ui.button(
"95%",
variant="accent" if bol_bands == "95%" else None,
on_press=lambda: set_bol_bands("95%"),
),
ui.button(
"99%",
variant="accent" if bol_bands == "99%" else None,
on_press=lambda: set_bol_bands("99%"),
),
margin_x=10,
)
return ui.panel(
ui.flex(
ui.flex(symbol_picker, exchange_picker, gap="size-200"),
ui.flex(
ui.text("Window size:"),
ui.flex(window_size_selector, direction="row"),
gap="size-100",
direction="column",
),
ui.flex(
ui.text("Bolinger bands:"),
ui.flex(bolinger_band_selector, direction="row"),
gap="size-100",
direction="column",
),
margin="size-200",
direction="column",
gap="size-200",
),
title="Parameters",
)
@ui.component
def orderbook_panel(symbols):
symbol, set_symbol = ui.use_state("")
size, set_size = ui.use_state(0)
blink_table, publisher = ui.use_memo(
lambda: table_publisher(
"Order table", {"Sym": dht.string, "Size": dht.int32, "Side": dht.string}
),
[],
)
t = ui.use_memo(lambda: blink_to_append_only(blink_table), [blink_table])
def submit_order(order_sym, order_size, side):
publisher.add(
empty_table(1).update(
[f"Sym=`{order_sym}`", f"Size={order_size}", f"Side=`{side}`"]
)
)
def handle_buy(_):
submit_order(symbol, size, "buy")
def handle_sell(_):
submit_order(symbol, size, "sell")
symbol_picker = ui.picker(
*symbols,
label="Symbol",
label_position="side",
on_selection_change=set_symbol,
selected_key=symbol,
)
size_selector = ui.number_field(
label="Size", label_position="side", value=size, on_change=set_size
)
return ui.panel(
ui.flex(
symbol_picker,
size_selector,
ui.button("Buy", on_press=handle_buy, variant="accent", style="fill"),
ui.button("Sell", on_press=handle_sell, variant="negative", style="fill"),
gap="size-200",
margin="size-200",
wrap=True,
),
t,
title="Order Book",
)
@ui.component
def my_layout(source, source_with_stats):
# lists of symbols and exchanges will inform drop-down selectors
symbols = ui.use_column_data(source.agg_by(agg.unique(cols="Sym"), by="Sym"))
exchanges = ui.use_column_data(
source.agg_by(agg.unique(cols="Exchange"), by="Exchange")
)
exchanges.append("All")
# state variables
symbol, set_symbol = ui.use_state(symbols[0])
exchange, set_exchange = ui.use_state("All")
window_size, set_window_size = ui.use_state("30 seconds")
bol_bands, set_bol_bands = ui.use_state("90%")
# use state variables to par down source table before sending off to other functions
single_symbol = ui.use_memo(
lambda: (
source_with_stats.where([f"Sym == `{symbol}`"])
.drop_columns(
[
"PriceAvg5s",
"PriceStd5s",
"PriceAvg30s",
"PriceStd30s",
"PriceAvg1m",
"PriceStd1m",
"PriceAvg5m",
"PriceStd5m",
]
)
.rename_columns(
[
"PriceAvg5s=PriceAvg5sAvg",
"PriceStd5s=PriceStd5sAvg",
"PriceAvg30s=PriceAvg30sAvg",
"PriceStd30s=PriceStd30sAvg",
"PriceAvg1m=PriceAvg1mAvg",
"PriceStd1m=PriceStd1mAvg",
"PriceAvg5m=PriceAvg5mAvg",
"PriceStd5m=PriceStd5mAvg",
]
)
if exchange == "All"
else source_with_stats.where(
[f"Sym == `{symbol}`", f"Exchange == `{exchange}`"]
).drop_columns(
[
"PriceAvg5sAvg",
"PriceStd5sAvg",
"PriceAvg30sAvg",
"PriceStd30sAvg",
"PriceAvg1mAvg",
"PriceStd1mAvg",
"PriceAvg5mAvg",
"PriceStd5mAvg",
]
)
),
[symbol, source_with_stats],
)
return ui.row(
ui.column(
line_plot(single_symbol, exchange, window_size, bol_bands),
ui.stack(
full_table(source),
filtered_table(single_symbol, exchange),
),
width=65,
),
ui.column(
ui.row(
parameters_panel(
symbols,
exchanges,
symbol,
set_symbol,
exchange,
set_exchange,
window_size,
set_window_size,
bol_bands,
set_bol_bands,
),
height=40,
),
ui.row(orderbook_panel(symbols), height=60),
width=35,
),
)
# precompute all of the rolling statistics
_sorted_stocks = dx.data.stocks()
_stocks_with_stats = _sorted_stocks.update_by(
[
uby.rolling_avg_time("Timestamp", "PriceAvg5sAvg=Price", "PT2.5s", "PT2.5s"),
uby.rolling_avg_time("Timestamp", "PriceAvg30sAvg=Price", "PT15s", "PT15s"),
uby.rolling_avg_time("Timestamp", "PriceAvg1mAvg=Price", "PT30s", "PT30s"),
uby.rolling_avg_time("Timestamp", "PriceAvg5mAvg=Price", "PT150s", "PT150s"),
uby.rolling_std_time("Timestamp", "PriceStd5sAvg=Price", "PT2.5s", "PT2.5s"),
uby.rolling_std_time("Timestamp", "PriceStd30sAvg=Price", "PT15s", "PT15s"),
uby.rolling_std_time("Timestamp", "PriceStd1mAvg=Price", "PT30s", "PT30s"),
uby.rolling_std_time("Timestamp", "PriceStd5mAvg=Price", "PT150s", "PT150s"),
],
by=["Sym"],
).update_by(
[
uby.rolling_avg_time("Timestamp", "PriceAvg5s=Price", "PT2.5s", "PT2.5s"),
uby.rolling_avg_time("Timestamp", "PriceAvg30s=Price", "PT15s", "PT15s"),
uby.rolling_avg_time("Timestamp", "PriceAvg1m=Price", "PT30s", "PT30s"),
uby.rolling_avg_time("Timestamp", "PriceAvg5m=Price", "PT150s", "PT150s"),
uby.rolling_std_time("Timestamp", "PriceStd5s=Price", "PT2.5s", "PT2.5s"),
uby.rolling_std_time("Timestamp", "PriceStd30s=Price", "PT15s", "PT15s"),
uby.rolling_std_time("Timestamp", "PriceStd1m=Price", "PT30s", "PT30s"),
uby.rolling_std_time("Timestamp", "PriceStd5m=Price", "PT150s", "PT150s"),
],
by=["Sym", "Exchange"],
)
dashboard = ui.dashboard(my_layout(stocks, _stocks_with_stats))