Write Java with Deephaven

At this point you should be familiar with DQL and Groovy. The next step is to learn how to write even more advanced queries using Java directly. Note that if you are using Python, the same methods that we will discuss apply with Python native code and libraries. At its core, Groovy is an optionally typed, dynamically compiled language built on top of Java. This means that you can write and use Java code directly in-line with your DQL!

Example: Creating a Report

A very simple example of this can be to perform some final operations at the end of a Run and Done query. In this example you will generate a simple XML report at the end of a Run and Done.

Before you start writing and testing code you must ensure that any external libraries that you want to use will be on the classpath of the query. The “Classpath” of a query is simply the set of paths in which the Java runtime will search for classes in.

Next, ensure that you import all of the classes that you are going to use into the script itself. Note that many classes are already included in the default set of imported classes.

// Import the java classes from the JDOM2 package that we will use to write XML
import org.jdom2.Document
import org.jdom2.Element
import org.jdom2.output.XMLOutputter
import org.jdom2.output.Format

// Import some additional Deephaven classes that we will need to iterate the tables.
import com.illumon.iris.db.v2.utils.Index
import com.illumon.iris.db.v2.utils.ReadOnlyIndex

Write DQL as you normally would to produce a set of output tables.

// Grab the table of trades, and compute a total value, as well as binning each row by hour
Trades = db.t("LearnDeephaven", "StockTrades")
            .where("Date=`2017-08-25`")
            .updateView("Value=Last*Size", "TimeBin=upperBin(Timestamp, HOUR)")

// Take the the trades table, and compute the sum of Size and value, as well as min and max trade prices
// for each time bin for each symbol.
TradesBySymBin = Trades.by(AggCombo(AggSum("TradeVolume=Size", "TradeValue=Value"),
                                    AggMin("MinTrade=Last"),
                                    AggMax("MaxTrade=Last")), "USym", "TimeBin")

// Sort the time binned aggregation above and then split it into separate tables by USym.  This produces a TableMap
// which behaves exactly like a Java HashMap, where the keys are the USym names and the values are the Tables for that USym
TradesBySymBinGrouped = TradesBySymBin.sort("TimeBin")
                                       .byExternal("USym")

// Compute the total size and trade value for each USym by exchange traded in
TradesBySymExchange = Trades.view("USym", "Exchange", "Size", "Value")
                            .sumBy("USym", "Exchange")
                            .sort("USym", "Exchange")

// Split that table into smaller ones by USym
TradesBySymExchangeGrouped = TradesBySymExchange.byExternal("USym")

// Group the trades table by USym and compute a handful of aggregations including
//          * The total trade volume
//          * The total Dollar value traded
//          * The min and max traded price
//          * The first and last trade times
//
// Next, it sorts the aggregation by volume and joins on the by exchange, and by bin, tables
// to augment the data with the most traded on exchange and most traded in time bin for each symbol.
TradesBySym = Trades.by(AggCombo(AggSum("TradeVolume=Size", "Value"),
                                 AggMin("MinTrade=Last"),
                                 AggMax("MaxTrade=Last"),
                                 AggFirst("tFirstTrade=Timestamp"),
                                 AggLast("tLastTrade=Timestamp")), "USym")
                    .sortDescending("TradeVolume")
                    .naturalJoin(TradesBySymExchange.sortDescending("Size")
                                                    .sort("USym")
                                                    .firstBy("USym")
                                , "USym", "MaxExchange=Exchange,MaxQty=Size,MaxValue=Value")
                    .naturalJoin(TradesBySymBin.sortDescending("TradeVolume")
                                               .sort("USym")
                                               .firstBy("USym"), "USym", "MaxTime=TimeBin,TimeVolume=TradeVolume")

// Compute a total for all trades and exchanges for the day.
DaySummary = TradesBySym.view("TradeVolume", "Value").sumBy()

At this point you are ready to process the result tables to generate your report. There are a few important rules for iterating data within a table.

  • Tables may not be “Flat”, meaning that their indices may not be contiguous in the set of [0, N-1]
  • The actual set of included rows is contained within the Table’s Index property, which can be retrieved by .getIndex().

In the example above, we have taken the trades table, computed some statistics and then binned the trades by Symbol. Finally, they are binned in one hour increments which allows us to compute the following:

  • The most active trading hour
  • The most active trading exchange
  • The largest trade for the day
  • The High and Low prices

Next, we will use the JDOM2 XML package to generate our XML report file.

// Now let's write out a summary!

// First we create a helper closure so that we don't have to type the same boilerplate code every single time.
getValue = { table, column, key -> table.getColumnSource(column).get(key).toString() }

log.info().append("Beginning report generation").endl()

// Now write some java code that creates an XML tree and writes it out to disk.
Element reportRoot = new Element("TradeReport")

// The DaySummary table is a single row table, so we know to just grab the 0th (first) row.
reportRoot.addContent(new Element("Summary")
                            .setAttribute("totalVolume", getValue(DaySummary, "TradeVolume", 0L))
                            .setAttribute("totalValue" , getValue(DaySummary, "Value", 0L))
                            .setAttribute("busiestSym" , getValue(TradesBySym, "USym", 0L)));

// We want to sort the Trades table by USym alphabetically because it looks nicer.
 tempTable = TradesBySym.sort("USym")

 // You can't iterate a table with a for loop from 1-table.size() because the table may not be flat!
 // Instead, we must use an Iterator and .forEachLong() to avoid Boxing.  forEachLong() will invoke the
 // closure you pass to it for each row (called a key) in the table,
 ReadOnlyIndex.Iterator it = tempTable.getIndex().iterator()
 it.forEachLong({ key ->
    String currentUSym = getValue(tempTable, "USym", key)
    log.info().append("Processing USym=").append(currentUSym).endl()

    // Grab the USym specific tables from the TableMaps we created above
    Table byBinTable = TradesBySymBinGrouped.get(currentUSym)
    Table byExchangeTable = TradesBySymExchangeGrouped.get(currentUSym)

    Element symElem = new Element("USym")
                            .setAttribute("name", currentUSym)
                            .setAttribute("totalVolume", getValue(tempTable, "TradeVolume", key))
                            .setAttribute("totalValue", getValue(tempTable, "Value", key))
                            .setAttribute("mostActiveOn", getValue(tempTable, "MaxExchange", key))
                            .setAttribute("mostActiveTime", getValue(tempTable, "MaxTime", key))
    reportRoot.addContent(symElem)

    symElem.addContent(new Element("Price")
                            .setAttribute("min", getValue(tempTable, "MinTrade", key))
                            .setAttribute("max", getValue(tempTable, "MaxTrade", key))
                            .setAttribute("tFirst", getValue(tempTable, "tFirstTrade", key))
                            .setAttribute("tLast", getValue(tempTable, "tLastTrade", key)))

    // Iterate the exchange table and append a summary of trades for this USym for each exchange
    byExchangeTable.getIndex().iterator().forEachLong({ k ->
        symElem.addContent(new Element("Exchange")
                                .setAttribute("name", getValue(byExchangeTable, "Exchange", k))
                                .setAttribute("volume", getValue(byExchangeTable, "Size", k))
                                .setAttribute("value", getValue(byExchangeTable, "Value", k)))

        // This return value allows you to stop the iteration if you need to.
        return true;
    })

    // Iterate the bins table and append a summary of trades for each hour of the day.
    byBinTable.getIndex().iterator().forEachLong({ k ->
        symElem.addContent(new Element("TimeBin")
                                .setAttribute("bin", getValue(byBinTable, "TimeBin", k))
                                .setAttribute("volume", getValue(byBinTable, "TradeVolume", k))
                                .setAttribute("value", getValue(byBinTable, "TradeValue", k))
                                .setAttribute("minPrice", getValue(byBinTable, "MinTrade", k))
                                .setAttribute("maxPrize", getValue(byBinTable, "MaxTrade", k)))
return true;
    })
 })

 new XMLOutputter(Format.getPrettyFormat()).output(reportRoot, new FileOutputStream("/tmp/Report.xml"));

This query will only run successfully in a Run & Done query. It is important to note also, that we are using non-ticking historical tables here. This will work for ticking tables as well but there are some restrictions and ‘gotchas’ that you must be aware of when using them. This will be addressed in the next example.

Finally, this example produces an XML report on the query server in “/tmp” that looks like this:

<TradeReport>
  <Summary totalVolume="158035716" totalValue="1.06243090811231E10" busiestSym="BAC" />
  <USym name="AAPL" totalVolume="26873443" totalValue="4.296428709770799E9" mostActiveOn="Internal" mostActiveTime="2017-08-25T12:00:00.000000000 NY">
    <Price min="159.27" max="160.56" tFirst="2017-08-25T04:00:15.733154147 NY" tLast="2017-08-25T19:59:39.066821882 NY" />
    <Exchange name="Arca" volume="1512175" value="2.41913894219E8" />
    <Exchange name="Bats" volume="1395230" value="2.2316922639499998E8" />
    <Exchange name="BatsY" volume="538880" value="8.619752308999999E7" />
    <Exchange name="Chicago" volume="5315850" value="8.48500118E8" />
    <Exchange name="EdgA" volume="175490" value="2.807935465E7" />
    <Exchange name="EdgX" volume="2052086" value="3.2831414377000004E8" />
    <Exchange name="Internal" volume="8459378" value="1.3530810456537998E9" />
    <Exchange name="Nasdaq" volume="7356981" value="1.176403947953E9" />
    <Exchange name="Nyse" volume="67373" value="1.0769456040000005E7" />
    <TimeBin bin="2017-08-25T05:00:00.000000000 NY" volume="702" value="112036.06999999998" minPrice="159.35" maxPrize="159.74" />
    <TimeBin bin="2017-08-25T06:00:00.000000000 NY" volume="1113" value="177536.63" minPrice="159.5" maxPrize="159.69" />
    <TimeBin bin="2017-08-25T07:00:00.000000000 NY" volume="3403" value="544185.65" minPrice="159.69" maxPrize="159.97" />
    <TimeBin bin="2017-08-25T08:00:00.000000000 NY" volume="22926" value="3671915.369999999" minPrice="159.86" maxPrize="160.4" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="72162" value="1.1548462759999996E7" minPrice="159.85" maxPrize="160.38" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="3279417" value="5.247501695831999E8" minPrice="159.52" maxPrize="160.39" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="3525925" value="5.650535325966996E8" minPrice="159.61" maxPrize="160.56" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="8182599" value="1.3059863126440992E9" minPrice="159.29" maxPrize="159.9241" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="1864129" value="2.974801074154E8" minPrice="159.27" maxPrize="159.89" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="1846351" value="2.953613321681001E8" minPrice="159.8" maxPrize="160.12" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="1629412" value="2.609255489385999E8" minPrice="159.6" maxPrize="160.34" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="3590447" value="5.744473176564999E8" minPrice="159.79" maxPrize="160.35" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="2543524" value="4.0660305406820065E8" minPrice="159.5409" maxPrize="160.0462" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="299471" value="4.7873287010000005E7" minPrice="159.7" maxPrize="159.95" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="4973" value="794233.6900000002" minPrice="159.58" maxPrize="159.85" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="6889" value="1099677.52" minPrice="159.58" maxPrize="159.73" />
  </USym>
  <USym name="AXP" totalVolume="2613940" totalValue="2.2368046554330003E8" mostActiveOn="Nyse" mostActiveTime="2017-08-25T17:00:00.000000000 NY">
    <Price min="85.42" max="85.96" tFirst="2017-08-25T06:55:58.685661000 NY" tLast="2017-08-25T17:53:36.250336999 NY" />
    <Exchange name="Arca" volume="112443" value="9624752.57" />
    <Exchange name="Bats" volume="160329" value="1.3724484630000003E7" />
    <Exchange name="BatsY" volume="48308" value="4132769.855" />
    <Exchange name="Chicago" volume="8800" value="752400.0" />
    <Exchange name="EdgA" volume="27206" value="2328215.42" />
    <Exchange name="EdgX" volume="121120" value="1.037852818E7" />
    <Exchange name="Internal" volume="371025" value="3.174870619929999E7" />
    <Exchange name="Nasdaq" volume="375731" value="3.2166569415999994E7" />
    <Exchange name="Nyse" volume="1388978" value="1.18824039273E8" />
    <TimeBin bin="2017-08-25T07:00:00.000000000 NY" volume="2434" value="209104.93999999997" minPrice="85.91" maxPrize="85.91" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="303212" value="2.5987087033399988E7" minPrice="85.45" maxPrize="85.91" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="314115" value="2.69429455151E7" minPrice="85.51" maxPrize="85.96" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="239865" value="2.0515369061999995E7" minPrice="85.42" maxPrize="85.665" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="160923" value="1.37567353354E7" minPrice="85.44" maxPrize="85.5399" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="163836" value="1.4003390628900003E7" minPrice="85.43" maxPrize="85.51" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="256213" value="2.1928693208200004E7" minPrice="85.49" maxPrize="85.61" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="508079" value="4.347679069059998E7" minPrice="85.45" maxPrize="85.69" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="660232" value="5.64303495597E7" minPrice="85.47" maxPrize="85.588" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="5031" value="429999.57" minPrice="85.47" maxPrize="85.47" />
  </USym>
  <USym name="BAC" totalVolume="46616375" totalValue="1.1124571364779005E9" mostActiveOn="Internal" mostActiveTime="2017-08-25T16:00:00.000000000 NY">
    <Price min="23.74" max="24.07" tFirst="2017-08-25T07:01:48.770295000 NY" tLast="2017-08-25T19:59:52.800630000 NY" />
    <Exchange name="Arca" volume="3180167" value="7.592341410000001E7" />
    <Exchange name="Bats" volume="2072107" value="4.9477151940000005E7" />
    <Exchange name="BatsY" volume="3401772" value="8.119536383399998E7" />
    <Exchange name="Chicago" volume="361178" value="8603730.28" />
    <Exchange name="EdgA" volume="1152039" value="2.7491564490000002E7" />
    <Exchange name="EdgX" volume="2851112" value="6.810291924499999E7" />
    <Exchange name="Internal" volume="17156415" value="4.094435481038999E8" />
    <Exchange name="Nasdaq" volume="5632658" value="1.34492399771E8" />
    <Exchange name="Nyse" volume="10808927" value="2.57727044714E8" />
    <TimeBin bin="2017-08-25T08:00:00.000000000 NY" volume="24990" value="598786.8999999999" minPrice="23.93" maxPrize="23.98" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="138021" value="3303910.140000001" minPrice="23.9" maxPrize="23.96" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="6175116" value="1.481120935393E8" minPrice="23.85" maxPrize="24.07" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="7949066" value="1.9041405504529995E8" minPrice="23.88" maxPrize="24.04" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="7532745" value="1.7954051452520022E8" minPrice="23.78" maxPrize="23.9259" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="4081434" value="9.711156492140013E7" minPrice="23.75" maxPrize="23.85" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="3404574" value="8.109744418900004E7" minPrice="23.7637" maxPrize="23.85" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="2746223" value="6.553316788899999E7" minPrice="23.83" maxPrize="23.89" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="9546229" value="2.274544566302001E8" minPrice="23.77" maxPrize="23.89" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="4846012" value="1.1520417701850003E8" minPrice="23.77" maxPrize="23.9925" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="133181" value="3165749.400000001" minPrice="23.76" maxPrize="23.79" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="21789" value="517743.98000000004" minPrice="23.75" maxPrize="23.78" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="16995" value="403472.3" minPrice="23.74" maxPrize="23.76" />
  </USym>
  <USym name="CSCO" totalVolume="21464606" totalValue="6.772007690790001E8" mostActiveOn="Nasdaq" mostActiveTime="2017-08-25T16:00:00.000000000 NY">
    <Price min="31.3" max="31.8" tFirst="2017-08-25T06:41:27.636832565 NY" tLast="2017-08-25T19:11:47.935216917 NY" />
    <Exchange name="Arca" volume="1007246" value="3.1814582917000003E7" />
    <Exchange name="Bats" volume="1114654" value="3.5211674145E7" />
    <Exchange name="BatsY" volume="2375792" value="7.5027074418E7" />
    <Exchange name="Chicago" volume="999" value="31586.15" />
    <Exchange name="EdgA" volume="490629" value="1.5497965370000001E7" />
    <Exchange name="EdgX" volume="934801" value="2.9520199275000002E7" />
    <Exchange name="Internal" volume="7150042" value="2.2568863476299998E8" />
    <Exchange name="Nasdaq" volume="8379175" value="2.64053085911E8" />
    <Exchange name="Nyse" volume="11268" value="355966.12999999995" />
    <TimeBin bin="2017-08-25T07:00:00.000000000 NY" volume="100" value="3133.0" minPrice="31.33" maxPrize="31.33" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="300" value="9402.0" minPrice="31.3" maxPrize="31.36" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="2619251" value="8.255176807320005E7" minPrice="31.31" maxPrize="31.715" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="3450141" value="1.0943619524180003E8" minPrice="31.62" maxPrize="31.8" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="2043873" value="6.453219786529996E7" minPrice="31.5" maxPrize="31.7549" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="1527456" value="4.8184430411500044E7" minPrice="31.48" maxPrize="31.61" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="1726618" value="5.449870878100002E7" minPrice="31.49" maxPrize="31.62" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="1410271" value="4.455857876189999E7" minPrice="31.56" maxPrize="31.62" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="4324024" value="1.3624550305430007E8" minPrice="31.41" maxPrize="31.61" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="4301533" value="1.3526163872E8" minPrice="31.42" maxPrize="31.67" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="58160" value="1828603.9000000001" minPrice="31.44" maxPrize="31.58" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="2768" value="87123.32" minPrice="31.45" maxPrize="31.49" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="111" value="3485.95" minPrice="31.4" maxPrize="31.45" />
  </USym>
  <USym name="GOOG" totalVolume="1167353" totalValue="1.0723084009771996E9" mostActiveOn="Nasdaq" mostActiveTime="2017-08-25T17:00:00.000000000 NY">
    <Price min="914.02" max="925.97" tFirst="2017-08-25T04:30:01.201529919 NY" tLast="2017-08-25T19:55:17.680719742 NY" />
    <Exchange name="Arca" volume="85431" value="7.8508188035E7" />
    <Exchange name="Bats" volume="54297" value="4.992440761500004E7" />
    <Exchange name="BatsY" volume="24502" value="2.2508682892E7" />
    <Exchange name="Chicago" volume="1320" value="1216018.7500000002" />
    <Exchange name="EdgA" volume="10264" value="9427334.66" />
    <Exchange name="EdgX" volume="52099" value="4.7882312415000014E7" />
    <Exchange name="Internal" volume="419086" value="3.850136324262002E8" />
    <Exchange name="Nasdaq" volume="519258" value="4.768208774740002E8" />
    <Exchange name="Nyse" volume="1096" value="1006946.71" />
    <TimeBin bin="2017-08-25T05:00:00.000000000 NY" volume="2" value="1847.3000000000002" minPrice="922.6" maxPrize="924.7" />
    <TimeBin bin="2017-08-25T06:00:00.000000000 NY" volume="26" value="23994.23" minPrice="921.91" maxPrize="924.7" />
    <TimeBin bin="2017-08-25T07:00:00.000000000 NY" volume="5" value="4610.8" minPrice="922.16" maxPrize="922.16" />
    <TimeBin bin="2017-08-25T08:00:00.000000000 NY" volume="88" value="81464.8" minPrice="922.56" maxPrize="925.97" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="1863" value="1723021.4199999964" minPrice="923.32" maxPrize="925.02" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="124543" value="1.150077067495E8" minPrice="921.64" maxPrize="925.07" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="172347" value="1.5909355932019997E8" minPrice="918.7" maxPrize="925.6099" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="121064" value="1.1111657145300004E8" minPrice="915.5" maxPrize="919.64" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="65606" value="6.026372237430004E7" minPrice="917.11" maxPrize="919.9999" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="78682" value="7.221271583900002E7" minPrice="916.44" maxPrize="919.32" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="95550" value="8.765689134560002E7" minPrice="916.6973" maxPrize="918.645" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="246534" value="2.2603665966559976E8" minPrice="915.451" maxPrize="919.2099" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="256144" value="2.3459895093E8" minPrice="914.42" maxPrize="916.5" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="4623" value="4234194.120000002" minPrice="915.78" maxPrize="916.5" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="262" value="239673.15999999997" minPrice="914.02" maxPrize="916.0" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="14" value="12817.470000000001" minPrice="915.25" maxPrize="915.61" />
  </USym>
  <USym name="IBM" totalVolume="3122955" totalValue="4.4933125264509994E8" mostActiveOn="Nyse" mostActiveTime="2017-08-25T16:00:00.000000000 NY">
    <Price min="142.8" max="144.19" tFirst="2017-08-25T08:25:38.252856000 NY" tLast="2017-08-25T18:46:15.030128000 NY" />
    <Exchange name="Arca" volume="258607" value="3.721921644500001E7" />
    <Exchange name="Bats" volume="169878" value="2.444978478E7" />
    <Exchange name="BatsY" volume="76582" value="1.1020903568999998E7" />
    <Exchange name="Chicago" volume="305" value="43890.42" />
    <Exchange name="EdgA" volume="14166" value="2038595.62" />
    <Exchange name="EdgX" volume="217212" value="3.126181841999999E7" />
    <Exchange name="Internal" volume="915236" value="1.3171236204110003E8" />
    <Exchange name="Nasdaq" volume="434230" value="6.250102642699998E7" />
    <Exchange name="Nyse" volume="1036739" value="1.4908365492299998E8" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="2555" value="366637.9" minPrice="143.47" maxPrize="143.5" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="219271" value="3.1479552458000004E7" minPrice="143.25" maxPrize="143.85" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="484979" value="6.981178344840004E7" minPrice="143.7" maxPrize="144.19" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="331518" value="4.7682307761000015E7" minPrice="143.66" maxPrize="143.99" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="288055" value="4.1452048634599976E7" minPrice="143.73" maxPrize="144.0" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="243945" value="3.511821508909997E7" minPrice="143.87" maxPrize="144.03" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="314885" value="4.535292442250001E7" minPrice="142.8" maxPrize="144.16" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="706901" value="1.0176292396149999E8" minPrice="143.78" maxPrize="144.17" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="529268" value="7.60781702E7" minPrice="143.52" maxPrize="143.97" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="13" value="1871.8700000000001" minPrice="143.99" maxPrize="143.99" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="1565" value="224816.9" minPrice="143.65" maxPrize="143.96" />
  </USym>
  <USym name="INTC" totalVolume="18090722" totalValue="6.277131784626999E8" mostActiveOn="Nasdaq" mostActiveTime="2017-08-25T17:00:00.000000000 NY">
    <Price min="34.58" max="34.93" tFirst="2017-08-25T04:00:19.413389357 NY" tLast="2017-08-25T19:45:15.079972961 NY" />
    <Exchange name="Arca" volume="858413" value="2.9797492283E7" />
    <Exchange name="Bats" volume="1390106" value="4.8272472145E7" />
    <Exchange name="BatsY" volume="1017033" value="3.5285237245E7" />
    <Exchange name="Chicago" volume="300" value="10403.0" />
    <Exchange name="EdgA" volume="174757" value="6064732.595" />
    <Exchange name="EdgX" volume="913459" value="3.1708514924999993E7" />
    <Exchange name="Internal" volume="4025523" value="1.397004327137E8" />
    <Exchange name="Nasdaq" volume="9696679" value="3.36371557966E8" />
    <Exchange name="Nyse" volume="14452" value="502335.58999999997" />
    <TimeBin bin="2017-08-25T05:00:00.000000000 NY" volume="34" value="1184.2200000000003" minPrice="34.71" maxPrize="34.88" />
    <TimeBin bin="2017-08-25T07:00:00.000000000 NY" volume="97" value="3370.6200000000003" minPrice="34.74" maxPrize="34.88" />
    <TimeBin bin="2017-08-25T08:00:00.000000000 NY" volume="326" value="11353.380000000001" minPrice="34.81" maxPrize="34.88" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="522" value="18188.850000000002" minPrice="34.81" maxPrize="34.88" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="1339636" value="4.66393159405E7" minPrice="34.77" maxPrize="34.93" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="2271417" value="7.900554077659996E7" minPrice="34.65" maxPrize="34.89" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="1544871" value="5.3514818850199856E7" minPrice="34.58" maxPrize="34.685" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="1127465" value="3.909105115979999E7" minPrice="34.62" maxPrize="34.71" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="985405" value="3.4171197365299985E7" minPrice="34.65" maxPrize="34.695" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="1074572" value="3.727849852969994E7" minPrice="34.67" maxPrize="34.71" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="2558930" value="8.877787004989998E7" minPrice="34.64" maxPrize="34.75" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="7175291" value="2.487794105107E8" minPrice="34.6318" maxPrize="34.82" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="10441" value="361988.91000000003" minPrice="34.63" maxPrize="34.67" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="1300" value="45019.0" minPrice="34.63" maxPrize="34.63" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="415" value="14370.3" minPrice="34.62" maxPrize="34.63" />
  </USym>
  <USym name="MSFT" totalVolume="14343667" totalValue="1.0452581418469E9" mostActiveOn="Nasdaq" mostActiveTime="2017-08-25T16:00:00.000000000 NY">
    <Price min="72.21" max="73.35" tFirst="2017-08-25T04:15:40.589788866 NY" tLast="2017-08-25T19:46:05.422006195 NY" />
    <Exchange name="Arca" volume="984778" value="7.180027665E7" />
    <Exchange name="Bats" volume="1379889" value="1.0057342641000003E8" />
    <Exchange name="BatsY" volume="639071" value="4.6568222607999995E7" />
    <Exchange name="EdgA" volume="191113" value="1.3929940104999999E7" />
    <Exchange name="EdgX" volume="1008937" value="7.3543359745E7" />
    <Exchange name="Internal" volume="3524784" value="2.5689617949789995E8" />
    <Exchange name="Nasdaq" volume="6580534" value="4.7942837916099995E8" />
    <Exchange name="Nyse" volume="34561" value="2518357.6700000004" />
    <TimeBin bin="2017-08-25T05:00:00.000000000 NY" volume="36" value="2624.1500000000005" minPrice="72.85" maxPrize="72.9" />
    <TimeBin bin="2017-08-25T07:00:00.000000000 NY" volume="2239" value="163233.1" minPrice="72.9" maxPrize="72.91" />
    <TimeBin bin="2017-08-25T08:00:00.000000000 NY" volume="478" value="34861.82" minPrice="72.82" maxPrize="72.99" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="3047" value="222171.45" minPrice="72.82" maxPrize="73.0" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="1759893" value="1.2850892920749994E8" minPrice="72.75" maxPrize="73.28" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="1744525" value="1.2745348148269983E8" minPrice="72.77" maxPrize="73.35" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="1430335" value="1.0388184933999991E8" minPrice="72.21" maxPrize="72.7999" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="919927" value="6.683087604710005E7" minPrice="72.555" maxPrize="72.7199" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="880648" value="6.4099016703100115E7" minPrice="72.6401" maxPrize="72.86" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="1129840" value="8.234650714100005E7" minPrice="72.79" maxPrize="72.94" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="3594793" value="2.6215196638900006E8" minPrice="72.76" maxPrize="73.05" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="2873789" value="2.0926328896649992E8" minPrice="72.6183" maxPrize="72.84" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="1821" value="132365.53" minPrice="72.63" maxPrize="72.83" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="485" value="35288.600000000006" minPrice="72.76" maxPrize="72.76" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="1811" value="131681.92" minPrice="72.62" maxPrize="72.76" />
  </USym>
  <USym name="PFE" totalVolume="16217445" totalValue="5.420655376652001E8" mostActiveOn="Internal" mostActiveTime="2017-08-25T17:00:00.000000000 NY">
    <Price min="33.2" max="33.62" tFirst="2017-08-25T09:30:00.145164000 NY" tLast="2017-08-25T19:08:58.541532000 NY" />
    <Exchange name="Arca" volume="451164" value="1.5089535085E7" />
    <Exchange name="Bats" volume="576295" value="1.9276030745E7" />
    <Exchange name="BatsY" volume="869912" value="2.9090971621000003E7" />
    <Exchange name="Chicago" volume="1843" value="61625.020000000004" />
    <Exchange name="EdgA" volume="224614" value="7510654.685" />
    <Exchange name="EdgX" volume="372521" value="1.245842783E7" />
    <Exchange name="Internal" volume="6593509" value="2.204133885402001E8" />
    <Exchange name="Nasdaq" volume="1265869" value="4.234111349700001E7" />
    <Exchange name="Nyse" volume="5861718" value="1.9582379064200002E8" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="831535" value="2.7719805372299958E7" minPrice="33.23" maxPrize="33.45" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="1191569" value="3.978295868730001E7" minPrice="33.35" maxPrize="33.44" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="1193782" value="3.987929117379998E7" minPrice="33.37" maxPrize="33.45" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="2374766" value="7.948386026159997E7" minPrice="33.36" maxPrize="33.585" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="1232104" value="4.135945023070001E7" minPrice="33.4572" maxPrize="33.62" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="620956" value="2.0799195125600003E7" minPrice="33.47" maxPrize="33.55" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="2738066" value="9.15431293539E7" minPrice="33.39" maxPrize="33.5" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="6012823" value="2.0076853269000006E8" minPrice="33.35" maxPrize="33.4367" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="20081" value="670504.62" minPrice="33.39" maxPrize="33.4" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="1466" value="48949.75" minPrice="33.39" maxPrize="33.4" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="297" value="9860.400000000001" minPrice="33.2" maxPrize="33.2" />
  </USym>
  <USym name="XOM" totalVolume="7525210" totalValue="5.778654886549999E8" mostActiveOn="Nyse" mostActiveTime="2017-08-25T11:00:00.000000000 NY">
    <Price min="76.43" max="77.13" tFirst="2017-08-25T07:22:55.959199000 NY" tLast="2017-08-25T19:32:00.887690000 NY" />
    <Exchange name="Arca" volume="309480" value="2.3767735080000006E7" />
    <Exchange name="Bats" volume="374988" value="2.880125855E7" />
    <Exchange name="BatsY" volume="227803" value="1.7492337819E7" />
    <Exchange name="Chicago" volume="17100" value="1314500.2" />
    <Exchange name="EdgA" volume="78138" value="6000549.43" />
    <Exchange name="EdgX" volume="416220" value="3.1972628735000003E7" />
    <Exchange name="Internal" volume="2230058" value="1.7130645658799994E8" />
    <Exchange name="Nasdaq" volume="692520" value="5.319056401399998E7" />
    <Exchange name="Nyse" volume="3178903" value="2.4401945823899996E8" />
    <TimeBin bin="2017-08-25T08:00:00.000000000 NY" volume="300" value="23002.0" minPrice="76.62" maxPrize="76.7" />
    <TimeBin bin="2017-08-25T09:00:00.000000000 NY" volume="4489" value="344003.1000000001" minPrice="76.59" maxPrize="76.7" />
    <TimeBin bin="2017-08-25T10:00:00.000000000 NY" volume="643251" value="4.929772136649991E7" minPrice="76.43" maxPrize="76.81" />
    <TimeBin bin="2017-08-25T11:00:00.000000000 NY" volume="1649288" value="1.2686918977719995E8" minPrice="76.68" maxPrize="77.13" />
    <TimeBin bin="2017-08-25T12:00:00.000000000 NY" volume="817770" value="6.281628682300007E7" minPrice="76.68" maxPrize="77.025" />
    <TimeBin bin="2017-08-25T13:00:00.000000000 NY" volume="557532" value="4.281301271469995E7" minPrice="76.71" maxPrize="76.8706" />
    <TimeBin bin="2017-08-25T14:00:00.000000000 NY" volume="404077" value="3.102772089939998E7" minPrice="76.73" maxPrize="76.85" />
    <TimeBin bin="2017-08-25T15:00:00.000000000 NY" volume="571726" value="4.388585053589997E7" minPrice="76.73" maxPrize="76.8" />
    <TimeBin bin="2017-08-25T16:00:00.000000000 NY" volume="1368261" value="1.0505021591530001E8" minPrice="76.7" maxPrize="76.86" />
    <TimeBin bin="2017-08-25T17:00:00.000000000 NY" volume="1504426" value="1.1542445571299995E8" minPrice="76.72" maxPrize="76.98" />
    <TimeBin bin="2017-08-25T18:00:00.000000000 NY" volume="765" value="58750.0" minPrice="76.79" maxPrize="76.8" />
    <TimeBin bin="2017-08-25T19:00:00.000000000 NY" volume="2340" value="179653.96000000002" minPrice="76.7" maxPrize="76.79" />
    <TimeBin bin="2017-08-25T20:00:00.000000000 NY" volume="985" value="75625.85" minPrice="76.73" maxPrize="76.88" />
  </USym>
</TradeReport>

Example: Alerting

While report generation may be interesting and useful, another use case could be integration with an external system. In the example below, we will set up monitoring for Bid and Ask limits that will send a message to a Slack channel when the limits have been breached. We will build this example in parts like above, and discuss each part in isolation. This example will combine:

To start out, this example is going to use the LearnDeephaven namespace, so in order to simulate ticking data we will use the replay query discussed here. Next, we will write some simple DQL to compute the NBBO values for each USym in the dataset.

// Grab the Trades table, and coerce it to the current day for replay simulation.
Trades = convertDateColumns(db.t("LearnDeephaven", "StockTrades")
        .where("Date=`2017-08-25`")
        .updateView("Date=currentDateNy()"))
        .where(new SortedClockFilter("Timestamp", clock, true))
        .updateView("Value=Last*Size")

// Next compute the average trade size and price, as well as standard deviations for both
TradeStats = Trades.by(AggCombo(AggStd("TradePriceStdDev=Last", "TradeSizeStd=Size"), AggAvg("AvgTradePrice=Last", "AvgTradeSize=Size")), "USym")

// Grab the Quotes table, and coerce it to the current day for replay simulation.
Quotes = convertDateColumns(db.t("LearnDeephaven", "StockQuotes")
        .where("Date=`2017-08-25`")
        .updateView("Date=currentDateNy()"))
        .where(new SortedClockFilter("Timestamp", clock, true))

// Group the Quotes by USym and Exchange, and grab the last (latest) value
QuotesByExchangeSym = Quotes.lastBy("USym","Exchange")

// Compute the best Bid and exchange, ignoring exchanges that have not offered one,
// or have an invalid bid (BidSize is zero or less)
// We use the SortedLastBy feature here which will sort the table by the specified row,
// and then take the last row by the specified grouping column.  This is much more
// efficient than an explicit .sort() and .lastBy()
Bids = QuotesByExchangeSym.where("BidSize > 0")
        .view("USym", "BidExchange=Exchange", "Timestamp", "Bid", "BidSize")
        .by(new SortedLastBy("Bid"), "USym")

// We compute the best Ask and Exchange in the same way we computed Bid, except that for asks
// we need the FirstBy (smallest value).
Asks = QuotesByExchangeSym.where("AskSize > 0")
        .view("USym", "AskExchange=Exchange", "Timestamp", "Ask", "AskSize")
        .by(new SortedLastBy("Ask"), "USym")

// Create a unified table of Statistics for each usym by joining the Bids, Asks, and TradeStats
// tables together by USym.
SymStats = Bids.naturalJoin(Asks, "USym", "Ask,AskSize, AskExchange")
        .naturalJoin(TradeStats, "USym")

There is nothing particularly unique about this code aside from the use of SortedFirstBy and SortedLastBy, which (as commented) is an efficient implementation to grab the row associated with the highest, or lowest sorted value of a column.

This is useful, however what we really want to be able to do is to specify high and low alerting limits for the Bid and Ask values. In order to do this, we can use Deephaven’s Input Table feature. Input Tables allow users to work with a table using CRUD operations directly through their Swing UI.

Note

See: Input Tables

if(!db.hasTable("Example", "Limits")) {
    InputTable.newInputTable(db, "Example", "Limits",
            cKey("USym", String),
            c("AskHighTrigger", double),
            c("AskHighReset", double),
            c("AskLowTrigger", double),
            c("AskLowReset", double),
            c("BidHighTrigger", double),
            c("BidHighReset", double),
            c("BidLowTrigger", double),
            c("BidLowReset", double))
}

This first bit of code ensures that the input table has been created and will be ready for use. This is not strictly necessary, if you create your input tables by hand, or elsewhere. For this example, however, it will be necessary. Of note here, cKey() creates a key column for the input table, while c() creates a regular data column. For each row, the set of key column values must be unique.

At this point, we could listen to both the Input table, and the Statistics table, but it will be far simpler to augment the Stats table with the limits using a naturalJoin().

// Grab the input table.
Limits = InputTable.it(db, "Example", "Limits")

// Create a convenient editor for the UI.
LimitEditor = new LiveInputTableEditor(Limits)

// Join the Limits onto the statistics table.
SymStatsAugment = SymStats.naturalJoin(Limits, "USym")

Now, if these are opened in a Swing UI, you end up with a workspace like this:

img

You can enter values in the input tables to set the various limits and they will appear in the augmented table.

img

At this point, the query will not take any action based on the limits. We must create a Java object, called a Listener, that will respond to changes in the rows of the augmented table.

In Deephaven, table updates are handled by an internal process called the LiveTableMonitor or LTM. The LTM executes periodically and will iterate through the set of Root nodes in the Query graph (called the DAG). When each of these roots is updated it may create an Update which it will propagate to its set of Listener objects, which will, in turn, process the Update, incorporate it into itself then create a new Update and propagate it to its listeners.

To guarantee that the data in each table of the DAG is updated correctly and consistently, this occurs on a special thread dedicated to this, while holding a lock object to prevent external changes. This has two major implications for user code:

  1. When accessing table data programmatically you must either:
    1. Execute on the LTM thread,
    2. Hold the LTM lock, And be aware of and detect changes during processing.
  2. Code inside the update() method of a listener must be as short as possible and must not take any actions that could block the LTM thread.

In this example we will be creating an InstrumentedShiftAwareListenerAdapter that will execute on the LTM and with the LTM lock, so we do not need to be concerned about the first item. The second item is extremely important: The LTM must refresh every root node in the DAG before it can continue onto its next periodic cycle, each of which takes some finite amount of time. If the sum of these refreshes exceeds the target period of the LTM tables, updates will occur less and less frequently. Analyzing this behavior is covered in the Query Performance section. If your listener is inefficient or blocks, it will consume a very large portion of this periodic budget and will make the query perform very poorly, or crash entirely.

So what will this listener do?

  • Process Updates from the SymStatsAugment table.
  • Use Hysteresis to send Slack alerts for High and Low limits for a set of values.

Here are some design considerations that we will apply:

  • We should be able to re-use this listener elsewhere.
  • We should not duplicate code; hysteresis is a well understood concept.
  • We should be able to take a unique action for each limit.

First we will define some classes to achieve these. Note that we will use the Groovy annotation @CompileStatic in all of the following code so that the Groovy parser will perform static analysis and compilation immediately, not lazily, which will help find errors earlier.

@CompileStatic
enum TriggerState {
    HIGH,
    NORMAL,
    LOW
}

The TriggerState enumeration defines the possible states of a value:

  • HIGH - The value has breached its High Limit.
  • NORMAL - The value is within the limits.
  • LOW - The value has breached its Low limit.
@CompileStatic
@FunctionalInterface
interface ChangeListener {
    void onChange(Object key, String trackerName, TriggerState newState);
}

The ChangeListener interface provides us a way to inject different behaviors for different Trackers. The Tracker class will invoke the onChange() method when the state of its value changes.

@CompileStatic
class Tracker {
    final Map<Object, TriggerState> states = new HashMap<>();

    final String name;
    final ColumnSource valueColumn;
    final ColumnSource hiTriggerColumn;
    final ColumnSource hiTriggerResetColumn;
    final ColumnSource lowTriggerColumn;
    final ColumnSource lowTriggerResetColumn;
    private final ChangeListener onChangeHandler;

    Tracker(String name,
                    ColumnSource valueColumn,
                    ColumnSource hiTriggerColumn,
                    ColumnSource hiTriggerResetColumn,
                    ColumnSource lowTriggerColumn,
                    ColumnSource lowTriggerResetColumn,
                    ChangeListener onChangeHandler) {
        this.name = name;
        this.valueColumn = valueColumn;
        this.hiTriggerColumn = hiTriggerColumn;
        this.hiTriggerResetColumn = hiTriggerResetColumn;
        this.lowTriggerColumn = lowTriggerColumn;
        this.lowTriggerResetColumn = lowTriggerResetColumn;
        this.onChangeHandler = onChangeHandler;
    }

    void update(Object key, long row) {
        final double value = valueColumn.getDouble(row);
        final double triggerHigh = hiTriggerColumn.getDouble(row);
        final double triggerHighReset = hiTriggerResetColumn.getDouble(row);
        final double triggerLow = lowTriggerColumn.getDouble(row);
        final double triggerLowReset = lowTriggerResetColumn.getDouble(row);

        final TriggerState currentState = states.getOrDefault(key, states.getOrDefault(key, TriggerState.NORMAL));

        if(triggerHigh != QueryConstants.NULL_DOUBLE && value > triggerHigh && currentState != TriggerState.HIGH) {
            states.put(key, TriggerState.HIGH);
            onChangeHandler.onChange(key, name, TriggerState.HIGH);
            return;
        }

        if(triggerLow != QueryConstants.NULL_DOUBLE && value < triggerLow && currentState != TriggerState.LOW) {
            states.put(key, TriggerState.LOW);
            onChangeHandler.onChange(key, name, TriggerState.LOW);
            return;
        }

        if(currentState == TriggerState.HIGH && value <= triggerHighReset) {
            states.put(key,  TriggerState.NORMAL);
            onChangeHandler.onChange(key, name, TriggerState.NORMAL);
            return;
        }

        if(currentState == TriggerState.LOW && value >= triggerLowReset) {
            states.put(key,  TriggerState.NORMAL);
            onChangeHandler.onChange(key, name, TriggerState.NORMAL);
        }
    }
}

The Tracker class is the core of the Hysteresis code. It keeps an internal map of Key to TriggerState values so that it can monitor the different USyms. It also holds references to the columns where the trigger and reset values for each Limit can be retrieved. Note here, that we check each limit value against QueryConstants.NULL_DOUBLE before using them. This guards against the situation where there are no limits yet defined for a particular key.

In the update() method, if any state change is detected, the registered onChangeHandler is invoked to notify the user code of the change.

Finally, we must create the class that actually listens to updates from the table. This class must extend InstrumentedShiftAwareListenerAdapter. This is somewhat of a mouthful, but this class provides the base implementation for Processing an update. It also enables it to integrate nicely with the QueryPerformanceLog to help you monitor its performance.

@CompileStatic
class HysteresisListener extends InstrumentedShiftAwareListenerAdapter {
    private final List<Tracker> trackers = new ArrayList<>();
    private final ColumnSource keyColumn;

    public HysteresisListener(String description, DynamicTable source, String keyColumnName) {
        super(description, source, false);
        this.keyColumn = source.getColumnSource(keyColumnName);
    }

    @Override
    public void onUpdate(Update upstream) {
        //We do not need to process removed rows,  in fact,  in the case of quotes,  there should only be added and modified.

        // Groovy 2.3 has a bug that will fail to properly coerce closures to FunctionalInterfaces.
        // https://issues.apache.org/jira/browse/GROOVY-8445
        // upstream.added.iterator().forEachLong({row -> updateTrackers(row)});
        // upstream.forAllModified({preShiftKey, postShiftKey -> updateTrackers(postShiftKey)});

        ReadOnlyIndex.Iterator it = upstream.added.iterator()
        while(it.hasNext()) {
            final long row = it.nextLong();
            updateTrackers(row)
        }

        it = upstream.modified.iterator();
        while(it.hasNext()) {
            final long row = it.nextLong();
            updateTrackers(row)
        }
    }

    private boolean updateTrackers(long row) {
        final Object key = keyColumn.get(row);

        for(Tracker t : trackers) {
            t.update(key, row)
        }

        return true;
    }

    public HysteresisListener addTracker(String name,
                           String valueColumn,
                           String hiTriggerColumn,
                           String hiTriggerResetColumn,
                           String lowTriggerColumn,
                           String lowTriggerResetColumn,
                           ChangeListener changeHandler) {
        trackers.add(new Tracker(name,
                source.getColumnSource(valueColumn),
                source.getColumnSource(hiTriggerColumn),
                source.getColumnSource(hiTriggerResetColumn),
                source.getColumnSource(lowTriggerColumn),
                source.getColumnSource(lowTriggerResetColumn),
                changeHandler));

        return this;
    }
}

This class has two fields:

  • trackers - A list of Tracker objects. These will be responsible for tracking the state of alerts for a particular value of importance (Ask, for example) and issuing the high and low limit alerts.
  • keyColumn - This will be the column that the Listener looks at to determine which USym (the key) we are evaluating limits for.

The onUpdate() method will be invoked by the LTM when the table it is attached to changes. Updates consist of three sets of indices and one set of shifts:

  • Removed rows - This is an index of the rows that were removed from the table.
  • Added Rows - This is an index of the rows that were added to the table.
  • Modified Rows - These are the rows that existed, and were modified.
  • ShiftData - Contains information about rows that did not change value, but instead changed positions.

It is important to understand what “Shifting” does. In many cases, the values within a row do not change; they simply change position (as in sort()) . This allows the Query Engine to optimize updates downstream. It also means that the various indices in the Update can be based on different address spaces. In this case, Removed will be in the pre-shift address space, and Added/Modified will be in the Post-shift space.

For this example, we stay simple and only handle Added/Modified rows which allows us to operate strictly in post-shift address space and avoid the complexity. This means that we can simply iterate through the added and modified indices, capture the key value at each row and pass those values down to each tracker where the limits will be checked and alerts issued.

For simplicity, we’ve added a addTracker() method to allow the user to specify the columns by name as well as their handler method without having to directly interact with the Tracker class.

At this point we have a set of classes that we can use to trigger alerts. In this example, we are going to send the alerts to a Slack channel, so we must configure the Slack integration.

// Initialize Slack
import com.slack.api.Slack;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.request.chat.ChatPostMessageRequest;
import com.slack.api.methods.response.chat.ChatPostMessageResponse;

slack = Slack.getInstance()
slackApi = slack.methods(Configuration.getInstance().getProperty(“slack.token”))

// Listen for table updates
sendMessage = { usym, tracker, ns ->
final String message;
if(ns == TriggerState.NORMAL) {
message = "USym " + usym + " has returned to normal for the " + tracker + " tracker"
} else {
message = "Warning: USym " + usym + " has breached the " + tracker + " " + ns + " limit"
}

    new Thread({
        slackApi.chatPostMessage(ChatPostMessageRequest.builder()
                .channel("#MarketAlerts")
                .text(message)
                .build())
    }).start()

}

We create a sendMessage closure here that we can pass to the addTracker() method of the listener class we created before. This class does two important things:

  1. Creates an appropriate message for the channel based on the tracker and state.
  2. Sends the Slack message using a separate thread.

The second part is the most important part. Recall that Listeners are invoked on the LTM thread, and that we must not block or perform long operations on the LTM thread. Interacting with Slack is going to be performing websocket I/O so we must do it elsewhere. In this example, we simply execute the chatPostMessage() on a separate thread.

Finally, we must tie it all together by configuring our trackers and attaching the listener to the stats table.

quotesListener = new HysteresisListener("Quote Monitor", SymStatsAugment, "USym")
    .addTracker("Bid", "Bid", "BidHighTrigger", "BidHighReset", "BidLowTrigger", "BidLowReset", sendMessage)
    .addTracker("Ask", "Ask", "AskHighTrigger", "AskHighReset", "AskLowTrigger", "AskLowReset", sendMessage)

SymStatsAugment.listenForUpdates(quotesListener)

Now, when a limit is breached you will receive a Slack message like this:

img

Below, find the complete query:
// Replay bits only
import com.illumon.iris.db.tables.utils.DBDateTime;
import com.illumon.iris.db.tables.utils.DBTimeUtils;
import java.time._
import java.util.stream.Collectors;
import com.illumon.iris.console.utils.LiveInputTableEditor
import com.illumon.iris.db.v2.by._

REPLAY_DATE=currentDateNy()
REPLAY_DATE_PARTS = REPLAY_DATE.split("-")

// Compute the nanos of Epoch at midnight for the current day
dtMidnightToday = LocalDate.now().atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()*1000000L

// Compute the nanos of Epoch at midnight of the date you are replaying
dtMidnightSource = LocalDate.of(Integer.parseInt(REPLAY_DATE_PARTS[0]),
Integer.parseInt(REPLAY_DATE_PARTS[1]),
Integer.parseInt(REPLAY_DATE_PARTS[2]))
.atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()*1000000L

// Create a closure that will convert a DBDateTime value from the source date, to the current date, maintaining the timestamp
convertDateToday = { dt -> dt == null ? null : new DBDateTime(dt.getNanos() - dtMidnightSource + dtMidnightToday) }

// Create a closure that will iterate over an input table and automatically apply an updateView clause for each DbDateTime column it finds.
convertDateColumns = { t ->
final List<String> cols = t.getDefinition().getColumnList().stream()
.filter({c -> c.getDataType() == DBDateTime.class })
.map({ c -> c.getName() + " = (DBDateTime)convertDateToday.call("+c.getName()+")" }).collect(Collectors.toList())
return t.updateView(cols)
}

// Grab the Trades table, and coerce it to the current day for replay simulation.
Trades = convertDateColumns(db.t("LearnDeephaven", "StockTrades")
.where("Date=\$REPLAY_DATE")
.updateView("Date=currentDateNy()"))
.updateView("Value=Last*Size")

// Next Compte the average trade size and price, as well as standard deviations for both
TradeStats = Trades.by(AggCombo(AggStd("TradePriceStdDev=Last", "TradeSizeStd=Size"), AggAvg("AvgTradePrice=Last", "AvgTradeSize=Size")), "USym")

// Grab the Quotes table, and coerce it to the current day for replay simulation.
Quotes = convertDateColumns(db.t("LearnDeephaven", "StockQuotes")
.where("Date=\$REPLAY_DATE")
.updateView("Date=currentDateNy()"))

// Group the Quotes by USym and Exchange, and grab the last (latest) value
QuotesByExchangeSym = Quotes.lastBy("USym","Exchange")

// Compute the best Bid and exchange, ignoring exchanges that have not offered one,
// or have an invalid bid (BidSize is zero or less)
// We use the SortedLastBy feature here which will sort the table by the specified row,
// and then take the last row by the specified grouping column. This is much more
// efficient than an explicit .sort() and .lastBy()
Bids = QuotesByExchangeSym.where("BidSize > 0")
.view("USym", "BidExchange=Exchange", "Timestamp", "Bid", "BidSize")
.by(new SortedLastBy("Bid"), "USym")

// We compute the best Ask and Exchange in the same way we computed Bid, except that for asks
// we need the FirstBy (smallest value).
Asks = QuotesByExchangeSym.where("AskSize > 0")
.view("USym", "AskExchange=Exchange", "Timestamp", "Ask", "AskSize")
.by(new SortedLastBy("Ask"), "USym")

// Create a unified table of Statistics for each usym by joining the Bids, Asks, and TradeStats
// tables together by USym.
SymStats = Bids.naturalJoin(Asks, "USym", "Ask,AskSize, AskExchange")
.naturalJoin(TradeStats, "USym")

// Ensure the input table actually exists
if(!db.hasTable("Example", "Limits")) {
InputTable.newInputTable(db, "Example", "Limits",
cKey("USym", String),
c("AskHighTrigger", double),
c("AskHighReset", double),
c("AskLowTrigger", double),
c("AskLowReset", double),
c("BidHighTrigger", double),
c("BidHighReset", double),
c("BidLowTrigger", double),
c("BidLowReset", double))
}

// Grab the input table
Limits = InputTable.it(db, "Example", "Limits")

// Create a convenient editor for the UI
LimitEditor = new LiveInputTableEditor(Limits)

// Join the Limits onto the statistics table
SymStatsAugment = SymStats.naturalJoin(Limits, "USym")

// JAVA CODE WOOHOO

import com.illumon.iris.db.v2.DynamicTable;
import com.illumon.iris.db.v2.InstrumentedShiftAwareListenerAdapter;
import com.illumon.iris.db.v2.sources.ColumnSource;
import com.illumon.iris.db.v2.ShiftAwareListener.Update
import com.illumon.iris.db.v2.utils.ReadOnlyIndex
import com.illumon.util.QueryConstants

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import groovy.transform.CompileStatic

@CompileStatic
enum TriggerState {
HIGH,
NORMAL,
LOW
}

@CompileStatic
@FunctionalInterface
interface ChangeListener {
void onChange(Object key, String trackerName, TriggerState newState);
}

@CompileStatic
class Tracker {
final Map<Object, TriggerState> states = new HashMap<>();

    final String name;
    final ColumnSource valueColumn;
    final ColumnSource hiTriggerColumn;
    final ColumnSource hiTriggerResetColumn;
    final ColumnSource lowTriggerColumn;
    final ColumnSource lowTriggerResetColumn;
    private final ChangeListener onChangeHandler;

    Tracker(String name,
                    ColumnSource valueColumn,
                    ColumnSource hiTriggerColumn,
                    ColumnSource hiTriggerResetColumn,
                    ColumnSource lowTriggerColumn,
                    ColumnSource lowTriggerResetColumn,
                    ChangeListener onChangeHandler) {
        this.name = name;
        this.valueColumn = valueColumn;
        this.hiTriggerColumn = hiTriggerColumn;
        this.hiTriggerResetColumn = hiTriggerResetColumn;
        this.lowTriggerColumn = lowTriggerColumn;
        this.lowTriggerResetColumn = lowTriggerResetColumn;
        this.onChangeHandler = onChangeHandler;
    }

    void update(Object key, long row) {
        final double value = valueColumn.getDouble(row);
        final double triggerHigh = hiTriggerColumn.getDouble(row);
        final double triggerHighReset = hiTriggerResetColumn.getDouble(row);
        final double triggerLow = lowTriggerColumn.getDouble(row);
        final double triggerLowReset = lowTriggerResetColumn.getDouble(row);

        final TriggerState currentState = states.getOrDefault(key, states.getOrDefault(key, TriggerState.NORMAL));

        if(triggerHigh != QueryConstants.NULL_DOUBLE && value > triggerHigh && currentState != TriggerState.HIGH) {
            states.put(key, TriggerState.HIGH);
            onChangeHandler.onChange(key, name, TriggerState.HIGH);
            return;
        }

        if(triggerLow != QueryConstants.NULL_DOUBLE && value < triggerLow && currentState != TriggerState.LOW) {
            states.put(key, TriggerState.LOW);
            onChangeHandler.onChange(key, name, TriggerState.LOW);
            return;
        }

        if(currentState == TriggerState.HIGH && value <= triggerHighReset) {
            states.put(key,  TriggerState.NORMAL);
            onChangeHandler.onChange(key, name, TriggerState.NORMAL);
            return;
        }

        if(currentState == TriggerState.LOW && value >= triggerLowReset) {
            states.put(key,  TriggerState.NORMAL);
            onChangeHandler.onChange(key, name, TriggerState.NORMAL);
        }
    }

}

@CompileStatic
class HysteresisListener extends InstrumentedShiftAwareListenerAdapter {
private final List<Tracker> trackers = new ArrayList<>();
private final ColumnSource keyColumn;

    public HysteresisListener(String description, DynamicTable source, String keyColumnName) {
        super(description, source, false);
        this.keyColumn = source.getColumnSource(keyColumnName);
    }

    @Override
    public void onUpdate(Update upstream) {
        //We do not need to process removed rows, or shifts in fact,  in the case of quotes,  there should only be added and modified.

        // Groovy 2.3 has a bug that will fail to properly coerce closures to FunctionalInterfaces.
        // https://issues.apache.org/jira/browse/GROOVY-8445
        //upstream.added.iterator().forEachLong({row -> updateTrackers(row)});
        //upstream.forAllModified({preShiftKey, postShiftKey -> updateTrackers(postShiftKey)});

        ReadOnlyIndex.Iterator it = upstream.added.iterator()
        while(it.hasNext()) {
            final long row = it.nextLong();
            updateTrackers(row)
        }

        it = upstream.modified.iterator();
        while(it.hasNext()) {
            final long row = it.nextLong();
            updateTrackers(row)
        }
    }

    private boolean updateTrackers(long row) {
        final Object key = keyColumn.get(row);

        for(Tracker t : trackers) {
            t.update(key, row)
        }

        return true;
    }

    public HysteresisListener addTracker(String name,
                           String valueColumn,
                           String hiTriggerColumn,
                           String hiTriggerResetColumn,
                           String lowTriggerColumn,
                           String lowTriggerResetColumn,
                           ChangeListener changeHandler) {
        trackers.add(new Tracker(name,
                source.getColumnSource(valueColumn),
                source.getColumnSource(hiTriggerColumn),
                source.getColumnSource(hiTriggerResetColumn),
                source.getColumnSource(lowTriggerColumn),
                source.getColumnSource(lowTriggerResetColumn),
                changeHandler));

        return this;
    }

}

// Initialize Slack
import com.slack.api.Slack;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.request.chat.ChatPostMessageRequest;
import com.slack.api.methods.response.chat.ChatPostMessageResponse;

slack = Slack.getInstance()
slackApi = slack.methods(Configuration.getInstance().getProperty(“slack.token”))

// Listen for table updates
sendMessage = { usym, tracker, ns ->
final String message;
if(ns == TriggerState.NORMAL) {
message = "USym " + usym + " has returned to normal for the " + tracker + " tracker"
} else {
message = "Warning: USym " + usym + " has breached the " + tracker + " " + ns + " limit"
}

    new Thread({
        slackApi.chatPostMessage(ChatPostMessageRequest.builder()
                .channel("#MarketAlerts")
                .text(message)
                .build())
    }).start()

}

quotesListener = new HysteresisListener("Quote Monitor", SymStatsAugment, "USym")
.addTracker("Bid", "Bid", "BidHighTrigger", "BidHighReset", "BidLowTrigger", "BidLowReset", sendMessage)
.addTracker("Ask", "Ask", "AskHighTrigger", "AskHighReset", "AskLowTrigger", "AskLowReset", sendMessage)

SymStatsAugment.listenForUpdates(quotesListener)