Framing and Context
Deephaven recently published a GitHub repository showing an implementation of Materialize’s e-commerce functional demo using Deephaven as the query engine. Today we want to explore the difference in performance between our respective solutions and the factors that drive those differences.
Before we start, caveat emptor: Materialize and Deephaven engines come from different corners of the data processing world; from their genealogy, you would expect them to have distinct proficiencies matching the use cases they grew up to serve. As we discuss elsewhere, we believe you wouldn’t be wrong in that expectation. Likely there will be particular use cases for which one of them is a better fit.
However, many use cases live at the intersection of the two engines’ capabilities, so you could rightfully consider either. We believe this particular e-commerce analysis is such an example: a use case that is fair game for both.
Goals
At a high level, we want to investigate the relative performance of our respective engines. To that effect, our specific goals are:
- Test both Deephaven and Materialize under high message rate load.
- Find the limits for the basic (default) setup for each: specifically, the highest sustainable message rate.
- Explore configuration changes to support higher message rates on each of Materialize and Deephaven.
- Accumulate experience towards establishing a model for performance testing of streaming platforms generally.
Results
As detailed in the Performance Comparison section below, the differences in design philosophy and targets lead to stark differences in throughput and resource consumption.
For workloads of all sizes, Deeephaven uses a fraction of the CPU required by Materialize. While Deephaven has a small amount of memory overhead for small workloads (a touch over a gigabyte), its memory consumption scales very gently in comparison with Materialize. At higher data rates, around 100k pageviews/second, Materialize uses drastically more CPU and memory than Deephaven while falling behind on the data. The extreme scalability of Deephaven is a product of its pedigree in big-data capital markets applications.
pageviews/second | Materialize CPU | Deephaven CPU | Materialize Mem | Deephaven Mem |
---|---|---|---|---|
50 | 30.6% | 8.3% | 0.16 GiB | 1.30 GiB |
5,000 | 179.6% | 33.1% | 0.57 GiB | 1.70 GiB |
35,000 | (1) 848.5% | 82.2% | 2.80 GiB | 2.60 GiB |
100,000 | (2) 848.4% | 96.2% | 7.00 GiB | 4.60 GiB |
145,000 | 149.5% | 5.20 GiB | ||
160,000 | 174.6% | 5.90 GiB | ||
200,000 | (3) 240.0% | 6.70 GiB |
Next, let's dig into the construction of the benchmark that produced these results.
Strategy
We closely follow the Materialize demo available on GitHub . We did our best to capture the semantics of the original SQL queries, while converting the code to Deephaven table operations for execution inside our Python web UI. While we provide a Python script with the full resulting code, we encourage you to cut & paste Python expressions one at a time in the interactive console and incrementally “build” the solution while exploring the table results — just as you would if you were working on creating a data model. ( Alternatively, you could paste the entire script in a Deephaven notebook and step through it incrementally).
The e-commerce demo has two sources of streaming events:
- Customer purchases, reflected as database transactions in MySQL, which are converted to Kafka Avro-encoded events via Debezium using its MySQL CDC connector.
- Customer pageviews, reflected as Kafka JSON-encoded events.
Multiple derived “streaming tables” (in Deephaven’s terms) or “materialized views” (in Materialize’s terms) are created from those events, including joins and grouping involving both the purchase and pageview streams.
Our test strategy kept purchases constant and steady, while we increased pageview rates until performance-related delays were detected. We expected this point would indicate an engine no longer capable of “keeping up”, which would be confirmed by seeing further increases in refresh delays as customer pageview rates increased. We selected pageviews for this rate increase instead of purchases for two reasons:
- We believe it more realistic. It’s plausible that pageview spikes would exist on top of a horizontally scaled web frontend. That is less true for an OLTP oriented database engine like MySQL intermediating CDC events to Kafka (which works as a "smoothing" function), for the purchases case.
- Simulated using only Kafka publishers, pageviews prevent us from worrying about MySQL or Debezium and their configurations. We wanted to ensure bottlenecks showed in Materialize or Deephaven, not earlier points in the pipeline.
Implementation
Tracking Update Delays
We defined one additional streaming table in both Deephaven and Materialize to help detect update delays.
Every pageview event includes a received_at
field that is timestamped at the time the pageview simulated action is
created, right before publication. This is not the Kafka Timestamp field on every message, but a specific data field in
the JSON encoded pageview.
Tracking Delays in Deephaven
For Deephaven, we define:
pageviews_summary = pageviews_stg \
.aggBy(
as_list([
agg.AggCount('total'),
agg.AggMax('max_received_at = received_at')])) \
.update('dt_ms = (DateTime.now() - max_received_at)/1_000_000.0')
We ran into two pitfalls trying different ways to define this table:
We initially used
update_view
instead ofupdate
to create thedt_ms
column. In Deephaven,update_view
creates columns that are computed when data is pulled (read) from the result table, as opposed to computed and a result stored in memory on every update cycle. Said differently,update_view
calculates results on demand. For some use cases, this is far more efficient in CPU and memory resources. However, for the case at hand, it meant that the timestamping in the call toDateTime.now()
was triggered when the Deephaven web UI refreshed the rendered result, as opposed to when the Update Graph Processor actually processed new events. Usingupdate
ensures thatdt_ms
is calculated during the Update Graph Processor's cycle, after computing thetotal
andmax_received_at
statistics.This is rather technical, but we include it here for transparency:
Separating the new timestamp to its own column in the
update
expression is tempting, as a way to make the resulting table more explicit. Like so:.update('now = DateTime.now()',
'dt_ms = (now - max_received_at)/1_000_000.0')However, since the
now
column in this definition does not have any inputs depending on ticking data, Deephaven would compute it just once, during the initial refresh associated with table creation; as an optimization, subsequent update cycles would not rerunDateTime.now()
to recompute the value. This would causedt_ms
to use the initialization time for 'now', rather than the current time during the Update Graph Processor cycle. It is possible to override this optimization and define the column such that it refreshes on every change to the table, but for our purposes it is not necessary to add that complexity to the script.
Tracking Delays in Materialize
For Materialize, we poll the results for this query:
SELECT total,
to_timestamp(max_received_at) max_received_ts,
mz_logical_timestamp() - 1000*max_received_at AS dt_ms
FROM pageviews_summary;
Originally we tried to define this query as a materialized view. We had to settle for a SELECT
statement (that we
execute using psql once per second with the help from the watch
shell command, similar to how the original Materialize
demo
README suggests
for another query. The reason is, it
is not possible to
call mz_logical_timestamp
in a materialized view definition.
Generating High Pageview Action Rates in Python
The original Materialize demo repository includes the code for a Python script that simulates both purchase actions (via updating MySQL tables) and pageview actions (via publishing to Kafka-compatible Redpanda). We started with this code and modified it to support controlling at runtime the rate of both purchases and pageviews. You can see the resulting script in the Deephaven repository. The main changes are described below.
- We added a simple command API via socket connection for message rate control.
- Since the Python libraries for MySQL and connection to Kafka both offer a synchronous API, we changed the script to
take advantage of the
asyncio
Python library to wrap the synchronous calls: above a certain action rate, the time blocking in a call to generate an action exceeds the implied period between actions. We started with actions running on top of aThreadPoolExecutor
, but had to later switch to aProcessPoolExecutor
due to CPU utilization in a single Python process maxing out (since due to the GIT a single Python code cannot reach more than one full CPU utilization). - We wrote a simple traffic shaping function that tries to keep a steady rate of actions executed, and is able to adapt the target rate dynamically, by ramping up or down an estimate of the short term rate. To estimate the short term rate, we wrote a simple rate estimation algorithm based on a moving average of rolling fixed-size periods.
- Instead of using the default Kafka (Redpanda) topic configuration for pageviews, which in the original demo auto-creates with a single partition on first message published, we create the topic via the administrative Kafka API and configure it for multiple partitions. Using independent Kafka Publisher objects targeting separate partitions per Python Process Executor enables multiple independent partition streams at both the publisher and broker level.
We configure the Kafka producers to disable producer acks and for increased batch size. Disabling producer acks is a setting that compromises correctness for performance, and you should seldom do this in a real production environment. In our context, however, with all processes running in a single-machine dockerized compose, we do it with the goal of maximizing broker throughput and minimizing producer blocking call times.
Configuring Docker Compose for High Message Rate
We want to ensure a high message rate reaches the query engines. As much as possible, we want to prevent intermediate
systems from introducing queuing or delays, or any smoothing out effect in front of the engines. We change the
docker compose
configuration accordingly:
- To avoid actual disk I/O, MySQL and Redpanda are configured to use a tmpfs (RAM) volume for their backing stores.
- We use
a custom redpanda.yaml configuration
for Redpanda to turn off
developer_mode
. - In our search to reduce CPU utilization in Python while generating high message rates, we switched from the default CPython interpreter to using PyPy . This considerably reduced CPU utilization for the Python processes.
Test Platform and Test Plan
Test Platform
The following hardware and software setup was used:
- Intel Core i9-10900K (10 cores, 20 threads, 20 MiB cache, 3.7 GHz base / 5.3 GHz max freq)
- 128 GiB RAM
- Ubuntu Linux 20.04.
mbw -t0 8192
reports memory bandwidth at an average of 9611.7 MiB/s.
Test Plan
The test plan below repeats in greater detail the instructions summarized in
the README.md file
for the debezium/perf
directory.
Ensure everything starts afresh and there is no “memory” of previous runs stored in container filesystems or volumes. On the
debezium/perf
directory run:docker compose stop
docker compose down -vStart
docker compose
and save log output to a file. On thedebezium/perf
directory, run:docker compose up -d
nohup docker compose -f logs >& /tmp/perf.log < /dev/null &Wait until Debezium has finished initialization (log line stating “Keepalive thread is running” shown) and loadgen has initialized the database (log line stating “Initializing shop database DONE”).
Open a web browser tab and connect to the Deephaven web console at:
localhost:10000/ide
In the console, run the Deephaven Python script that creates the demo tables via:
exec(open('/scripts/demo.py').read())
This will create several Python top level variables, one for each table.
Each table becomes a tab in the UI showing the current table data. Right after the statement that created them, you should see one icon (displayed prominently in blue) for each variable. Clicking an icon jumps you to the respective table display.
Switch to the tab for the
pageviews_summary
table to display it. You should see a single row with columnstotal
( accumulated number of pageview events processed),max_received_at
(most recent timestamp across all pageview events), anddt_ms
(the difference in milliseconds between max_received_at and the current time). As the default Deephaven configuration updates tables once per second, an OK value of dt_ms can be anything between zero and 1,000 milliseconds. This configuration is often meaningfully lowered for other use cases, but we are sticking with defaults here.Start Materialize’s command line interface. On the
debezium/perf
directory, run:docker compose run mzcli
Inside the CLI, load the SQL code for the Materialize demo:
\i /scripts/demo.sql
You should see several lines of output stating
CREATE SOURCE
and thenCREATE VIEW
.Exit the CLI with
Ctrl-D
and run the followingwatch
command in the shell:watch -n1 "psql -c '
SELECT
total,
to_timestamp(max_received_at) max_received_ts,
mz_logical_timestamp() - 1000 * max_received_at AS dt_ms
FROM pageviews_summary;' -U materialize -h localhost -p 6875"You should see the results of the
SELECT
query above executed once per second. Note this requires the host to have thepsql
command installed, which is part of the PostgreSQL client tools package in most linux distributions (e.g. in Ubuntu 20.04 the package ispostgresql-client-12
)To allow us to track CPU and RAM utilization, on a separate shell (or terminal window) run the
top
command.You should see near the top of the processes by CPU utilization
redpanda
,materialize
,java
(this is the Deephaven engine), andpypy
. Note%CPU
andRES
(RAM resident size, in KiB if no unit indicated) for each as we start. The start rates are 3 purchases per second and 50 pageviews per second, which should create some noticeable, albeit low, CPU load, enough for our process of interest to already be showing up in top’s list.Connect to the socket command line client for the generate load script:
nc localhost 8090
We used netcat above, part of the
netcat-bsd
Ubuntu package, but you can use other tools like telnet if you prefer. Set the desired rate of pageviews while you watch top and the table fordt_ms
in both Materialize and Deephaven ( 10,000 is an example below, modify as required):set pageviews_per_second 10000
The docker compose log should show loadgen output acknowledging the change, with a message reading “Changing pageview rate from 50 to 10000 per second”.
After a few seconds (it is logged every 10), the docker compose log should show
loadgen
generating an effective rate close to the one requested, with a message reading similar to “Simulated 99007 pageview actions in the last 10.0 seconds, effective rate 9896.08/s”. Note it may take two cycles of 10 second rate logging to show the new rate as steady, since the change likely happens in the middle of a particular cycle and affects its count of messages only from that point forward.Wait a few seconds for the new rate to settle and use
top
to sample CPU (%CPU
column) and Memory (RES
column) utilization.
Test automation
Manually executing the test plan is prone to several sources of variation.
- In the default period of one sample per second, top samples vary considerably from one to the next: instantaneous CPU utilization is not a good approximation for CPU utilization over a period of time. Thus ideally one would like to take several instantaneous samples over a period and use them to approximate a value for the period.
- As time passes, the engines require more memory to store derived data for growing tables: to make an experiment run repeatable, ideally samples should be taken a given constant time after a new rate is set, which in turn should happen quickly after compose startup.
To avoid these pitfalls for manual execution, we created an automated experiment that runs with a script.
The run_experiment.sh
script in the debezium/perf
directory automates:
- startup of the containers required for a particular run (and only those)
- saving container logs
- loading the demo code in the respective engine and sampling of engine delays to a log file
- setup of a new given pageviews per second rate, and a fixed wait time thereafter for processing to settle
- sampling of CPU and memory utilization via top to a log file
- stop and reset of the containers
Example
cd debezium/perf
./run_experiment.sh dh 5000 20 10 1.0
This will run an experiment for Deephaven (tag dh
; use tag mz
for Materialize) with a target rate of 5,000 pageviews
per second. It will wait 20 seconds after setting the target rate to begin sampling CPU and memory utilization
using top
in batch mode. 10 samples will be obtained, with a delay of 1.0 seconds between samples .
Performance Comparison
Summary of automated results
The table below shows the mean CPU utilization and the max memory utilization for a period of 10 seconds, starting after 20 seconds a new pageviews per second rate was set. Every run is started afresh, and only one of the two engines is running.
pageviews/second | Materialize CPU | Deephaven CPU | Materialize Mem | Deephaven Mem |
---|---|---|---|---|
50 | 30.6% | 8.3% | 0.16 GiB | 1.30 GiB |
5,000 | 179.6% | 33.1% | 0.57 GiB | 1.70 GiB |
35,000 | (1) 848.5% | 82.2% | 2.80 GiB | 2.60 GiB |
100,000 | (2) 848.4% | 96.2% | 7.00 GiB | 4.60 GiB |
145,000 | 149.5% | 5.20 GiB | ||
160,000 | 174.6% | 5.90 GiB | ||
200,000 | (3) 240.0% | 6.70 GiB |
- We reconfigured Materialize to run with 8 workers.
- Materialize is falling behind at this point, as evidenced
by
logs/2022.03.22.05.04.28_UTC_mz_100000/mz_sample_dt.log
. - The Deephaven update cycle is taking longer than 1 second, as logged
in
logs/2022.03.22.05.09.52_UTC_dh_200000/docker-compose.log
.
You can download the logs produced for these runs here.
Details for some manual runs
Note the results below were obtained manually, which is prone to issues mentioned in the previous section. Still, we
believe it is useful to display some top
screen captures and videos recorded for a couple of cases.
On startup with the initial 3 purchases per second and 50 pageviews per second:
- Materialize consumes 229 MiB RAM and 46% CPU.
- Deephaven consumes 1.8 GiB RAM and 3% CPU.
Increasing pageviews per second to 5,000:
- Materialize consumes 899 MiB RAM and 168% CPU.
- Deephaven consumes 2.6 GiB RAM and 47% CPU.
The output from top oscillates quite a bit at this point. A later reading a minute later is a bit more stable (but there are still oscillations):
- Materialize consumes 1.2 GiB RAM and 157% CPU.
- Deephaven consumes 3.0 GiB RAM and 18% CPU.
Note as time passes, both engines need to increase their memory consumption given the accumulation of data ontables that grow as new events arrive.
Increasing pageviews per second to 35,000:
- Materialized consumes 4.8 GiB RAM and 244% of CPU.
- Deephaven consumes 5.1 GiB RAM and 92% CPU.
We note the watch command for the
psql SELECT
query withdt_ms
for Materialize is refreshing in jumps of several seconds. We suspect Materialize is running short of CPU given the default configuration of -w2 with 2 workers.Deephaven meanwhile is still refreshing consistently every second.
You can get more details about the Deephaven Update Graph Processor refresh cycle in the docker-compose log for Deephaven server, in log lines reading
Update Graph Processor cycleTime=...
.At this point, we stop the test and reconfigure Materialize with 8 workers, by modifying the
.env
file in thedebezium/perf
directory and settingMATERIALIZE_WORKERS=8
, restart from the beginning of the test plan, and set pageviews per second to 35,000. With 35,000 pageviews per second and 8 workers, we see Materialize consumes 4.1 GiB RAM and 829% CPU.Updates to the watch psql command are refreshed once per second, so with 8 workers Materialize is keeping up.
Restarting with 100,000 pageviews per second:
- Materialize consumes 9.9 GiB RAM and 801% CPU.
- Deephaven consumes 5.8 GiB RAM and 200% CPU.
We are still running Materialize with 8 workers here, which probably explains why CPU utilization stays around 800%.
Talk to us
We encourage you to explore our GitHub code and to draw your own conclusions. We look forward to feedback, insight, and curiosity from the community.
Notes
With respect to memory utilization, at high message rates (say, 200k msg/sec), it does not take more than a few minutes under the default Deephaven configuration for the Deephaven engine to run out of memory. As our engine is Java based, we need to define a maximum heap size on startup, and our defaults set that to 4 GiB in our core repo, and 12 GiB in this demo. To facilitate longer testing this is easily modified: in the
.env
file on thedebezium/perf
directory, setDEEPHAVEN_HEAP
to the desired value; this translates to the value used for thejava
command-Xmx
option. This does not impact initial memory allocation, but provides a hard limit on total memory allocation. Although a limitation here for our current purposes, in production deployments with shares resources this is helpful. You can monitor actual memory utilization throughtop
or inside the DH web console using theprocessMemory
table from thePerformanceQueries
package, as follows:import deephaven.PerformanceQueries as pq
procmem = pq.processMemory()This will create a table that updates very 15 seconds with memory statistics for the Deephaven engine process, including percentage of time spent in Garbage Collection.