Automated Ingestion from Azure Storage into Snowflake via Snowpipe

Data

Automated Ingestion from Azure Storage into Snowflake via Snowpipe

If you wish to automatically stream/ingest data from an Azure Storage container into Snowflake, using native functionality, you will most likely be considering what is commonly referred to as a Snowpipe. In real terms, a Snowpipe for Azure is actually a combination of multiple components:

  • Azure Storage Container – stores files to be ingested into Snowflake
  • Azure Storage Queue – tracks each file that lands in the storage container
  • Azure Storage Event – updates the storage queue each time a new file is uploaded to the storage container
  • Snowflake Notification Integration – facilitates access between Snowflake and the Azure storage queue
  • Snowflake Storage Integration – facilitates access between Snowflake and the Azure storage container
  • Snowflake Stage – leveraged in ingestion statements such as COPY INTO to directly link the ingestion statement in Snowflake with the files to ingest from Azure
  • Snowflake Pipe – triggered using the notification integration to execute a COPY INTO statement to ingest data via a stage
  • Snowflake Table – table to store the ingested data in Snowflake

Flow Diagram

The following diagram provides a very simple overview of the flow involved for a Snowpipe between Snowflake and Azure, leveraging both a storage integration and a notification integration.

Requirements

This article assumes that the following objects are already configured:

  • Azure Storage Container – stores files to be ingested into Snowflake
  • Snowflake Storage Integration – facilitates access between Snowflake and the Azure storage container
  • Snowflake Stage – leveraged in ingestion statements such as COPY INTO to directly link the ingestion statement in Snowflake with the files to ingest from Azure
  • Snowflake Table – table to store the ingested data in Snowflake
  • Snowflake File Format – leveraged to ensure files are parsed correctly during ingestion

The core components are covered in my previous blog post: Configuring Storage Integrations between Snowflake and Azure Storage.

To perform the steps described in this article, the following components are required:

  • Snowflake ACCOUNTADMIN role or other role(s) with the following privileges:
    • CREATE INTEGRATION on the account
    • INSERT (or higher) on the table to which data will be ingested
    • USAGE on both the database and schema containing the table
    • USAGE on the stage that facilitates access to the Azure storage container
    • USAGE on both the database and schema containing the stage
    • CREATE PIPE on the schema in which the pipe will be created
  • Sufficient access in Azure to:
    • Approve the creation of a new service principal and grant it the required access to the storage queue
    • Configure an Azure storage event that updates the storage queue when a file is uploaded to the storage container
    • Create an Azure Event Grid Topic or leverage an existing one

Components Covered in This Article

With the above requirements out of the way, this article covers the configuration of the following objects:

  • Azure Storage Queue – tracks each file that lands in the storage container
  • Azure Storage Event – updates the storage queue each time a new file is uploaded to the storage container
  • Snowflake Notification Integration – facilitates access between Snowflake and the Azure storage queue
  • Snowflake Pipe – triggered using the notification integration to execute a COPY INTO statement to ingest data via a stage

Step 1: Create the Azure Storage Queue and Event

Before we can create a notification integration or pipe, we need to create the queue and event in Azure that will work together to identify files to be ingested into Snowflake. To keep things simple, we will create these in the same Azure storage account as our Azure storage container.

Step 1a: Create the Storage Queue

Creating a storage queue in Azure is as simple as clicking a button and entering a name. If only everything was so simple! The steps are as follows.

To start, navigate to the Queues pane within a storage account and select the button to add a new queue.

Give your queue a name and hit OK.

The storage queue should then be visible in your storage account:

Step 1b: Create the Storage Event to populate the Storage Queue

Creating an event is a little bit more complex than creating a queue, though it is still relatively simple.

To start, navigate to the Events pane within a storage account and select the button to add a new event subscription.

Configure your new event subscription to send messages to your storage queue (i.e., the endpoint) whenever a blob is created. You may need to create a new topic if a suitable one does not exist already.

Do not click Create yet!

Before selecting Create, navigate to the Filters pane and add a subject filter so that the subject begins with the subject of your storage container. For example, if our storage container is called my-storage-container, then the path would be /blobServices/default/containers/my-storage-container/blobs/.

If you like, you can add further filtering too, before finally selecting Create.

The event subscription should then be visible in your storage account. For this demo, we have created an event called snowpipe-event which triggers whenever a file is uploaded to the snowpipe storage container.

You may wish to test the event by uploading a file to your storage container and waiting to see

Step 2: Create Notification Integration Between Snowflake and Azure

In order to leverage our newly created Azure storage queue, we must create a corresponding notification integration in Snowflake that can authenticate into the Azure tenant and access the storage queue.

To achieve this, we need to know the location of the resources in Azure. Specifically, we need to know the Azure Tenant ID and the URL of the storage queue.

Step 2a: Retrieve the Azure Tenant ID

There are many ways to retrieve the ID for a given Azure tenant. My personal favourite method is by attempting to switch directory within the Azure portal, as this provides the Directory ID for each Azure tenant. In this case, the Directory ID is the tenant ID that we are looking for.

The following pair of screenshots demonstrate this method.

You may find it easier to record your Azure Tenant ID for later reference in a text file or similar.

Our important demo variables so far:

Azure Tenant ID: 1234a567-bc89-1ab2-3cde-4f56a789bcd1

 

Step 2b: Retrieve the URL for the Storage Queue

Personally, I find it easiest to retrieve the URL for the Azure storage queue through the Azure portal by opening the specific queue and simply reading the URL value.

The following screenshot demonstrate this method.

As with the Azure Tenant ID, you may find it easier to record this URL for later reference in a text file or similar.

Our important demo variables so far:

Azure Tenant ID: 1234a567-bc89-1ab2-3cde-4f56a789bcd1
Storage Queue URL: https://mystorageaccount.blob.core.windows.net/my-storage-queue

 

It is important to note at this time that do not we place azure:// at the start of the URL as we would with a storage integration. In this case, we keep the  https:// as Snowflake expects this format for notification integrations.

Step 2c: Create the Notification Integration Object in Snowflake

As mentioned above, this process can only be achieved using the Snowflake ACCOUNTADMIN role, or another role with the CREATE INTEGRATION privilege.

We now have the required variables to create our notification integration. This is achieved by executing a SQL script within Snowflake with the following template:

CREATE NOTIFICATION INTEGRATION <INTEGRATION_NAME>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  ENABLED = TRUE
  AZURE_TENANT_ID = '<retrieved in step 2a>'
  AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://<storage account>.queue.core.windows.net/<storage queue>'
;

 

Entering our demo variables, we can create a new storage integration called NI_AZURE_SNOWPIPE with the following script:

CREATE NOTIFICATION INTEGRATION NI_AZURE_SNOWPIPE
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  ENABLED = TRUE
  AZURE_TENANT_ID = '1234a567-bc89-1ab2-3cde-4f56a789bcd1'
  AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://mystorageaccount.queue.core.windows.net/snowpipe'
;

 

Executing this form of statement in Snowflake will yield the following result:

At this stage, the notification integration object exists in Snowflake but it cannot access Azure. It must be authorised via Azure itself.

Step 3: Authorise the Snowflake Notification Integration in Azure

Now that we have created our notification integration in Snowflake, we must authorise it in Azure.

Step 3a: Retrieve the Required Information About the Snowflake Notification Integration

Snowflake helpfully provides the information we need in the output of a DESCRIBE command of the following template:

DESCRIBE NOTIFICATION INTEGRATION <INTEGRATION_NAME>;

 

For example, we can execute the following statement to see the details for our new NI_AZURE_SNOWPIPE notification integration:

The important information that we require here are the AZURE_CONSENT_URL and the AZURE_MULTI_TENANT_APP_NAME fields.  As with the previous variables, you may find it easier to record these for later reference in a text file or similar.

Our important demo variables so far:

Azure Tenant ID: 1234a567-bc89-1ab2-3cde-4f56a789bcd1
Storage Queue URL: https://mystorageaccount.blob.core.windows.net/my-storage-queue
AZURE_CONSENT_URL: https://login.microsoftonline.com/1234a567-bc89-1ab2-3cde-4f56a789bcd1/oauth2/authorize?client_id=a123bcd4-1234-1a23-bcd4-1a23b45678cd&response_type=code
AZURE_MULTI_TENANT_APP_NAME: SnowflakePACInt5678_1234567890123

 

Step 3b: Approve the Authorisation in the Azure Consent URL

Access the AZURE_CONSENT_URL whilst logged into Azure as a user with admin privileges to authenticate the storage integration. After clicking on the consent URL and signing in with an administrator account, the following prompt will be displayed:

After consenting on behalf of your organisation and accepting, the app will appear under Azure AD - Enterprise applications in your tenant as shown below:

Finally, you may be prompted with the following option which you can ignore:

This has created a new service principal in your Azure tenancy with a name matching the first part of the AZURE_MULTI_TENANT_APP_NAME, before the underscore. Again, you may find it easier to record these for later reference in a text file or similar.

Our important demo variables so far:

Azure Tenant ID: 1234a567-bc89-1ab2-3cde-4f56a789bcd1
Storage Queue URL: https://mystorageaccount.blob.core.windows.net/my-storage-queue
AZURE_CONSENT_URL: https://login.microsoftonline.com/1234a567-bc89-1ab2-3cde-4f56a789bcd1/oauth2/authorize?client_id=a123bcd4-1234-1a23-bcd4-1a23b45678cd&response_type=code
AZURE_MULTI_TENANT_APP_NAME: SnowflakePACInt5678_1234567890123
Azure Service Principal for the Notification Integration: SnowflakePACInt5678

 

Azure may take a few minutes to create the new service principal. Often it is near-instant, however it may take up to an hour depending on the performance of the platform and any other activity.

Step 3c: Grant the Azure Service Principal the Desired Access to Each Storage Container

Now that Snowflake is able to use a notification integration to leverage an Azure service principal, the only remaining step is to ensure that the Azure service principal has the desired access to the Azure storage queue.

To grant the required access, add a new role assignment under the Access Control (IAM) pane from within the storage queue.

You may wish to grant the service principal the desired access at the storage account level instead of the queue level. This is only if you want to allow access to all queues within the storage account for multiple notification integrations, as they will all authenticate with the same service principal.

When adding the new role assignment, choose which level of access you wish to grant from the list of possible roles.

  • If you wish for read access only, the Storage Queue Data Message Reader role is required.
  • If you wish for deeper access, such as for sending messages to the queue from Snowflake, then the Storage Queue Data Contributor role is required.

Finally, locate the service principal which you wish to grant access to. This is the Azure Service Principal for the Storage Integration from our tracking sheet, which is SnowflakePACInt1234 in our example.

The service principal should now have the desired access on the storage queue.

Step 4: Build the Pipe in Snowflake

Now that we have created our notification integration, we are ready to begin automatically ingesting data into Snowflake!

As mentioned earlier, we have assumed several components are already configured to avoid bloating this article and retreading earlier ground. Specifically, we assume the existence of a functioning stage, file format and table. This also means we can assume the existence of a functioning COPY INTO statement to ingest data directly (without an automated pipe).

Assume the following script already works to ingest data into the table manually:

copy into STREAMING_DEMO_DATA
  from (
    select 
        metadata$filename 
      , current_timestamp()
      , $1
      , $2
      , $3
    from @STG_STREAMING_DEMO
  )
  file_format = (format_name = FF_STREAMING_DEMO)
;

 

There is nothing particularly special about the above COPY INTO statement, and I will not explain it further as this is not relevant for the article. What we wish to focus on is how this can easily be converted into a pipe to facilitate automated ingestion. To achieve this, we must fit this ingestion statement into the following template:

create pipe <pipe name>
  auto_ingest = true
  integration = <integration name>
  as
    <ingestion statement>
;

 

For our demonstration, we execute the following code to create our pipe:

create pipe PIPE_STREAMING_DEMO
  auto_ingest = true
  integration = NI_AZURE_SNOWPIPE
  as
    copy into STREAMING_DEMO_DATA
    from (
      select 
          metadata$filename 
        , current_timestamp()
        , $1
        , $2
        , $3
      from @STG_STREAMING_DEMO
    )
    file_format = (format_name = FF_STREAMING_DEMO)
;

 

Now that we have created our pipe, can monitor its status with the SYSTEM$PIPE_STATUS() function, demonstrated as follows:

To test this, simply upload files to your Azure storage container and watch as the events are added into your Azure storage queue. After a short time, you will then see the files ingest into your table in Snowflake.

If you are not seeing the ingestion work as expected, I would recommend combining the SYSTEM$PIPE_STATUS() function with the PIPE_USAGE_HISTORY and COPY_HISTORY table functions to identify if any errors are occurring.

Wrap Up

In this article, we have demonstrated how to create a Snowpipe that leverages a Notification Integration object in Snowflake to facilitate access to a storage queue in Azure and automatically ingest data from an Azure storage container. The best part of this is that engineers no longer need to figure out the authentication when creating further pipes in Snowflake that depend on this queue, as you can add filters to the COPY INTO statements within individual pipes to allow multiple pipes to share a queue whilst controlling which data is ingested.

More About the Author

Chris Hastie

Data Lead
Using Azure App Settings to Authenticate Snowflake with Python Earlier this month, my colleague Jason Hoehn published the following article: Intro to Azure Functions and Snowpark: Populating ...
Automated Ingestion from Azure Storage into Snowflake via Snowpipe If you wish to automatically stream/ingest data from an Azure Storage container into Snowflake, using native functionality, you will ...

See more from this author →

Subscribe to our newsletter

  • I understand that InterWorks will use the data provided for the purpose of communication and the administration my request. InterWorks will never disclose or sell any personal data except where required to do so by law. Finally, I understand that future communications related topics and events may be sent from InterWorks, but I can opt-out at any time.
  • This field is for validation purposes and should be left unchanged.

InterWorks uses cookies to allow us to better understand how the site is used. By continuing to use this site, you consent to this policy. Review Policy OK

×

Interworks GmbH
Ratinger Straße 9
40213 Düsseldorf
Germany
Geschäftsführer: Mel Stephenson

Kontaktaufnahme: markus@interworks.eu
Telefon: +49 (0)211 5408 5301

Amtsgericht Düsseldorf HRB 79752
UstldNr: DE 313 353 072