API Access Using Snowflake External Functions and Azure

Data

API Access Using Snowflake External Functions and Azure

Welcome back to our mini-series on Snowflake External Functions with Azure. Before we proceed, please read the previous post on how to set up these functions and how Snowflake interacts with Azure: A Definitive Guide to Setting up Snowflake External Functions with Azure

Why Choose an External Function over a Snowpark-Based Stored Procedure or Function?

As stated in the previous post, Snowflake has recently released Snowpark and this has unlocked a whole new realm of possibility. I have covered that extensively in my Snowflake with Python series, however there is a critical component that is lacking where Snowpark-based functionality is concerned: external connectivity.

In its current form, Snowpark-based stored procedures and functions cannot access external resources during execution. You can prepare them in advance by providing all the additional files that you want at time of creation, but you cannot access the “outside world” using the functionality itself. For example, you cannot send a request to an API or connect to outside data sources. This is where external functions can step in and provide a lot of value.

In this blog, we will walk through how to set up some a simple Python function within an Azure Function App and configure it as an external functions in Snowflake. This post assumes you already have an Azure environment and a basic understanding of how to navigate it. As stated above, this article also assumes you have completed the steps in my previous article: A Definitive Guide to Setting up Snowflake External Functions with Azure.

API Knowledge Disclaimer

If you are unfamiliar with APIs and how they work, don’t worry. This post will not explain APIs and how they work, however it also will not expect any knowledge of this kind and you should still be able to follow along and set up your own integrations. For this post, we have deliberately selected a public facing API that does not leverage any authentication processes to make our lives easier; although, if you have the understanding of how authenticated APIs are accessed, then you could replicate this process for an API of your choice.

UK Flood Warning Measurements

Now that we have set up our two basic sample functions in the previous article, this article will go through how to add new external functions to the framework that we have set up. This article will use a more complex example that leverages an API, specifically the UK government’s public facing API which contains data on flood monitoring measurements across various stations in the UK, documented here. We will set up an API call which retrieves the latest measurements for a given station and call this as a function within Snowflake to pull readings for a set of flood monitoring stations along the River Irwell.

If you are not interested in the API itself and just want to see an example of adding a new external function, feel free to skip down to the section called “Adding a New External Function to Snowflake.”

Creating a Table of Flood Monitoring Stations in Snowflake

Whilst we could set up external functions to do everything we need, it’s definitely simpler to play with the API a little directly first and set up a table of flood monitoring stations in Snowflake. As we have deliberately picked an open API, we can access the full list of stations directly by accessing the following URL:

http://environment.data.gov.uk/flood-monitoring/id/stations

If you open this URL, you will see a JSON dataset containing information for every flood monitoring station in the UK:

As this article is not focused on loading and transforming JSON data, I will not go into detail here. You can see that in our previous articles! By following these steps, we are able to get this data into a table in Snowflake called STATIONS:

  1. Copy the contents of the API Stations url into a local file called flood_monitoring_stations.json
  2. Create a database called EXTERNAL_FUNCTIONS_AZURE and a schema called UK_FLOOD_MONITORING
  3. Create a stage within this schema called STG_STATIONS
  4. Use a PUT command in SnowSQL to load the data into the stage
  5. Load the data from the stage into a table called RAW_STATIONS
  6. Create a view parsing the raw station data into a table called PARSED_STATIONS

If following along at home, the below script should achieve steps 2 through 6 if executed line by line in SnowSQL. This is not a best practice approach; however, it gets the job done quickly for the purposes of setting up our example.

--------------------------------------------
-- Environment

create database if not exists EXTERNAL_FUNCTIONS_AZURE;

use database EXTERNAL_FUNCTIONS_AZURE;

create schema if not exists UK_FLOOD_MONITORING;

use schema UK_FLOOD_MONITORING;

use warehouse wh_demo;

--------------------------------------------
-- Ingestion

CREATE OR REPLACE stage STG_STATIONS;

-- PUT command must be executed in SnowSQL or similar local execution platform
PUT 'FILE://path/to/file/flood_monitoring_stations.json' @STG_STATIONS;

CREATE OR REPLACE TABLE RAW_STATIONS (
  body variant
);

COPY INTO RAW_STATIONS
FROM @STG_STATIONS
  file_format = (
  type = 'JSON'
  );

--------------------------------------------
-- Parsing

CREATE OR REPLACE VIEW PARSED_STATIONS
AS
SELECT
    items.value:"@id"::string                   as id
  , items.value:"RLOIid"::number                as rloi_id
  , items.value:"catchmentName"::string         as catchment_name
  , items.value:"label"::string                 as label
  , items.value:"dateOpened"::date              as date_opened
  , items.value:"easting"::number               as easting
  , items.value:"northing"::number              as northing
  , items.value:"lat"::float                    as lat
  , items.value:"long"::float                   as long
  , items.value:"notation"::string              as notation
  , items.value:"riverName"::string             as river_name
  , items.value:"stationReference"::string      as station_reference
  , items.value:"status"::string                as status
  , items.value:"town"::string                  as town
  , items.value:"wiskiID"::string               as wiski_id
FROM RAW_STATIONS
  , lateral flatten (body:"items") as items
-- WHERE clause to remove some problematic entries
-- which are not relevant to this example
WHERE
    NOT is_array(items.value:"northing")
;

Once we have set this up, we can run a quick query on our view to confirm that there are 9 flood monitoring stations along the River Irwell:

select * 
from PARSED_STATIONS
where river_name = 'River Irwell'
;

The following screenshot demonstrates the output of this query and highlights several key columns which we are interested in.

Retrieving the Most Recent Reading per Flood Monitoring Station by URL

One of the fields highlighted above is the NOTATION field. This field is the unique identifier for a flood monitoring station and can be used to interact with the API and retrieve further data. For example, we can retrieve the most recent measurements for a station by accessing the following URL:

http://environment.data.gov.uk/flood-monitoring/id/stations/{{stationId}}/measures

The first row of our data is for the flood monitoring station at Pioneer Mills in the town of Radcliffe. We can see that the NOTATION field for this record is 690250. Using this identifier, we can determine the following URL:

http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures

By opening this URL, we can access JSON-structured data similar to the following:

{
  "@context" : "http://environment.data.gov.uk/flood-monitoring/meta/context.jsonld" ,
  "meta" : {
    "publisher" : "Environment Agency" ,
    "licence" : "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/" ,
    "documentation" : "http://environment.data.gov.uk/flood-monitoring/doc/reference" ,
    "version" : "0.9" ,
    "comment" : "Status: Beta service" ,
    "hasFormat" : [ "http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.csv", "http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.rdf", "http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.ttl", "http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.html" ]
  }
   ,
  "items" : [
    {
      "@id" : "http://environment.data.gov.uk/flood-monitoring/id/measures/690250-level-stage-i-15_min-m" ,
      "label" : "Pioneer Mills - level-stage-i-15_min-m" ,
      "latestReading" : {
        "@id" : "http://environment.data.gov.uk/flood-monitoring/data/readings/690250-level-stage-i-15_min-m/2020-08-11T04-30-00Z" ,
        "date" : "2020-08-11" ,
        "dateTime" : "2020-08-11T04:30:00Z" ,
        "measure" : "http://environment.data.gov.uk/flood-monitoring/id/measures/690250-level-stage-i-15_min-m" ,
        "value" : 0.584
      },
      "notation" : "690250-level-stage-i-15_min-m" ,
      "parameter" : "level" ,
      "parameterName" : "Water Level" ,
      "period" : 900 ,
      "qualifier" : "Stage" ,
      "station" : "http://environment.data.gov.uk/flood-monitoring/id/stations/690250" ,
      "stationReference" : "690250" ,
      "unit" : "http://qudt.org/1.1/vocab/unit#Meter" ,
      "unitName" : "m" ,
      "valueType" : "instantaneous"
    }
  ]
}

In short, what we have here is a set of descriptive header data, and a flood monitoring measurement contained within the "items" key. For our example, we are specifically interested in the datetime and value of the latest reading.

Python Function to Retrieve the Most Recent Reading for a Station

We could manually visit the URL for each of our desired stations every 15 minutes and update our data that way, but this would be far too much work and an automated solution would be far superior. We will start by building a Python function to access the API and retrieve the latest reading for a single given station:

## Import necessary modules for the function
import http.client
import mimetypes
import json

## Define the function which retrieves the latest readings for a single station ID
def retrieve_latest_reading(station_id: str) :
  ### Provide root URL for the connection to access
  url = "environment.data.gov.uk"

  ### Build an HTTP connection to the above URL
  conn = http.client.HTTPConnection(url)

  ### Since our API does not require one, we use an empty payload
  payload = ""

  ### To make our lives easier, we instruct the API to return a JSON object
  headers = {"Accept": "application/json"}

  ### Define the actual endpoint on the API that we wish to access
  ### Note that we specify the stationId in this endpoint
  endpoint = f"/flood-monitoring/id/stations/{station_id}/measures"

  ### Access the endpoint with a "GET" request, converting the response to a JSON object
  conn.request("GET", endpoint, payload, headers)
  res = conn.getresponse()
  data = res.read()
  json_response =  json.loads(data.decode("utf-8"))

  ### Return the JSON Response object as the output of our function
  return json_response

This returns the following set of results:

Python Function to Retrieve and Parse the Most Recent Readings for a Station

Taking this a step further, we can see that the output above contains a lot of information that is not relevant for our purposes. To resolve this, we create another function that executes the previous one to retrieve the readings for a given station, then parses out the information that we need.

## Create the function to retrieve all latest readings from an incoming
## Snowflake-structured array of data
def retrieve_and_parse_latest_readings(station_id: str) :

  ### Execute the function to send a request to the
  ### API to retrieve the latest readings for a given station
  latest_reading_json = retrieve_latest_reading(station_id=station_id)

  ### Retrieve the datetime and value for the latest reading from latestReadingJSON
  latest_reading_datetime = latest_reading_json["items"][0]["latestReading"]["dateTime"]
  latest_reading_value = latest_reading_json["items"][0]["latestReading"]["value"]

  ### Return the combined output
  parsed_reading_json = {"datetime": latest_reading_datetime, "value" : latest_reading_value}
  return parsed_reading_json

We can test our function quite simply using our same example:

retrieve_and_parse_latest_readings("690250")

This returns the following set of results:

Python Function to Retrieve the Most Recent Readings for a Given List of Stations

Now that we have a function to retrieve the most recent reading for a single station, we can adapt this to return data for a list of stations. In preparation for this becoming an external function for Snowflake, we structure our incoming data in a specific way that matches Snowflake’s method of transferring data. We thus expect incoming data to be in the following JSON structure where the first value is the row number and the second is the station ID. We call this input station_array_list.

station_array_list = [
    [0, 690250]
  , [1, 690155]
  , [2, 690120]
  , [3, 690510]
  , [4, 690511]
  , [5, 690140]
  , [6, 690150]
  , [7, 690503]
  , [8, 690160]
]

We can now put together a simple Python function to loop through the members of this incoming list and retrieve our desired values:

## Create the function to retrieve all latest readings from an incoming
## Snowflake-structured array of data
def retrieve_all_latest_readings(station_array_list: list) :
  ### Iterate over input rows to
  ### perform the row-level function and
  ### store the results in a single list
  ### that is a compatible response for
  ### a Snowflake External Function.
  response_list = []
  for station_array in station_array_list :

    ### Retrieve the row number
    row_number = station_array[0]
    
    ### Retrieve the station ID
    station_id = station_array[1]

    ### try/except is used for error handling
    try:
      #### Execute the function to retrieve and parse the reading
      parsed_reading_json = retrieve_and_parse_latest_readings(station_id=station_id)
    except:
      parsed_reading_json = "Error"
    
    ### Append the result to the list
    ### of rows to return
    response_list.append([row_number, parsed_reading_json])

  return response_list

We can test this simply enough in Python:

station_array_list = [
    [0, 690250]
  , [1, 690155]
  , [2, 690120]
  , [3, 690510]
  , [4, 690511]
  , [5, 690140]
  , [6, 690150]
  , [7, 690503]
  , [8, 690160]
]

retrieve_all_latest_readings(station_array_list)

This returns the following set of results:

This gives us exactly what we need. We will come back to this shortly when creating our Azure function, but first lets review what we need to achieve to add this as a Snowflake external function with Azure.

Adding a New External Function to Snowflake

External Function Architecture Reminder

The following diagram was used in the previous post to demonstrate the flow between Snowflake and Azure for each of our external functions:

The full explanation of each of these elements is in the previous blog post. To add a new external function to our existing list, the following steps must be followed:

  1. Create a new Azure function inside the same Azure Function App (or follow the steps in the previous article if you wish to deploy a new one)
  2. Configure the Azure API management gateway to support the new Azure functions as an endpoint
  3. Create a new External Function object in Snowflake

Fortunately, this is a simple process and should not take too long to achieve. This process is streamlined as we configured most of the important elements in the previous article and can leverage them again easily by using the same Azure API management gateway.

1. Create a New Azure Function Inside the Same Function App

The Python code for the function itself is as follows:

# Snowflake External Function - Flood Monitoring Station Readings

## Sends a request to the API for UK flood monitoring
## stations to retrieve the latest reading

## Import Azure modules
import azure.functions as func
import logging

## Import necessary modules for the function
import json
import http.client
import mimetypes

## Define the function which retrieves the latest readings for a single station ID
def retrieve_latest_reading(station_id: str) :
  ### Provide root URL for the connection to access
  url = "environment.data.gov.uk"

  ### Build an HTTP connection to the above URL
  conn = http.client.HTTPConnection(url)

  ### Since our API does not require one, we use an empty payload
  payload = ""

  ### To make our lives easier, we instruct the API to return a JSON object
  headers = {"Accept": "application/json"}

  ### Define the actual endpoint on the API that we wish to access
  ### Note that we specify the stationId in this endpoint
  endpoint = f"/flood-monitoring/id/stations/{station_id}/measures"

  ### Access the endpoint with a "GET" request, converting the response to a JSON object
  conn.request("GET", endpoint, payload, headers)
  res = conn.getresponse()
  data = res.read()
  json_response =  json.loads(data.decode("utf-8"))

  ### Return the JSON Response object as the output of our function
  return json_response

## Create the function to retrieve all latest readings from an incoming
## Snowflake-structured array of data
def retrieve_and_parse_latest_readings(station_id: str) :

  ### Execute the function to send a request to the
  ### API to retrieve the latest readings for a given station
  latest_reading_json = retrieve_latest_reading(station_id=station_id)

  ### Retrieve the datetime and value for the latest reading from latestReadingJSON
  latest_reading_datetime = latest_reading_json["items"][0]["latestReading"]["dateTime"]
  latest_reading_value = latest_reading_json["items"][0]["latestReading"]["value"]

  ### Return the combined output
  parsed_reading_json = {"datetime": latest_reading_datetime, "value" : latest_reading_value}
  return parsed_reading_json

## Create the function to retrieve all latest readings from an incoming
## Snowflake-structured array of data
def retrieve_all_latest_readings(station_array_list: list) :
  ### Iterate over input rows to
  ### perform the row-level function and
  ### store the results in a single list
  ### that is a compatible response for
  ### a Snowflake External Function.
  response_list = []
  for station_array in station_array_list :

    ### Retrieve the row number
    row_number = station_array[0]
    
    ### Retrieve the station ID
    station_id = station_array[1]

    ### try/except is used for error handling
    try:
      #### Execute the function to retrieve and parse the reading
      parsed_reading_json = retrieve_and_parse_latest_readings(station_id=station_id)
    except:
      parsed_reading_json = "Error"
    
    ### Append the result to the list
    ### of rows to return
    response_list.append([row_number, parsed_reading_json])

  return response_list

## Define the main function
def main(req: func.HttpRequest) -> func.HttpResponse:

  ### Attempt to parse the body of the request as JSON
  try: 
    req_body = req.get_json()
  except ValueError: 
    logging.info("Failed to parse request body as JSON")
    logging.info(e)
    return func.HttpResponse(body="Failed to parse request body as JSON", status_code=400)
  else: 

    try :

      #### Additional logging that is not necessary
      #### once in production
      logging.info("req_body:")
      logging.info(req_body)

      #### Retrieve the "data" key of the request body.
      #### When the request comes from Snowflake, we expect data to be a
      #### an array of each row in the Snowflake query resultset.
      #### Each row is its own array, which begins with the row number,
      #### for example [0, 690250] would be the 0th row, in which the
      #### variable value passed to the function is 690250.
      input_rows = req_body.get("data") 

      #### Additional logging that is not necessary
      #### once in production
      logging.info("input_rows:")
      logging.info(input_rows)

      #### Iterate over input rows to
      #### perform the row-level function and
      #### store the results in a single list
      #### that is a compatible response for
      #### a Snowflake External Function.
      response_list = retrieve_all_latest_readings(input_rows)

      #### Put response into a JSON dictionary,
      #### then convert it to a string for transmission
      response_json = {"data": response_list}
      response_as_string = json.dumps(response_json)

      #### Send the response
      response_headers = {"Content-Type" : "application/json"}
      return  func.HttpResponse(body=response_as_string, status_code=200, headers=response_headers)

    ### Error handling
    except Exception as e:
      
      logging.info(f"Manual log - Error encountered")
      logging.info(e)
      return  func.HttpResponse(body=f"Error encountered", status_code=400)

We will follow similar steps in section “2c. Deploying Azure Functions using VSCode” in the previous article, A Definitive Guide to Setting up Snowflake External Functions with Azure, to leverage the demos in InterWorks’ Example Snowflake Azure External Functions Git repository through VSCode as a short cut to deploying the functions in Azure.

For this approach, simply add a new folder for our function with an __init__.py file containing the code above, along with a function.json file:

For this example, I use the following content for the function.json file:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

You can then use VSCode to deploy this in the same way as before:

2. Configure the Azure API Management Gateway to Support the New Azure Functions as an Endpoint

Return to your API management gateway in Azure and view your API in the APIs pane. If you followed the previous article, it should look similar to the following screenshot:

We have a couple of options at this point:

  1. Add the new function as an entirely new API in the management gateway by following the steps in section “4. Configure the Azure API Management Gateway to Support the Azure Functions as Endpoints” of the previous article
  2. Add the new function as a new operation in the existing API

This article will cover the process involved for the latter option, as the former simply follows the steps in the aforementioned section.

To add the new function as a new operation, the advised method would be to select + Add operation and follow the various steps.

Personally, I don’t like filling everything in again when we can save ourselves some effort, so instead I would recommend cloning an existing endpoint and tweaking it. For this example, we clone the product POST endpoint:

After selecting clone, Azure will spend a few moments deploying the clone. When it finishes, you should see it appear:

From here, we just need to clean up the frontend. Fortunately the other areas (inbound processing, backend and outbound processing) can remain as they are, which has the added benefit of leveraging any policies that we configured previous for the endpoint that we cloned.

To open the frontend configuration, select the pencil icon in the relevant section.

A menu appears in which you can configure new values for the frontend. Here you can configure the display name and URL, along with any other relevant details if you wish. Importantly, be sure to select a POST URL as this is what Snowflake will use to send data to the endpoint.

It is important that the URL matches the name of your Azure Function. See the below screenshot for a reference of the functions available in our Azure Functions App:

Once you have configured the values, select Save. The name change will be reflected on the left:

This should be the only change required as this API is already configured to point to our main Azure Functions App and the URL how we map to the specific function.

3. Create a new External Function object in Snowflake

The final steps are to build and test the external function in Snowflake.

Be sure that the role creating the external functions has been granted USAGE on the API integration.

As before, the template to create an external function is as follows:

CREATE [OR REPLACE] external function <database>.<schema>.<function name>(variables)
  returns variant
  api_integration = <api integration object>
as
  '<endpoint url>'
;

Following this template, we can create both of our functions:

create [or replace] external function external_functions.uk_flood_monitoring.retrieve_flood_monitoring_station_readings(x number, y number)
  returns variant
  api_integration = API_AZURE_EXTERNAL_FUNCTIONS
as 
  'https://api-snowflake-demo.azure-api.net/Example-External-Functions/flood_monitoring_station_readings'
;

Now all we need to do is test our function to make sure it works. The simplest way to test these is to execute them directly with manual inputs:

This gives us everything we need. We can now replace our original view with a similar view that can also retrieve the latest metrics, directly parsing the datetime value while we are at it:

CREATE OR REPLACE VIEW stations_from_api
AS
SELECT
    items.value:"RLOIid"::number                as rloi_id
  , items.value:"label"::string                 as label
  , items.value:"town"::string                  as town
  , items.value:"riverName"::string             as river_name
  , items.value:"catchmentName"::string         as catchment_name
  , items.value:"notation"::string              as notation
  , to_timestamp(parse_json(retrieve_flood_monitoring_station_readings(notation)):"datetime"::string, 'YYYY-MM-DD"T"HH24:MI:SS"Z"') as datetime
  , parse_json(retrieve_flood_monitoring_station_readings(notation)):"value"::float as value
  , items.value:"@id"::string                   as id
  , items.value:"dateOpened"::date              as date_opened
  , items.value:"easting"::number               as easting
  , items.value:"northing"::number              as northing
  , items.value:"lat"::float                    as lat
  , items.value:"long"::float                   as long
  , items.value:"stationReference"::string      as station_reference
  , items.value:"status"::string                as status
  , items.value:"wiskiID"::string               as wiski_id
FROM raw_stations
  , lateral flatten (body:"items") as items
-- WHERE clause to remove some problematic entries
-- which are not relevant to this example
WHERE
    NOT is_array(items.value:"northing")
;

Once we have set this up, we can run a quick query on our view to retrieve the latest metrics for our 9 flood monitoring stations along the River Irwell:

SELECT *
FROM stations_from_api
WHERE river_name = 'River Irwell'
;

You should see a result similar to the following:

And with that, we have a view that can successfully query an API!

Wrap Up

And there we have it! Whenever we need the latest data, we can query this view and the API will be accessed to retrieve the latest readings.

To recap: To add a new external function to our existing list, the following steps must be followed:

  1. Create a new Azure function inside the same Azure Function App (or follow the steps in the previous article if you wish to deploy a new one)
  2. Configure the Azure API management gateway to support the new Azure functions as an endpoint
  3. Create a new External Function object in Snowflake

After creating your Azure function in step 1, the remaining steps are both quick to complete.

Hopefully this post and its predecessor have been useful for you and have provided a strong foundation for you to make your own Snowflake external functions with Azure.

More About the Author

Chris Hastie

Data Lead
Configure Azure Private Endpoints for Snowflake Internal Stages Snowflake and Azure are incredibly capable of storing data securely. You can see all reports available to customers through their pages ...
Configure Azure Private Link Connectivity with Snowflake Snowflake and Azure are incredibly capable of storing data securely. You can see all reports available to customers through their pages ...

See more from this author →

InterWorks uses cookies to allow us to better understand how the site is used. By continuing to use this site, you consent to this policy. Review Policy OK

×

Interworks GmbH
Ratinger Straße 9
40213 Düsseldorf
Germany
Geschäftsführer: Mel Stephenson

Kontaktaufnahme: markus@interworks.eu
Telefon: +49 (0)211 5408 5301

Amtsgericht Düsseldorf HRB 79752
UstldNr: DE 313 353 072

×

Love our blog? You should see our emails. Sign up for our newsletter!