Skip to main content
  1. Posts/

Hidden Complexities of Distributed SQL

·1593 words·8 mins
Asaf
backend distributed search core
Author
Asaf
Team Lead
Table of Contents

Introduction
#

Query planners are a cool piece of software that exists in every database or SQL engine out there. You give it a query like:

SELECT user_id, COUNT(*)
FROM events
GROUP BY user_id;

And the planner’s job is to figure out the most efficient way to run this query. That involves:

  • Understanding the structure and statistics of the underlying data
  • Applying transformation rules to optimize the query plan
  • Choosing between different join algorithms, aggregation strategies, and execution orders
  • Splitting work across nodes while minimizing data shuffling

It all starts with a logical plan (what the query means), which is then turned into a physical plan (how to actually execute it).

What is a distributed SQL engine, and why do I need it?
#

Querying data across multiple storage systems can be useful. Today, organizations often have data scattered across various sources. Multiple databases, data lakes, and other platforms that analysts want to query for insights.

To solve this problem, we can use a distributed SQL engine. It connects to the underlying storage systems - whether databases like ClickHouse or PostgreSQL, data lakes such as Parquet files on S3 or Snowflake, or even analytics engines like Splunk or Elasticsearch.

Let’s start by taking the simplest example, we have a single PostgreSQL with one table that holds our security logs:

 | timestamp  | username | email        |
 | ---------- | -------- | ------------ |
 | 1743522546 | John Doe | [email protected] |
 | 1743522650 | John Doe | [email protected] |
 | 1743522667 | John Doe | [email protected] |
 | 1743522689 | John Doe | [email protected] |
 | 1743522700 | John Doe | [email protected] |

And let’s say we are using some JDBC client to query our data. We want to retrieve all the logs from PostgreSQL where the username = 'John Doe':

SELECT *
FROM security_logs
WHERE username = 'John Doe'
ORDER BY timestamp
LIMIT 100;

This JDBC client can be “dumb” here - simply taking our query, sending it to PostgreSQL, parsing the results, and displaying them to the user.

Now suppose we have another database - ClickHouse, for example, and our JDBC client supports connecting to both PostgreSQL and ClickHouse servers, allowing us to query both in the same query. Let’s add ClickHouse to the query. It contains the same security logs, so we can use UNION ALL to query both sources. This is where things get trickier.

SELECT *
FROM
(
  SELECT * FROM pg.security_logs
  UNION ALL
  SELECT * FROM ch.security_logs
)
WHERE username = 'John Doe'
ORDER BY timestamp
LIMIT 100;

If we take the naive approach of simply querying the two databases with this query and combining the results in the client, we’ll end up with 200 results that are not ordered.

So, if we think about it, we need the client to “understand” the query, not just act as a “dumb” client. The client needs to parse the SQL query, interpret it, and generate a plan. These are the challenges that SQL query engines and query planners are designed to solve.

So let’s take the example above:

SELECT *
FROM
(
  SELECT * FROM pg.security_logs
  UNION ALL
  SELECT * FROM ch.security_logs
)
WHERE username = 'John Doe'
ORDER BY timestamp
LIMIT 100;

A query planner will parse this SQL to AST, apply optimizations to this AST, and convert this AST to a plan:

- Output
  - Limit (100)
    - Sort (timestamp DESC)
      - Filter (username = 'John Doe')
        - Load Data
          - ch.security_logs
          - pg.security_logs

What are we seeing here? This is a classic plan for the query runner. Lets look at it from the bottom. First, we need to load data from both tables - pg.security_logs and ch.security_logs. Then, we apply the filter username = 'John Doe', sort the results by timestamp, limit them to 100, and finally, output the results.

This is the most naive plan we can think of, and it has many problems. The biggest one? - its slow. Its slow because we have to download all the data from pg.security_logs and ch.security_logs into the memory of our query engine. If those tables are large, this will take a lot of time, and it might not even be feasible due to memory and disk constrains.

Optimizations
#

But let’s assume, somehow, we could magically download all the data to our machine quickly. Now we need to filter on username = 'John Doe'. But our query runner doesn’t have an efficient way to do this, so we’re stuck doing it in O(n). Looping over every row and checking if it matches our filter. But wait, why do we even need to download all the data? PostgreSQL and ClickHose already support efficient filtering on columns. We can ask PostgreSQL and ClickHouse to give us the data already filtered. So, we’ll change the plan to this:

- Output
  - Limit (100)
    - Sort (timestamp DESC)
      - Load Data
        - pg.security_logs (WHERE username = 'John Doe')
        - ch.security_logs (WHERE username = 'John Doe')

Nice! We’re now pulling only the rows we need, we’ve minimized the I/O time, and we also we don’t need to filter in memory, since PostgreSQL and ClickHose handles it efficiently for us.

What we did here is called predicate pushdown. We “pushed down” the Filter to the data source. But we still have some issues. If we only want the top 100 logs based on the timestamp, we should pull only those from each table. That means pushing down the Sort + Limit to the data sources.

Sort + Limit is commonly referred to as TopN, because we’re only interested in the top n rows from the table based on a specific ordering. However, this time we do need to keep the Sort + Limit step in memory as well, because if we push it down to the individual data sources, we’ll get 100 results from each one, but they won’t be globally ordered. So after each data source gives us 100 rows, we need to sort them again and apply the final limit to 100. Now our plan looks like this:

- Output
  - Limit (100)
    - Sort (timestamp DESC)
      - Load Data
        - pg.security_logs (WHERE username = 'John Doe' ORDER BY timestamp DESC LIMIT 100)
        - ch.security_logs (WHERE username = 'John Doe' ORDER BY timestamp DESC LIMIT 100)

And now, we have the most efficient plan to run this query, we’re pulling only 100 rows from each data source, and the ordering and limiting step is fast.

predicate pushdown plan nodes from the original plan can significantly improve query performance. In the query planner, we have an optimizer that applies these kinds of optimizations during the planning phase of the query engine.

Usually, this is done in an iterative fashion. the planner tries to push down filters or modify the query plan repeatedly until no further improvements can be made.

Now, let’s look at another similar example, this time with a GROUP BY (aggregation) query. In a traditional single - source query engine, this is straightforward: read the rows, group them in memory or with temporary disk spill, and aggregate. But in a multi-sources system, we need a different approach.

Let’s say we now want to query from the two tables how many events happened for each user. Imagine this query:

SELECT username, count(*)
FROM (
  SELECT * FROM pg.security_logs
  UNION ALL
  SELECT * FROM ch.security_logs
)
GROUP BY username;

We will get following the following plan:

- Output
  - Project (username, count(*))
    - Aggregation (group by = username, function = count())
        - Load Data
          - pg.security_logs
          - ch.security_logs

Once again, we face the same problem - loading all the data from the tables and aggregating it in memory. So, let’s push down the aggregation instead:

- Output
  - Project (username, count(*))
    - Load Data
      - pg.security_logs (group by = username, function = count())
      - ch.security_logs (group by = username, function = count())

But we still need to merge the results from the two aggregations. However, this merge step shouldn’t use the count() aggregation function again - it needs to use sum() because we want to combine the counts from each group.

So, the final plan will be:

- Output
  - Project (username, count(*))
    - Aggregation (group by = username, function = sum())
      - Load Data
        - pg.security_logs (group by = username, function = count())
        - ch.security_logs (group by = username, function = count())

The dcount dilemma
#

Counting distinct elements is one of the trickiest operations in distributed SQL query engines.

Say you want to run:

SELECT COUNT(DISTINCT username) FROM (
  SELECT * FROM pg.security_logs
  UNION ALL
  SELECT * FROM ch.security_logs
);

Unlike a simple COUNT, you can’t just sum up the distinct counts from each plan node:

pg.security logs has {user1, user2, user3} → count = 3

ch.security logs has {user2, user3, user4} → count = 3

Naively summing gives 6, but the correct answer is 4.

Try to think of a solution to this problem :)
How would you solve it?

Wrapping Up
#

So now we understand how we can efficiently query multiple data sources from a single place.

There are several cool opensource projects that implement such query planners:

  • Trino (or Presto)
  • Apache DataFusion
  • Apache Calcite (Used by Apache Flink)

Of course, many challenges still remain. How we can perform JOIN efficiently across multiple data sources? How can we work with data that is unstructured? Like logs that are in format of JSON format - or sometimes just random strings like raw syslogs:

Tue Mar 04 15:57:06 2020: <14>Mar  4 15:53:04 BAR-NG-VF500 BAR-NG-VF500/box_Firewall_Activity:  Info     BAR-NG-VF500 Remove: type=FWD|proto=UDP|srcIF=eth1|srcIP=192.168.70.7|srcPort=38686|srcMAC=08:00:27:da:d7:9c|dstIP=8.8.8.8|dstPort=53