Incremental Load in Snowflake Using S3, Snowpipe, Streams, and Tasks


Incremental Load

Incremental load is the process of loading only new or changed data instead of reloading the entire dataset every time.
This approach improves performance, reduces cost, and ensures data freshness in downstream tables.

The incremental load pipeline implemented here follows this flow:

  • Amazon S3
  • Snowpipe (auto ingestion)
  • Streams (change data capture)
  • Tasks (automation)
  • Silver Table (clean layer)

End-to-end incremental load architecture showing S3, Snowpipe, Streams, Tasks, and Silver Table


Step 1: Create Database & Schema

CREATE DATABASE ecomproject;
CREATE SCHEMA ecomproject.sales;

This database and schema will be used to create all required objects for the incremental load pipeline.


Step 2: Define File Format for CSV

CREATE OR REPLACE FILE FORMAT csv_format_employees
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
EMPTY_FIELD_AS_NULL = TRUE
NULL_IF = ('\\N', 'NULL', '')
ENCODING = 'UTF8';

This file format definition ensures that CSV data from S3 is parsed correctly during ingestion.


Step 3: Create Storage Integration with AWS S3

CREATE OR REPLACE STORAGE INTEGRATION s3_integration
TYPE = EXTERNAL_STAGE
ENABLED = TRUE
STORAGE_PROVIDER = 'S3'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::xxxxxxxxxxxx:role/aws_snowflake_role'
STORAGE_ALLOWED_LOCATIONS = (
  's3://e-commerce-project-id-01/bronze/',
  's3://e-commerce-project-id-01/silver/'
);

The storage integration enables secure access between Snowflake and AWS S3 without using access keys.


Step 4: Create External Stage Object Using Storage Integration

CREATE OR REPLACE STAGE external_stages
STORAGE_INTEGRATION = s3_integration
URL = 's3://e-commerce-project-id-01/bronze'
FILE_FORMAT = csv_format_employees;

The external stage points to the S3 location where raw CSV files are uploaded.


Step 5: Create Raw Table (Bronze Layer)

CREATE OR REPLACE TABLE ecomproject.sales.employees_raw (
  EMPID INT,
  EMPNAME VARCHAR(30),
  SALARY FLOAT,
  AGE INT,
  DEPT VARCHAR(15),
  LOCATION VARCHAR(20)
);

The raw table stores data exactly as received from the source without any transformation.


Step 6: Create Snowpipe for Auto Ingestion

CREATE OR REPLACE PIPE raw_employees_data_pipe
AUTO_INGEST = TRUE
AS
COPY INTO ecomproject.sales.employees_raw
FROM @external_stages
ON_ERROR = CONTINUE
FILE_FORMAT = (FORMAT_NAME = 'csv_format_employees');

Snowpipe automatically loads files from S3 into the raw table as soon as new files arrive.

To monitor the Snowpipe status:

SELECT SYSTEM$PIPE_STATUS('raw_employees_data_pipe');

Pipe status output showing execution state as RUNNING


Step 7: Create Stream on Bronze Table

CREATE OR REPLACE STREAM ecomproject.sales.raw_employees_data_streams
ON TABLE ecomproject.sales.employees_raw
SHOW_INITIAL_ROWS = TRUE;

The stream captures row-level changes from the raw table and enables incremental processing.


Upload employees1.csv File into AWS S3

After uploading employees1.csv to the S3 bucket, Snowpipe automatically ingests the data.

SELECT * 
FROM ecomproject.sales.employees_raw;

As shown, Snowpipe fetched 6 records from the employees1.csv file using streams from S3.

Raw table output showing 6 records loaded from employees1.csv

As shown above Snowpipe has fetched 6 records using streams from employees1 file from S3.


Step 8: Create Silver Table (Clean Layer)

CREATE OR REPLACE TABLE ecomproject.sales.employees_silver (
  EMPID INT,
  EMPNAME VARCHAR(30),
  SALARY FLOAT,
  AGE INT,
  DEPT VARCHAR(15),
  LOCATION VARCHAR(20)
);

The silver table stores cleaned and deduplicated data.


Step 9: Create Task and Merge Statement for Incremental Load into Silver Table (SCD Type 1)

CREATE OR REPLACE TASK ecomproject.sales.task_incremental_load
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
MERGE INTO ecomproject.sales.employees_silver T
USING ecomproject.sales.raw_employees_data_streams S
ON T.EMPID = S.EMPID
WHEN MATCHED
  AND S.METADATA$ACTION = 'DELETE'
  AND S.METADATA$ISUPDATE = 'FALSE'
THEN DELETE
WHEN MATCHED
  AND S.METADATA$ACTION = 'INSERT'
  AND S.METADATA$ISUPDATE = 'TRUE'
THEN UPDATE SET
  T.EMPNAME = S.EMPNAME,
  T.SALARY = S.SALARY,
  T.AGE = S.AGE,
  T.DEPT = S.DEPT,
  T.LOCATION = S.LOCATION
WHEN NOT MATCHED
  AND S.METADATA$ACTION = 'INSERT'
  AND S.METADATA$ISUPDATE = 'FALSE'
THEN INSERT (EMPID, EMPNAME, SALARY, AGE, DEPT, LOCATION)
VALUES (S.EMPID, S.EMPNAME, S.SALARY, S.AGE, S.DEPT, S.LOCATION);

This merge logic implements incremental load with duplicate handling using stream metadata.


Check the Status of Task

DESC TASK ecomproject.sales.task_incremental_load;

As shown, the task is initially in a SUSPENDED state.

To enable it:

ALTER TASK ecomproject.sales.task_incremental_load RESUME;

After resuming, the task status changes to STARTED.

As you can see above task is STARTED

SELECT * 
FROM ecomproject.sales.employees_silver;

As shown, 6 records have been fetched from the raw table and copied to the silver table.


Step 10: Let us check Incremental load logic created in STEP-09

employees2.csv

Now upload employees2.csv into the AWS S3 bucket.

This file contains 11 records, including duplicates already present from employees1.csv.

employees2.csv file preview showing 11 records


Check Raw Table After Upload

SELECT * 
FROM ecomproject.sales.employees_raw;

As shown, the raw table now contains 17 records, including duplicate records.

Raw table showing total of 17 records


Check Silver Table After Incremental Load

SELECT * 
FROM ecomproject.sales.employees_silver
ORDER BY EMPID;

As shown, the silver table contains only 11 records.

Explanation:

  • Out of 11 records in employees2.csv, 5 records are new
  • 6 records are duplicates already loaded from employees1.csv
  • Duplicate records were skipped during the incremental merge

Result:

Only the 5 new records were inserted, resulting in 11 distinct records in the silver table.

Silver table output showing 11 distinct records

Out of the 11 records in employees2.csv, only 5 were new records, while the remaining 6 were duplicates of records already loaded earlier from employees1.csv.

Since those 6 duplicate records were already present in the Silver table, they were skipped during the incremental load.

✅ Result:
Only the 5 new records from employees2.csv were inserted, resulting in a total of 11 distinct records in the Silver table.

Now let us test UPDATE statement by updating salary of empid -1 from 80,000 to 90,000


Testing UPDATE Statement

Now test the update logic by updating salary for EMPID = 1 from 80,000 to 90,000.

UPDATE ecomproject.sales.employees_silver
SET SALARY = 90000
WHERE EMPID = 1;
SELECT *
FROM ecomproject.sales.employees_silver
ORDER BY EMPID;

As shown, the salary for EMPID = 1 has been successfully updated.

Silver table showing updated salary value

Congratulations we have implemented end to end Incremental load logic using Merge statement, Snowpipe, Streams and tasks and also tested using insert and updating records(UPSERT).


Completion

Incremental load logic has been successfully implemented end-to-end using:

  • Merge statement
  • Snowpipe
  • Streams
  • Tasks

The pipeline has been tested using both insert and update operations (UPSERT).

If practicing using a Snowflake free account, suspend the task after completion:

ALTER TASK ecomproject.sales.task_incremental_load SUSPEND;

Incremental load architecture showing S3 → Snowpipe → Stream → Task → Silver Table