Create your own plugin
This guide walks you through creating a plugin in Deephaven. Specifically, it covers creating a bidirectional plugin that extends the capabilities of Deephaven's Python client API. Bidirectional plugins allow users to create custom RPC methods that enable clients to interact with and return objects on a running server.
Plugins in Deephaven are just Python packages that extend the server's functionality, the UI, a client API, or a combination of these. They can be as simple as a single Python file or as complex as a multi-file package with dependencies. Plugins are packaged like any other Python package.
All plugins have both a server-side and a client-side component. Server-side plugin code is implemented with the deephaven.plugin
module, and client-side plugin code is implemented with the pydeephaven.experimental.plugin_client
module.
The plugin example used in this guide can also be found here.
Plugin structure
This example creates a plugin called ExampleService
. You can call your plugin whatever you'd like, but be aware that the name must be consistent across both the server and client. If you change the name as you follow along, update it in both the server and client code.
The directory name does not need to match the plugin name. This guide uses ExampleServicePlugin
for the directory name, but the plugin itself is called ExampleService
.
The plugin is broken into server- and client-side components. Create the following directory and file structure in your project folder:
ExampleServicePlugin/
└── server/
└── pyproject.toml
└── example_plugin_server/
└── __init__.py
└── server/
└── pyproject.toml
└── example_plugin_client/
└── __init__.py
This guide assumes the use of the pyproject.toml
configuration for packaging. You can also use setup.cfg
or setup.py
if you prefer.
Server-side plugin
All server-side plugin code will live in the server
folder. This folder contains a TOML file for packaging and a subfolder containing the actual plugin implementation.
Plugin implementation
This example plugin can be broken into three parts, all of which go in server/example_plugin_server/__init__.py
:
- The plugin implementation
- The object(s) it manages
- The message stream handler
Let's start with the plugin implementation itself, which contains all of the required methods for any bidirectional plugins. This example manages only a single object, however, more complex plugins can manage an arbitrary number of objects. In such a case, those objects would be added to the relevant methods.
from deephaven.plugin.object_type import MessageStream, BidirectionalObjectType
from typing import override
class ExampleServicePlugin(BidirectionalObjectType):
"""Plugin for ExampleService."""
@property
@override
def name(self) -> str:
"""Get the name of the service."""
return "ExampleService"
@override
def is_type(self, object) -> bool:
"""Check if an object is an ExampleService."""
return isinstance(object, ExampleService)
@override
def create_client_connection(
self, obj: ExampleService, connection: MessageStream
) -> MessageStream:
"""Create a connection to an ExampleService instance."""
return ExampleServiceMessageStream(obj, connection)
Now, create the object the plugin manages. In this case, the service will have two methods.
from deephaven.table import Table
class ExampleService:
"""Example service that echoes strings and tables."""
def hello_string(self, data: str) -> str:
"""Returns a string containing the input data."""
return f"Hello client. You said: {data}"
def hello_table(self, table: Table, data: str) -> Table:
"""Returns a table generated from the input table and the input data."""
return table.update(["Client = data", "Server = `Hello client!`"])
Lastly, define the message stream handler to pass messages between the client and the server.
from deephaven.plugin.object_type import MessageStream, BidirectionalObjectType
from typing import List, Any, override
import traceback
import json
class ExampleServiceMessageStream(MessageStream):
"""
MessageStream implementation for ExampleService.
This will be called when the client sends a message to the server.
"""
def __init__(self, service: ExampleService, client_connection: MessageStream):
self.service = service
self.client_connection = client_connection
# Send an empty payload to the client to acknowledge successful connection
self.client_connection.on_data(b"", [])
@override
def on_data(self, payload: bytes, references: List[Any]):
"""Called when the client sends a message to the server."""
# Deserialize the input JSON bytes
input_string = bytes(payload).decode("utf-8")
print(f"Received data from client: {input_string}")
inputs = json.loads(input_string)
# Initialize the result payload and references to Deephaven objects
result_payload = {}
result_references = []
try:
if inputs["method"] == "hello_string":
print(f"Calling hello_string(\"{inputs['data']}\")")
result_payload["result"] = self.service.hello_string(inputs["data"])
elif inputs["method"] == "hello_table":
print(f"Calling hello_table({references[0]}, \"{inputs['data']}\")")
result_payload["result"] = ""
result_references = [
self.service.hello_table(references[0], inputs["data"])
]
else:
print(f"Unknown message type: {inputs['method']}")
raise NotImplementedError(f"Unknown message type: {inputs['method']}")
except Exception as e:
result_payload["error"] = traceback.format_exc()
print(f"Error processing message: {result_payload['error']}")
print(f"Sending result to client: {result_payload}")
# Serialize the result payload to JSON bytes
json_string = json.dumps(result_payload).encode("utf-8")
self.client_connection.on_data(
payload=json_string, references=result_references
)
@override
def on_close(self):
"""Called when the client closes the connection."""
print("Client connection closed.")
Plugin registration
The registration code is simpler than the implementation.
from deephaven.plugin import Registration, Callback
from typing import override
class ExampleServicePluginRegistration(Registration):
"""Registration for ExampleServicePlugin."""
@classmethod
@override
def register_into(cls, callback: Callback) -> None:
"""Register the ExampleServicePlugin."""
callback.register(ExampleServicePlugin)
Altogether, the __init__.py
should look like this:
__init__.py
"""
This module provides a server-side plugin for accessing an ExampleService object.
"""
from deephaven.plugin.object_type import BidirectionalObjectType, MessageStream
from deephaven.plugin import Registration, Callback
from typing import List, Any, override
from deephaven.table import Table
import traceback
import json
class ExampleService:
"""Example service that echoes strings and tables."""
def hello_string(self, data: str) -> str:
"""Returns a string containing the input data."""
return f"Hello client. You said: {data}"
def hello_table(self, table: Table, data: str) -> Table:
"""Returns a table generated from the input table and the input data."""
return table.update(["Client = data", "Server = `Hello client!`"])
class ExampleServiceMessageStream(MessageStream):
"""
MessageStream implementation for ExampleService.
This will be called when the client sends a message to the server.
"""
def __init__(self, service: ExampleService, client_connection: MessageStream):
self.service = service
self.client_connection = client_connection
# Send an empty payload to the client to acknowledge successful connection
self.client_connection.on_data(b"", [])
@override
def on_data(self, payload: bytes, references: List[Any]):
"""Called when the client sends a message to the server."""
# Deserialize the input JSON bytes
input_string = bytes(payload).decode("utf-8")
print(f"Received data from client: {input_string}")
inputs = json.loads(input_string)
# Initialize the result payload and references to Deephaven objects
result_payload = {}
result_references = []
try:
if inputs["method"] == "hello_string":
print(f"Calling hello_string(\"{inputs['data']}\")")
result_payload["result"] = self.service.hello_string(inputs["data"])
elif inputs["method"] == "hello_table":
print(f"Calling hello_table({references[0]}, \"{inputs['data']}\")")
result_payload["result"] = ""
result_references = [
self.service.hello_table(references[0], inputs["data"])
]
else:
print(f"Unknown message type: {inputs['method']}")
raise NotImplementedError(f"Unknown message type: {inputs['method']}")
except Exception as e:
result_payload["error"] = traceback.format_exc()
print(f"Error processing message: {result_payload['error']}")
print(f"Sending result to client: {result_payload}")
# Serialize the result payload to JSON bytes
json_string = json.dumps(result_payload).encode("utf-8")
self.client_connection.on_data(
payload=json_string, references=result_references
)
@override
def on_close(self):
"""Called when the client closes the connection."""
print("Client connection closed.")
class ExampleServicePlugin(BidirectionalObjectType):
"""Plugin for ExampleService."""
@property
@override
def name(self) -> str:
"""Get the name of the service."""
return "ExampleService"
@override
def is_type(self, object) -> bool:
"""Check if an object is an ExampleService."""
return isinstance(object, ExampleService)
@override
def create_client_connection(
self, obj: ExampleService, connection: MessageStream
) -> MessageStream:
"""Create a connection to an ExampleService instance."""
return ExampleServiceMessageStream(obj, connection)
class ExampleServicePluginRegistration(Registration):
"""Registration for ExampleServicePlugin."""
@classmethod
@override
def register_into(cls, callback: Callback) -> None:
"""Register the ExampleServicePlugin."""
callback.register(ExampleServicePlugin)
Packaging
As mentioned previously, this project uses pyproject.toml
to package the server-side code, which goes in server/pyproject.toml
:
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "example_plugin_server"
version = "0.0.1"
dependencies = ["deephaven-plugin>=0.6.0", "deephaven-core>=0.36.1"]
[project.entry-points."deephaven.plugin"]
registration_cls = "example_plugin_server:ExampleServicePluginRegistration"
The server-side pyproject.toml
file must register an entry point for the plugin.
Testing
You can test the plugin with only the server-side wiring implemented to make sure it registers with the server on startup. To see how, refer to the Server-side testing section.
Client-side plugin
The client-side code and packaging live in the client
folder of the project.
Plugin implementation
Like with the server-side code, the client-side code will be housed in a single file, client/example_plugin_client/__init__.py
. The client-side implementation will mirror the server-side implementation to call the methods on the server-side object.
import io
from typing import Any, List
from pydeephaven import Table
from pydeephaven.experimental import plugin_client, server_object
import json
class ExampleServiceProxy(server_object.ServerObject):
"""
This class provides a client-side interface to the ExampleService server-side object.
When you call a method on this class, it sends a message to the server-side object
and returns the result. The inputs are serialized to JSON bytes and sent to the
server, and the result is deserialized from JSON and returned.
"""
def __init__(self, plugin_client: plugin_client.PluginClient):
self.type_ = plugin_client.type_
self.ticket = plugin_client.ticket
self.plugin_client = plugin_client
# Consume the first (empty) payload from the server to acknowledge successful connection
next(self.plugin_client.resp_stream)
def hello_string(self, data: str) -> str:
"""Call ExampleService.hello_string() on the server.
Returns a string containing the input data."""
inputs = {"method": "hello_string", "data": data}
# serialize the inputs to JSON bytes
input_bytes = json.dumps(inputs).encode("utf-8")
# no input references
input_references = []
self.plugin_client.req_stream.write(input_bytes, input_references)
result_bytes, result_references = next(self.plugin_client.resp_stream)
# Deserialize the results from JSON bytes
results = json.loads(result_bytes.decode("utf-8"))
if "error" in results:
raise Exception(results["error"])
# return the result string
return results["result"]
def hello_table(self, table: Table, data: str) -> Table:
"""Call ExampleService.hello_table() on the server.
Returns a table generated from the input table and the input data."""
inputs = {"method": "hello_table", "data": data}
# serialize the inputs to JSON bytes
input_bytes = json.dumps(inputs).encode("utf-8")
# input references
input_references = [table]
self.plugin_client.req_stream.write(input_bytes, input_references)
result_bytes, result_references = next(self.plugin_client.resp_stream)
# Deserialize the results from JSON bytes
results = json.loads(result_bytes.decode("utf-8"))
if "error" in results:
raise Exception(results["error"])
# fetch and return the result table
return result_references[0].fetch()
Packaging
Like with the server-side plugin, the client-side plugin is packaged with a TOML file, client/pyproject.toml
:
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "example_plugin_client"
version = "0.0.1"
dependencies = ["pydeephaven>=0.36.1", "pandas"]
The client-side plugin for this example requires no registration.
Use the plugin
Once you have completed all of the client—and server-side wiring, you can test the plugin. The following subsections cover testing the server using different launch methods.
Installation
To test the plugin, you must first install it, along with Deephaven and any other required packages.
pip-installed Deephaven
If starting the server with pip-installed Deephaven, it is highly recommended to do so in a virtual environment. The following commands create a virtual environment and install the necessary packages, including the plugin.
# Run these from the root folder of the project
rm -rf ./venv-server
python3.12 -m venv ./venv-server
source ./venv-server/bin/activate
python -m pip install -U pip
python -m pip install -U deephaven-server
python -m pip install ./server
Docker
If starting the server with Docker, it's recommended to use Docker Compose to manage the server and any other services. The following Dockerfile builds a Docker image based on Deephaven's base server image, and installs the plugin:
FROM ghcr.io/deephaven/server:latest
COPY ./server /server
RUN pip install -U pip
RUN pip install /server
The following Docker Compose file runs Deephaven using the image built by the above Dockerfile:
The Dockerfile below sets the pre-shared key to YOUR_PASSWORD_HERE
for demonstration purposes. The client needs this key when connecting to the server. It is recommended that this key be changed to something more secure.
services:
deephaven:
build: .
ports:
- '${DEEPHAVEN_PORT:-10000}:10000'
volumes:
- ./data:/data
environment:
- START_OPTS=-Xmx4g -Dauthentication.psk=YOUR_PASSWORD_HERE
With those two files in hand, docker compose up
installs the plugin and starts the server.
Server-side testing
pip-installed Deephaven
If running Deephaven from Python, the following Python script will start a Deephaven server, create an instance of ExampleService
, and keep the server running until ctrl+C is pressed:
import sys
from example_plugin_server import ExampleService
example_service = ExampleService()
# Keep the server running until the user presses Control-C
print("Press Control-C to exit")
try:
while True:
input()
except KeyboardInterrupt:
print("Exiting Deephaven...")
sys.exit(0)
Docker
When running from Docker, you need only create an instance of ExampleService
so that a client may interact with it:
from example_plugin_server import ExampleService
example_service = ExampleService()
Client-side testing
Then, to test the client-side plugin, run the following commands from the root of the project to create another virtual environment and install the necessary packages:
rm -rf ./venv-client
python3.12 -m venv ./venv-client
source ./venv-client/bin/activate
python -m pip install -U pip
python -m pip install ./client
With that said and done, the following Python script will use the client-side plugin:
Replace YOUR_PASSWORD_HERE
with the pre-shared key set when starting the server if you changed it.
"""
Example client script that connects to a Deephaven server
and calls methods on a server-side ExampleService object.
"""
from pydeephaven import Session
from example_plugin_client import ExampleServiceProxy
session = Session(
auth_type="io.deephaven.authentication.psk.PskAuthenticationHandler",
auth_token="YOUR_PASSWORD_HERE", # Replace this with your PSK
)
# Print the objects the server can export
print("")
print(f"Exportable Objects: {list(session.exportable_objects.keys())}")
# Get a ticket for an ExampleService object from the server named "example_service"
example_service_ticket = session.exportable_objects["example_service"]
# Wrap the ticket as a PluginClient
example_service_plugin_client = session.plugin_client(example_service_ticket)
# Create a proxy object for the ExampleService
example_service = ExampleServiceProxy(example_service_plugin_client)
print("")
print("Calling ExampleService.hello_string()")
result = example_service.hello_string("Hello server!")
print(f"Result: {result}")
print("")
print("Calling ExampleService.hello_table()")
table_in = session.empty_table(10).update("X = i")
table_result = example_service.hello_table(table_in, "Hello server!")
print("Result:")
print(table_result.to_arrow().to_pandas())
# Close the session so that Python can exit
session.close()