This is part 2 of a two part article on the evolution of data orchestration. In part 1, we discuss the past and present of data orchestration tools and summarised limitations with the current data orchestration patterns. I recommend reading “The evolution of data orchestration: Part 1 - the past and present” before reading this article.
Gen 3 data orchestration (the future): the distribution centre
What is it?
Each node has a defined upstream condition that must be evaluate True
before the node can run.
Rather than relying on a time-based trigger (e.g. 00:00 daily) or sensor-based trigger (e.g. new file landed in S3), the trigger mechanism for Gen 3 uses a condition-based trigger.
A simple condition:
# task_2_condition.yml
count: 60
period: minute
check_frequency: */5 * * * * # cron: every 5 minutes
sources:
- name: task_1
loaded_at_field: table_1.loaded_at
In the example above, let’s assume that Table 1 has fresh data flowing into it every 15 minutes.
Let’s define some backend fields:
current_cursor_value
: thecurrent_cursor_value
is calculated by takingmax(loaded_at_field)
.last_loaded_at
: thelast_loaded_at
refers to the last successful trigger of Task 2.loaded_at
timestamps are persisted as a column in Table 2, andlast_loaded_at
is calculated by takingmax(loaded_at)
.
The window of data to process is calculated by taking: last_loaded_at
- current_cursor_value
. Task 2 only runs if Table 1 has 60 minutes of Time Window of data to process.
If the value is greater or equal to 60, then Task 2’s condition evaluates to True
and Task 2 is triggered. The condition for Task 2 is evaluated every 5 minutes.
Multi-conditions
Let’s picture a scenario where we are Task 3, and we have two upstream dependencies.
The condition:
# task_3_condition.yml
count: 60
period: minute
check_frequency: */5 * * * * # cron: every 5 minutes
sources:
- name: task_1
loaded_at_field: table_1.loaded_at
- name: task_2
loaded_at_field: table_2.loaded_at
Data is loaded into Table 1 in 15 minute intervals, and into Table 2 in 60 minute intervals.
Task 3 will run only if Table 1 and Table 2 have 60 minutes of unprocessed Time Window.
A good way of visualising this is through the classic game Tetris.
A distribution centre
This is similar to the approach of a distribution centre. Packages arrive from multiple sources, however each package is not shipped to a customer immediately. Instead, the distribution centre waits for more packages to arrive before sending a batch of packages at once to maximise delivery. In our context, each node is its own distribution centre, and checks for upstream dependencies before triggering itself.
Why is this useful?
Prevents joins on stale data
Upstream tables can be updated at different intervals. If one of the upstream tables did not have fresh data when running the current node, then it would result in semi-stale data being produced.
Loose coupling and independent sync frequency
This design pattern results in loose coupling between the nodes. Each node will execute as long as its upstream conditions evaluate True
.
This means that each node has its own execution frequency, which allows each node to be executed independently from other nodes at varying intervals.
This is in stark contrast to the mono-dag where the entire DAG is triggered on one schedule (e.g. 00:00 daily).
Faster build and test times
This solves the problem of having a physical mono-dag which can have very long build and test times, as now each node can be built and tested based on the node’s immediate upstream dependencies. A logical mono-dag is still useful for checking that there are no cyclical dependencies between all nodes, and for visualising the lineage graph.
Uses critical path
This approach certainly removes the need to have a DAG of DAGs, which results in nodes not being executed using the critical path.
Limitations and mitigations
Random completion times
Unlike the mono-dag approach, each node does not have a set start time or estimated completion time. Each node will execute as and when its conditions are met. To help users be aware of when data was last refreshed, a loaded_at
timestamp would need to be included in each node’s table. This will allow consumers to use the max(loaded_at)
timestamp in their dashboard so business users are aware when was the data last loaded.
Infinity loops
How do we prevent infinity loops if an upstream node has stopped refreshing? To prevent infinity waiting loops, we need to check for data freshness of the current node and return an error if the data freshness exceeds a certain threshold.
# task_3_condition.yml
count: 60
period: minute
check_frequency: */5 * * * * # cron: every 5 minutes
freshness:
warn_after:
count: 120
period: minute
error_after:
count: 180
period: minute
sources:
- name: task_1
loaded_at_field: table_1.loaded_at
- name: task_2
loaded_at_field: table_2.loaded_at
In this example, if the current node was not refreshed for 120 minutes, a warning will be thrown and the node owner will be notified. After 180 minutes, an error will be thrown, the node owner will be notified, and the check frequency paused until the error is fixed to save on compute.
Conclusion
This is a purely theoretical blog post based on patterns inspired from the software engineering world.
This pattern has been blogged about or discussed in one way or another in other blogs too.
I’d be keen to hear your thoughts on this proposed approach.
Have you considered queues/events Kafka style? A DAG can at any time raise events (pt 1 is complete!) and downstream DAGs can be triggered by that event. Critically, to ensure decoupling, you don’t want downstream DAGs to be checking conditions (because then they have to know stuff about the upstream DAG), instead, have the upstream DAG issue the event to a queue (“I’ve made 1000 new rows!”) and let the downstream one respond as a subscriber to that message
I love your writing; thanks for sharing! Very insightful and had lots of good images. I wrote about similar things, where I checked trends in orchestration with a declarative approach. In case of interest, I leave the link here: https://airbyte.com/blog/data-orchestration-trends.