Source code for pydeephaven.experimental.plugin_client
## Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending#"""Experimental module to communicate with server-side plugins from the client."""importthreadingfromqueueimportSimpleQueuefromtypingimportAny,List,Union,Tuplefromdeephaven_core.protoimportobject_pb2fromdeephaven_core.protoimportticket_pb2frompydeephaven.dherrorimportDHErrorfrompydeephaven.tableimportTablefrompydeephaven.ticketimportExportTicket,ServerObject
[docs]classPluginClient(ServerObject):""" Connected to an object on the server, this provides access to that object through messages sent from the server. Use resp_stream to read messages that the server has sent, and req_stream to send messages back to the server, if supported. """def__init__(self,session:'pydeephaven.session.Session',server_obj:ServerObject):self.export_ticket=None# make sure we have an ExportTicket on the server object so that it will remain alive for# the lifespan of this PluginClientifnotisinstance(server_obj.ticket,ExportTicket):self.export_ticket=session.fetch(server_obj.ticket)self.server_obj=ServerObject(type=server_obj.type,ticket=self.export_ticket)else:self.server_obj=server_objself.session=sessionself.req_stream=PluginRequestStream(SimpleQueue(),self.server_obj.pb_typed_ticket)self.resp_stream=PluginResponseStream(self._open(),self.session)super().__init__(type=self.server_obj.type,ticket=self.server_obj.ticket)def_open(self)->Any:returnself.session.plugin_object_service.message_stream(self.req_stream)defclose(self)->None:ifself.export_ticket:self.session.release(self.export_ticket)self.req_stream.close()self.resp_stream.close()
[docs]classFetchable(ServerObject):""" Represents an object on the server that could be fetched and used or communicated with from the client. """def__init__(self,session,typed_ticket:ticket_pb2.TypedTicket):export_ticket=ExportTicket(typed_ticket.ticket.ticket)super().__init__(type=typed_ticket.type,ticket=export_ticket)self.session=session
[docs]deffetch(self)->Union[Table,PluginClient]:""" Returns a client object that can be interacted with, representing an object that actually only exists on the server. In contrast to a Fetchable instance, which only serves as a reference to a server object, this method can return a Table or PluginClient. Note that closing this Fetchable or the result returned from this method will also close the other, take care when signaling that it is safe to release this object on the server. """ifself.pb_typed_ticket.typeisNone:raiseDHError("Cannot fetch an object with no type, the server has no ObjectType plugin registered to ""support it.")ifself.pb_typed_ticket.type=='Table':returnself.session.table_service.fetch_etcr(self.pb_typed_ticket.ticket)returnPluginClient(self.session,self)
[docs]classPluginRequestStream:""" A stream of requests to the server. If supported by the server-side plugin, these will be processed on the server in the order they are sent. """def__init__(self,req_queue:SimpleQueue,source_ticket:ticket_pb2.TypedTicket):self.req_queue=req_queueconnect_req=object_pb2.ConnectRequest(source_id=source_ticket)stream_req=object_pb2.StreamRequest(connect=connect_req)self.req_queue.put(stream_req)self._sentinel=object()
[docs]defwrite(self,payload:bytes,references:List[ServerObject])->None:""" Sends a message to the server, consisting of a payload of bytes and a list of objects that exist on the server. """data_message=object_pb2.ClientData(payload=payload,references=[obj.pb_typed_ticket()forobjinreferences])stream_req=object_pb2.StreamRequest(data=data_message)self.req_queue.put(stream_req)
[docs]classPluginResponseStream:""" A stream of responses from the server. Will contain at least one response from when the object was first connected to, depending on the server implementation. """def__init__(self,stream_resp,session:'pydeephaven.session.Session'):self.stream_resp=stream_respself.session=sessionself._rlock=threading.RLock()def__next__(self)->Tuple[bytes,List[Fetchable]]:withself._rlock:ifnotself.stream_resp:raiseRuntimeError("the response stream is closed.")try:resp=next(self.stream_resp)exceptStopIterationase:raiseelse:returnresp.data.payload,[Fetchable(self.session,ticket)forticketinresp.data.exported_references]def__iter__(self):returnselfdefclose(self)->None:withself._rlock:self.stream_resp=None