Skip to main content
Version: Python

Export Deephaven Tables to Parquet Files

The Deephaven Parquet Python module provides tools to integrate Deephaven with the Parquet file format. This module makes it easy to write Deephaven tables to Parquet files and directories. This document covers writing Deephaven tables to single Parquet files, flat partitioned Parquet directories, and key-value partitioned Parquet directories.

By default, Deephaven tables are written to Parquet files using SNAPPY compression when writing the data. This default can be changed with the compression_codec_name argument in any of the writing functions discussed here or with the codec_name argument in the ColumnInstruction class. See the column instructions section for more information.

First, create some tables that will be used for the examples in this guide.

from deephaven import new_table, merge
from deephaven.column import int_col, double_col, string_col

math_grades = new_table(
[
string_col("Name", ["Ashley", "Jeff", "Rita", "Zach"]),
string_col("Class", ["Math", "Math", "Math", "Math"]),
int_col("Test1", [92, 78, 87, 74]),
int_col("Test2", [94, 88, 81, 70]),
]
)

science_grades = new_table(
[
string_col("Name", ["Ashley", "Jeff", "Rita", "Zach"]),
string_col("Class", ["Science", "Science", "Science", "Science"]),
int_col("Test1", [87, 90, 99, 80]),
int_col("Test2", [91, 83, 95, 78]),
]
)

history_grades = new_table(
[
string_col("Name", ["Ashley", "Jeff", "Rita", "Zach"]),
string_col("Class", ["History", "History", "History", "History"]),
int_col("Test1", [82, 87, 84, 76]),
int_col("Test2", [88, 92, 85, 78]),
]
)

grades = merge([math_grades, science_grades, history_grades])

grades_partitioned = grades.partition_by("Class")

Write to a single Parquet file

Write a Deephaven table to a single Parquet file with parquet.write. Supply the table argument with the Deephaven table to be written, and the path argument with the destination file path for the resulting Parquet file. This file path should end with the .parquet file extension.

from deephaven import parquet

parquet.write(table=grades, path="/data/grades/grades.parquet")

Write _metadata and _common_metadata files by setting the generate_metadata_files argument to True. Parquet metadata files are useful for reading very large datasets, as they enhance the performance of the read operation significantly. If the data might be read in the future, consider writing metadata files.

parquet.write(
table=grades, path="/data/grades_meta/grades.parquet", generate_metadata_files=True
)

Partitioned Parquet directories

Deephaven supports writing tables to partitioned Parquet directories. A partitioned Parquet directory organizes data into subdirectories based on one or more partitioning columns. This structure allows for more efficient data querying by pruning irrelevant partitions, leading to faster read times than a single Parquet file. Deephaven tables can be written to flat partitioned directories or key-value partitioned directories.

Data can be written to partitioned directories from Deephaven tables or from Deephaven's partitioned tables. Partitioned tables have partitioning columns built into the API, so Deephaven can use those partitioning columns to create partitioned directories. Regular Deephaven tables do not have partitioning columns, so the user must provide that information using the table_definition argument to any of the writing functions.

Table definitions represent a table's schema. They are constructed from lists of Deephaven Column objects that specify a column's name and type using types from the deephaven.dtypes Python module. Additionally, Column objects are used to specify whether a particular column is a partitioning column by setting the column_type argument to ColumnType.PARTITIONING.

Create a table definition for the grades table defined above.

from deephaven import dtypes
from deephaven.column import col_def, ColumnType

grades_def = [
col_def("Name", dtypes.string),
# Class is declared to be a partitioning column
col_def("Class", dtypes.string, column_type=ColumnType.PARTITIONING),
col_def("Test1", dtypes.int32),
col_def("Test2", dtypes.int32),
]

Write to a key-value partitioned Parquet directory

Key-value partitioned Parquet directories extend partitioning by organizing data based on key-value pairs in the directory structure. This allows for highly granular and flexible data access patterns, providing efficient querying for complex datasets. The downside is the added complexity in managing and maintaining the key-value pairs, which can be more intricate than other partitioning methods.

Use parquet.write_partitioned to write Deephaven tables to key-value partitioned Parquet directories. Supply a Deephaven table or a partitioned table to the table argument, and set the destination_dir argument to the destination root directory where the partitioned Parquet data will be stored. Non-existing directories in the provided path will be created.

from deephaven import parquet

# write a standard Deephaven table, must specify table_definition
parquet.write_partitioned(
table=grades, destination_dir="/data/grades_kv_1/", table_definition=grades_def
)

# or write a partitioned table
parquet.write_partitioned(
table=grades_partitioned, destination_dir="/data/grades_kv_2/"
)

Set the generate_metadata_files argument to True to write metadata files.

parquet.write_partitioned(
table=grades_partitioned,
destination_dir="/data/grades_kv_2_md/",
generate_metadata_files=True,
)

Write to a flat partitioned Parquet directory

A flat partitioned Parquet directory stores data without nested subdirectories. Each file contains partition information within its filename or as metadata. This approach simplifies directory management compared to hierarchical partitioning but can lead to larger directory listings, which might affect performance with many partitions.

Use parquet.write or parquet.batch_write to write Deephaven tables to Parquet files in flat partitioned directories. parquet.write requires multiple calls to write multiple tables to the destination, while parquet.batch_write can write multiple tables to multiple paths in a single call.

Supply parquet.write with the Deephaven table to be written and the destination file path with the table and path arguments. The path must end with the .parquet file extension.

from deephaven import parquet

parquet.write(math_grades, "/data/grades_flat_1/math.parquet")
parquet.write(science_grades, "/data/grades_flat_1/science.parquet")
parquet.write(history_grades, "/data/grades_flat_1/history.parquet")

Use parquet.batch_write to accomplish the same thing by passing multiple tables to the tables argument and multiple destination paths to the paths argument. This requires the table_definition argument to be specified.

parquet.batch_write(
tables=[math_grades, science_grades, history_grades],
paths=[
"/data/grades_flat_2/math.parquet",
"/data/grades_flat_2/science.parquet",
"/data/grades_flat_2/history.parquet",
],
table_definition=grades_def,
)

To write a Deephaven partitioned table to a flat partitioned Parquet directory, the table must first be broken into a list of constituent tables, then parquet.batch_write can be used to write all of the resulting constituent tables to Parquet. Again, the table_definition argument must be specified.

from deephaven.pandas import to_pandas

# get keys from table
grades_partitioned_keys = grades_partitioned.keys()

# make keys iterable through pandas
keys = to_pandas(grades_partitioned_keys)["Class"]

# create list of constituent tables from keys
grades_partitioned_list = []
for key in keys:
grades_partitioned_list.append(grades_partitioned.get_constituent(key))

# write each constituent table to Parquet using batch_write
parquet.batch_write(
tables=grades_partitioned_list,
paths=[
"/data/grades_flat_3/math.parquet",
"/data/grades_flat_3/science.parquet",
"/data/grades_flat_3/history.parquet",
],
table_definition=grades_def,
)

Optional arguments

The write, write_partitioned, and batch_write functions from the Deephaven Parquet Python module all accept additional optional arguments used to control the specifics of how data gets written from Deephaven to Parquet. Here are the additional arguments that all three of these functions accept:

  • table_definition: The table definition or schema, provided as a dictionary of string-DType pairs, or as a list of ColumnDefinition instances. When not provided, the column definitions implied by the table(s) are used.
  • col_instructions: Instructions for customizations while writing particular columns, provided as a ColumnInstruction or a list of ColumnInstructions. The default is None, which means no specialization for any column.
  • compression_codec_name: The name of the compression codec to use. Defaults to SNAPPY.
  • max_dictionary_keys: The maximum number of unique keys the writer should add to a dictionary page before switching to non-dictionary encoding. This is never evaluated for non-string columns. Defaults to 2^20 (1,048,576).
  • max_dictionary_size: The maximum number of bytes the writer should add to the dictionary before switching to non-dictionary encoding. This is never evaluated for non-string columns. Defaults to 2^20 (1,048,576).
  • target_page_size: The target page size in bytes. Defaults to 2^20 bytes (1 MiB).
  • generate_metadata_files: Whether to generate Parquet _metadata and _common_metadata files. Defaults to False.
  • index_columns: Sequence of sequences containing the column names for indexes to persist. The write operation will store the index info for the provided columns as sidecar tables. For example, if the input is [[“Col1”], [“Col1”, “Col2”]], the write operation will store the index info for [“Col1”] and for [“Col1”, “Col2”]. By default, data indexes to write are determined by those of the source table. This argument can be used to narrow the set of indexes to write or to be explicit about the expected set of indexes present on all sources. Indexes that are specified but missing will be computed on demand.

Column Instructions

The col_instructions argument to write, write_partitioned, and batch_write must be an instance of the ColumnInstruction class. This class maps specific columns in the Deephaven table to specific columns in the resulting Parquet files, as well as specifying the method of compression used for that column.

ColumnInstruction has the following arguments:

  • column_name: The column name in the Deephaven table to apply these instructions.
  • parquet_column_name: The name of the corresponding column in the Parquet dataset.
  • codec_name: The name of the compression codec to use.
  • codec_args: An implementation-specific string used to map types to/from bytes. It is typically used in cases where there is no obvious language-agnostic representation in Parquet.
  • use_dictionary: True or False indicating whether or not to use dictionary-based encoding for string columns.

Of particular interest is the codec_name argument. This defines the particular type of compression used for the given column and can have significant implications for the speed of the export. The options are:

  • SNAPPY: (default) Aims for high speed and a reasonable amount of compression. Based on Google's Snappy compression format.
  • UNCOMPRESSED: The output will not be compressed.
  • LZ4_RAW: A codec based on the LZ4 block format. Should always be used instead of LZ4.
  • LZO: Compression codec based on or interoperable with the LZO compression library.
  • GZIP: Compression codec based on the GZIP format (not the closely-related "zlib" or "deflate" formats) defined by RFC 1952.
  • ZSTD: Compression codec with the highest compression ratio based on the Zstandard format defined by RFC 8478.
  • LZ4: Deprecated Compression codec loosely based on the LZ4 compression algorithm, but with an additional undocumented framing scheme. The framing is part of the original Hadoop compression library and was historically copied first in parquet-mr, then emulated with mixed results by parquet-cpp. Note that LZ4 is deprecated; use LZ4_RAW instead.