{"id":103280,"date":"2024-08-15T18:50:00","date_gmt":"2024-08-15T18:50:00","guid":{"rendered":"https:\/\/www.red-gate.com\/simple-talk\/?p=103280"},"modified":"2024-08-06T00:40:16","modified_gmt":"2024-08-06T00:40:16","slug":"change-data-capture-pipeline-automation-in-snowflake","status":"publish","type":"post","link":"https:\/\/www.red-gate.com\/simple-talk\/databases\/snowflake\/change-data-capture-pipeline-automation-in-snowflake\/","title":{"rendered":"Change Data Capture Pipeline Automation in Snowflake"},"content":{"rendered":"<p>This article offers technical guidance on automating updates and tracking changes of data examples using Snowflake&#8217;s CDC capabilities. The examples will be using healthcare type data, but the basic tenets apply in most use cases.<\/p>\n<h2>Introduction<\/h2>\n<p>This article discusses how to use <a href=\"https:\/\/docs.snowflake.com\/en\/user-guide\/streams\">Snowflake&#8217;s streaming<\/a> features to implement <a href=\"https:\/\/en.wikipedia.org\/wiki\/Change_data_capture\">Change Data Capture (CDC)<\/a>. Using the framework of managing healthcare patient records, we will look at how to update current records, make a capture log table, and automate the process using Snowflake&#8217;s task functionality.<\/p>\n<p>In healthcare, it&#8217;s important to have the most recent data available. Using Snowflake&#8217;s CDC, we can automatically record every change made to data in typical tables<strong>.<\/strong> This method involves setting up stream objects, Snowflake\u2019s features, that capture any changes inside the data.<\/p>\n<p>We will explain how to set up these streams, how they capture data changes, and how they use something called offset storage to keep track of data changes accurately. The article will also show how these streams help in processing and updating data regularly through a process we will create called the <strong>apply_healthcare_changes<\/strong> task. This task takes the captured changes and updates the a history table named: <strong>healthcare_logs_history<\/strong>, ensuring that all records are current and reflect the latest information.<\/p>\n<p>By the end of this article, you&#8217;ll know how to use CDC in Snowflake to manage healthcare data effectively, ensuring that the data is always updated and correct.<\/p>\n<h2>What Is Change Data Capture?<\/h2>\n<p>In the change data capture process, we create <a href=\"https:\/\/docs.snowflake.com\/en\/user-guide\/streams-intro\">stream objects<\/a> to keep a record of changes made to source tables, such as updates, deletions, and inserts. The stream objects basically keep a real-time log of all the changes made to the data.<\/p>\n<p>The following example code shows the basic syntax for creating a stream object.<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">create or replace stream my_stream on table my_table;<\/pre>\n<p>Every change in the <code>my_table<\/code> table will be copied to <code>my_table<\/code> and timestamped, which will be our checkpoint for all the changes that have been processed and acknowledged. This is referred to as <a href=\"https:\/\/docs.snowflake.com\/en\/user-guide\/streams-intro\">offset storage<\/a>, which is shown in the following diagram.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"wp-image-103292 aligncenter\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/Screenshot-2024-07-23-144917.png\" alt=\"\" width=\"441\" height=\"350\" \/><\/p>\n<p>Each of the steps in the diagram are outlined in the following subsections, along with the objects that we will be using later in the process.<\/p>\n<h3><a id=\"post-103280-_Toc166360758\"><\/a>Source Table Insert\/Update\/Delete (healthcare_logs)<\/h3>\n<p>The starting point for data updates is the source table, which we will create later in the demo, that holds the logs for a healthcare system. Imagine it like a central log in a hospital, where patient information is constantly being refreshed. Whenever new test results arrive, medications are adjusted, or any details are added, removed, or modified, it triggers a specific set of procedures to ensure everything is documented and up to date.<\/p>\n<h3><a id=\"post-103280-_Toc166360759\"><\/a>Creating Snowflake Stream Objects and Capture Data Changes (healthcare_logs_stream)<\/h3>\n<p>Snowflake&#8217;s stream objects are like digital note takers, constantly monitoring the source data for updates. They record every addition, modification, or removal of information, ensuring a complete history is maintained. In a hospital setting, this might involve capturing new diagnoses, adjustments to medication plans, or changes to patient details.<\/p>\n<h3><a id=\"post-103280-_Toc166360760\"><\/a>Offset Storage in Stream Creation<\/h3>\n<p>The <code>healthcare_logs_stream<\/code> (can be kept as the original term for technical accuracy) keeps a watchful eye on the data, recording every update the moment it happens. It adds a offset, like a personalized mark and a time stamp, to each change. This offset ensures no update gets processed twice, and it helps the <code>healthcare_logs_stream<\/code> remember exactly where it left off. This is crucial for making sure the data stays accurate and follows the correct sequence throughout the entire process.<\/p>\n<h3><a id=\"post-103280-_Toc166360761\"><\/a>Checkpoints in Stream Processing<\/h3>\n<p>The <code>healthcare_log_stream<\/code> utilizes checkpoints to guarantee that data processing can pick up again from the last point known to be functioning correctly. This eliminates the need to reprocess everything from the very beginning. Imagine, for instance, a processing task encounters an issue after some changes have been made. The system can then simply revert to the most recent checkpoint and restart processing from there. This ensures that the data remains consistent and reliable throughout the entire process<\/p>\n<h2><a id=\"post-103280-_Toc166360762\"><\/a>CDC Data Pipeline Flow<\/h2>\n<p>This diagram depicts a CDC Data Pipeline Flow in an example healthcare setting, where data activities such as inserts, updates, and deletes are sent to the <code>healthcare_logs<\/code> Table. From there, a stream object, <code>healthcare_logs_stream<\/code>, is constructed to capture and record these changes, as well as the associated metadata. The <code>apply_healthcare_changes<\/code> task processes these collected changes and applies the results.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone  wp-image-103294\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/09\/Screenshot-2024-07-23-145616.png\" alt=\"\" width=\"243\" height=\"517\" \/><\/p>\n<p>In the following sections, we will examine in a bit more detail some of what is involved with these steps.<\/p>\n<h3><a id=\"post-103280-_Toc166360763\"><\/a>Inserts\/Updates\/Deletes:<\/h3>\n<p>The process begins with data modifications happening at the <strong>Source<\/strong> Table, which includes inserts, updates, and deletes of data entries. This is typically where changes to patient records or healthcare logs begin in the system.<\/p>\n<h3><a id=\"post-103280-_Toc166360764\"><\/a>Create Stream (healthcare_logs_stream):<\/h3>\n<p>Following the data changes in the source table, these changes need to be captured in real-time. This is achieved by the &#8220;<strong>Create Stream<\/strong>&#8221; step, which sets up the <code>healthcare_logs_stream<\/code><strong>.<\/strong> This stream is specifically designed to observe and log all changes (metadata and data modifications) occurring in the source table.<\/p>\n<h3><a id=\"post-103280-_Toc166360765\"><\/a>Healthcare_logs_stream (Capture Data Changes):<\/h3>\n<p>Within the <code>healthcare_logs_stream<\/code>, every alteration made to the <strong>source table<\/strong>\u2014be it an insertion, update, or deletion\u2014is captured. This stream acts like a continuous feed that records each change along with relevant metadata (like timestamps and the nature of the change). This is the critical phase where data capture happens, ensuring that no modification is overlooked.<\/p>\n<h3><a id=\"post-103280-_Toc166360766\"><\/a>Apply_healthcare_changes Task:<\/h3>\n<p>Once changes are recorded by the stream, the next step involves processing these changes. The <code>apply_healthcare_changes<\/code> Task automates the application of these changes to another dataset, specifically the <code>healthcare_logs_current<\/code> Table. This task essentially processes and integrates captured data changes into this historical data table, ensuring the data is up-to-date and accurately reflects all modifications.<\/p>\n<h3><a id=\"post-103280-_Toc166360767\"><\/a>Transform, Analysis, Monitor and Analytics:<\/h3>\n<p>After changes have been applied and the data is stored in the <code>healthcare_logs_current<\/code> Table, it undergoes further transformations (if necessary) to prepare it for analysis. This step might involve SQL queries or stored procedures to format or clean the data for effective analysis. Following transformation, the data is used for monitoring and analytics, providing insights and decision support based on the latest data.<\/p>\n<h2><a id=\"post-103280-_Toc166360768\"><\/a>CDC Practical Implementation<\/h2>\n<p>In this section, we will perform practical step-by-step implementation of CDC in Snowflake, starting with the creation of databases, tables, and initial data insertion. We then progress to setting up Streams and Change Tables to capture data modifications, followed by automating the ingestion of updates into the <code>healthcare_Logs_Current<\/code> table using Snowflake Tasks. Finally, we cover the processes involved in updating and verifying the data to ensure accuracy and reliability for downstream analysis.<\/p>\n<h3><a id=\"post-103280-_Toc166360769\"><\/a>Creating Database, Tables, and Inserting a Data<\/h3>\n<p>We will build a healthcare <a href=\"https:\/\/docs.snowflake.com\/en\/sql-reference\/sql\/create-database#general-usage-notes\">database in Snowflake<\/a> at this stage. Start by creating a new database:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"934\" height=\"317\" class=\"wp-image-103282\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-2.png\" \/><\/p>\n<p>After the database is created, you may notice that there are two schemas already created in the database for you. <code>PUBLIC<\/code> and <code>INFORMATION_SCHEMA<\/code>. <code>INFORMATION_SCHEMA<\/code> contains views that you can used to get information about<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"1174\" height=\"291\" class=\"wp-image-103283\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-3.png\" \/><\/p>\n<p>As per standard practice in snowflake, you create your views, tables, task, and stream under public schema.<\/p>\n<p>Following the creation of the database, the following columns will be added to the healthcare log <a href=\"https:\/\/docs.snowflake.com\/en\/sql-reference\/sql\/create-table\">table<\/a>: This healthcare log table serves as a source table in addition to storing patient data. The SQL code is shown below.<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">CREATE OR REPLACE TABLE healthcare_logs (\n    log_id INT DEFAULT seq_log_id.nextval PRIMARY KEY,\n    patient_event VARCHAR,\n    patient_id VARCHAR,\n    event_time TIMESTAMP,\n    provider_id VARCHAR,\n    deleted BOOLEAN DEFAULT FALSE\n);<\/pre>\n<p>After creating the table, we will use the following SQL code to insert the data: Note that this is sample data, not actual, useful data from the real world. The healthcare log table now contains 1000 records that we have added.<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">INSERT INTO HEALTHCARE_LOGS \n           (patient_event, patient_id, \n             event_time, provider_id)\nSELECT\n  ARRAY_CONSTRUCT('Checkup', 'Surgery', 'Consultation', \n                  'Emergency', 'Routine Visit')\n           [FLOOR(RANDOM()*5)+1] AS patient_event,\n  UUID_STRING() AS patient_id,\n  DATEADD('day', -UNIFORM(1, 365, RANDOM()), \n  CURRENT_TIMESTAMP()) AS event_time,\n  UUID_STRING() AS provider_id\nFROM provider_locations, TABLE(GENERATOR(ROWCOUNT =&gt; 1000))<\/pre>\n<p>Executing this should show the following result:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"1616\" height=\"448\" class=\"wp-image-103284\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/a-screenshot-of-a-computer-description-automatica.png\" alt=\"A screenshot of a computer\n\nDescription automatically generated\" \/><\/p>\n<h3><a id=\"post-103280-_Toc166360770\"><\/a>Creating Stream and Change Table<\/h3>\n<p>During this phase, we will establish a stream on the <code>healthcare_logs<\/code> table to ensure that any modifications to the patient journey are reflected in the <code>healthcare_logs_stream<\/code>. We will also create a table named <code>healthcare_logs_current<\/code> to store these changes.<\/p>\n<p>Create this stream using the following code:<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">CREATE OR REPLACE STREAM HEALTHCARE_LOGS_STREAM \non table HEALTHCARE_LOGS;<\/pre>\n<p>The <code>healthcare_logs_current<\/code> table will then be created, capturing the modifications made to the source table. Please take note that the task functionality provided in Snowflake will be used to generate the change log table.<\/p>\n<p>As a result, we won&#8217;t be entering any data because an automated activity will handle that, which we will discuss in more detail later in this post.<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">-- Create a table that will reflect the latest state \n-- after applying changes\nCREATE OR REPLACE TABLE Healthcare_LOGS_CURRENT \n   AS SELECT * FROM Healthcare_LOGS WHERE DELETED = FALSE<\/pre>\n<h3><a id=\"post-103280-_Toc166360771\"><\/a>Automation of Healthcare_Logs_Current Table Ingestion via Task<\/h3>\n<p>In a real-world electric health record scenario, we will typically see constant changes in patient journeys such as medical and laboratory tests, prescriptions, and drug prescriptions. Therefore, for data engineers and data administrator, it becomes cumbersome to maintain all those changes in our data. As a result, Snowflake has introduced &#8220;<a href=\"https:\/\/docs.snowflake.com\/en\/user-guide\/tasks-intro\">Task<\/a>,\u201d an easy-to-use automated functionality within Snowflake. There are multiple tasks that can be utilized in Snowflake; a few of them are the following<\/p>\n<ol>\n<li>Event Driven<\/li>\n<li>Time Driven<\/li>\n<\/ol>\n<p>In our healthcare scenario, we will use Snowflake&#8217;s time-triggered activities to manage ongoing changes in the patient journey. These activities will ensure regular updates to reflect the latest patient interactions and treatment records. These tasks will load the change data and pertinent metadata into the <code>healthcare_logs_current<\/code> table for a consolidated view after capturing DML updates from the source table via a Snowflake stream. For further information on this implementation, see the code that is included below.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"1134\" height=\"364\" class=\"wp-image-103285\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-5.png\" \/><\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\"><code>CREATE OR REPLACE TASK APPLY_HEALTHCARE_CHANGES<\/code> <code>warehouse= \u2018COMPUTE_WH\u2019<\/code> <code>\tschedule= '1 MINUTE'<\/code> <code>\tas MERGE INTO Healthcare_LOGS_Current AS target<\/code> <code>USING (<\/code> <code>    SELECT * FROM healthcare_logs_stream<\/code> <code>             WHERE METADATA$ACTION != 'DELETE' AND DELETED = FALSE<\/code> <code>) AS changes ON target.PATIENT_ID = changes.PATIENT_ID<\/code> <code>WHEN MATCHED THEN UPDATE SET<\/code> <code>    target.PATIENT_EVENT = changes.PATIENT_EVENT,<\/code> <code>    target.EVENT_TIME = changes.EVENT_TIME,<\/code> <code>    target.PROVIDER_ID = changes.PROVIDER_ID<\/code> <code>WHEN NOT MATCHED THEN INSERT (<\/code> <code>    PATIENT_EVENT, PATIENT_ID, EVENT_TIME, PROVIDER_ID, DELETED<\/code> <code>)<\/code> <code>VALUES (<\/code> <code>    changes.PATIENT_EVENT, changes.PATIENT_ID, <\/code> <code>    changes.EVENT_TIME, changes.PROVIDER_ID, changes.DELETED<\/code> );<\/pre>\n<p>After establishing the task, the next step is to restart it. Snowflake tasks are stopped by default when created and must be explicitly resumed to become active. Do this by executing the following script:<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">-- Start the tasks\nALTER TASK apply_healthcare_changes RESUME;<\/pre>\n<h3><a id=\"post-103280-_Toc166360772\"><\/a>Data Update and Verification<\/h3>\n<p>First, let&#8217;s examine our healthcare_logs_current table before making any updates. You will notice that, currently, this table contains no records.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"801\" height=\"196\" class=\"wp-image-103286\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-6.png\" \/><\/p>\n<p>Now, let&#8217;s proceed to update the patient records using the SQL code provided below. In this update, we have changed the <code>patient_event<\/code> to &#8216;Checkup&#8217; for the first record at the top of the <code>healthcare_logs<\/code> table. One row is updated due to the <code>LIMIT<\/code> clause in the subquery.<\/p>\n<pre class=\"lang:tsql theme:ssms2012-simple-talk\">UPDATE healthcare_logs \nSET patient_event = \u2018Checkup\u2019\nWHERE patient_id = (Select patient_id \n                    FROM healthcare_logs \n                    LIMIT 1) ;<\/pre>\n<p>As previously mentioned, any updates to the source table are reflected in the change log table through the task. Next, to verify the successful automation, you should click on &#8216;task run history&#8217; and then view the &#8216;show query profile&#8217; to monitor the task.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"839\" height=\"532\" class=\"wp-image-103287\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-7.png\" \/><\/p>\n<p>When you click on &#8216;show query profile,&#8217; you will see that one update has been made to the <code>healthcare_logs_current<\/code> table.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"782\" height=\"487\" class=\"wp-image-103288\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-8.png\" \/><\/p>\n<p>Afterwards, we&#8217;ll check our change log table using the query provided below and examine the output result.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"794\" height=\"211\" class=\"wp-image-103289\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2024\/07\/word-image-103280-9.png\" \/><\/p>\n<p>As you can see, the update we made to the patient event in the healthcare logs table is now stored in the <code>healthcare_logs_current<\/code> table.<\/p>\n<h2><a id=\"post-103280-_Toc166360773\"><\/a>Conclusion<\/h2>\n<p>In this article, we defined a complete method to deploying Change Data Capture (CDC) in Snowflake for healthcare data management. It routinely collects and logs patient records, assuring data integrity and speeding changes.<\/p>\n<p>In addition, we discussed how to set up and use Snowflake&#8217;s features to manage healthcare data. We started by creating a database and setting up tables like the <code>healthcare_logs<\/code> table to keep track of patient information. We then showed how to automatically capture changes to this data using Snowflake\u2019s streaming feature with the <code>healthcare_logs_stream<\/code>.<\/p>\n<p>We explained the importance of using tasks in Snowflake, such as the <code>apply_healthcare_changes<\/code> task, to automatically update the <code>healthcare_logs_current<\/code> table with new data from the <code>healthcare_logs<\/code> table. This helps keep patient records up to date without needing manual updates.<\/p>\n<p>Finally, we checked our work by updating records and reviewing these changes in Snowflake to make sure everything was working correctly. This process shows that Snowflake can effectively manage patient data, making it easier for healthcare providers to access the latest and most accurate information.<\/p>\n<p>By following these steps, healthcare organizations can improve how they manage patient data, leading to better care and more efficient operations.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>This article offers technical guidance on automating updates and tracking changes of data examples using Snowflake&#8217;s CDC capabilities. The examples will be using healthcare type data, but the basic tenets apply in most use cases. Introduction This article discusses how to use Snowflake&#8217;s streaming features to implement Change Data Capture (CDC). Using the framework of&#8230;&hellip;<\/p>\n","protected":false},"author":343433,"featured_media":103291,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[53,159001],"tags":[159124,159123,159122],"coauthors":[159125],"class_list":["post-103280","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-featured","category-snowflake","tag-change-data-capture","tag-snowflake","tag-snowflake-change-data-caputre"],"acf":[],"_links":{"self":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/103280","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/users\/343433"}],"replies":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/comments?post=103280"}],"version-history":[{"count":7,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/103280\/revisions"}],"predecessor-version":[{"id":103300,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/103280\/revisions\/103300"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/media\/103291"}],"wp:attachment":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/media?parent=103280"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/categories?post=103280"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/tags?post=103280"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/coauthors?post=103280"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}