Symbol Caching for Importers
Symbol Managers
Most string columns have a limited universe of distinct values, and hence benefit from using a symbol table as part of their storage format. When writing a column with a local symbol table, a symbol manager is used to cache the symbol indexes assigned to distinct string values.
There are two types of symbol managers used for this purpose:
- Strict symbol managers guarantee a one-to-one mapping from distinct value to symbol index. They must hold the full reverse mapping in memory when actively in use, but produce the most compressed output. They are suitable for columns with a small (less than 1 million), well-understood number of distinct values, such as equity tickers or option symbols in a table of quotes or trades.
- Bounded symbol managers cache a limited number of one-to-many mappings from distinct value to symbol index in memory when actively in use. In order to guarantee deterministic output at replica nodes, they use a FIFO (first-in-first-out) cache eviction policy. They are suitable for columns with a large number of distinct values, where there is expected to be some locality of reference, such as order identifiers in a table for an order management system. The following property defines the FIFO window size for all bounded (non-strict) symbol managers:
LocalAppendableTableComponentFactory.boundedSymbolManagerSize
. The window size defaults to 10000. However, increasing this value is recommended if it is known that column values will repeat at a larger interval.
Note that symbol managers may only store approximately 2B mappings, because the symbol indexes must be less than the maximum array size in Java. Trying to store a mapping to an index higher than 2,147,483,646 = 231-2 will result in an UnsupportedOperationException
.
String Caches
Reading and writing from/to columns of string values usually involves a cache, in an attempt to reduce the total number of (possibly long-lived) objects created in the system. For writing processes (like the Data Import Server, offline or "batch mode" imports, or the merge process), cache selection has an impact on symbol manager selection, heap usage, and output size.
When an unbounded string cache is selected for a given appendable column of strings configured to use a local symbol table, the writing sub-system will use a strict symbol manager. In all other cases bounded symbol managers are used, with a FIFO window as mentioned above.
Data Import Server Cache Hints
Users should carefully consider which columns have a well-known universe of possible distinct values, and configure cache hints accordingly. If the cycle where data may repeat is long, configuring a larger bounded symbol manager size is appropriate/advisable. If your column's data is unlikely to repeat, or unlikely to repeat more than a small handful of times over the course of a partition, you should include symbolTable="None"
in your schema and disregard cache hints for the column in question.
The Data Import Server uses a series of cache hints (specified via properties) to determine string cache selection. It is possible to specify a global default, as well as column and table name specific caches, allowing sharing across tables in some cases to minimize overall heap usage at the expense of concurrency. This hint system allows Unbounded, Bounded, and "Always Create" caches to be specified. "Always Create" (or "No-Op") makes a new string in each instance, rather than caching the data.
Note that MergeData and other write jobs are also able to use hint-driven StringCacheProviders. The "default" StringCacheProvider is "unbounded," thereby triggering strict symbol manager usage.
Note
See: JavaDocs
To configure cache hints, the following syntax is used:
<DataImportServer>.StringCacheHint.<match-specifier>=<cache type>,<value type>,<arguments>
match-specifier
- The table name and column name, which may be exact or partial matches. These values take on the forms shown in the examples below.cache-type
- Specifies if cache is unbounded, bounded, or if a new string is always created rather than using a cache; the specific values are shown in the examples below.value-type
- A String or Compressed String value, as shown in the examples below.arguments
-- For unbounded caches — A single integer specifying the cache’s initial capacity. The cache will grow as necessary to accommodate additional entries; however, the Data Import Server must briefly pause data ingestion for a table location when increasing the cache’s capacity. For this reason, when configuring an unbounded cache for a specific column, it is preferable to set the cache’s initial capacity to meet or exceed the number of unique values expected for that column. Unbounded caches should not be used for columns with an infinite number of possible values, as this would cause the Data Import Server to attempt to use an infinite amount of memory, potentially leading to a crash or severely reduced performance.
- For bounded caches — The first argument is an integer specifying the cache size. The second integer is a "collision factor" used for performance tuning. Deephaven recommends setting this value to 2. The collision factor determines how far the cache will search for an slot before evicting an existing item from the cache. It may be appropriate to increase the collision factor when there is evidence of a nonuniform hashcode distribution for the items being cached.
The following examples illustrate all possible hint types.
Table name and column name exact match
DataImportServer.StringCacheHint.tableNameAndColumnNameEquals_OptionQuotes/Exchange=ConcurrentUnboundedStringCache,String,1
Column name exact match
DataImportServer.StringCacheHint.columnNameEquals_UnderlyingSymbol=ConcurrentUnboundedStringCache,String,20000
DataImportServer.StringCacheHint.columnNameEquals_Side=ConcurrentUnboundedStringCache,CompressedString,6
Column name contains
DataImportServer.StringCacheHint.columnNameContains_ClOrderId=ConcurrentBoundedStringCache,CompressedString,2000000,2
DataImportServer.StringCacheHint.columnNameContains_Flags=ConcurrentUnboundedStringCache,CompressedString,1000
Table name starts with
DataImportServer.StringCacheHint.tableNameStartsWith_Stock=ConcurrentBoundedStringCache,CompressedString,1000000,2
Global default
DataImportServer.StringCacheHint.default=ConcurrentBoundedStringCache,CompressedString,1000000,2
If the default value uses compressed strings but a column has a variable-width or multi-byte encoding, the internal default may be used:
DataImportServer.StringCacheHint.default=AlwaysCreateStringCache,String
Offline Importers and Merge
In the current version of Deephaven, all Deephaven Data Labs-provided importers, as well as the merge process, use a global default string cache for all output columns.
The size of this cache is specified by the following property, which bounds the cache of strings that will be stored:
StringCacheProvider.defaultStringCacheSize
Note
This deprecates BinaryStore.stringCacheSize
as of Deephaven v.1.20180430.
This property must be passed to the worker/process running a given import or merge. This can be accomplished by adding -DStringCacheProvider.defaultStringCacheSize=-1
to the process arguments when executing the import or merge class, or to the Extra JVM Arguments (Advanced Settings) in an import or merge Persistent Query configuration. The value above, -1, means that all strings should be cached indefinitely, and all symbol managers should be strict.
Remote Table Appender
The RTA uses the same rules as offline imports and merges. However, it is generally more suitable to use a large-but-bounded cache for most cases.
Bounded string managers work as follows. For string columns, a symbol table maps symbols to an ID, an integer offset. The field value for the Symbol ID (a 4-byte signed integer) is fixed width; this not only saves space if the same string is written multiple times, but allows for efficient random access. A fixed width symbol ID allows Deephaven to compute the byte index of the data we need to read from the row index, and read just those bytes without performing an O(n)
sequential read or O(log n)
lookup in a sorted data structure. Note that columns with encodings that are variable width or multi-byte are unable to use compressed strings. If such a column matches a cache of compressed strings, a default will be substituted.
For example, let's say you have 6 strings, A B C D E F, but you have a Symbol Manager whose cache can only hold 5 items.
Your symbol table will include:
ID | String |
---|---|
0 | A |
1 | B |
2 | C |
3 | D |
4 | E |
If new strings come in as "A A B A B C D E", then "0 0 1 0 1 2 3 4" is logged on disk for the column. If a row for F comes in, it cannot fit it in the symbol table because it is the sixth item; there is only room for five. The Symbol Table must drop an item, and it does so by following the "First In, First Out" rule: whatever was added to the cache first is removed, not whatever was least recently used. In this case, the Symbol Table removed "A."
Now our table looks like:
ID | String |
---|---|
5 | F |
1 | B |
2 | C |
3 | D |
4 | E |
As the table continues to update with new data, let's say we get another row with 'A'. It needs to be put back in our table, which means we have to remove another item, in this case 'B'. Because 'A' appears like a new string, it is assigned a new ID (now it is 6, not 0).
ID | String |
---|---|
5 | F |
6 | A |
2 | C |
3 | D |
4 | E |
It's important to set a reasonable value that your cache can hold. However, depending on the order of the source data, the same string could be read into the cache with a different ID multiple times. In this case, a symbol table is probably ineffective. For example, with a dataset like A B C D E F A B C D E F A B C D E F, the same string will be frequently assigned a new ID if the cache only holds five items. IDs can count up to 2 billion very quickly, which exceeds the maximum amount of mappings that can be stored. To avoid this, include symbolTable="None"
in your schema.
Deephaven does not have an unbounded cache by default because tables with non-repeating string data (e.g. ProcessEventLog) would very quickly exhaust available system resources resulting in process termination (which is of particular concern for long-running processes such as the DataImportServer).