Iceberg and Deephaven
Apache Iceberg is a high-performance format for tabular data. Deephaven's Iceberg integration enables users to interact with Iceberg catalogs, namespaces, tables, and snapshots by ingesting them as tables. It supports both static and refreshing Iceberg tables. This guide walks through creating an Iceberg catalog with a single table and snapshot. It then shows how to interact with the catalog in the Deephaven IDE through:
- A REST API and MinIO instance
- S3 storage providers
The API enables you to interact with many types of catalogs. They include:
- REST
- AWS Glue
- JDBC
- Hive
- Hadoop
- Nessie
Deephaven's Iceberg module
Deephaven's Iceberg integration is provided by the deephaven.experimental.iceberg
module. The module contains several classes and functions:
- Classes:
- Methods:
When querying Iceberg tables located in any S3-compatible storage service, the deephaven.experimental.s3
module must be used to read the data.
A Deephaven deployment for Iceberg
The examples presented in this guide pull Iceberg data from a REST catalog. This section closely follows Iceberg's Spark quickstart. It extends the docker-compose.yml
file in that guide to include Deephaven as part of the Iceberg Docker network. The Deephaven server starts alongside a Spark server, Iceberg REST API, and MinIO object store.
docker-compose.yml
version: '3'
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8081:8080
- 11000:10000
- 11001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ['server', '/data', '--console-address', ':9001']
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
deephaven:
image: ghcr.io/deephaven/server:latest
networks:
iceberg_net:
ports:
- '${DEEPHAVEN_PORT:-10000}:10000'
environment:
- START_OPTS=-Dauthentication.psk=YOUR_PASSWORD_HERE
- USER
volumes:
- ./data:/data
- /home/${USER}/.aws:/home/${USER}/.aws
networks:
iceberg_net:
A full explanation of the docker-compose.yml
file is outside the scope of this guide.
The docker-compose.yml
file above sets the pre-shared key to YOUR_PASSWORD_HERE
. This doesn't meet security best practices and should be changed in a production environment. For more information, see pre-shared key authentication.
Run docker compose up
from the directory with the docker-compose.yml
file. This starts the Deephaven server, Spark server, Iceberg REST API, and MinIO object store. When you're done, a ctrl+C
or docker compose down
stops the containers.
Create an Iceberg catalog
This section follows the Iceberg Spark quickstart by creating an Iceberg catalog with a single table and snapshot using the Iceberg REST API in Jupyter. The docker-compose.yml extends the one in the Spark quickstart guide to include Deephaven as a service in the Iceberg Docker network. As such, the file starts up the following services:
- MinIO object store
- MinIO client
- Iceberg Spark server, reachable by Jupyter
- Deephaven server
Once the Docker containers are up and running, head to http://localhost:8888
to access the Iceberg Spark server in Jupyter. Open either the Iceberg - Getting Started
or PyIceberg - Getting Started
notebooks, which create a catalog using the Iceberg REST API. The first four code blocks create an Iceberg table called nyc.taxis
. Run this code to follow along with this guide, which uses the table in the sections below. All code blocks afterward are optional for our purposes.
Interact with the Iceberg catalog
After creating the Iceberg catalog and table, head to the Deephaven IDE at http://localhost:10000/ide
.
To interact with an Iceberg catalog, you must first create an instance of the IcebergCatalogAdapter
class. An instance of the class is created with any of the following methods:
adapter
:IcebergCatalogAdapter
created from configuration properties. This is a general-purpose method that can be used to connect to a variety of catalog types with various configuration properties.adapter_s3_rest
:IcebergCatalogAdapter
created from an S3-compatible provider and a REST catalog.adapter_aws_glue
:IcebergCatalogAdapter
created from AWS Glue.
With a REST catalog
There are two ways to interact with an Iceberg REST catalog in Deephaven:
When using adapter
, the catalog type, URI, access keys, and endpoint should be specified in a dict of configuration properties.
from deephaven.experimental import iceberg
local_adapter = iceberg.adapter(
name="generic-adapter",
properties={
"type": "rest",
"uri": "http://rest:8181",
"client.region": "us-east-1",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.endpoint": "http://minio:9000",
},
)
When using adapter_s3_rest
, the constructor arguments should specify the catalog URI, warehouse location, region name, access key ID, secret access key, and endpoint override.
from deephaven.experimental import iceberg
local_adapter = iceberg.adapter_s3_rest(
name="minio-iceberg",
catalog_uri="http://rest:8181",
warehouse_location="s3a://warehouse/wh",
region_name="us-east-1",
access_key_id="admin",
secret_access_key="password",
end_point_override="http://minio:9000",
)
Once an IcebergCatalogAdapter
has been created, it can query the namespaces and tables in a catalog. The following code block gets the available top-level namespaces and tables in the nyc
namespace.
namespaces = local_adapter.namespaces()
tables = local_adapter.tables(namespace="nyc")
At this point, you can also load a table from the catalog with load_table
. What you get is not a Deephaven table, but an IcebergTableAdapter
.
iceberg_taxis = local_adapter.load_table(table_identifier="nyc.taxis")
To load the nyc.taxis
Iceberg table into a Deephaven table, you should create an instance of the IcebergReadInstructions
class. This class tells Deephaven more about:
- The resultant table definition.
- Custom data instructions, including S3 configuration parameters.
- Column renames.
- The update mode for the table: static, refreshed manually, or refreshed automatically.
- A particular snapshot ID to read.
In this example, we will be loading a static table. So, the following instructions will suffice:
static_instructions = iceberg.IcebergReadInstructions(
update_mode=iceberg.IcebergUpdateMode.static()
)
Now that we have the table adapter and the instructions, we can read the table into a Deephaven table:
taxis = iceberg_taxis.table(static_instructions)
With AWS Glue
There are two ways to connect to and read data from an AWS Glue catalog in Deephaven:
When using adapter
, the catalog type, URI, access keys, and endpoint should be specified in the configuration properties.
from deephaven.experimental import s3, iceberg
cloud_adapter = iceberg.adapter(
name="generic-adapter",
properties={
"type": "glue",
"uri": "s3://lab-warehouse/sales",
},
)
When using adapter_aws_glue
, the constructor arguments should specify the catalog URI, warehouse location, and name.
from deephaven.experimental import s3, iceberg
cloud_adapter = iceberg.adapter_aws_glue(
name="aws-iceberg",
catalog_uri="s3://lab-warehouse/nyc",
warehouse_location="s3://lab-warehouse/nyc",
)
AWS region and credential information must be made visible to Deephaven if running from Docker. The Docker Compose deployment used in this guide makes the default location visible to the Deephaven container via a volume mount. The default location could also be changed. See here for more information.
Once an IcebergCatalogAdapter
has been created, it can query the namespaces, tables, and snapshots in a catalog. The following code block gets the available top-level namespaces and the tables in the nyc
namespace, snapshots in the nyc.taxis
table, and the nyc.taxis
table itself.
namespaces = cloud_adapter.namespaces()
tables = cloud_adapter.tables(namespace="nyc")
At this point, you can also load a table from the catalog with load_table
. What you get is not a Deephaven table, but an IcebergTableAdapter
.
iceberg_taxis = cloud_adapter.load_table(table_identifier="nyc.taxis")
To load the nyc.taxis
Iceberg table into a Deephaven table, you must create an S3Instructions object with information about the region, keys, and endpoint of the Iceberg table. This object is then used to create an instance of the IcebergReadInstructions class, which is passed as an input to the IcebergTableAdapter.table
method.
from deephaven.experimental import s3
s3_instructions = s3.S3Instructions(
region_name="us-east-1", access_key_id="admin", secret_access_key="password"
)
iceberg_instructions = iceberg.IcebergReadInstructions(
data_instructions=s3_instructions
)
taxis = iceberg_taxis.table(instructions=iceberg_instructions)
Custom Iceberg instructions
You can specify custom instructions when creating an IcebergReadInstructions
instance. Each subsection below covers a different custom instruction that can be passed in when reading Iceberg tables.
Refreshing Iceberg tables
Deephaven also supports refreshing Iceberg tables. The IcebergUpdateMode
class specifies three different supported update modes:
- Static
- Refreshed manually
- Refreshed automatically
This guide already looked at static Iceberg tables. For Iceberg tables that can be refreshed manually and automatically, the following code block creates an instance of each mode:
manual_refresh_mode = iceberg.IcebergUpdateMode.manual_refresh()
auto_refresh_mode_60s = iceberg.IcebergUpdateMode.auto_refresh()
auto_refresh_mode_30s = iceberg.IcebergUpdateMode.auto_refresh(auto_refresh_ms=30000)
# Instructions for an Iceberg table that will be refreshed manually
manual_refresh_instructions = iceberg.IcebergReadInstructions(
update_mode=manual_refresh_mode
)
# Instructions for an Iceberg table that will be refreshed automatically every 60 seconds
auto_refresh_instructions_60s = iceberg.IcebergReadInstructions(
update_mode=auto_refresh_mode_60s
)
# Instructions for an Iceberg table that will be refreshed automatically every 30 seconds
auto_refresh_instructions_30s = iceberg.IcebergReadInstructions(
update_mode=auto_refresh_mode_30s
)
S3 instructions
Reading data from S3 storage often requires additional information. In Deephaven, this is done with S3Instructions
. The following code block creates an S3Instructions
object with the region, access key, secret key, and endpoint for reading Iceberg tables from S3 cloud storage:
from deephaven.experimental import s3
s3_instructions = s3.S3Instructions(
region_name="us-east-1",
access_key_id="admin",
secret_access_key="password",
endpoint_override="http://minio:9000",
)
iceberg_instructions = iceberg.IcebergInstructions(data_instructions=s3_instructions)
Table definition
You can specify the resultant table definition of an Iceberg read operation. The following code block defines a custom table definition to use when reading from Iceberg:
from deephaven.experimental import iceberg
from deephaven import dtypes as dht
def_instructions = iceberg.IcebergReadInstructions(
table_definition={
"ID": dht.long,
"Timestamp": dht.Instant,
"Operation": dht.string,
"Summary": dht.string,
}
)
Column renames
A common example is renaming columns. Deephaven recommends columns follow PascalCase
naming conventions. The following code block renames certain columns in the nyc.taxis
table to follow this convention:
from deephaven.experimental import iceberg
iceberg_instructions_renames = iceberg.IcebergReadInstructions(
column_renames={
"tpep_pickup_datetime": "PickupTime",
"tpep_dropoff_datetime": "DropoffTime",
"passenger_count": "NumPassengers",
"trip_distance": "Distance",
},
)
Snapshot ID
Deephaven can also be told to read a particular snapshot ID:
from deephaven.experimental import iceberg
snapshot_instructions = iceberg.IcebergReadInstructions(snapshot_id=6738371110677246500)
Table definition
You can also set the table definition via custom Iceberg instructions. However, Deephaven automatically infers the correct data types for the nyc.taxis
table, so this is not needed. See IcebergReadInstructions
for more information.
Next steps
This guide presented a basic example of interacting with an Iceberg catalog in Deephaven. These examples can be extended to include more complex queries, catalogs with multiple namespaces, snapshots, custom instructions, and more.