Incremental or High Water Mark data Loading

Incremental or High Water Mark data Loading


Overview

It is a common use case to retrieve regular batches of data from a data source where the source table has a last updated time-stamp or other incrementing key. This makes it easy to retrieve just the data that has been updated.

In this article, we'll examine one way that we can easily perform incremental loads using data staging connectors inside Matillion ETL. The basic concept of this method is given in this below diagram. It consists of two jobs that are run once to initialise the incremental load, then two jobs that are run regularly to perform the incremental load itself.

  • Load: An initial load of the entire data set into a table.
  • View: Creating a view that always contains the datetime of the most recent record update.
  • Incremental: The incremental load job that will pull in newly-updated data only.
  • Update: A Transformation Job that is called by Incremental that is responsible for updating the target table.

Each of these jobs is described in detail in the following sections. Note that we are using Salesforce Query for our incremental load as an example but the advice in this article is equally relevant to any of Matillion ETL's 'query' components. Similarly, we will be using a DateTime value to judge the recency of a record and will be referring to DateTime throughout the article but plain digits as incrementing keys will work just as well.


Initial Load

To set up for our incremental load, we're going to first need to do a full load of the data into a table. In this example, we'll be using the Salesforce Query component to pull in the Knowledge__kav data source into a table called sf_articles with all columns but the advice in this article is common to any of Matillion ETL's 'query' components.

Run this job to create your table and load the initial data. It may be wise to rerun this job for a second table that we will use to stage incremental data loads as it will ensure the table metadata matches your target table's. In this example we use sf_articles as our main table and sf_articles_stg as the staging table.



Recent Update View

Next we need to find the most-recent record from that table and create a view to keep an eye on that value even if the table data changes. In a Transformation Job, use a Table Input Component to bring in your table (sf_articles in this example) but select only a datetime column that indicates the record's updated time. In this case, we've chosen the SystemModstamp column.

Now link it to an Aggregate component. Add a single Aggregations entry for the chosen column with the Aggregation Type as "max". This will result in a single value remaining - the most recent update value. You can check this is the case with the Sample tab on the Aggregate component.

Finally, we want to connect this to a Create View component. Name the view something memorable and then run the job to create this view.

We now have a table full of data and a view that has a single record and value - the datetime of that table's most recent update. We have named our view sf_article_recent for this example and we'll be using it in the Incremental Load Job section.



Update Job

We'll now create a small Transformation Job that will serve to update your table. This is simply a Table Input component connected to a Table Update component.

The Table Input component should be pointed to the staging table previously mentioned in the Initial Load section. Remember that this should match your target table's metadata. In this example, it is pointing at the sf_articles_stg table.

The Table Update component should be pointed to your main table. In this example, it is updating sf_articles.

We now just need to organise a job that will populate our staging table with recently-updated rows.



Incremental Load Job

Finally we're at the point where we can design our incremental load job. This utilises the same Query component we did our initial load with but with some differences:

  • The Target Table property should point to our staging table. In this example it is pointed to sf_articles_stg.
  • We have connected a Table Iterator component to the Query component.
  • The Query component should be set to 'Advanced Mode' so we can use some custom SQL.
  • The job now ends by being linked to a Run Transformation component that points to the Update job previously described.

Job Variables

We need to declare a Job Variable to hold the datetime of our most recently-updated field. To do this, right-click your job in the Project Explorer and click Variables → Manage Job Variables and create a new variable of DateTime type with a memorable name. It definitely helps to provide a default value here, which needn't be particularly functional. An example is given below that can be copy/pasted.

2020-03-01 10:48:59.0

Table Iterator Component

The Table Iterator component holds the key to this setup. The Table Name property is set to point at the View we created earlier. Our view has only 1 record so we'll only be doing a single iteration. If we use the Column Mapping property to map our single column (with single record) to a variable then we can pass its value as a variable to the Query component. In this example, we've named the variable recent.

Query Component

Ensure the Query component is it to 'Advanced Mode' and has your staging table as its Target Table. For the custom SQL, we simply want to load the entire data source but only records that are 'greater than' the date in our variable. This will work for finding more recent DateTimes as well as more generic incrementing keys. The below example should be viewed with the knowledge that:

  • Our example data source is Knowledge__kav
  • Our example DateTime column is SystemModstamp
  • Our example Job Variable is recent
SELECT * FROM "Knowledge__kav" WHERE SystemModstamp > '${recent}'

The generic form being:

SELECT * FROM "" WHERE <DateTime Column> > '${<variable>}'



Scheduling

If the above is completed then we already have a full table of initial data and a view that always contains that data's most-recent record update DateTime. We've harnessed a Table Iterator component to read that DateTime and pass it to a Query component that uses it to filter future data loads so we only get records NEWER than that. These are placed into a staging table which is then used to update our complete data table.

You are, of course, welcome to run this manually but commonly we'd want to schedule this job to run regularly and keep our table up to date automatically. This is as simple as choosing the Incremental Load Job when setting up a Schedule in Project → Manage Schedules. See the article on Managing Schedules for more information.



Incremental Loading Video Guide