Skip to contents

Creates an exponential moving sum (EMS) UpdateByOp for each column in cols, using time as the decay unit.

Arguments

decay_time

ISO-8601-formatted duration string specifying the decay rate.

cols

String or list of strings denoting the column(s) to operate on. Can be renaming expressions, i.e. “new_col = col”. Default is to compute the exponential moving sum for all non-grouping columns.

operation_control

OperationControl that defines how special cases will behave. See ?op_control for more information.

Value

UpdateByOp to be used in a call to update_by().

Details

The formula used is $$a_i = e^{\frac{-dt_i}{\tau}}$$ $$\mathcal{S}_0 = x_0$$ $$\mathcal{S}_i = a_i*\mathcal{S}_{i-1} + x_i$$

Where:

  • \(dt_i\) is the difference between time \(t_i\) and \(t_{i-1}\) in nanoseconds.

  • \(\tau\) is decay_time in nanoseconds, an input parameter to the method.

  • \(\mathcal{S}_i\) is the exponential moving sum of column \(X\) at time step \(i\).

  • \(x_i\) is the current value.

  • \(i\) denotes the time step, ranging from \(i=1\) to \(i = n-1\), where \(n\) is the number of elements in \(X\).

This function acts on aggregation groups specified with the by parameter of the update_by() caller function. The aggregation groups are defined by the unique combinations of values in the by columns. For example, if by = c("A", "B"), then the aggregation groups are defined by the unique combinations of values in the A and B columns.

This function, like other Deephaven uby functions, is a generator function. That is, its output is another function called an UpdateByOp intended to be used in a call to update_by(). This detail is typically hidden from the user. However, it is important to understand this detail for debugging purposes, as the output of a uby function can otherwise seem unexpected.

For more information, see the vignette on uby functions by running vignette("update_by").

Examples

if (FALSE) { # \dontrun{
library(rdeephaven)

# connecting to Deephaven server
client <- Client$new("localhost:10000", auth_type = "psk", auth_token = "my_secret_token")

# create data frame, push to server, retrieve TableHandle
df <- data.frame(
  timeCol = seq.POSIXt(as.POSIXct(Sys.Date()), as.POSIXct(Sys.Date() + 0.01), by = "1 sec")[1:500],
  boolCol = sample(c(TRUE, FALSE), 500, TRUE),
  col1 = sample(10000, size = 500, replace = TRUE),
  col2 = sample(10000, size = 500, replace = TRUE),
  col3 = 1:500
)
th <- client$import_table(df)

# compute 10-second exponential moving sum of col1 and col2
th1 <- th$
  update_by(uby_ems_time(ts_col = "timeCol", decay_time = "PT10s", cols = c("col1Ems = col1", "col2Ems = col2")))

# compute 5-second exponential moving sum of col1 and col2, grouped by boolCol
th2 <- th$
  update_by(uby_ems_time(ts_col = "timeCol", decay_time = "PT5s", cols = c("col1Ems = col1", "col2Ems = col2")), by = "boolCol")

# compute 20-second exponential moving sum of col1 and col2, grouped by boolCol and parity of col3
th3 <- th$
  update("col3Parity = col3 % 2")$
  update_by(uby_ems_time(ts_col = "timeCol", decay_time = "PT20s", cols = c("col1Ems = col1", "col2Ems = col2")), by = c("boolCol", "col3Parity"))

client$close()
} # }