#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module implements the Query class that can be used to execute a chained set of Table operations in one
batch."""
from __future__ import annotations
from typing import Union
from pydeephaven.table import Table
from pydeephaven._table_interface import TableInterface
from pydeephaven._table_ops import *
from pydeephaven.dherror import DHError
[docs]class Query(TableInterface):
"""A Query object is used to define and execute a sequence of Deephaven table operations on the server.
When the query is executed, the table operations specified for the Query object are batched together and sent to the
server in a single request, thus avoiding multiple round trips between the client and the server. The result of
executing the query is a new Deephaven table.
Note, an application should always use the factory method on the Session object to create a Query instance as the
constructor is subject to future changes to support more advanced features already planned.
"""
_ops: List[TableOp]
def table_op_handler(self, table_op: TableOp) -> Query:
self._ops.append(table_op)
return self
def __init__(self, session: Any, table: Table):
self.session = session
if not self.session or not table:
raise DHError("invalid session or table value.")
self._ops = [NoneOp(table=table)]
[docs] def exec(self) -> Table:
"""Executes the query on the server and return the result table.
Returns:
a Table object
Raises:
DHError
"""
return self.session.table_service.batch(self._ops)
[docs] def drop_columns(self, cols: Union[str, List[str]]) -> Query:
"""Adds a drop-columns operation to the query.
Args:
cols (Union[str, List[str]]) : the column name(s)
Returns:
self
"""
return super().drop_columns(cols)
[docs] def update(self, formulas: Union[str, List[str]]) -> Query:
"""Adds an update operation to the query.
Args:
formulas (Union[str, List[str]]): the column formula(s)
Returns:
self
"""
return super().update(formulas)
[docs] def lazy_update(self, formulas: Union[str, List[str]]) -> Query:
"""Adds a lazy-update operation to the query.
Args:
formulas (Union[str, List[str]]): the column formula(s)
Returns:
self
"""
return super().lazy_update(formulas)
[docs] def view(self, formulas: Union[str, List[str]]) -> Query:
"""Adds a view operation to the query.
Args:
formulas (Union[str, List[str]]): the column formula(s)
Returns:
self
"""
return super().view(formulas)
[docs] def update_view(self, formulas: Union[str, List[str]]) -> Query:
"""Adds an update-view operation to the query.
Args:
formulas (Union[str, List[str]]): the column formula(s)
Returns:
self
"""
return super().update_view(formulas)
[docs] def select(self, formulas: Union[str, List[str]] = None) -> Query:
"""Adds a select operation to the query.
Args:
formulas (Union[str, List[str]], optional): the column formula(s), default is None
Returns:
self
"""
return super().select(formulas)
[docs] def select_distinct(self, cols: Union[str, List[str]] = None) -> Query:
"""Adds a select-distinct operation to the query.
Args:
cols (Union[str, List[str]], optional): the column name(s), default is None
Returns:
self
"""
return super().select_distinct(cols)
[docs] def sort(self, order_by: Union[str, List[str]], order: Union[SortDirection, List[SortDirection]] = None) -> Query:
"""Adds sort operation to the query.
Args:
order_by (Union[str, List[str]]): the names of the columns to be sorted on
order (Union[SortDirection, List[SortDirection]], optional): the corresponding sort direction(s) for each
sort column, default is None. In the absence of explicit sort directions, data will be sorted in
the ascending order.
Returns:
self
"""
return super().sort(order_by, order)
[docs] def where(self, filters: Union[str, List[str]]) -> Query:
"""Adds a filter operation to the query.
Args:
filters (Union[str, List[str]]): the filter condition expression(s)
Returns:
self
"""
return super().where(filters)
[docs] def head(self, num_rows: int) -> Query:
"""Adds a head operation to the query.
Args:
num_rows (int): the number of rows at the head of table
Returns:
self
"""
return super().head(num_rows)
[docs] def tail(self, num_rows: int) -> Query:
"""Adds a tail operation to the query.
Args:
num_rows (int): the number of rows at the end of table
Returns:
self
"""
return super().tail(num_rows)
[docs] def natural_join(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Query:
"""Adds a natural-join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
Returns:
self
"""
return super().natural_join(table, on, joins)
[docs] def exact_join(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Query:
"""Adds an exact-join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
Returns:
self
"""
return super().exact_join(table, on, joins)
[docs] def join(self, table: Any, on: Union[str, List[str]] = None, joins: Union[str, List[str]] = None,
reserve_bits: int = 10) -> Query:
"""Adds a cross-join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
reserve_bits(int, optional): the number of bits of key-space to initially reserve per group; default is 10
Returns:
self
"""
return super().join(table, on, joins)
[docs] def aj(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Query:
"""Adds a as-of join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '>' or '>='. If a common name is used for the inexact match,
'>=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
Returns:
self
"""
return super().aj(table, on, joins)
[docs] def raj(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Query:
"""Adds a reverse as-of join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '<' or '<='. If a common name is used for the inexact match,
'<=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
Returns:
self
"""
return super().raj(table, on, joins)
[docs] def head_by(self, num_rows: int, by: Union[str, List[str]]) -> Query:
"""Adds a head-by operation to the query.
Args:
num_rows (int): the number of rows at the beginning of each group
by (Union[str, List[str]]): the group-by column name(s)
Returns:
self
"""
return super().head_by(num_rows, by)
[docs] def tail_by(self, num_rows: int, by: Union[str, List[str]]) -> Query:
"""Adds a tail-by operation to the query.
Args:
num_rows (int): the number of rows at the end of each group
by (Union[str, List[str]]): the group-by column name(s)
Returns:
self
"""
return super().tail_by(num_rows, by)
[docs] def group_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a group-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s); default is None
Returns:
self
"""
return super().group_by(by)
[docs] def ungroup(self, cols: Union[str, List[str]] = None, null_fill: bool = True) -> Query:
"""Adds an ungroup operation to the query.
Args:
cols (Union[str, List[str]], optional): the array column(s), default is None, meaning all array columns will
be ungrouped
null_fill (bool, optional): indicates whether null should be used to fill missing cells, default is True
Returns:
self
"""
return super().ungroup(cols, null_fill)
[docs] def first_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a first-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().first_by(by)
[docs] def last_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a last-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().last_by(by)
[docs] def sum_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a sum-by aggregation to the query.
Args:
by (Union[str, List[str]]): the group-by column name(s)
Returns:
self
"""
return super().sum_by(by)
[docs] def avg_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds an avg-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().avg_by(by)
[docs] def std_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a std-by aggregation to the query.
Args:
by (Union[str, List[str]]): the group-by column name(s)
Returns:
self
"""
return super().std_by(by)
[docs] def var_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a var-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().var_by(by)
[docs] def min_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a min-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().min_by(by)
[docs] def max_by(self, by: Union[str, List[str]] = None) -> Query:
"""Adds a max-by aggregation to the query.
Args:
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().max_by(by)
[docs] def count_by(self, col: str, by: Union[str, List[str]] = None) -> Query:
"""Adds a count-by aggregation to the query.
Args:
col (str): the name of the column to store the counts
by (Union[str, List[str]], optional): the group-by column name(s), default is None
Returns:
self
"""
return super().count_by(col, by)
[docs] def agg_by(self, aggs: Union[Aggregation, List[Aggregation]], by: Union[str, List[str]]) -> Query:
"""Adds an Aggregate operation to the query.
Args:
by (Union[str, List[str]]): the group-by column name(s)
aggs (Union[Aggregation, List[Aggregation]]): the aggregation(s) to be applied
Returns:
self
"""
return super().agg_by(aggs=aggs, by=by)
[docs] def agg_all_by(self, agg: Aggregation, by: Union[str, List[str]]) -> Query:
"""Adds an AggregateAll operation to the query.
Args:
agg (Aggregation): the aggregation to be applied
by (Union[str, List[str]]): the group-by column name(s)
Returns:
self
"""
return super().agg_all_by(agg=agg, by=by)
[docs] def update_by(self, ops: Union[UpdateByOperation, List[UpdateByOperation]], by: Union[str, List[str]]) -> Query:
"""Adds an update-by operation to the query.
Args:
ops (Union[UpdateByOperation, List[UpdateByOperation]]): the UpdateByOperation(s) to be applied
by (Union[str, List[str]]): the group-by column name(s)
Returns:
self
"""
return super().update_by(ops, by)
[docs] def snapshot(self) -> Query:
"""Adds a snapshot operation to the query.
Returns:
self
"""
return super().snapshot()
[docs] def snapshot_when(self, trigger_table: Any, stamp_cols: Union[str, List[str]] = None, initial: bool = False,
incremental: bool = False, history: bool = False) -> Query:
"""Adds a snapshot_when operation to the query.
Args:
trigger_table (Table): the trigger table
stamp_cols (Union[str, List[str]]): The column(s) from trigger_table that form the "stamp key", may be
renames, default is None, meaning that all columns from trigger_table form the "stamp key".
initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the
resulting table will remain empty until trigger_table first updates.
incremental (bool): Whether the resulting table should be incremental, default is False. When False, all
rows of this table will have the latest "stamp key". When True, only the rows of this table that have
been added or updated will have the latest "stamp key".
history (bool): Whether the resulting table should keep history, default is False. A history table appends a
full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag
is currently incompatible with initial and incremental: when history is True, incremental and initial
must be False.
Returns:
self
"""
return super().snapshot_when(trigger_table, stamp_cols, initial, incremental, history)
[docs] def where_in(self, filter_table: Any, cols: Union[str, List[str]]) -> Query:
"""Adds a where_in operation to the query.
Args:
filter_table (Table): the table containing the set of values to filter on
cols (Union[str, List[str]]): the column name(s)
Returns:
self
"""
return super().where_in(filter_table, cols)
[docs] def where_not_in(self, filter_table: Any, cols: Union[str, List[str]]) -> Query:
"""Adds a where_not_in operation to the query.
Args:
filter_table (Table): the table containing the set of values to filter on
cols (Union[str, List[str]]): the column name(s)
Returns:
self
"""
return super().where_not_in(filter_table, cols)