This blog post is Human-Centered Content: Written by humans for humans.
In today’s fast-paced urban environments, efficient traffic management is crucial for reducing congestion, improving safety and optimising public transport. Real-time data streaming enables city planners and transport companies to make instant, data-driven decisions. While traffic management is a compelling example, it’s just one of many use-cases for real-time streaming — others include financial transaction monitoring, IoT device telemetry and online retail analytics.
In this article, I’ll use the traffic scenario as a practical context to illustrate how to build a streaming solution using AWS Kinesis Data Streams, Firehose and Snowflake. There are many ways to approach this scenario. One, for example, can be found in this article that explains how to use Snowpipe and Kafka together.
Before we get into the actual implementation itself, let’s define in broad terms the technology stack we’ll use. Amazon Kinesis is a managed service that provides tools for real-time data streaming. Within Kinesis, Kinesis Data Streams allows you to continuously capture and store large volumes of real-time data from sources such as sensors, GPS devices or application logs. To process and deliver this data in batches, you can use Kinesis Data Firehose, which reads data directly from Kinesis Data Streams and automatically delivers it to a specified destination — in our case, Snowflake. For further reading on real-time streaming architectures, see this AWS blog on integrating Firehose with Snowflake, which served as a reference point for some of the ideas in this post.
Above: High Level Architecture
Requirements to follow this article:
- Snowflake account with permission to create users, roles, databases and tables.
- An AWS IAM role with permissions to create and manage Kinesis resources.
- An existing S3 bucket.
Step 1: Snowflake Setup
You can find all the SQL and Python code I will use in the article in this public repository.
First, we’ll provision a service user for AWS to adopt so that Firehose can write directly to Snowflake. We’ll use key pair authentication for enhanced security – you can follow the steps as outlined here. Securely store the private key generated here, but we’ll need to use it a bit later:
-- Use a high-level role with privilege to create users and grant privileges USE ROLE SECURITYADMIN; CREATE OR REPLACE USER svc_aws_stream PASSWORD = '<your_password>' LOGIN_NAME = 'my_service_user' DEFAULT_ROLE = public MUST_CHANGE_PASSWORD = FALSE EMAIL = '<your_email@example.com>'; ALTER USER svc_aws_stream SET RSA_PUBLIC_KEY='<your_RSA_public_key>'; ALTER USER svc_aws_stream UNSET PASSWORD; GRANT ROLE SYSADMIN TO USER svc_aws_stream;
Let’s now provision a database, schema and table where we’ll stream the data into:
USE ROLE SYSADMIN; CREATE DATABASE IF NOT EXISTS traffic_db; CREATE SCHEMA IF NOT EXISTS traffic_db.raw_data; CREATE OR REPLACE TABLE traffic_db.raw_data.traffic_events ( eventId NUMBER, sensorType VARCHAR, location VARCHAR, eventTime TIMESTAMP_NTZ, trafficSpeed NUMBER, congestionLevel NUMBER, incidentType VARCHAR, vehicleCount NUMBER, avgVehicleLength NUMBER, weatherCondition VARCHAR, temperature NUMBER, laneClosed BOOLEAN, reportedBy VARCHAR );
Finally, let’s make sure we grant the required privileges to the role to be used by Firehose:
-- Use a high-level role with privilege to create roles and grant privileges USE ROLE SECURITYADMIN; -- 1. Create the custom role CREATE ROLE kinesis_firehose_role COMMENT = 'Role for AWS Kinesis Firehose to ingest traffic data'; -- 2. Grant required privileges to the role GRANT USAGE ON DATABASE traffic_db TO ROLE kinesis_firehose_role; GRANT USAGE ON SCHEMA traffic_db.raw_data TO ROLE kinesis_firehose_role; GRANT INSERT, SELECT ON TABLE traffic_db.raw_data.traffic_events TO ROLE kinesis_firehose_role; -- 3. Assign the role to your service user GRANT ROLE kinesis_firehose_role TO USER svc_aws_stream;
Great, we now have the Snowflake setup complete. We’ve created a role that follows the least-privilege principle and assigned it to a role with a key pair, so that keeps things nice and secure. Snowflake is now ready for us to start streaming data.
Step 2: AWS Setup
Let’s now create the data stream first. I will name this “Traffic-Stream” and leave all the rest of the setup as per the default settings – essentially with on-demand mode and a 1-day data retention period:
Above: Kinesis Data Stream created
Now let’s create the Kinesis Data Firehose. I’ve listed below all the items I changed. Assume all the rest is as per default.
- Source drop-down: Kinesis Data Stream
- Destination drop-down: Snowflake
- Firehose stream name: Leave the default
- Source settings. Kinesis data stream: Browse to select the data stream created earlier.
- Destination settings.
- Input your Snowflake account details
e.g. xxxxx.snowflakecomputing.com - User: svc_aws_stream
- Private key: Enter the private key generated earlier. Exclude the -BEGIN prefix and footer -END. If the key is split across multiple lines, remove the line breaks so it’s all a single long string.
- Input your Snowflake account details
-
- Use custom Snowflake role: kinesis_firehose_role
- Under Database configuration settings, enter the database, schema, and tables created earlier.
- Backup settings. Browse to select an existing S3 bucket. This will be solely to log error messages, so it may not be used at all.
With the above, we configured a Kinesis Data Firehose stream to read from an existing Kinesis Data Stream and deliver data directly to Snowflake using our service user and custom role. We also specified the target details and selected an S3 bucket for any error logs. Click now on create and, after a couple of minutes, the stream should be created.
Step 3: Testing the stream
Let’s do a bit of basic testing. Firehose does provide a feature to test whilst you are configuring it but I will just use the command line in AWS CloudShell to simulate live traffic sensor events.
The command I will run is:
aws kinesis put-record \ --stream-name <stream-name> \ --partition-key <partition-key> \ --data '<json-payload>' \ --cli-binary-format raw-in-base64-out
Which in my case looks like this:
aws kinesis put-record \ --stream-name Traffic-Stream \ --partition-key sensor1 \ --data '{"eventId":101,"sensorType":"loop_detector","location":"Sydney-GeorgeSt-5","eventTime":"2025-05-30T14:00:00Z","trafficSpeed":28,"congestionLevel":3,"incidentType":"accident","vehicleCount":15,"avgVehicleLength":4.2,"weatherCondition":"rain","temperature":17,"laneClosed":true,"reportedBy":"city_traffic_monitor"}' \ --cli-binary-format raw-in-base64-out
When I enter this command, I get the below response, which confirms the record was successfully sent to the Kinesis stream, the shard it landed in and its unique sequence number. If you now query the table in Snowflake you should your record inserted.
(Optional) Step 4: Assembling a Small Testing Application
In a production scenario, IoT devices, GPS units and vehicle telematics systems would send JSON events to the Kinesis stream. For example, let’s image a traffic camera is pushing live data every few seconds.
To illustrate this, I will spin up a basic EC2 instance to represent that application. I will deploy that instance in the same region as Kinesis. This instance will also need outbound networking access.
This instance will have a Permission Policy like this:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:<region>:<account>:stream/<stream name>" } ] }
Depending on your Linux distribution, you may need to verify that Python is installed by checking for its presence and version. Since some systems come with Python pre-installed, while others may require manual installation. Next, I will just create a basic Python script:
nano send_kinesis_events.py
The script below simulates sending 10 traffic sensor events with random details like speed, location and weather. It sends one event every second, pretending to be a city traffic monitor reporting real-time data. Note that most values are hard-coded, but for the purposes of testing it will do.
import json import random import time import subprocess STREAM_NAME = "Traffic-Stream" sensor_types = ["loop_detector", "camera", "radar"] locations = ["Sydney-GeorgeSt-5", "Sydney-PittSt-2", "Sydney-KentSt-9"] incident_types = ["accident", "construction", "none", "breakdown"] for i in range(10): payload = { "eventId": 100 + i, "sensorType": random.choice(sensor_types), "location": random.choice(locations), "eventTime": "2025-05-30T14:00:{:02d}Z".format(i), "trafficSpeed": random.randint(20, 80), "congestionLevel": random.randint(1, 5), "incidentType": random.choice(incident_types), "vehicleCount": random.randint(5, 30), "avgVehicleLength": round(random.uniform(3.0, 6.0), 1), "weatherCondition": random.choice(["clear", "rain", "fog"]), "temperature": random.randint(10, 30), "laneClosed": random.choice([True, False]), "reportedBy": "city_traffic_monitor" } command = [ "aws", "kinesis", "put-record", "--stream-name", STREAM_NAME, "--partition-key", "sensor1", "--data", json.dumps(payload), "--cli-binary-format", "raw-in-base64-out" ] print(f"Sending event {i+1}...") subprocess.run(command) time.sleep(1)
Once the script is ready I will run it and, sure enough, I am getting the response from Kinesis:
python3 send_kinesis_events.py
And if I now query my table in Snowflake (the one I defined earlier when setting up Firehose), I should see how all the records are being steamed:
Conclusion
In this article, we’ve walked through setting up a real-time traffic data pipeline using AWS Kinesis and Snowflake, from configuring your Snowflake environment to streaming and testing live data. We also explored how to simulate real-world scenarios with a simple testing application. This approach can be adapted for a range of smart city and transport analytics needs. If you have any questions or want to discuss your own use case, feel free to reach out.