#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
""" This module provides Java compatibility support including convenience functions to create some widely used Java
data structures from corresponding Python ones in order to be able to call Java methods. """
from typing import Any, Callable, Dict, Iterable, List, Sequence, Set, TypeVar, Union, Optional, Mapping
from warnings import warn
import jpy
import numpy as np
import pandas as pd
from deephaven import dtypes, DHError
from deephaven._wrapper import unwrap, wrap_j_object, JObjectWrapper
from deephaven.dtypes import DType, _PRIMITIVE_DTYPE_NULL_MAP
from deephaven.column import ColumnDefinition
_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_DH_PANDAS_NULLABLE_TYPE_MAP: Dict[DType, pd.api.extensions.ExtensionDtype] = {
dtypes.bool_: pd.BooleanDtype,
dtypes.byte: pd.Int8Dtype,
dtypes.short: pd.Int16Dtype,
dtypes.char: pd.UInt16Dtype,
dtypes.int32: pd.Int32Dtype,
dtypes.int64: pd.Int64Dtype,
dtypes.float32: pd.Float32Dtype,
dtypes.float64: pd.Float64Dtype,
}
[docs]def is_java_type(obj: Any) -> bool:
"""Returns True if the object is originated in Java."""
return isinstance(obj, jpy.JType)
[docs]def j_array_list(values: Iterable = None) -> jpy.JType:
"""Creates a Java ArrayList instance from an iterable."""
if values is None:
return None
r = jpy.get_type("java.util.ArrayList")(len(list(values)))
for v in values:
r.add(unwrap(v))
return r
[docs]def j_hashmap(d: Dict = None) -> jpy.JType:
"""Creates a Java HashMap from a dict."""
if d is None:
return None
r = jpy.get_type("java.util.HashMap")(len(d))
for k, v in d.items():
k = unwrap(k)
v = unwrap(v)
r.put(k, v)
return r
[docs]def j_hashset(s: Set = None) -> jpy.JType:
"""Creates a Java HashSet from a set."""
if s is None:
return None
r = jpy.get_type("java.util.HashSet")(len(s))
for v in s:
r.add(unwrap(v))
return r
[docs]def j_properties(d: Dict = None) -> jpy.JType:
"""Creates a Java Properties from a dict."""
if d is None:
return None
r = jpy.get_type("java.util.Properties")()
for k, v in d.items():
k = unwrap(k)
v = unwrap(v)
r.setProperty(k, v)
return r
[docs]def j_map_to_dict(m) -> Dict[Any, Any]:
"""Converts a java map to a python dictionary."""
if not m:
return {}
return {e.getKey(): wrap_j_object(e.getValue()) for e in m.entrySet().toArray()}
[docs]def j_list_to_list(jlist) -> List[Any]:
"""Converts a java list to a python list."""
if not jlist:
return []
return [wrap_j_object(jlist.get(i)) for i in range(jlist.size())]
T = TypeVar("T")
R = TypeVar("R")
[docs]def j_runnable(callable: Callable[[], None]) -> jpy.JType:
"""Constructs a Java 'Runnable' implementation from a Python callable that doesn't take any arguments and returns
None.
Args:
callable (Callable[[], None]): a Python callable that doesn't take any arguments and returns None
Returns:
io.deephaven.integrations.python.PythonRunnable instance
"""
return jpy.get_type("io.deephaven.integrations.python.PythonRunnable")(callable)
[docs]def j_function(func: Callable[[T], R], dtype: DType) -> jpy.JType:
"""Constructs a Java 'Function<PyObject, Object>' implementation from a Python callable or an object with an
'apply' method that accepts a single argument.
Args:
func (Callable): a Python callable or an object with an 'apply' method that accepts a single argument
dtype (DType): the return type of 'func'
Returns:
io.deephaven.integrations.python.PythonFunction instance
"""
return jpy.get_type("io.deephaven.integrations.python.PythonFunction")(
func, dtype.qst_type.clazz()
)
[docs]def j_unary_operator(func: Callable[[T], T], dtype: DType) -> jpy.JType:
"""Constructs a Java 'Function<PyObject, Object>' implementation from a Python callable or an object with an
'apply' method that accepts a single argument.
Args:
func (Callable): a Python callable or an object with an 'apply' method that accepts a single argument
dtype (DType): the return type of 'func'
Returns:
io.deephaven.integrations.python.PythonFunction instance
"""
return jpy.get_type("io.deephaven.integrations.python.PythonFunction$PythonUnaryOperator")(
func, dtype.qst_type.clazz()
)
[docs]def j_binary_operator(func: Callable[[T, T], T], dtype: DType) -> jpy.JType:
"""Constructs a Java 'Function<PyObject, PyObject, Object>' implementation from a Python callable or an object with an
'apply' method that accepts a single argument.
Args:
func (Callable): a Python callable or an object with an 'apply' method that accepts two arguments
dtype (DType): the return type of 'func'
Returns:
io.deephaven.integrations.python.PythonFunction instance
"""
return jpy.get_type("io.deephaven.integrations.python.PythonBiFunction$PythonBinaryOperator")(
func, dtype.qst_type.clazz()
)
[docs]def j_lambda(func: Callable, lambda_jtype: jpy.JType, return_dtype: DType = None):
"""Wraps a Python Callable as a Java "lambda" type.
Java lambda types must contain a single abstract method.
Args:
func (Callable): Any Python Callable or object with an 'apply' method that accepts the same arguments
(number and type) the target Java lambda type
lambda_jtype (jpy.JType): The Java lambda interface to wrap the provided callable in
return_dtype (DType): The expected return type if conversion should be applied. None (the default) does not
attempt to convert the return value and returns a Java Object.
"""
coerce_to_type = return_dtype.qst_type.clazz() if return_dtype is not None else None
return jpy.get_type('io.deephaven.integrations.python.JavaLambdaFactory').create(lambda_jtype.jclass, func,
coerce_to_type)
[docs]def to_sequence(v: Union[T, Sequence[T]] = None, wrapped: bool = False) -> Sequence[Union[T, jpy.JType]]:
"""A convenience function to create a sequence of wrapped or unwrapped object from either one or a sequence of
input values to help JPY find the matching Java overloaded method to call.
This also enables a function to provide parameters that can accept both singular and plural values of the same type
for the convenience of the users, e.g. both x= "abc" and x = ["abc"] are valid arguments.
Args:
v (Union[T, Sequence[T]], optional): the input value(s) to be converted to a sequence
wrapped (bool, optional): if True, the input value(s) will remain wrapped in a JPy object; otherwise, the input
value(s) will be unwrapped. Defaults to False.
Returns:
Sequence[Union[T, jpy.JType]]: a sequence of wrapped or unwrapped objects
"""
if v is None or isinstance(v, Sequence) and not v:
return ()
if wrapped:
if not isinstance(v, Sequence) or isinstance(v, str):
return (v,)
else:
return tuple(v)
if not isinstance(v, Sequence) or isinstance(v, str):
return (unwrap(v),)
else:
return tuple((unwrap(o) for o in v))
def _j_array_to_numpy_array(dtype: DType, j_array: jpy.JType, conv_null: bool, type_promotion: bool = False) -> \
Optional[np.ndarray]:
""" Produces a numpy array from the DType and given Java array.
Args:
dtype (DType): The dtype of the Java array
j_array (jpy.JType): The Java array to convert
conv_null (bool): If True, convert nulls to the null value for the dtype
type_promotion (bool): Ignored when conv_null is False. When conv_null is True, see the description for the same
named parameter in dh_nulls_to_nan().
Returns:
np.ndarray: The numpy array or None if the Java array is None
Raises:
DHError
"""
if j_array is None:
return None
if dtype.is_primitive:
np_array = np.frombuffer(j_array, dtype.np_type)
elif dtype == dtypes.Instant:
longs = _JPrimitiveArrayConversionUtility.translateArrayInstantToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(dtype.np_type)
elif dtype == dtypes.bool_:
# dh nulls will be preserved and show up as True b/c the underlying byte array isn't modified
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(bytes_, dtype.np_type)
elif dtype == dtypes.string:
np_array = np.array(j_array, dtypes.string.np_type)
elif dtype.np_type is not np.object_:
try:
np_array = np.frombuffer(j_array, dtype.np_type)
except:
np_array = np.array(j_array, np.object_)
else:
np_array = np.array(j_array, np.object_)
if conv_null:
return dh_null_to_nan(np_array, type_promotion)
return np_array
[docs]def dh_null_to_nan(np_array: np.ndarray, type_promotion: bool = False) -> np.ndarray:
"""Converts Deephaven primitive null values in the given numpy array to np.nan. No conversion is performed on
non-primitive types.
Note, the input numpy array is modified in place if it is of a float or double type. If that's not a desired behavior,
pass a copy of the array instead. For input arrays of other types, a new array is always returned.
Args:
np_array (np.ndarray): The numpy array to convert
type_promotion (bool): When False, integer, boolean, or character arrays will cause an exception to be raised.
When True, integer, boolean, or character arrays are converted to new np.float64 arrays and Deephaven null
values in them are converted to np.nan. Numpy arrays of float or double types are not affected by this flag
and Deephaven nulls will always be converted to np.nan in place. Defaults to False.
Returns:
np.ndarray: The numpy array with Deephaven nulls converted to np.nan.
Raises:
DHError
"""
if not isinstance(np_array, np.ndarray):
raise DHError(message="The given np_array argument is not a numpy array.")
dtype = dtypes.from_np_dtype(np_array.dtype)
if dh_null := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype):
if dtype in (dtypes.float32, dtypes.float64):
np_array = np.copy(np_array)
np_array[np_array == dh_null] = np.nan
else:
if not type_promotion:
raise DHError(message=f"failed to convert DH nulls to np.nan in the numpy array. The array is "
f"of {np_array.dtype.type} type but type_promotion is False")
if dtype is dtypes.bool_: # needs to change its type to byte for dh null detection
np_array = np.frombuffer(np_array, np.byte)
np_array = np_array.astype(np.float64)
np_array[np_array == dh_null] = np.nan
return np_array
def _j_array_to_series(dtype: DType, j_array: jpy.JType, conv_null: bool) -> pd.Series:
"""Produce a copy of the specified Java array as a pandas.Series object.
Args:
dtype (DType): the dtype of the Java array
j_array (jpy.JType): the Java array
conv_null (bool): whether to check for Deephaven nulls in the data and automatically replace them with
pd.NA.
Returns:
a pandas Series
Raises:
DHError
"""
if conv_null and dtype == dtypes.bool_:
j_array = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(j_array, dtype=np.byte)
s = pd.Series(data=np_array, dtype=pd.Int8Dtype(), copy=False)
s.mask(s == _NULL_BOOLEAN_AS_BYTE, inplace=True)
return s.astype(pd.BooleanDtype(), copy=False)
np_array = _j_array_to_numpy_array(dtype, j_array, conv_null=False)
if conv_null and (nv := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)) is not None:
pd_ex_dtype = _DH_PANDAS_NULLABLE_TYPE_MAP.get(dtype)
s = pd.Series(data=np_array, dtype=pd_ex_dtype(), copy=False)
s.mask(s == nv, inplace=True)
else:
s = pd.Series(data=np_array, copy=False)
return s
# Note: unable to import TableDefinitionLike due to circular ref (table.py -> agg.py -> jcompat.py)
[docs]def j_table_definition(
table_definition: Union[
"TableDefinition",
Mapping[str, dtypes.DType],
Iterable[ColumnDefinition],
jpy.JType,
None,
],
) -> Optional[jpy.JType]:
"""Deprecated for removal next release, prefer TableDefinition. Produce a Deephaven TableDefinition from user input.
Args:
table_definition (Optional[TableDefinitionLike]): the table definition as a dictionary of column
names and their corresponding data types or a list of Column objects
Returns:
a Deephaven TableDefinition object or None if the input is None
Raises:
DHError
"""
warn(
"j_table_definition is deprecated for removal next release, prefer TableDefinition",
DeprecationWarning,
stacklevel=2,
)
from deephaven.table import TableDefinition
return (
TableDefinition(table_definition).j_table_definition
if table_definition
else None
)
[docs]class AutoCloseable(JObjectWrapper):
"""A context manager wrapper to allow Java AutoCloseable to be used in with statements.
When constructing a new instance, the Java AutoCloseable must not be closed."""
j_object_type = jpy.get_type("java.lang.AutoCloseable")
def __init__(self, j_auto_closeable):
self._j_auto_closeable = j_auto_closeable
self.closed = False
def __enter__(self):
return self
def close(self):
if not self.closed:
self.closed = True
self._j_auto_closeable.close()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __del__(self):
self.close()
@property
def j_object(self) -> jpy.JType:
return self._j_auto_closeable