This series takes you from zero to hero with the latest and greatest cloud data warehousing platform, Snowflake.
Welcome back to our mini-series on Snowflake External Functions with AWS Lambda. Before we proceed, please read the previous post on how to set up these functions and how Snowflake interacts with AWS Lambda.
API Knowledge Disclaimer
If you are unfamiliar with APIs and how they work, don’t worry. This post will not explain APIs, how they work or expect any knowledge of this kind, so you should still be able to follow along and set up your own integrations. For this post, we have deliberately selected a public-facing API that does not leverage any authentication processes to make our lives easier. Although, of course, if you have the understanding of how authenticated APIs are accessed then you could replicate this process for an API of your choice.
UK Flood Warning Measurements
Now that we have set up our two basic sample functions in the previous post, this post will go through how to add new external functions to the framework that we have set up. This post will use a more complex example that leverages an API, specifically the UK government’s public-facing API that contains data on flood-monitoring measurements across various stations in the UK (documented here). We will set up an API call which retrieves the latest measurements for a given station, and we will call this as a function within Snowflake to pull readings for a set of flood-monitoring stations along the River Irwell.
If you are not interested in the API itself and just want to see an example of adding a new external function, feel free to skip down to the section called Adding a New External Function to Snowflake
.
Creating a Table of Flood Monitoring Stations in Snowflake
Whilst we could set up external functions to do everything we need, it’s definitely simpler to play with the API a little directly first and set up a table of flood-monitoring stations in Snowflake. As we have deliberately picked an open API, we can access the full list of stations directly at the following URL: http://environment.data.gov.uk/flood-monitoring/id/stations
If you click on the link, you will see a JSON dataset containing information for every flood monitoring station in the UK:
As this blog post is not focused on loading and transforming JSON data (see that in our previous posts!), I will not go into detail here. By following these steps, we are able to get this data into a table in Snowflake called STATIONS
:
- Copy the contents of the API Stations URL into a local file called
flood_monitoring_stations.json
- Create a database called
UK_FLOOD_MONITORING
and a schema calledRAW
- Create a stage within this schema called
STATIONS_STAGE
- Use a
PUT
command in SnowSQL to load the data into the stage - Load the data from the stage into a table in the
RAW
schema calledRAW_STATIONS
- Create another schema called
ANALYSIS
- Create a view parsing the raw station data into a table called
STATIONS
in theANALYSIS
schema
If following along at home, the below script should achieve steps 2 through 7 if executed line-by-line in SnowSQL. This is not a best practice approach; however, it gets the job done quickly for the purposes of setting up our example:
CREATE OR REPLACE database uk_flood_monitoring; USE database uk_flood_monitoring; CREATE OR REPLACE schema raw; CREATE OR REPLACE stage raw.stations_stage; -- PUT command must be executed in SnowSQL or similar local execution platform PUT 'FILE://path/to/file/flood_monitoring_stations.json' @raw.raw_stations_stage; CREATE OR REPLACE TABLE raw.raw_stations ( body variant ); COPY INTO raw.raw_stations FROM @raw.stations_stage file_format = ( type = 'JSON' ); CREATE OR REPLACE schema analysis; CREATE OR REPLACE VIEW analysis.stations AS SELECT items.value:"@id"::string as id , items.value:"RLOIid"::number as rloi_id , items.value:"catchmentName"::string as catchment_name , items.value:"label"::string as label , items.value:"dateOpened"::date as date_opened , items.value:"easting"::number as easting , items.value:"northing"::number as northing , items.value:"lat"::float as lat , items.value:"long"::float as long , items.value:"notation"::string as notation , items.value:"riverName"::string as river_name , items.value:"stationReference"::string as station_reference , items.value:"status"::string as status , items.value:"town"::string as town , items.value:"wiskiID"::string as wiski_id FROM raw.raw_stations , lateral flatten (body:"items") as items -- WHERE clause to remove some problematic entries -- which are not relevant to this example WHERE NOT is_array(items.value:"northing") ;
Once we have set this up, we can run a quick query on our view to confirm that there are nine flood-monitoring stations along the River Irwell:
SELECT * FROM analysis.stations WHERE river_name = 'River Irwell' ;
The following screenshot demonstrates the output of this query and highlights several key columns we are interested in:
Retrieving the Most Recent Reading per Flood Monitoring Station by URL
One of the fields highlighted above is the NOTATION
field. This field is the unique identifier for a flood-monitoring station and can be used to interact with the API and retrieve further data. For example, we can retrieve the most recent measurements for a station by accessing the following URL:
http://environment.data.gov.uk/flood-monitoring/id/stations/{{stationId}>}/measures
The first row of our data is for the flood-monitoring station at Pioneer Mills in the town of Radcliffe. We can see that the NOTATION
field for this record is 690250
. Using this identifier, we can determine the following URL:
http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures
By opening this URL, we can access JSON-structured data similar to the following:
{ "@context" : "<http://environment.data.gov.uk/flood-monitoring/meta/context.jsonld>" , "meta" : { "publisher" : "Environment Agency" , "licence" : "<http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/>" , "documentation" : "<http://environment.data.gov.uk/flood-monitoring/doc/reference>" , "version" : "0.9" , "comment" : "Status: Beta service" , "hasFormat" : [ "<http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.csv>", "<http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.rdf>", "<http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.ttl>", "<http://environment.data.gov.uk/flood-monitoring/id/stations/690250/measures.html>" ] } , "items" : [ { "@id" : "<http://environment.data.gov.uk/flood-monitoring/id/measures/690250-level-stage-i-15_min-m>" , "label" : "Pioneer Mills - level-stage-i-15_min-m" , "latestReading" : { "@id" : "<http://environment.data.gov.uk/flood-monitoring/data/readings/690250-level-stage-i-15_min-m/2020-08-11T04-30-00Z>" , "date" : "2020-08-11" , "dateTime" : "2020-08-11T04:30:00Z" , "measure" : "<http://environment.data.gov.uk/flood-monitoring/id/measures/690250-level-stage-i-15_min-m>" , "value" : 0.584 }, "notation" : "690250-level-stage-i-15_min-m" , "parameter" : "level" , "parameterName" : "Water Level" , "period" : 900 , "qualifier" : "Stage" , "station" : "<http://environment.data.gov.uk/flood-monitoring/id/stations/690250>" , "stationReference" : "690250" , "unit" : "<http://qudt.org/1.1/vocab/unit#Meter>" , "unitName" : "m" , "valueType" : "instantaneous" } ] }
In short, what we have here is a set of descriptive header data and a flood-monitoring measurement contained within the "items"
key. For our example, we are specifically interested in the datetime and value of the latest reading.
Python Function to Retrieve the Most Recent Reading for a Station
We could manually visit the URL for each of our desired stations every 15 minutes and update our data that way, but this would be far too much work, and an automated solution would be far superior. We will start by building a Python function to access the API and retrieve the latest reading for a single given station:
import http.client import mimetypes import json # Define the function which retrieves the latest readings for a single station ID def retrieveLatestReading(stationId) : ## Provide root URL for the connection to access url = 'environment.data.gov.uk' ## Build an HTTP connection to the above URL conn = http.client.HTTPConnection(url) ## Since our API does not require one, we use an empty payload payload = '' ## To make our lives easier, we instruct the API to return a JSON object headers = {'Accept': 'application/json'} ## Define the actual endpoint on the API that we wish to access ## Note that we specify the stationId in this endpoint endpoint = '/flood-monitoring/id/stations/{0}/measures'.format(stationId) ## Access the endpoint with a "GET" request, converting the response to a JSON object conn.request("GET", endpoint, payload, headers) res = conn.getresponse() data = res.read() jsonResponse = json.loads(data.decode("utf-8")) ## Return the JSON Response object as the output of our function return jsonResponse
We can test our function quite simply using our same example:
retrieveLatestReading('690250')
This returns the following set of results:
Python Function to Retrieve the Most Recent Readings for a Given List
Now that we have a function to retrieve the most recent reading for a single station, we can adapt this to return data for a list of stations. In preparation for this becoming an external function for Snowflake, we structure our incoming data in a specific way that matches Snowflake’s method of transferring data. We thus expect incoming data to be in the following JSON structure where the first value is the row number and the second is the station ID. We call this input stationArrayList
:
stationArrayList = [ [0, 690250] , [1, 690155] , [2, 690120] , [3, 690510] , [4, 690511] , [5, 690140] , [6, 690150] , [7, 690503] , [8, 690160] ]
We can now put together a simple Python function to loop through the members of this incoming list and retrieve our desired values:
# Create the function to retrieve all latest readings from an incoming # Snowflake-structured array of data def retrieveAllLatestReadings(stationArrayList) : ## Define empty array to store results from the loop below dataArray = [] ## Loop through each station in the array, retrieving the row number and ## station ID, then using the station ID to retrieve the latest readings. for stationArray in stationArrayList: rowNumber = stationArray[0] stationId = stationArray[1] latestReadingJSON = retrieveLatestReading(stationId) ## Retrieve the datetime and value for the latest reading from latestReadingJSON latestReadingDateTime = latestReadingJSON["items"][0]["latestReading"]["dateTime"] latestReadingValue = latestReadingJSON["items"][0]["latestReading"]["value"] ## Add these readings to the array to store the results for each loop newArrayEntry = [rowNumber, {"datetime": latestReadingDateTime, "value" : latestReadingValue}] dataArray.append(newArrayEntry) return dataArray
We can test this simply enough in Python:
retrieveAllLatestReadings(stationArrayList)
This returns the following set of results:
This gives us exactly what we need. We will come back to this shortly when creating our Lambda function, but first, let’s review what we need to achieve to add this as a Snowflake external function with AWS.
Adding a New External Function to Snowflake
External Function Architecture Reminder
The following diagram was used in the previous post to demonstrate the flow between Snowflake and AWS for each of our external functions:
The full explanation of each of these elements is in the previous blog post. To add a new external function to our existing list, the following steps must be followed:
- Create a new AWS Lambda function which leverages the same service role
- Add a new resource and method to the API gateway that leverages this new AWS Lambda function
- Update the resource policy for the API gateway to allow access to the new method
- Deploy the new resource, method and resource policy to the API gateway stage
- Create a new External Function object in Snowflake
This may feel like a long list after the previous blog post, but it actually takes very little time to go through these steps once you have created your new Lambda function.
List of Variables
You may recall from our previous post that we maintained a list of variables when creating the framework for our external functions. I have pulled through two relevant values again in the following list:
Lambda Service Role: snowflake-lambda-service-role Account ID: 012345678910 IAM Account Role: snowflake-external-lambda-functions API Integration Name: aws_lambda
As we proceed through the next steps, we will add to this list of variables again as we did before.
Creating the AWS Lambda Function to Retrieve Readings for Given Stations
As in the previous post, navigate to the Lambda area in AWS and create a new function. Be sure to use the same Lambda service role:
Before adding the Python code for the function, create an appropriate test event such as the following:
{ "body": "{\\"data\\" : [[0, 690250], [1, 690155], [2, 690120], [3, 690510], [4, 690511], [5, 690140], [6, 690150], [7, 690503], [8, 690160]]}" }
The following screenshot demonstrates this test event being created:
We can now add our Python code for the Lambda function. Since this blog isn’t intended to discuss Python, I’ve just provided the full Python code:
# Import necessary modules for the function import json import http.client import mimetypes # Define the function which retrieves the latest readings for a single station ID def retrieveLatestReading(stationId) : ## Provide root URL for the connection to access url = 'environment.data.gov.uk' ## Build an HTTP connection to the above URL conn = http.client.HTTPConnection(url) ## Since our API does not require one, we use an empty payload payload = '' ## To make our lives easier, we instruct the API to return a JSON object headers = {'Accept': 'application/json'} ## Define the actual endpoint on the API that we wish to access ## Note that we specify the stationId in this endpoint endpoint = '/flood-monitoring/id/stations/{0}/measures'.format(stationId) ## Access the endpoint with a "GET" request, converting the response to a JSON object conn.request("GET", endpoint, payload, headers) res = conn.getresponse() data = res.read() jsonResponse = json.loads(data.decode("utf-8")) ## Return the JSON Response object as the output of our function return jsonResponse # Create the function to retrieve all latest readings from an incoming # Snowflake-structured array of data def retrieveAllLatestReadings(stationArrayList) : ## Define empty array to store results from the loop below dataArray = [] ## Loop through each station in the array, retrieving the row number and ## station ID, then using the station ID to retrieve the latest readings. for stationArray in stationArrayList: rowNumber = stationArray[0] stationId = stationArray[1] latestReadingJSON = retrieveLatestReading(stationId) ## Retrieve the datetime and value for the latest reading from latestReadingJSON latestReadingDateTime = latestReadingJSON["items"][0]["latestReading"]["dateTime"] latestReadingValue = latestReadingJSON["items"][0]["latestReading"]["value"] ## Add these readings to the array to store the results for each loop newArrayEntry = [rowNumber, {"datetime": latestReadingDateTime, "value" : latestReadingValue}] dataArray.append(newArrayEntry) return dataArray # The lambda function itself, in which event is the full request sent to the API def lambda_handler(event, context): # Declare return variables statusCode = 200 # json_compatible_string_to_return is the json body returned by the # function, stored as a string. This empty value is replaced by the real # result when ready json_compatible_string_to_return = '' # try/except is used for error handling try: # Retrieve the body of the request as a JSON object body = json.loads(event['body']) # Retrieve the 'data' key of the request body # When the request comes from Snowflake, we expect data to be a # an array of each row in the Snowflake query resultset. # Each row is its own array, which begins with the row number, # for example [0, 690250] would be the 0th row, in which the # variable passed to the function is 690250. rows = body['data'] # Execute the function which retrieves the latest readings # for each station in the incoming data stored in rows dataArray = retrieveAllLatestReadings(rows) # Put dataArray into a dictionary, then convert it to a string dataArrayToReturn = {'data' : dataArray} json_compatible_string_to_return = json.dumps(dataArrayToReturn) except Exception as err: # Statuscode = 400 signifies an error statusCode = 400 # Function will return the error json_compatible_string_to_return = json.dumps({"data":str(err)}) return { 'statusCode': statusCode , 'headers': { 'Content-Type': 'application/json' } , 'body' : json_compatible_string_to_return }
When we test our function, we should see the following response:
{ "statusCode": 200, "headers": { "Content-Type": "application/json" }, "body": "{\\"data\\": [[0, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.584}], [1, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.341}], [2, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.159}], [3, {\\"datetime\\": \\"2020-08-11T14:30:00Z\\", \\"value\\": 0.943}], [4, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.199}], [5, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.32}], [6, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.126}], [7, {\\"datetime\\": \\"2020-08-11T04:30:00Z\\", \\"value\\": 0.775}], [8, {\\"datetime\\": \\"2020-08-11T14:30:00Z\\", \\"value\\": 0.284}]]}" }
The final thing to do before leaving the Lambda area in AWS is to add the Lambda function’s ARN to the list of variables:
Lambda Service Role: snowflake-lambda-service-role Account ID: 012345678910 IAM Account Role: snowflake-external-lambda-functions API Integration Name: aws_lambda Function ARN: arn:aws:lambda:eu-west-2:012345678910:function:snowflake-flood-monitoring-station-readings
Add a New Resource and Method to the API Gateway
In a similar fashion to the previous blog post, return to the API gateway area in AWS and modify our existing gateway. Add a new resource and method which accesses the API, following the exact same steps as the previous post:
Add the ARN for the new resource to the list of variables:
Lambda Service Role: snowflake-lambda-service-role Account ID: 012345678910 IAM Account Role: snowflake-external-lambda-functions API Integration Name: aws_lambda Function ARN: arn:aws:lambda:eu-west-2:012345678910:function:snowflake-flood-monitoring-station-readings Method ARN: arn:aws:execute-api:eu-west-2:012345678910:xxxxxxxxxx/*/POST/snowflake-flood-monitoring-station-readings
Update Resource Policy so API Gateway Will Allow Access to the New Method
Recall that the resource policy for the API gateway controls which resources can be accessed through the API. Again, I have included the general template from the previous blog:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:sts::<account ID>:assumed-role/<IAM Account Role>/snowflake" }, "Action": "execute-api:Invoke", "Resource": "<Method ARN>" } ] }
So to update the existing policy, we add a new statement to the list and populate it with our account ID, IAM Account Role and Method ARN:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:sts::012345678910:assumed-role/snowflake-external-lambda-functions/snowflake" }, "Action": "execute-api:Invoke", "Resource": "arn:aws:execute-api:eu-west-2:012345678910:xxxxxxxxxx/*/POST/snowflake-sum" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:sts::012345678910:assumed-role/snowflake-external-lambda-functions/snowflake" }, "Action": "execute-api:Invoke", "Resource": "arn:aws:execute-api:eu-west-2:012345678910:xxxxxxxxxx/*/POST/snowflake-product" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:sts::012345678910:assumed-role/snowflake-external-lambda-functions/snowflake" }, "Action": "execute-api:Invoke", "Resource": "arn:aws:execute-api:eu-west-2:012345678910:xxxxxxxxxx/*/POST/snowflake-flood-monitoring-station-readings" } ] }
Make your changes, and remember to save!
Deploy New Resource, Method and Resource Policy to API Gateway Stage
This stage is nice and simple. Navigate to Actions
and Deploy API
to deploy the API:
Select the existing stage from the dropdown, enter an appropriate description and click Deploy
:
After deploying the API, locate and record the Invoke URL for the newly staged method:
Lambda Service Role: snowflake-lambda-service-role Account ID: 012345678910 IAM Account Role: snowflake-external-lambda-functions API Integration Name: aws_lambda Function ARN: arn:aws:lambda:eu-west-2:012345678910:function:snowflake-flood-monitoring-station-readings Method ARN: arn:aws:execute-api:eu-west-2:012345678910:xxxxxxxxxx/*/POST/snowflake-flood-monitoring-station-readings Method Invoke URL: <https://xxxxxxxxxx.execute-api.eu-west-2.amazonaws.com/snowflake-external-function-stage/snowflake-flood-monitoring-station-readings>
Create a New External Function Object in Snowflake
The final step for us is to add this function in Snowflake. This is achieved with a single SQL statement:
CREATE [OR REPLACE] external function <database>.<schema>.<function name>(variables) returns variant api_integration = <api integration object> as '<method invoke url>'
So for our example, this would be:
CREATE OR REPLACE external function external_functions.lambda.retrieve_flood_monitoring_station_readings(station_id string) returns variant api_integration = aws_lambda as '<https://xxxxxxxxxx.execute-api.eu-west-2.amazonaws.com/snowflake-external-function-stage/snowflake-flood-monitoring-station-readings>'
Once we have created our external function, we can test it very simply:
SELECT parse_json(external_functions.lambda.retrieve_flood_monitoring_station_readings('690250'));
If you are seeing an error similar to the below, it could be that your API gateway resource policy is incorrect or has not been deployed:
Request failed for external function RETRIEVE_FLOOD_MONITORING_STATION_READINGS. Error: 403 '{"Message":"User: arn:aws:sts::012345678910:assumed-role/Snowflake-External-Lambda-Functions/snowflake is not authorized to perform: execute-api:Invoke on resource: arn:aws:execute-api:eu-west-2:********xxxx:xxxxxxxxxx/snowflake-external-function-stage/POST/snowflake-flood-monitoring-station-readings"}'
Once your resource policy is correct and deployed (which should have occurred in step 4 above), your test should result in a table such as the following:
This gives us everything we need. We can now replace our original analysis view with a similar view that can also retrieve the latest metrics, directly parsing the datetime value while we are at it:
CREATE OR REPLACE VIEW analysis.stations AS SELECT items.value:"RLOIid"::number as rloi_id , items.value:"label"::string as label , items.value:"town"::string as town , items.value:"riverName"::string as river_name , items.value:"catchmentName"::string as catchment_name , items.value:"notation"::string as notation , to_timestamp(parse_json(external_functions.lambda.retrieve_flood_monitoring_station_readings(notation)):"datetime"::string, 'YYYY-MM-DD"T"HH24:MI:SS"Z"') as datetime , parse_json(external_functions.lambda.retrieve_flood_monitoring_station_readings(notation)):"value"::float as value , items.value:"@id"::string as id , items.value:"dateOpened"::date as date_opened , items.value:"easting"::number as easting , items.value:"northing"::number as northing , items.value:"lat"::float as lat , items.value:"long"::float as long , items.value:"stationReference"::string as station_reference , items.value:"status"::string as status , items.value:"wiskiID"::string as wiski_id FROM raw.raw_stations , lateral flatten (body:"items") as items -- WHERE clause to remove some problematic entries -- which are not relevant to this example WHERE NOT is_array(items.value:"northing") ;
Once this is set up, we can run a quick query on our view to retrieve the latest metrics for our nine flood-monitoring stations along the River Irwell:
SELECT * FROM analysis.stations WHERE river_name = 'River Irwell' ;
You should see a result similar to the following:
And there we have it! Whenever we need the latest data, we can query this view and the API will be accessed to retrieve the latest readings.
In Review
To recap: To add a new external function to our existing list, the following steps must be followed:
- Create a new AWS Lambda function which leverages the same service role
- Add a new resource and method to the API gateway that leverages this new AWS Lambda function
- Update the resource policy for the API gateway to allow access to the new method
- Deploy the new resource, method and resource policy to the API gateway stage
- Create a new External Function object in Snowflake
After creating your Lambda function, these steps are all quite quick to complete.
More Insights to Come
Hopefully this post and its predecessor have been useful for you and have provided a strong foundation for you to make your own Snowflake external functions with AWS Lambda. In the next post, we will be discussing some of the pros and cons of external functions, along with a few things to keep in mind. For a taster, I recommend playing around querying this final view; you may notice a few flaws or things to consider.