Multithreaded Execution Model for Queries With ORDER BY and LIMIT Clauses
This small post continues the previous post dedicated to multithreaded query execution in databases with column-oriented storage format. This time we'll consider queries like the following:
SELECT SearchPhrase
FROM hits
WHERE SearchPhrase IS NOT NULL
ORDER BY SearchPhrase
LIMIT 10;
It returns the top 10 rows sorted by non-NULL SearchPhrase
column values in ascending order. The query is taken from the ClickBench benchmark.
In the case of large datasets, such queries can be efficiently executed on multiple threads. This requires several stages:
The "query owner" thread divides the dataset into small enough frames. The maximum size for each frame varies in the range of 10K-1M rows in popular analytical databases. Say, DuckDB uses 10K frames while in QuestDB they're up to 1M rows in size. Then it publishes tasks containing the frames in an SPMC queue. The tasks should also contain data required to execute the filter (
WHERE SearchPhrase IS NOT NULL
in our case).Worker threads pick up the tasks from the queue and process them. First, they apply the filter. This step is optional as there may be no filter in the query. Then they try adding filtered rows to a sorted data structure, like min/max heap or R/B tree. The data structure may contain row IDs, in case the storage format supports random access, or materialized columns returned by the query, in case random access isn't possible due to data compression. As a result of a task execution, the data structure contains up to the top 10 rows. Each worker thread should have its instance of the data structure and use it when handling all tasks belonging to the given query.
Once all tasks are processed, the "query owner" thread needs to merge the worker "top 10 rows" data structures into a single one, so that it can return the result set back to the client. This can be done eagerly, e.g. the "query owner" thread may iterate through all worker data structures and put their rows into its data structure, or lazily via the k-way merge algorithm.
The 1st and 3rd stages above are serial, but they assume very little work while the main body of work is done in the 2nd stage. Thanks to this, this execution model scales very nicely with the number of worker threads and CPU cores.
I'm interested in learning more about analytical query execution models and not only, so if you have anything to share, don't hesitate to write a comment. Have fun coding and see you next time.