Iceberg

Iceberg tables can be referenced via Deephaven Schemas using the Extended Storage feature. This provides immediate access for Deephaven to read existing Iceberg tables as Deephaven tables, including support for Iceberg schema evolution and nested Iceberg structures.

Configuration

The easiest way to configure Iceberg tables for Deephaven is to use the built-in inference provided by LoadTableOptions. Advanced users can customize the mapping with a Resolver, or with custom inference options.

A type or catalog-impl key is required for an Iceberg catalog. Additional parameters may be necessary depending on the type of catalog. Commonly configured properties may include warehouse and uri. See Iceberg Catalog properties and your specific catalog implementation for more details on configuration. An Iceberg table identifier is also required.

import io.deephaven.enterprise.iceberg.IcebergTableOptions
import io.deephaven.iceberg.util.BuildCatalogOptions
import io.deephaven.iceberg.util.LoadTableOptions

// The options to read an Iceberg Catalog.
catalogOptions = BuildCatalogOptions.builder()
    .name("MyCatalog")
    .putAllProperties([
        "type": "<type>",
        // ... additional properties here ...
    ])
    .build()

// The options to load an Iceberg Table from the catalog, uses default
// inference options.
tableOptions = LoadTableOptions.builder()
    .id("IcebergNamespace.IcebergTableName")
    .build()

// Load the Iceberg Catalog and Table and materialize the results into
// an explicit Resolver.
options = IcebergTableOptions.builder()
    .tableKey("DHNamespace", "DHTableName")
    .catalogOptions(catalogOptions)
    .tableOptions(tableOptions)
    .build()
    .materialize()

// Verify the resulting Table data looks correct
myTable = options.table()

If you have a working Spark configuration, that can typically be translated into the necessary Catalog properties by removing the Spark prefix.

For example, the following Spark properties:

spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type = rest
spark.sql.catalog.rest_prod.uri = http://localhost:8080

Translate into the given BuildCatalogOptions:

catalogOptions = BuildCatalogOptions.builder()
    .name("rest_prod")
    .putAllProperties([
        "type": "rest",
        "uri": "http://localhost:8080"
    ])
    .build()

See BuildCatalogOptions and LoadTableOptions for more details on these structures.

Deployment

An iris-schemamanagers user is required to deploy the Schema.

import com.illumon.iris.db.schema.SchemaServiceFactory

schemaService = SchemaServiceFactory.getDefault()

// Admins are encouraged to explicitly manage the deployment logic of their Schema with
// the SchemaService.
mySchema = options.schema().get()
schemaService.addSchema(mySchema)

// Alternatively, admins may deploy the schema directly using the IcebergTableOptions.
// The following will create the schema namespace if it doesn't exist, create the schema if it doesn't exist, or update it if it does.
// options.deploy(schemaService)

// Verify that the table can be fetched
myTableViaDb = db.historicalTable("DHNamespace", "DHTableName")

Serialization format

Caution

The creation and deployment of a Deephaven Iceberg schema is typically performed programmatically, as shown in the previous sections. Exercise caution when manually creating or editing a schema.

An Iceberg table is referenced in a Deephaven table's schema using an ExtendedStorage element with the attribute type set to iceberg.

<Table namespace="DHNamespace" name="DHTableName" namespaceSet="System" storageType="Extended">
  <!-- Column elements omitted for brevity -->
  <ExtendedStorage type="iceberg">
    <Catalog><!-- see `Catalog` section below --></Catalog>
    <Table><!-- see `Table` section below --></Table>
  </ExtendedStorage>
</Table>

Catalog element

The Catalog element is a serialization of the core BuildCatalogOptions. It is composed of a Name, Properties, and optional HadoopConfig element. The Properties element is a map of string keys to string values. The optional HadoopConfig element is optional and is an additional map for Hadoop catalogs. For example:

<Catalog>
  <Name>MyCatalog</Name>
  <Properties injection="enabled">
    <Entry key="type" value="<type>" />
    <!--
    <Entry key="warehouse" value="..." />
    -->
  </Properties>
  <!--
  <HadoopConfig>
    <Entry key="..." value="...">
  </HadoopConfig>
  -->
</Catalog>

The injection attribute on the Properties element controls whether Deephaven may automatically add properties that work around known upstream issues and/or supply defaults needed for Deephaven's Iceberg usage. The valid values are enabled and disabled. It is recommended to set this to enabled.

Table element

The Table element is a serialization of the core LoadTableOptions. It is composed of a TableIdentifier, Resolver, and NameMapping element.

<Table>
  <TableIdentifier>IcebergNamespace.IcebergTableName</TableIdentifier>
  <Resolver><!-- see `Resolver` section below --></Resolver>
  <NameMapping><!-- see `NameMapping` section below --></NameMapping>
</Table>

The Resolver element contains a ColumnInstructions, Schema, and optional PartitionSpec element. The ColumnInstructions element contains the mapping from Deephaven column names to Iceberg fieldId, Iceberg partitionFieldId, or type unmapped. The Schema element contains the Iceberg Schema JSON. The optional PartitionSpec element contains the Iceberg Partition Spec JSON.

<Resolver type="direct">
  <ColumnInstructions>
    <Column name="Foo" partitionFieldId="1000" />
    <Column name="Bar" fieldId="2" />
    <Column name="Baz" type="unmapped" />
  </ColumnInstructions>
  <Schema type="json"><![CDATA[{
  "type" : "struct",
  "schema-id" : 0,
  "fields" : [ {
    "id" : 1,
    "name" : "foo",
    "required" : false,
    "type" : "int"
  }, {
    "id" : 2,
    "name" : "bar",
    "required" : false,
    "type" : "long"
  } ]
}]]></Schema>
  <PartitionSpec type="json"><![CDATA[{
  "spec-id" : 0,
  "fields" : [ {
    "name" : "foo",
    "transform" : "identity",
    "source-id" : 1,
    "field-id" : 1000
  } ]
}]]></PartitionSpec>
</Resolver>

The NameMapping element provides fallback field ids to be used when a data file does not contain field id information. It has three different types, specified via the type attribute.

The table type means to read the Name Mapping from the Iceberg Table property schema.name-mapping.default (see https://iceberg.apache.org/spec/#column-projection).

<NameMapping type="table" />

The empty type means to not use name mapping.

<NameMapping type="empty" />

The json type uses Iceberg Name Mapping JSON.

<NameMapping type="json"><![CDATA[[ {
  "field-id" : 1,
  "names" : [ "Foo" ]
}, {
  "field-id" : 2,
  "names" : [ "Bar" ]
} ]]]></NameMapping>

Full example

Let's assume that we have an existing Iceberg Glue Catalog that contains a table mycatalog.cities with the Iceberg Schema:

{
  "type": "struct",
  "schema-id": 1,
  "fields": [
    {
      "id": 1,
      "name": "city",
      "required": false,
      "type": "string"
    },
    {
      "id": 2,
      "name": "latitude",
      "required": false,
      "type": "double"
    },
    {
      "id": 3,
      "name": "longitude",
      "required": false,
      "type": "double"
    }
  ]
}

To create a new Deephaven Schema with namespace DhExample and table name Cities that references this Iceberg Table, we would execute the following once:

import io.deephaven.enterprise.iceberg.IcebergTableOptions
import io.deephaven.iceberg.util.BuildCatalogOptions
import io.deephaven.iceberg.util.LoadTableOptions
import com.illumon.iris.db.schema.SchemaServiceFactory

catalogOptions = BuildCatalogOptions.builder()
    .name("GlueCatalog")
    .putAllProperties(["type": "glue"])
    .build()

tableOptions = LoadTableOptions.builder()
    .id("mycatalog.cities")
    .build()

options = IcebergTableOptions.builder()
    .tableKey("DhExample", "Cities")
    .catalogOptions(catalogOptions)
    .tableOptions(tableOptions)
    .build()
    .materialize()

SchemaServiceFactory.getDefault().addSchema(options.schema().get())

This would result in the following Deephaven Schema:

<Table namespace="DhExample" name="Cities" namespaceSet="System" storageType="Extended">
  <Column name="city" dataType="String" columnType="Normal" />
  <Column name="latitude" dataType="double" columnType="Normal" />
  <Column name="longitude" dataType="double" columnType="Normal" />
  <ExtendedStorage type="iceberg">
    <Catalog>
      <Name>GlueCatalog</Name>
      <Properties injection="enabled">
        <Entry key="glue" />
      </Properties>
    </Catalog>
    <Table>
      <TableIdentifier>mycatalog.cities</TableIdentifier>
      <Resolver type="direct">
        <ColumnInstructions>
          <Column name="city" fieldId="1" />
          <Column name="latitude" fieldId="2" />
          <Column name="longitude" fieldId="3" />
        </ColumnInstructions>
        <Schema type="json"><![CDATA[{
  "type" : "struct",
  "schema-id" : 1,
  "fields" : [ {
    "id" : 1,
    "name" : "city",
    "required" : false,
    "type" : "string"
  }, {
    "id" : 2,
    "name" : "latitude",
    "required" : false,
    "type" : "double"
  }, {
    "id" : 3,
    "name" : "longitude",
    "required" : false,
    "type" : "double"
  } ]
}]]></Schema>
      </Resolver>
      <NameMapping type="empty" />
    </Table>
  </ExtendedStorage>
</Table>

Deprecated

Deprecated

Warning

This way of configuring Iceberg is deprecated and will be going away in the future release.

Iceberg tables can be read as Deephaven historical tables using the Extended Storage feature. To configure Iceberg, you first use an Iceberg Endpoint, then discover and deploy a schema.

Configuration

The first step to linking Iceberg tables into Deephaven is configuring an IcebergEndpoint, which we refer to as simply an endpoint. The endpoint contains the parameters required to locate and connect to the Iceberg catalog and the data warehouse and the storage-specific parameters required to read the data. An endpoint is configured using the example below.

import io.deephaven.enterprise.iceberg.IcebergTools
import io.deephaven.extensions.s3.S3Instructions

// Create a new endpoint
endpoint = IcebergTools.newEndpoint()
        .catalogType("rest")                                 // The catalog is a REST catalog
        .catalogUri("http://mydata.com:8181")                // located at this URI.
        .warehouseUri("s3://warehouse/")                     // The data warehouse is an S3 warehouse at this URI
        .putProperties("s3.access-key-id", "my_access_key",  // These are the properties required by the Iceberg API.
                       "client.region"   , "us-east-1")      // See https://iceberg.apache.org/docs/nightly/configuration/#configuration
        .putSecrets("s3.secret-access-key", "s3.key")        // Include any named secrets
        .dataInstructions(S3Instructions.builder()           // Configure the S3 data parameters
                .regionName("us-east-1")
                .build())
        .build("my_company_iceberg");                        // Explicitly name the endpoint.
from deephaven_enterprise import iceberg
from deephaven.experimental import s3

# Create a new endpoint
endpoint = iceberg.make_endpoint("rest", \                  # The catalog is a REST catalog
              "http://mydata.com:8181", \                   # Located at this URI
              "s3://warehouse/", \                          # The data warehouse is an S3 warehouse at this URI
              s3.S3Instructions(region_name="us-east-1"), \ # Create the data instructions for reading data.
              endpoint_name="my_company_iceberg", \         # Explicitly name this endpoint
              properties={"s3.access-key-id" : "my_access_key", "client.region" : "us-east-1"}, # Set Iceberg configuration properties. See https://iceberg.apache.org/docs/nightly/configuration/#configuration
              secrets={"s3.secret-access-key" : "s3.key"})  # Include any named secrets

Properties

Endpoint properties are a key-value map of Iceberg configuration parameters to their values. Valid property keys can be found in the Iceberg documentation.

Secrets

Endpoint secrets are a key-value map where keys represent Iceberg configuration properties and values represent named references to secrets stored within Deephaven. When needed, the secrets are retrieved from Deephaven and merged into the properties before being used to access Iceberg.

Secrets may be stored either as a property in the Deephaven configuration file or as a JSON map in a protected file on disk. Secrets providers are visited in ascending priority order until one supplies a value or none can be found. More sophisticated secret stores are possible. Contact support for more information.

From Properties

Secrets may be stored in Deephaven configuration files as simple properties, such as s3.access_key=1234secret4321. The default priority of the Properties secrets provider is 100, which can be adjusted using the property PropertiesSecretsProvider.priority.

Warning

Property-based secrets are visible to anyone with access to the server, including access to persistent queries. It is not recommended to store sensitive information such as passwords this way.

From Files

Secrets may be stored in files on disk containing a simple JSON map. This format is more secure and better supports more complex secret values. You may configure multiple secrets files and their priorities using these properties:

PropertyDescription
FileSecretsProvider.name.pathThe path to the secrets file for the provider name.
FileSecretsProvider.name.priorityThe priority of the secrets provider name.

You may provide as many of these as you need, ensuring that each name is unique.

An example file:

{
  "s3.key": "some_secret_key",
  "secret_url": "https://verysecret.com:9001",
  "complicated": "<Secret type=\"important\"><Internal>Secrecy</Internal></Secret>"
}

Warning

If you supply secrets in files, they are visible to any process with access to that file. For example, if they must be visible to query workers, any user running query workers under the same operating system user can access them. This should be considered before storing secrets such as passwords in files.

See per-user workers for details on configuring different operating system users for different Deephaven users.

Deployment

The endpoint can be deployed to the Deephaven configuration as long as a name has been provided. Once deployed, you may reference the endpoint by name in the schema for Iceberg tables to avoid duplication.

// Deploy the endpoint to Deephaven configuration, failing if it already exists.
endpoint.deploy(false)
# Deploy the endpoint to Deephaven configuration, Overwriting if it already exists.
# The overwrite_existing parameter defaults to False. The deployment will fail if the endpoint already exists.
endpoint.deploy(overwrite_existing=True)

Discovery

Once an endpoint has been configured, you can discover an Iceberg table to create and deploy a Deephaven schema. If you have previously deployed an endpoint, you can retrieve it by name as well.

import io.deephaven.enterprise.iceberg.IcebergTools

// Load an endpoint that was already configured
endpoint = IcebergTools.getEndpointByName("my_company_iceberg")

// Discover a table derive the schema and deploy it, deriving the namespace and table name
// from the table identifier, referencing the endpoint by name.
discovery = IcebergTools.discover(DiscoveryConfig.builder()
                .tableIdentifier("market.trades")
                .endpoint(endpoint)
                .build())

discovery.deployWithEndpointReference()
from deephaven_enterprise import iceberg

# Load an endpoint that was already configured
endpoint = iceberg.get_named_endpoint("my_company_iceberg")

# Discover a table derive the schema and deploy it, deriving the namespace and table name
# from the table identifier, referencing the endpoint by name.
result = iceberg.discover("market.trades", endpoint)
result.deploy_named()

In the examples above, the Deephaven namespace and table name are derived directly from the Iceberg table identifier. You may specify your own by setting the namespace and tableName properties during discovery.

discovery = IcebergTools.discover(DiscoveryConfig.builder()
                .tableIdentifier("market.trades")
                .namespace("MarketUS")
                .tableName("EqTrades")
                .endpoint(endpoint)
                .build())
result = iceberg.discover(
    "market.trades", namespace="MarketUS", table_name="EqTrades", endpoint
)

Example

Below is a complete example that creates an endpoint, discovers a table, deploys a schema, and then fetches the table.

import io.deephaven.enterprise.iceberg.IcebergTools
import io.deephaven.enterprise.iceberg.discovery.DiscoveryConfig
import io.deephaven.extensions.s3.S3Instructions

// Create a new endpoint
endpoint = IcebergTools.newEndpoint()
        .catalogType("rest")
        .catalogUri("http://mydata.com:8181")
        .warehouseUri("s3://warehouse/")
        .putProperties("s3.access-key-id", "access_key",
                       "client.region" , "us-east-1")
        .putSecrets("s3.secret-access-key", "s3.key")
        .dataInstructions(S3Instructions.builder()
                .regionName("us-east-1")
                .build())
        .build("my_company_iceberg");

endpoint.deploy(true)

discovery = IcebergTools.discover(DiscoveryConfig.builder()
                .tableIdentifier("market.trades")
                .endpoint(endpoint)
                .build())

data = db.historicalTable("market", "trades")
from deephaven_enterprise import iceberg
from deephaven.experimental import s3

endpoint = iceberg.make_endpoint(
    "rest",
    "http://mydata.com:8181",
    "s3://warehouse/",
    s3.S3Instructions(region_name="us-east-1"),
    endpoint_name="my_company_iceberg",
    properties={"s3.access-key-id": "access_key", "client.region": "us-east-1"},
    secrets={"s3.secret-access-key": "s3.key"},
)
endpoint.deploy(True)

result = iceberg.discover("market.trades", endpoint)
result.deploy_named()

data = db.historical_table("market", "trades")