Trigger Snowpark Functions When Files Are Uploaded to Azure Storage

Data

Trigger Snowpark Functions When Files Are Uploaded to Azure Storage

In previous articles in this series, we have demonstrated how to set up Azure Functions to leverage Snowpark for Python. This allows us to interact with Snowflake from an Azure Function, unlocking a wide range of opportunities for scripting activity.

In this article, we will focus on how we can configure an Azure Functions App to react automatically when a file is uploaded to a container in an Azure storage account and leverage the contents of the file to perform an activity.

Requirements

This article assumes that the following objects are already configured:

  • Azure Storage Container – stores files to be ingested into Snowflake
  • Azure Storage Queue – tracks each file that lands in the storage container
  • Azure Storage Event – updates the storage queue each time a new file is uploaded to the storage container
  • Azure Functions App – should contain a function that can successfully connect to Snowflake

Supporting Articles

To understand how to configure the Azure Storage Queue and Event, take a look at “Step 1: Create the Azure Storage Queue and Event” in the following article: Automated Ingestion from Azure Storage into Snowflake via Snowpipe

To understand how to deploy Azure Functions Apps that connect to Snowflake, and how to leverage application settings, take a look at the following articles:

You may also wish to leverage components from this article to more securely connect to Snowflake:

Step 1: Authorise the Azure Functions App to Access the Storage Account Objects

The first step on our journey is to allow our Azure Functions App to access the required objects in Azure Storage. Specifically, we want our app to be able to react when a new message is sent to the queue, and access the corresponding file in the container.

Step 1a: Configure the Azure Functions App with a Managed Identity That Can Be Used for Permission Grants

From within your Azure Functions App, navigate to the identity pane and change the status for the System assigned identity to On. After hitting save, you will see a warning message regarding adding the managed identity to Azure Active Directory, which you can accept.

Once this is complete, the Object (principal) ID for the managed identity assigned to your Azure Functions App will be displayed. You can note this down if you wish, however we will not need it as we can reference our new managed identity by name, which will match the name of the Azure Functions App.

Step 1b: Grant the Managed Identity the Desired Access to the Storage Container(s) and Queue(s)

Now that the Azure Functions App has a managed identity, we can grant it access to other objects within Azure.

To grant the required access, add a new role assignment under the Access Control (IAM) pane from within each desired storage queue and container.

You may wish to grant the managed identity the desired access at the storage account level instead of the container/queue level. This is only if you want to allow access to all containers/queues within the storage account, as they will all authenticate with the same managed identity.

When adding the new role assignment, choose which level of access you wish to grant from the list of possible roles.

  • Storage account roles
    • If you wish for read access only, the Storage Blob Data Reader role is required.
    • If you wish for deeper access, such as writing file to the container or moving files within the container, then the Storage Blob Data Contributor role is required.
  • Storage queue roles
    • If you wish for read access only, the Storage Queue Data Message Reader role is required.
    • If you wish for deeper access, such as for sending messages to the queue, then the Storage Queue Data Contributor role is required.

The following pair of screenshots demonstrate these role assignments:

Finally, locate the managed identity which you wish to grant access to. This will have the same name as the Azure Functions App.

The Azure Functions App should now have the desired access on both the storage container(s) and the storage queue(s).

Step 2: Configure the Azure Function with a Queue Trigger

The next step on our journey is to configure a function inside our functions app that will react to a message landing in a queue. To configure this, we must configure a collection of application settings in our Azure Functions App that will facilitate this access then configure a matching binding for the function.

Step 2a: Configure Appropriate App Settings in the Azure Functions App

Each of these application settings must be named with a specific pattern, consisting of a shared prefix and a targeted suffix.

For our example, we create three app settings that each have the shared prefix “AZURE_STORAGE_IDENTITY”.

Azure App Setting: AZURE_STORAGE_IDENTITY__blobServiceUri

This app setting is part of a series of settings that instruct the Azure function to leverage its own managed identity when connecting to the storage container. Specifically, this is the URI of the storage account that contains the container.

Example value: https://my-storage-account.blob.core.windows.net

Azure App Setting: AZURE_STORAGE_IDENTITY__credential

This app setting is part of a series of settings that instruct the Azure function to leverage its own managed identity when connecting to the storage container and queue. Specifically, this app setting instructs the use of managed identity.

Example value: managedidentity

Azure App Setting: AZURE_STORAGE_IDENTITY__queueServiceUri

This app setting is part of a series of settings that instruct the Azure function to leverage its own managed identity when connecting to the storage queue. Specifically, this is the URI of the storage account that contains the queue.

Original value: https://my-storage-account.queue.core.windows.net

Azure App Settings Example Screenshot

The following screenshot demonstrates these three app settings in use:

Step 2b: Configure a queue binding for the function

Now that the app settings have been configured, the shared prefix can be used as a connection inside a binding. Recall that any Azure function is configured by default with two files:

  • __init__.py – contains the Python code for the function
  • function.json – contains the metadata for the function, including the list of bindings that determine how the function is triggered

The following structure demonstrates the contents of a function.json file that has been configured to trigger based on a queue event.

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "<name of queue>",
      "connection": "<connection string for queue>"
    }
  ]
}

For our example, we use the following to populate the function.json file:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "automated-function-trigger-demo",
      "connection": "AZURE_STORAGE_IDENTITY"
    }
  ]
}

The important detail here is that the shared prefix for the app settings from the previous step has been used as the “connection” within the queue trigger binding.

Step 2c: Ensure the Dunction Is Prepared to React to a Queue Trigger

The final change to make when configuring a queue trigger for a function is to ensure the function expects this trigger. To do this, simply modify the __init__.py file so that the main function expects a func.QueueMessage input called “msg.”

The following code block demonstrates this in a very simple fashion.

## Import Azure packages
import logging
import azure.functions as func

## Define main function for Azure
def main(msg: func.QueueMessage):
  logging.info('Received new message from queue')
  logging.info(msg)

It is useful to extend this by parsing the queue for important information. The core information we need from the queued message is the URL for the file it corresponds to in the storage container. This can be parsed to retrieve the storage blob service URI, the container and the relative path for the file within the container:

## Define function to retrieve the desired
## information from the input message
def parse_input_message(msg: func.QueueMessage):

  ### Retrieve message as JSON
  msg_json = msg.get_json()
  logging.info('Message JSON:')
  logging.info(msg_json)
  
  ### Retrieve message ID
  msg_id = msg_json["id"]
  logging.info(f'Message ID: {msg_id}')

  ### Retrieve full file URL from input blob.
  ### The specific key varies depending on the type
  ### of storage container
  if "url" in msg_json["data"] :
    file_path_url = msg_json["data"]["url"]
  elif "blobUrl" in msg_json["data"] :
    file_path_url = msg_json["data"]["blobUrl"]
  else :
    logging.error("Function abort - Path URL does not match expected storage blob service URI")
    raise ValueError("Function abort - Path URL does not match expected storage blob service URI")
  
  logging.info(f'File path URL: {file_path_url}')
  
  '''
  Expected file URL format:
  https://<storage account>.blob.core.windows.net/<container>/path/to/file.json
    
  Example expected file URL:
  https://my-storage-account.blob.core.windows.net/automated-function-trigger-demo/example_file.json
  '''

  ### Retrieve storage blob service uri
  storage_blob_service_uri = os.getenv("AZURE_STORAGE_IDENTITY__blobServiceUri")

  ### Parse storage queue service URI from file path URL
  if file_path_url.startswith(storage_blob_service_uri) :
    file_path = file_path_url[1 + len(storage_blob_service_uri):]
  else :
    logging.info(f'Function abort - Path URL does not match expected storage blob service URI')
    return
  
  ### Split file path into container and relative file path
  container, relative_file_path = file_path.split('/', 1)

  return storage_blob_service_uri, container, relative_file_path

Step 3: Configure the Azure Function to Read the Given File from the Storage Container

The next step on our journey is to configure the function to interact with the file that was uploaded to the storage container to perform an activity in Snowflake. For this article, we will demonstrate a very simple use-case where the content of the file will simply contain a SQL statement that we will execute in Snowflake.

This article does not provide any guidance on avoiding or preventing destructive or negative code, nor does it discuss preventing SQL injection or similar. This article is simply providing a demonstration of the possible functionality and it is strong recommended to configure additional validations and security methods if you configure a similar Azure function in your own environment.

Step 3a: Python Code to Download a JSON File from a Given Storage Account, Container and Relative File Path

First, we focus on the specific code to download the file based on the given container and relative file path. The following code block leverages the DefaultAzureCredential class, which is designed to attempt a series of different authentication methods in sequence until it finds one that works. Among others, this class will attempt to authenticate using managed identity credentials, which we already configured for the Azure Functions App in step 1b.

## Import Azure packages
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient

## Import other packages
import os
import json
from io import BytesIO

## Define function to download full JSON file from blob
def azure_download_json_file(storage_blob_service_uri=None, container=None, relative_file_path=None):
  default_azure_credential = DefaultAzureCredential()
  blob_service_client = BlobServiceClient(storage_blob_service_uri, credential=default_azure_credential)
  blob_client = blob_service_client.get_blob_client(container=container, blob=relative_file_path)
  with BytesIO() as input_blob:
    blob_client.download_blob().download_to_stream(input_blob)
    input_blob.seek(0)
    json_input = json.load(input_blob)
  
  return json_input

Step 3b: Python Code to Retrieve the SQL Statement from the JSON Input

Once we have the JSON input, we can extract the SQL statement that we intend to execute:

## Define function that retrieve the SQL statement
## to execute from the JSON input
def retrieve_sql_statement_to_execute(json_input: dict):

  '''
  Expected format of JSON file:
  json_input = {
    "sql_statement_to_execute" : "<sql_statement_to_execute>"
  }
  '''

  ### Error if JSON file is not in expected format
  if "sql_statement_to_execute" not in json_input.keys() :
    logging.error(f"Manual log - Downloaded file did not include the key 'sql_statement_to_execute'")
    raise ValueError("Manual log - Downloaded file did not include the key 'sql_statement_to_execute'")

  ### Retrieve the value as its own variable
  sql_statement_to_execute = json_input["sql_statement_to_execute"]

  return sql_statement_to_execute

Step 3c: Python Code to Execute the SQL Statement in Snowflake

Finally, we can leverage a snowpark session to execute the statement in Snowflake.

## Define function that executes given SQL in Snowflake
def execute_sql_in_snowflake(sql_statement_to_execute: str):

  ### Create Snowflake Snowpark session 
  snowpark_session = build_snowpark_session()

  ### Execute the SQL command in Snowflake
  ### and log the result
  sf_df_statement_result = snowpark_session.sql(sql_statement_to_execute).collect()
  
  logging.info("SQL statement result:")
  logging.info(sf_df_statement_result)

  ### Close the Snowflake Snowpark Session
  snowpark_session.close()
  
  return

Making This Easier with Functioning Examples and the Dedicated InterWorks Submodule

The final part of this post was a very simple example to show how you could leverage an input from blob storage to trigger and execute SQL statements in Snowflake, but the capability here is far greater. Depending on your use case, you could use this to mimic your own snowpipes with custom pre-ingestion transformations using Pandas, or introduce your own additional logging and API calls whilst orchestrating activity, or any number of other ideas. My goal with this post is to give a taste of the possible and lay the groundwork for using queue-based triggers based on files being uploaded to blob storage. I’d love to hear what use cases you come up with!

To wrap things up, I’d like to mention a few resources to make this process easier. Firstly, all of the code and example today can be found in this dedicated GitHub repository: InterWorks Example Snowpark Azure Functions. In addition to bringing all of our code together into working examples that you can deploy yourself, this repository also demonstrates how to leverage our custom submodule to simplify these processes. Most notably, the submodule allows you to jump straight into creating Snowpark sessions without developing your own supporting functions, demonstrated by the following code snippet:

## Import Azure packages
import logging
import azure.functions as func

## Import other packages
import pandas
import json

## Import shared packages
from ..submodules.interworks_snowpark.interworks_snowpark_python.snowpark_session_builder import build_snowpark_session_using_stored_private_key_in_azure_secrets_vault as build_snowpark_session

## Define main function for Azure
def main(req: func.HttpRequest) -> func.HttpResponse:
  logging.info("HTTP trigger received a new request")
  
  ### Create Snowflake Snowpark session 
  snowpark_session = build_snowpark_session()

More About the Author

Chris Hastie

Data Lead
Configure Azure Private Endpoints for Snowflake Internal Stages Snowflake and Azure are incredibly capable of storing data securely. You can see all reports available to customers through their pages ...
Configure Azure Private Link Connectivity with Snowflake Snowflake and Azure are incredibly capable of storing data securely. You can see all reports available to customers through their pages ...

See more from this author →

InterWorks uses cookies to allow us to better understand how the site is used. By continuing to use this site, you consent to this policy. Review Policy OK

×

Interworks GmbH
Ratinger Straße 9
40213 Düsseldorf
Germany
Geschäftsführer: Mel Stephenson

Kontaktaufnahme: markus@interworks.eu
Telefon: +49 (0)211 5408 5301

Amtsgericht Düsseldorf HRB 79752
UstldNr: DE 313 353 072

×

Love our blog? You should see our emails. Sign up for our newsletter!