Iceberg
Iceberg tables can be referenced via Deephaven Schemas using the Extended Storage feature. This provides immediate access for Deephaven to read existing Iceberg tables as Deephaven tables, including support for Iceberg schema evolution and nested Iceberg structures.
Configuration
The easiest way to configure Iceberg tables for Deephaven is to use the built-in inference provided by LoadTableOptions. Advanced users can customize the mapping with a Resolver, or with custom inference options.
A type
or catalog-impl
key is required for an Iceberg catalog.
Additional parameters may be necessary depending on the type of catalog.
Commonly configured properties may include warehouse
and uri
.
See Iceberg Catalog properties and your specific catalog implementation for more details on configuration.
An Iceberg table identifier is also required.
import io.deephaven.enterprise.iceberg.IcebergTableOptions
import io.deephaven.iceberg.util.BuildCatalogOptions
import io.deephaven.iceberg.util.LoadTableOptions
// The options to read an Iceberg Catalog.
catalogOptions = BuildCatalogOptions.builder()
.name("MyCatalog")
.putAllProperties([
"type": "<type>",
// ... additional properties here ...
])
.build()
// The options to load an Iceberg Table from the catalog, uses default
// inference options.
tableOptions = LoadTableOptions.builder()
.id("IcebergNamespace.IcebergTableName")
.build()
// Load the Iceberg Catalog and Table and materialize the results into
// an explicit Resolver.
options = IcebergTableOptions.builder()
.tableKey("DHNamespace", "DHTableName")
.catalogOptions(catalogOptions)
.tableOptions(tableOptions)
.build()
.materialize()
// Verify the resulting Table data looks correct
myTable = options.table()
If you have a working Spark configuration, that can typically be translated into the necessary Catalog properties by removing the Spark prefix.
For example, the following Spark properties:
spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type = rest
spark.sql.catalog.rest_prod.uri = http://localhost:8080
Translate into the given BuildCatalogOptions
:
catalogOptions = BuildCatalogOptions.builder()
.name("rest_prod")
.putAllProperties([
"type": "rest",
"uri": "http://localhost:8080"
])
.build()
See BuildCatalogOptions and LoadTableOptions for more details on these structures.
Deployment
An iris-schemamanagers
user is required to deploy the Schema.
import com.illumon.iris.db.schema.SchemaServiceFactory
schemaService = SchemaServiceFactory.getDefault()
// Admins are encouraged to explicitly manage the deployment logic of their Schema with
// the SchemaService.
mySchema = options.schema().get()
schemaService.addSchema(mySchema)
// Alternatively, admins may deploy the schema directly using the IcebergTableOptions.
// The following will create the schema namespace if it doesn't exist, create the schema if it doesn't exist, or update it if it does.
// options.deploy(schemaService)
// Verify that the table can be fetched
myTableViaDb = db.historicalTable("DHNamespace", "DHTableName")
Serialization format
Caution
The creation and deployment of a Deephaven Iceberg schema is typically performed programmatically, as shown in the previous sections. Exercise caution when manually creating or editing a schema.
An Iceberg table is referenced in a Deephaven table's schema using an ExtendedStorage
element with the attribute type
set to iceberg
.
<Table namespace="DHNamespace" name="DHTableName" namespaceSet="System" storageType="Extended">
<!-- Column elements omitted for brevity -->
<ExtendedStorage type="iceberg">
<Catalog><!-- see `Catalog` section below --></Catalog>
<Table><!-- see `Table` section below --></Table>
</ExtendedStorage>
</Table>
Catalog
element
The Catalog
element is a serialization of the core BuildCatalogOptions.
It is composed of a Name
, Properties
, and optional HadoopConfig
element.
The Properties
element is a map of string keys to string values.
The optional HadoopConfig
element is optional and is an additional map for Hadoop catalogs. For example:
<Catalog>
<Name>MyCatalog</Name>
<Properties injection="enabled">
<Entry key="type" value="<type>" />
<!--
<Entry key="warehouse" value="..." />
-->
</Properties>
<!--
<HadoopConfig>
<Entry key="..." value="...">
</HadoopConfig>
-->
</Catalog>
The injection
attribute on the Properties
element controls whether Deephaven may automatically add properties that work around known upstream issues and/or supply defaults needed for Deephaven's Iceberg usage.
The valid values are enabled
and disabled
.
It is recommended to set this to enabled
.
Table
element
The Table
element is a serialization of the core LoadTableOptions.
It is composed of a TableIdentifier
, Resolver
, and NameMapping
element.
<Table>
<TableIdentifier>IcebergNamespace.IcebergTableName</TableIdentifier>
<Resolver><!-- see `Resolver` section below --></Resolver>
<NameMapping><!-- see `NameMapping` section below --></NameMapping>
</Table>
The Resolver
element contains a ColumnInstructions
, Schema
, and optional PartitionSpec
element.
The ColumnInstructions
element contains the mapping from Deephaven column names to Iceberg fieldId
, Iceberg partitionFieldId
, or type
unmapped
.
The Schema
element contains the Iceberg Schema JSON.
The optional PartitionSpec
element contains the Iceberg Partition Spec JSON.
<Resolver type="direct">
<ColumnInstructions>
<Column name="Foo" partitionFieldId="1000" />
<Column name="Bar" fieldId="2" />
<Column name="Baz" type="unmapped" />
</ColumnInstructions>
<Schema type="json"><![CDATA[{
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "foo",
"required" : false,
"type" : "int"
}, {
"id" : 2,
"name" : "bar",
"required" : false,
"type" : "long"
} ]
}]]></Schema>
<PartitionSpec type="json"><![CDATA[{
"spec-id" : 0,
"fields" : [ {
"name" : "foo",
"transform" : "identity",
"source-id" : 1,
"field-id" : 1000
} ]
}]]></PartitionSpec>
</Resolver>
The NameMapping
element provides fallback field ids to be used when a data file does not contain field id information.
It has three different types, specified via the type
attribute.
The table
type means to read the Name Mapping from the Iceberg Table property schema.name-mapping.default
(see https://iceberg.apache.org/spec/#column-projection).
<NameMapping type="table" />
The empty
type means to not use name mapping.
<NameMapping type="empty" />
The json
type uses Iceberg Name Mapping JSON.
<NameMapping type="json"><![CDATA[[ {
"field-id" : 1,
"names" : [ "Foo" ]
}, {
"field-id" : 2,
"names" : [ "Bar" ]
} ]]]></NameMapping>
Full example
Let's assume that we have an existing Iceberg Glue Catalog that contains a table mycatalog.cities
with the Iceberg Schema:
{
"type": "struct",
"schema-id": 1,
"fields": [
{
"id": 1,
"name": "city",
"required": false,
"type": "string"
},
{
"id": 2,
"name": "latitude",
"required": false,
"type": "double"
},
{
"id": 3,
"name": "longitude",
"required": false,
"type": "double"
}
]
}
To create a new Deephaven Schema with namespace DhExample
and table name Cities
that references this Iceberg Table, we would execute the following once:
import io.deephaven.enterprise.iceberg.IcebergTableOptions
import io.deephaven.iceberg.util.BuildCatalogOptions
import io.deephaven.iceberg.util.LoadTableOptions
import com.illumon.iris.db.schema.SchemaServiceFactory
catalogOptions = BuildCatalogOptions.builder()
.name("GlueCatalog")
.putAllProperties(["type": "glue"])
.build()
tableOptions = LoadTableOptions.builder()
.id("mycatalog.cities")
.build()
options = IcebergTableOptions.builder()
.tableKey("DhExample", "Cities")
.catalogOptions(catalogOptions)
.tableOptions(tableOptions)
.build()
.materialize()
SchemaServiceFactory.getDefault().addSchema(options.schema().get())
This would result in the following Deephaven Schema:
<Table namespace="DhExample" name="Cities" namespaceSet="System" storageType="Extended">
<Column name="city" dataType="String" columnType="Normal" />
<Column name="latitude" dataType="double" columnType="Normal" />
<Column name="longitude" dataType="double" columnType="Normal" />
<ExtendedStorage type="iceberg">
<Catalog>
<Name>GlueCatalog</Name>
<Properties injection="enabled">
<Entry key="glue" />
</Properties>
</Catalog>
<Table>
<TableIdentifier>mycatalog.cities</TableIdentifier>
<Resolver type="direct">
<ColumnInstructions>
<Column name="city" fieldId="1" />
<Column name="latitude" fieldId="2" />
<Column name="longitude" fieldId="3" />
</ColumnInstructions>
<Schema type="json"><![CDATA[{
"type" : "struct",
"schema-id" : 1,
"fields" : [ {
"id" : 1,
"name" : "city",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "latitude",
"required" : false,
"type" : "double"
}, {
"id" : 3,
"name" : "longitude",
"required" : false,
"type" : "double"
} ]
}]]></Schema>
</Resolver>
<NameMapping type="empty" />
</Table>
</ExtendedStorage>
</Table>
Deprecated
Deprecated
Warning
This way of configuring Iceberg is deprecated and will be going away in the future release.
Iceberg tables can be read as Deephaven historical tables using the Extended Storage feature. To configure Iceberg, you first use an Iceberg Endpoint, then discover and deploy a schema.
Configuration
The first step to linking Iceberg tables into Deephaven is configuring an IcebergEndpoint
, which we refer to as simply an endpoint. The endpoint contains the parameters required to locate and connect to the Iceberg catalog and the data warehouse and the storage-specific parameters required to read the data.
An endpoint is configured using the example below.
import io.deephaven.enterprise.iceberg.IcebergTools
import io.deephaven.extensions.s3.S3Instructions
// Create a new endpoint
endpoint = IcebergTools.newEndpoint()
.catalogType("rest") // The catalog is a REST catalog
.catalogUri("http://mydata.com:8181") // located at this URI.
.warehouseUri("s3://warehouse/") // The data warehouse is an S3 warehouse at this URI
.putProperties("s3.access-key-id", "my_access_key", // These are the properties required by the Iceberg API.
"client.region" , "us-east-1") // See https://iceberg.apache.org/docs/nightly/configuration/#configuration
.putSecrets("s3.secret-access-key", "s3.key") // Include any named secrets
.dataInstructions(S3Instructions.builder() // Configure the S3 data parameters
.regionName("us-east-1")
.build())
.build("my_company_iceberg"); // Explicitly name the endpoint.
from deephaven_enterprise import iceberg
from deephaven.experimental import s3
# Create a new endpoint
endpoint = iceberg.make_endpoint("rest", \ # The catalog is a REST catalog
"http://mydata.com:8181", \ # Located at this URI
"s3://warehouse/", \ # The data warehouse is an S3 warehouse at this URI
s3.S3Instructions(region_name="us-east-1"), \ # Create the data instructions for reading data.
endpoint_name="my_company_iceberg", \ # Explicitly name this endpoint
properties={"s3.access-key-id" : "my_access_key", "client.region" : "us-east-1"}, # Set Iceberg configuration properties. See https://iceberg.apache.org/docs/nightly/configuration/#configuration
secrets={"s3.secret-access-key" : "s3.key"}) # Include any named secrets
Properties
Endpoint properties are a key-value map of Iceberg configuration parameters to their values. Valid property keys can be found in the Iceberg documentation.
Secrets
Endpoint secrets are a key-value map where keys represent Iceberg configuration properties and values represent named references to secrets stored within Deephaven. When needed, the secrets are retrieved from Deephaven and merged into the properties
before being used to access Iceberg.
Secrets may be stored either as a property in the Deephaven configuration file or as a JSON map in a protected file on disk. Secrets providers are visited in ascending priority order until one supplies a value or none can be found. More sophisticated secret stores are possible. Contact support for more information.
From Properties
Secrets may be stored in Deephaven configuration files as simple properties, such as s3.access_key=1234secret4321
. The default priority of the Properties secrets provider is 100, which can be adjusted using the property PropertiesSecretsProvider.priority
.
Warning
Property-based secrets are visible to anyone with access to the server, including access to persistent queries. It is not recommended to store sensitive information such as passwords this way.
From Files
Secrets may be stored in files on disk containing a simple JSON map. This format is more secure and better supports more complex secret values. You may configure multiple secrets files and their priorities using these properties:
Property | Description |
---|---|
FileSecretsProvider.name.path | The path to the secrets file for the provider name . |
FileSecretsProvider.name.priority | The priority of the secrets provider name . |
You may provide as many of these as you need, ensuring that each name is unique.
An example file:
{
"s3.key": "some_secret_key",
"secret_url": "https://verysecret.com:9001",
"complicated": "<Secret type=\"important\"><Internal>Secrecy</Internal></Secret>"
}
Warning
If you supply secrets in files, they are visible to any process with access to that file. For example, if they must be visible to query workers, any user running query workers under the same operating system user can access them. This should be considered before storing secrets such as passwords in files.
See per-user workers for details on configuring different operating system users for different Deephaven users.
Deployment
The endpoint can be deployed to the Deephaven configuration as long as a name has been provided. Once deployed, you may reference the endpoint by name in the schema for Iceberg tables to avoid duplication.
// Deploy the endpoint to Deephaven configuration, failing if it already exists.
endpoint.deploy(false)
# Deploy the endpoint to Deephaven configuration, Overwriting if it already exists.
# The overwrite_existing parameter defaults to False. The deployment will fail if the endpoint already exists.
endpoint.deploy(overwrite_existing=True)
Discovery
Once an endpoint has been configured, you can discover an Iceberg table to create and deploy a Deephaven schema. If you have previously deployed an endpoint, you can retrieve it by name as well.
import io.deephaven.enterprise.iceberg.IcebergTools
// Load an endpoint that was already configured
endpoint = IcebergTools.getEndpointByName("my_company_iceberg")
// Discover a table derive the schema and deploy it, deriving the namespace and table name
// from the table identifier, referencing the endpoint by name.
discovery = IcebergTools.discover(DiscoveryConfig.builder()
.tableIdentifier("market.trades")
.endpoint(endpoint)
.build())
discovery.deployWithEndpointReference()
from deephaven_enterprise import iceberg
# Load an endpoint that was already configured
endpoint = iceberg.get_named_endpoint("my_company_iceberg")
# Discover a table derive the schema and deploy it, deriving the namespace and table name
# from the table identifier, referencing the endpoint by name.
result = iceberg.discover("market.trades", endpoint)
result.deploy_named()
In the examples above, the Deephaven namespace and table name are derived directly from the Iceberg table identifier. You may specify your own by setting the namespace
and tableName
properties during discovery.
discovery = IcebergTools.discover(DiscoveryConfig.builder()
.tableIdentifier("market.trades")
.namespace("MarketUS")
.tableName("EqTrades")
.endpoint(endpoint)
.build())
result = iceberg.discover(
"market.trades", namespace="MarketUS", table_name="EqTrades", endpoint
)
Example
Below is a complete example that creates an endpoint, discovers a table, deploys a schema, and then fetches the table.
import io.deephaven.enterprise.iceberg.IcebergTools
import io.deephaven.enterprise.iceberg.discovery.DiscoveryConfig
import io.deephaven.extensions.s3.S3Instructions
// Create a new endpoint
endpoint = IcebergTools.newEndpoint()
.catalogType("rest")
.catalogUri("http://mydata.com:8181")
.warehouseUri("s3://warehouse/")
.putProperties("s3.access-key-id", "access_key",
"client.region" , "us-east-1")
.putSecrets("s3.secret-access-key", "s3.key")
.dataInstructions(S3Instructions.builder()
.regionName("us-east-1")
.build())
.build("my_company_iceberg");
endpoint.deploy(true)
discovery = IcebergTools.discover(DiscoveryConfig.builder()
.tableIdentifier("market.trades")
.endpoint(endpoint)
.build())
data = db.historicalTable("market", "trades")
from deephaven_enterprise import iceberg
from deephaven.experimental import s3
endpoint = iceberg.make_endpoint(
"rest",
"http://mydata.com:8181",
"s3://warehouse/",
s3.S3Instructions(region_name="us-east-1"),
endpoint_name="my_company_iceberg",
properties={"s3.access-key-id": "access_key", "client.region": "us-east-1"},
secrets={"s3.secret-access-key": "s3.key"},
)
endpoint.deploy(True)
result = iceberg.discover("market.trades", endpoint)
result.deploy_named()
data = db.historical_table("market", "trades")