Using ClickHouse at Flare to aggregate data
The word is out: we're working on performance monitoring, Flare's most significant feature yet. One of the challenges we've tackled in the previous months was storing all this performance data generated by our customers' applications.
An application has multiple entry points. Think of requests, jobs, or commands within those entry points. Many things happen within those entry points, like executing queries, rendering views, making external HTTP calls, etc. We call each unit where something happens (including the entry points) a span.
Spans have a start and end time (and thus a duration). They can be children of other spans, like a query happening during a request, and they can be running on different servers at different times, for example, when dispatching and running a job on separate servers.
Ultimately, we want to aggregate these spans into digestible pieces to show, for example, which routes are slow and which queries might be the reason.
To do that, we need to process massive amounts of data quickly. While more traditional databases like MySQL and PostgreSQL allow us to store and query data quickly, they aren't that useful when storing and querying time series data. That's where ClickHouse comes in. A column-store database allows us to digest and query lots of data blazingly fast.
One of ClickHouse's significant advantages is its unique table structures, like the AggregatingMergeTree, which allows us to store all sorts of statistics efficiently about the duration of spans aggregated per minute. Let's take a look at an example.
Say we have the following spans:
Span | Duration | Time |
---|---|---|
A | 500ms | 12:55:13 |
B | 200ms | 12:55:22 |
C | 150ms | 12:56:10 |
D | 200ms | 13:10:05 |
Our aggregated spans table per minute now would look like this:
Time | Spans |
---|---|
12:55 | A, B |
12:56 | C |
13:10 | D |
Querying all the spans between 12:00 and 13:00 now only requires reading 2 rows instead of 3. That might not seem like a massive improvement, but if you're ingesting +100 spans per minute, that's much less data to read!
While it is technically possible to implement this functionality with traditional databases, it requires a lot of code, caching, and time. The cool thing is that ClickHouse provides this out of the box and even more!
Let's calculate the average duration of the spans in the table. In a more traditional database, we would add a column to the table containing the average duration:
Time | Spans | Average |
---|---|---|
12:55 | A, B | 350ms |
12:56 | C | 150ms |
13:10 | D | 200ms |
If we want to query the average duration at 12:55, we get 350ms, which is correct. But what if we want to query the average duration for all spans between 12:00 and 13:00?
A naive approach would be summing the averages and dividing that number by two. But that's not correct:
- Naive average approach: (350 + 150) / 2 = 250
- Correct average approach: (500+200+150) / 3 = 283
So, to correctly calculate the average, we need the input of the average function instead of the outcome we stored in the database. ClickHouse solves this problem using AggregateFunctions; they allow you to call mathematical functions on data, but instead of storing the outcome, they store the input as a binary blob. When you need the outcome, call the mathematical function again, this time on the binary input data, and get a number as output.
If we would try to represent this in our demo table, it would look like this:
Time | Spans | AggregateFunction(average) |
---|---|---|
12:55 | A, B | 500ms, 200ms |
12:56 | C | 150ms |
13:10 | D | 200ms |
SQL side of things
To implement such an operation, we first need a table with the spans we've received:
CREATE TABLE spans
(
span_id String,
duration UInt64,
time DateTime
)
ENGINE = MergeTree()
ORDER BY time;
This schema differs from the schemas we're used to from MySQL and Postgres. Let's quickly go through it!
- We create a table
spans
with three columns:span_id
,duration
, andtime
- Each ClickHouse table requires an engine type. We're using a MergeTree engine, which behaves kinda like a more traditional SQL table
- The
ORDER BY time
clause defines how ClickHouse will store and index data
Let's create our aggregated_spans
schema where we'll store the aggregate grouped by time:
CREATE TABLE aggregated_spans
(
time DateTime,
spans AggregateFunction(groupArray, String),
average AggregateFunction(avg, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY toStartOfMinute(time);
In this schema:
- We create a table
aggregated_spans
with three columns time
is a plaintext column representing the datespans
is an array of strings representing the span ID, but instead of storing the string itself, we store the input of a group function so that in the end, we can get an array of all the span IDsaverage
stores the input of the average function, as we discussed earlier- Since we're aggregating data, we're using the AggregatingMergeTree table engine
- The
ORDER BY time
clause again tells ClickHouse how to store and index data; it is also the column by which we'll aggregate the data. Since we want to do this on a minute basis, we're stripping the seconds for every datetime.
We're using a materialized view to get data from the spans
table into the aggregated_spans
table. It can be compared to a trigger that listens to inserts on the spans
table; when an insert happens, the materialized view query will be executed and insert rows in the aggregated_spans
table. We create it as such:
CREATE MATERIALIZED VIEW aggregated_spans_mv TO aggregated_spans AS
SELECT
toStartOfMinute(time) AS time,
groupArrayState(span) AS spans,
avgState(duration) AS avg_duration
FROM spans
GROUP BY time;
Did you notice the groupArrayState
and avgState
functions? Since we don't want to store the output of the groupArray
or avg
functions but their input, we suffix state
to these functions to do that.
Inserting spans can be done just like you would expect:
INSERT INTO spans VALUES
('A', 500, '2024-03-14 12:55:13'),
('B', 200, '2024-03-14 12:55:22'),
('C', 150, '2024-03-14 12:56:10'),
('D', 200, '2024-03-14 13:10:05');
The materialized view will automatically act and insert 4 rows into aggregated_spans
. If we would now query aggregated_spans
like this:
SELECT * from aggregated_spans
Then we'll get the following rows:
Time | Spans | Average |
---|---|---|
12:55 | ������� | ������� |
12:56 | ������� | ������� |
13:10 | ������� | ������� |
As you can see, the first two rows are aggregated instead of having four rows within our initial data. The values of the span IDs and average are not yet what they should be since they're represented as a binary blob. Let's fix that by adjusting our query:
SELECT
time,
groupArrayMerge(spans) AS spans,
avgMerge(avg_duration) AS avg_duration
FROM aggregated_spans
GROUP BY time
ORDER BY time;
Let's quickly go through this query:
- We'll select the time on which we also group the rows to get the data on a minute basis
- We run the
groupArray
andavg
functions, but this time, we suffix them withmerge
, meaning we provide input values as a binary blob of values instead of raw numbers - In the end, everything will be sorted by the time
The output now looks like this:
Time | Spans | Average |
---|---|---|
12:55 | A,B | 350 |
12:56 | C | 150 |
13:10 | D | 200 |
Looking great! As a last test, can we get the correct average if we combine rows together to achieve, for example, an average per hour? A query for such data would look like this:
SELECT
toStartOfHour(time) as time,
groupArrayMerge(spans) AS spans,
avgMerge(avg_duration) AS avg_duration
FROM aggregated_spans
GROUP BY time
ORDER BY time;
Notice that we'll now select toStartOfHour(time)
instead of time
. The results now look like this:
Time | Spans | Average |
---|---|---|
12:00 | A,B,C | 283 |
13:00 | D | 200 |
Since we're calculating the functions on the spot based on the query, our average is now correct.
Conclusion
ClickHouse was essential to crunch massive amounts of data without breaking a sweat. Next time, we'll examine how we're running queries against ClickHouse in Laravel and how to run these queries concurrently in batches.
See ya!