Regarding the data operation chaining, you may have read Yassine's article on the break task. It describes how you can adapt the tasks chaining of a table-to-table data operation depending on conditions described in a SQL assertion file. Here, you'll see how you can also check conditions to choose if you should trigger a whole data operation or not.
Since the 1.3.8 Tailer SDK version, you can specify the criticality of a task as "stop". This new flag allows to build complex data pipelines that are triggered on some conditions.
In this tips and tricks post, we'll show you how to write such tasks, and some frequent use cases that can improve your configurations.
Conditional Task
Let's say you have some data tables that are the multiple sources of another table. All of the source tables can be updated anytime, and you may want to launch the destination table's update only when all of the data source have been updated.
In the previous version of Tailer, you had to either launch your update job everytime one of the source table is updated, or schedule it at some (precise) arbitrary time and hope to have that timing synchronized with the source updates.
A "stop" SQL task can be written in a table-to-table data operation to check if all the sources are up to date. No matter if the task fails or not, the run status will be success. But if the task fails, then the run won't trigger any downstream operation.
This means you can build workflow configuration based on this "stop" data operation, and your data pipeline will be triggered only when your conditions are met.
Usage
Let's say you have 2 source tables, ecom_sales and stores_sales, and you want to merge them into an agg_sales destination table. The data operation may be time consuming, (or you may have more than 2 source tables) and you may want to execute the aggregation once a day, when the sources are up to date.
With previous versions of Tailer, you had to write several workflow configurations and launch the update everytime a source is updated. You add a pipeline like this:
update_ecom_sales >> update_agg_sales
update_stores_sales >> update_agg_sales
And update_agg_sales was launched twice a day. And of course, even more if you have more source tables.
Now, you can just launch a cheap condition check everytime a source table is updated and launch the intensive update only when needed:
update_ecom_sales >> assert_sales_completeness
update_stores_sales >> assert_sales_completeness
assert_sales_completeness >> update_agg_sales
How?
First, you need to write a conditionnal operation:
`{
"configuration_type": "table-to-table",
"configuration_id": "assert_sales_completeness",
[...]
"workflow": [
{
"id": "assert",
"short_description": "check conditions",
"sql_file": "assert_sales_completeness.sql",
"criticality": "stop"
}
]
}`
With a SQL file that contains assertions that checks if you have sales on yesterday's date in both source tables:
ASSERT
(select max(date) from ecom_sales) >= (current_date - 1)
AND (select max(date) from stores_sales) >= (current_date - 1)
as 'KO: at least one source table is not up to date'
Then, you create a worklfow configuration that launches the agg_sales update triggered on the "assert_sales_completeness". You also need to update your previous workflow to trigger the assert_sales_completeness operation after each source table update. For example, worklfow like this:
{
"configuration_type": "workflow",
"configuration_id": "trigger_update_agg_sales",
"authorized_job_ids": [
"gbq-to-gbq|assert_sales_completeness_PROD"
],
"target_dag": {
"configuration_type": "table-to-table",
"configuration_id": "update_agg_sales"
},
}
And that's it. You have an assert_sales_completeness operation that checks condition. It's status is always success, and the agg_sales is updated only when needed.
SQL of "stop" tasks
We've seen a basic example but you can have much more interesting SQL assertions. Here are some patterns you may find useful.
-- Launch if sources are up to date OR if current_time > threshold
You can then schedule your assert_sales_completeness at 11:30 with a "schedule_interval": "30 11 * * *" and check if it's past time (and if the job has not already been launched)
`ASSERT
( -- check if source tables are up to date
(select max(date) from ecom_sales) >= (current_date - 1)
AND (select max(date) from stores_sales) >= (current_date - 1)
)
OR
( -- current_time >= threshold and sources table are still not up to date
current_time >= "11:00:00"
AND not(
(select max(date) from ecom_sales) >= (current_date - 1)
AND (select max(date) from stores_sales) >= (current_date - 1)
)
)
as "KO: at least one source table is not up to date, and it's not 11 yet OR it's past 11 and source tables are updated"`
-- Launch based on source last update (instead of a date field)
You can check table's last update time instead of a date field using my_project.my_dataset.__TABLES__
metadata data set:
`` ASSERT
(
(
SELECT MIN(last_execution) FROM
(
SELECT DATETIME(TIMESTAMP_MILLIS(last_modified_time)) as last_execution FROM `my_project.my_dataset.__TABLES__`
where table_id="my_source_table"
UNION ALL
[...]
)
)
> (
SELECT DATETIME(TIMESTAMP_MILLIS(last_modified_time)) as last_execution FROM `my_project.my_dataset.__TABLES__`
where table_id="my_destination_table"
)
)
OR
(
( CURRENT_TIME("Europe/Paris") > TIME(09, 59, 59) )
AND
(SELECT DATETIME(TIMESTAMP_MILLIS(last_modified_time)) as last_execution FROM `my_project.my_dataset.__TABLES__`
where table_id="my_destination_table" ) < CURRENT_DATE()
) ``
There are many many more use cases that can be written with "stop" tasks:
- Skip week ends (by using the day of the week)
- Triggering an execution based on the number of elements to process (row count of a table or number of table suffixes)
- Composite conditions
- A/B Task type
The sky's (sql's :D ) the limit. Feel free to share your use case in the comment section below.