Skip to main content

Racing with Materialize

· 20 min read
DALL·E prompt: two panel comic of v8 car engines made of colorful beams of light, 3d render artstation
Cristian Ferretti
A performance comparison between two update models

Framing and Context

Deephaven recently published a GitHub repository showing an implementation of Materialize’s e-commerce functional demo using Deephaven as the query engine. Today we want to explore the difference in performance between our respective solutions and the factors that drive those differences.

Before we start, caveat emptor: Materialize and Deephaven engines come from different corners of the data processing world; from their genealogy, you would expect them to have distinct proficiencies matching the use cases they grew up to serve. As we discuss elsewhere, we believe you wouldn’t be wrong in that expectation. Likely there will be particular use cases for which one of them is a better fit.

However, many use cases live at the intersection of the two engines’ capabilities, so you could rightfully consider either. We believe this particular e-commerce analysis is such an example: a use case that is fair game for both.

Goals

At a high level, we want to investigate the relative performance of our respective engines. To that effect, our specific goals are:

  • Test both Deephaven and Materialize under high message rate load.
  • Find the limits for the basic (default) setup for each: specifically, the highest sustainable message rate.
  • Explore configuration changes to support higher message rates on each of Materialize and Deephaven.
  • Accumulate experience towards establishing a model for performance testing of streaming platforms generally.

Results

As detailed in the Performance Comparison section below, the differences in design philosophy and targets lead to stark differences in throughput and resource consumption.

For workloads of all sizes, Deeephaven uses a fraction of the CPU required by Materialize. While Deephaven has a small amount of memory overhead for small workloads (a touch over a gigabyte), its memory consumption scales very gently in comparison with Materialize. At higher data rates, around 100k pageviews/second, Materialize uses drastically more CPU and memory than Deephaven while falling behind on the data. The extreme scalability of Deephaven is a product of its pedigree in big-data capital markets applications.

pageviews/secondMaterialize CPUDeephaven CPUMaterialize MemDeephaven Mem
5030.6%8.3%0.16 GiB1.30 GiB
5,000179.6%33.1%0.57 GiB1.70 GiB
35,000(1) 848.5%82.2%2.80 GiB2.60 GiB
100,000(2) 848.4%96.2%7.00 GiB4.60 GiB
145,000149.5%5.20 GiB
160,000174.6%5.90 GiB
200,000(3) 240.0%6.70 GiB

Next, let's dig into the construction of the benchmark that produced these results.

Strategy

We closely follow the Materialize demo available on GitHub . We did our best to capture the semantics of the original SQL queries, while converting the code to Deephaven table operations for execution inside our Python web UI. While we provide a Python script with the full resulting code, we encourage you to cut & paste Python expressions one at a time in the interactive console and incrementally “build” the solution while exploring the table results — just as you would if you were working on creating a data model. ( Alternatively, you could paste the entire script in a Deephaven notebook and step through it incrementally).

The e-commerce demo has two sources of streaming events:

  1. Customer purchases, reflected as database transactions in MySQL, which are converted to Kafka Avro-encoded events via Debezium using its MySQL CDC connector.
  2. Customer pageviews, reflected as Kafka JSON-encoded events.

Multiple derived “streaming tables” (in Deephaven’s terms) or “materialized views” (in Materialize’s terms) are created from those events, including joins and grouping involving both the purchase and pageview streams.

Our test strategy kept purchases constant and steady, while we increased pageview rates until performance-related delays were detected. We expected this point would indicate an engine no longer capable of “keeping up”, which would be confirmed by seeing further increases in refresh delays as customer pageview rates increased. We selected pageviews for this rate increase instead of purchases for two reasons:

  1. We believe it more realistic. It’s plausible that pageview spikes would exist on top of a horizontally scaled web frontend. That is less true for an OLTP oriented database engine like MySQL intermediating CDC events to Kafka (which works as a "smoothing" function), for the purchases case.
  2. Simulated using only Kafka publishers, pageviews prevent us from worrying about MySQL or Debezium and their configurations. We wanted to ensure bottlenecks showed in Materialize or Deephaven, not earlier points in the pipeline.

Implementation

Tracking Update Delays

We defined one additional streaming table in both Deephaven and Materialize to help detect update delays.

Every pageview event includes a received_at field that is timestamped at the time the pageview simulated action is created, right before publication. This is not the Kafka Timestamp field on every message, but a specific data field in the JSON encoded pageview.

Tracking Delays in Deephaven

For Deephaven, we define:

pageviews_summary = pageviews_stg \
.aggBy(
as_list([
agg.AggCount('total'),
agg.AggMax('max_received_at = received_at')])) \
.update('dt_ms = (DateTime.now() - max_received_at)/1_000_000.0')

We ran into two pitfalls trying different ways to define this table:

  • We initially used update_view instead of update to create the dt_ms column. In Deephaven, update_view creates columns that are computed when data is pulled (read) from the result table, as opposed to computed and a result stored in memory on every update cycle. Said differently, update_view calculates results on demand. For some use cases, this is far more efficient in CPU and memory resources. However, for the case at hand, it meant that the timestamping in the call to DateTime.now() was triggered when the Deephaven web UI refreshed the rendered result, as opposed to when the Update Graph Processor actually processed new events. Using update ensures that dt_ms is calculated during the Update Graph Processor's cycle, after computing the total and max_received_at statistics.

  • This is rather technical, but we include it here for transparency:

    Separating the new timestamp to its own column in the update expression is tempting, as a way to make the resulting table more explicit. Like so:

        .update('now = DateTime.now()',
    'dt_ms = (now - max_received_at)/1_000_000.0')

    However, since the now column in this definition does not have any inputs depending on ticking data, Deephaven would compute it just once, during the initial refresh associated with table creation; as an optimization, subsequent update cycles would not rerun DateTime.now() to recompute the value. This would cause dt_ms to use the initialization time for 'now', rather than the current time during the Update Graph Processor cycle. It is possible to override this optimization and define the column such that it refreshes on every change to the table, but for our purposes it is not necessary to add that complexity to the script.

Tracking Delays in Materialize

For Materialize, we poll the results for this query:

SELECT total,
to_timestamp(max_received_at) max_received_ts,
mz_logical_timestamp() - 1000*max_received_at AS dt_ms
FROM pageviews_summary;

Originally we tried to define this query as a materialized view. We had to settle for a SELECT statement (that we execute using psql once per second with the help from the watch shell command, similar to how the original Materialize demo README suggests for another query. The reason is, it is not possible to call mz_logical_timestamp in a materialized view definition.

Generating High Pageview Action Rates in Python

The original Materialize demo repository includes the code for a Python script that simulates both purchase actions (via updating MySQL tables) and pageview actions (via publishing to Kafka-compatible Redpanda). We started with this code and modified it to support controlling at runtime the rate of both purchases and pageviews. You can see the resulting script in the Deephaven repository. The main changes are described below.

  • We added a simple command API via socket connection for message rate control.
  • Since the Python libraries for MySQL and connection to Kafka both offer a synchronous API, we changed the script to take advantage of the asyncio Python library to wrap the synchronous calls: above a certain action rate, the time blocking in a call to generate an action exceeds the implied period between actions. We started with actions running on top of a ThreadPoolExecutor , but had to later switch to a ProcessPoolExecutor due to CPU utilization in a single Python process maxing out (since due to the GIT a single Python code cannot reach more than one full CPU utilization).
  • We wrote a simple traffic shaping function that tries to keep a steady rate of actions executed, and is able to adapt the target rate dynamically, by ramping up or down an estimate of the short term rate. To estimate the short term rate, we wrote a simple rate estimation algorithm based on a moving average of rolling fixed-size periods.
  • Instead of using the default Kafka (Redpanda) topic configuration for pageviews, which in the original demo auto-creates with a single partition on first message published, we create the topic via the administrative Kafka API and configure it for multiple partitions. Using independent Kafka Publisher objects targeting separate partitions per Python Process Executor enables multiple independent partition streams at both the publisher and broker level.

We configure the Kafka producers to disable producer acks and for increased batch size. Disabling producer acks is a setting that compromises correctness for performance, and you should seldom do this in a real production environment. In our context, however, with all processes running in a single-machine dockerized compose, we do it with the goal of maximizing broker throughput and minimizing producer blocking call times.

Configuring Docker Compose for High Message Rate

We want to ensure a high message rate reaches the query engines. As much as possible, we want to prevent intermediate systems from introducing queuing or delays, or any smoothing out effect in front of the engines. We change the docker compose configuration accordingly:

Test Platform and Test Plan

Test Platform

The following hardware and software setup was used:

  • Intel Core i9-10900K (10 cores, 20 threads, 20 MiB cache, 3.7 GHz base / 5.3 GHz max freq)
  • 128 GiB RAM
  • Ubuntu Linux 20.04.
  • mbw -t0 8192 reports memory bandwidth at an average of 9611.7 MiB/s.

Test Plan

The test plan below repeats in greater detail the instructions summarized in the README.md file for the debezium/perf directory.

  1. Ensure everything starts afresh and there is no “memory” of previous runs stored in container filesystems or volumes. On the debezium/perf directory run:

    docker compose stop
    docker compose down -v
  2. Start docker compose and save log output to a file. On the debezium/perf directory, run:

    docker compose up -d
    nohup docker compose -f logs >& /tmp/perf.log < /dev/null &
  3. Wait until Debezium has finished initialization (log line stating “Keepalive thread is running” shown) and loadgen has initialized the database (log line stating “Initializing shop database DONE”). docker-compose log

  4. Open a web browser tab and connect to the Deephaven web console at:

    localhost:10000/ide

    In the console, run the Deephaven Python script that creates the demo tables via:

    exec(open('/scripts/demo.py').read())

    This will create several Python top level variables, one for each table.

    Each table becomes a tab in the UI showing the current table data. Right after the statement that created them, you should see one icon (displayed prominently in blue) for each variable. Clicking an icon jumps you to the respective table display.

  5. Switch to the tab for the pageviews_summary table to display it. You should see a single row with columns total ( accumulated number of pageview events processed), max_received_at (most recent timestamp across all pageview events), and dt_ms (the difference in milliseconds between max_received_at and the current time). As the default Deephaven configuration updates tables once per second, an OK value of dt_ms can be anything between zero and 1,000 milliseconds. This configuration is often meaningfully lowered for other use cases, but we are sticking with defaults here.

    Deephaven web UI displaying pageviews_summary

  6. Start Materialize’s command line interface. On the debezium/perf directory, run:

    docker compose run mzcli

    Inside the CLI, load the SQL code for the Materialize demo:

    \i /scripts/demo.sql

    You should see several lines of output stating CREATE SOURCE and then CREATE VIEW.

    materialize command line interface

    Exit the CLI with Ctrl-D and run the following watch command in the shell:

    watch -n1 "psql -c '
    SELECT
    total,
    to_timestamp(max_received_at) max_received_ts,
    mz_logical_timestamp() - 1000 * max_received_at AS dt_ms
    FROM pageviews_summary;' -U materialize -h localhost -p 6875"

    watch psql command

    You should see the results of the SELECT query above executed once per second. Note this requires the host to have the psql command installed, which is part of the PostgreSQL client tools package in most linux distributions (e.g. in Ubuntu 20.04 the package is postgresql-client-12)

    watch psql output

  7. To allow us to track CPU and RAM utilization, on a separate shell (or terminal window) run the top command.

    You should see near the top of the processes by CPU utilization redpanda, materialize, java (this is the Deephaven engine), and pypy. Note %CPU and RES (RAM resident size, in KiB if no unit indicated) for each as we start. The start rates are 3 purchases per second and 50 pageviews per second, which should create some noticeable, albeit low, CPU load, enough for our process of interest to already be showing up in top’s list.

    Initial top output

  8. Connect to the socket command line client for the generate load script:

    nc localhost 8090

    We used netcat above, part of the netcat-bsd Ubuntu package, but you can use other tools like telnet if you prefer. Set the desired rate of pageviews while you watch top and the table for dt_ms in both Materialize and Deephaven ( 10,000 is an example below, modify as required):

    set pageviews_per_second 10000

    loadgen script cli

    The docker compose log should show loadgen output acknowledging the change, with a message reading “Changing pageview rate from 50 to 10000 per second”.

    loadgen logging rate change

    After a few seconds (it is logged every 10), the docker compose log should show loadgen generating an effective rate close to the one requested, with a message reading similar to “Simulated 99007 pageview actions in the last 10.0 seconds, effective rate 9896.08/s”. Note it may take two cycles of 10 second rate logging to show the new rate as steady, since the change likely happens in the middle of a particular cycle and affects its count of messages only from that point forward.

    loadgen logging effective rate

  9. Wait a few seconds for the new rate to settle and use top to sample CPU (%CPU column) and Memory (RES column) utilization.

Test automation

Manually executing the test plan is prone to several sources of variation.

  • In the default period of one sample per second, top samples vary considerably from one to the next: instantaneous CPU utilization is not a good approximation for CPU utilization over a period of time. Thus ideally one would like to take several instantaneous samples over a period and use them to approximate a value for the period.
  • As time passes, the engines require more memory to store derived data for growing tables: to make an experiment run repeatable, ideally samples should be taken a given constant time after a new rate is set, which in turn should happen quickly after compose startup.

To avoid these pitfalls for manual execution, we created an automated experiment that runs with a script. The run_experiment.sh script in the debezium/perf directory automates:

  • startup of the containers required for a particular run (and only those)
  • saving container logs
  • loading the demo code in the respective engine and sampling of engine delays to a log file
  • setup of a new given pageviews per second rate, and a fixed wait time thereafter for processing to settle
  • sampling of CPU and memory utilization via top to a log file
  • stop and reset of the containers

Example

cd debezium/perf
./run_experiment.sh dh 5000 20 10 1.0

This will run an experiment for Deephaven (tag dh; use tag mz for Materialize) with a target rate of 5,000 pageviews per second. It will wait 20 seconds after setting the target rate to begin sampling CPU and memory utilization using top in batch mode. 10 samples will be obtained, with a delay of 1.0 seconds between samples .

Performance Comparison

Summary of automated results

The table below shows the mean CPU utilization and the max memory utilization for a period of 10 seconds, starting after 20 seconds a new pageviews per second rate was set. Every run is started afresh, and only one of the two engines is running.

pageviews/secondMaterialize CPUDeephaven CPUMaterialize MemDeephaven Mem
5030.6%8.3%0.16 GiB1.30 GiB
5,000179.6%33.1%0.57 GiB1.70 GiB
35,000(1) 848.5%82.2%2.80 GiB2.60 GiB
100,000(2) 848.4%96.2%7.00 GiB4.60 GiB
145,000149.5%5.20 GiB
160,000174.6%5.90 GiB
200,000(3) 240.0%6.70 GiB
  1. We reconfigured Materialize to run with 8 workers.
  2. Materialize is falling behind at this point, as evidenced by logs/2022.03.22.05.04.28_UTC_mz_100000/mz_sample_dt.log.
  3. The Deephaven update cycle is taking longer than 1 second, as logged in logs/2022.03.22.05.09.52_UTC_dh_200000/docker-compose.log.

You can download the logs produced for these runs here.

Details for some manual runs

Note the results below were obtained manually, which is prone to issues mentioned in the previous section. Still, we believe it is useful to display some top screen captures and videos recorded for a couple of cases.

  • On startup with the initial 3 purchases per second and 50 pageviews per second:

    • Materialize consumes 229 MiB RAM and 46% CPU.
    • Deephaven consumes 1.8 GiB RAM and 3% CPU.

    top output for 50 pageviews/s

  • Increasing pageviews per second to 5,000:

    • Materialize consumes 899 MiB RAM and 168% CPU.
    • Deephaven consumes 2.6 GiB RAM and 47% CPU.

    top output for 5,000 pageviews/s

    The output from top oscillates quite a bit at this point. A later reading a minute later is a bit more stable (but there are still oscillations):

    • Materialize consumes 1.2 GiB RAM and 157% CPU.
    • Deephaven consumes 3.0 GiB RAM and 18% CPU.

    top output for 5,000 pageviews/s, later

    Note as time passes, both engines need to increase their memory consumption given the accumulation of data on

    tables that grow as new events arrive.

  • Increasing pageviews per second to 35,000:

    • Materialized consumes 4.8 GiB RAM and 244% of CPU.
    • Deephaven consumes 5.1 GiB RAM and 92% CPU.

    top output for 35,000 pageviews/s

    We note the watch command for the psql SELECT query with dt_ms for Materialize is refreshing in jumps of several seconds. We suspect Materialize is running short of CPU given the default configuration of -w2 with 2 workers.

    Deephaven meanwhile is still refreshing consistently every second.

    You can get more details about the Deephaven Update Graph Processor refresh cycle in the docker-compose log for Deephaven server, in log lines reading Update Graph Processor cycleTime=....

    Deephaven server log for update graph processor cycle time

    At this point, we stop the test and reconfigure Materialize with 8 workers, by modifying the .env file in the debezium/perf directory and setting MATERIALIZE_WORKERS=8, restart from the beginning of the test plan, and set pageviews per second to 35,000. With 35,000 pageviews per second and 8 workers, we see Materialize consumes 4.1 GiB RAM and 829% CPU.

    top output for 35,000 pageviews/s and 8 Materialized workers

    Updates to the watch psql command are refreshed once per second, so with 8 workers Materialize is keeping up.

  • Restarting with 100,000 pageviews per second:

    • Materialize consumes 9.9 GiB RAM and 801% CPU.
    • Deephaven consumes 5.8 GiB RAM and 200% CPU.

    top output for 100,000 pageviews/s

    We are still running Materialize with 8 workers here, which probably explains why CPU utilization stays around 800%.

Talk to us

We encourage you to explore our GitHub code and to draw your own conclusions. We look forward to feedback, insight, and curiosity from the community.

Notes

  1. With respect to memory utilization, at high message rates (say, 200k msg/sec), it does not take more than a few minutes under the default Deephaven configuration for the Deephaven engine to run out of memory. As our engine is Java based, we need to define a maximum heap size on startup, and our defaults set that to 4 GiB in our core repo, and 12 GiB in this demo. To facilitate longer testing this is easily modified: in the .env file on the debezium/perf directory, set DEEPHAVEN_HEAP to the desired value; this translates to the value used for the java command -Xmx option. This does not impact initial memory allocation, but provides a hard limit on total memory allocation. Although a limitation here for our current purposes, in production deployments with shares resources this is helpful. You can monitor actual memory utilization through top or inside the DH web console using the processMemory table from the PerformanceQueries package, as follows:

    import deephaven.PerformanceQueries as pq
    procmem = pq.processMemory()

    This will create a table that updates very 15 seconds with memory statistics for the Deephaven engine process, including percentage of time spent in Garbage Collection.

    Process Memory Table