From Space to Database: Real-Time ISS Tracking Using Kafka and Snowpipe in Snowflake

Data

From Space to Database: Real-Time ISS Tracking Using Kafka and Snowpipe in Snowflake

Ever tracked astronomical objects at a planetarium, watched real-time flights via Flightradar24 or followed your pizza delivery make its way to your door? Each of these experiences relies on the magic of streaming data, specifically benefiting from row-level data ingestion to minimize latency and maximize efficiency.

With Snowflake’s Snowpipe, you can make data quickly accessible by automatically ingesting files, a process that is much faster than manually running COPY INTO commands for batch-loading data. Yet, when even faster data updates are essential, Snowpipe Streaming API steps in. This API allows for the writing of individual rows, rather than files, directly to Snowflake tables. This results in faster data availability and lower associated costs, making it an ideal solution for real-time data streaming applications

In this article, we’re going to guide you through the process of streaming real-time location data of the International Space Station (ISS) into Snowflake using Snowflake Connector for Kafka with Snowpipe Streaming API. The guide includes four main steps:

  1. First, we’ll set up a Kafka server.
  2. Next, we’ll produce data by calling an API.
  3. Then, we’ll set up the Snowflake Kafka connector.
  4. Finally, we’ll establish the data stream between Kafka and Snowflake.

To round it up, we’ll conclude with a screen recording that showcases what such a real-time data stream might look like in a Streamlit application.

Step 1: Setting up a Kafka Server

Just like a radio station broadcasting different shows on different frequencies, Kafka manages different streams of data through topics. Producers send data into these topics and consumers tune in to receive it, allowing for real-time data processing. Unlike traditional radio, which is generally one-way, Kafka enables two-way communication, letting data be both sent and received for more complex operations.

Create an EC2 Instance

Kafka requires a virtual machine (VM) or server with at least 2GB of RAM to operate. You can use an AWS EC2 instance, a virtual machine from Azure or Google Cloud, or even your own local machine. For this example, we are using an EC2 instance equipped with 2GB of RAM and running Amazon Linux.

Install Kafka

After your EC2 instance is up and running, connect to it using SSH. Setting up SSH out of the box can be challenging. For guidance on Windows, please refer to:

Next, you need to download Kafka. Make sure to download the binary version to avoid issues like: Classpath is empty. Please build the project first, e.g., by running 'gradlew jarAll'. First, visit the Apache Apache Kafka and locate the link for the latest version, as shown in the image. Then, use the wget command within your EC2 instance to download it. For example:

wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz

After downloading, extract the file using the following command. Make sure to modify the file name if you are using a different Kafka version.

tar -xzf kafka_2.12-3.5.1.tgz

Initialize Zookeeper

If Kafka were a radio station broadcasting various channels (topics) of data, then Zookeeper would be the control tower coordinating all the transmissions. To start Zookeeper, navigate first to the directory where Kafka is installed and launch Zookeeper with the following command:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start the Kafka Server

Open a new Command Line Interface (CLI) window, reconnect to your EC2 instance via SSH and switch to the Kafka directory. Start the Kafka server (aka broker) with the following command:

bin/kafka-server-start.sh config/server.properties

Create a Kafka Topic

To run Kafka, which operates on a Java platform, you’ll first need to ensure that Java is installed on your system. For instance, you can install Java 8 on Amazon Linux using the following command:

sudo dnf install java-1.8.0-amazon-corretto -y

To initialize a Kafka topic where data will be published, you’ll need to open another CLI window, reconnect to your EC2 instance via SSH, switch to the Kafka directory, and execute the following command:

bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  • --topic: Specifies the name of the topic.
  • --bootstrap-server: Specifies the server for the initial connection to the Kafka cluster.
  • 9092: This is the default port for Kafka.
  • --partitions: Specifies  the number of partitions for the topic.
  • --replication-factor: Sets the replication factor for the topic, which is useful for fault tolerance.

For detailed information on the flags and options for creating a topic, visit the Kafka’s Documentation.

Step 2: Data Ingestion into Kafka Topic

The Kafka console offers two key utilities for topic interactions:

  • kafka-console-producer.bat: Publishes data to Kafka topics.
  • kafka-console-consumer.bat: Consumes data from Kafka topics.

However, for this project, we’re using a Python script for more customized data handling. The script utilizes the confluent_kafka library to send messages to Kafka topics. To fetch data, it employs the requests library to regularly call an API provided by Open Notify. This open-source project offers an easy-to-use API for accessing various NASA data related to space and spacecraft.

Save the provided script on your EC2 instance as my_topic_producer.py. In the context of our demo, the script will call the API every 6 seconds and Kafka will flush 10 rows to Snowflake.

from confluent_kafka import Producer
import requests
import time
import json

def delivery_callback(err, msg):
    """Callback to handle message delivery reports."""
    if err:
        print(f"Failed to deliver message: {err}")
    else:
        print(f"Message sent to topic {msg.topic()}")

kafka_config = {'bootstrap.servers': 'localhost:9092',}     # Kafka settings
producer = Producer(kafka_config)                           # Initialize Kafka producer
api_url = "http://api.open-notify.org/iss-now"              # ISS API URL

while True:
    response = requests.get(api_url)                        # Fetch ISS location data from API
    if response.status_code == 200:
        api_data = response.json()                          # Convert API response to JSON
        producer.produce('my_topic', value=json.dumps(api_data), callback=delivery_callback)  # Send data to Kafka topic
        producer.flush()                                    # Wait for message to be sent
    else:
        print(f"API call failed. Status code: {response.status_code}")
    time.sleep(6)                                           # Wait 6 seconds before the next API call

Step 3: Setting up the Snowflake Kafka Connector

Download the Snowflake Kafka Connector

First, navigate to Maven Repository’s Snowflake Kafka Connector and choose the most recent version of the Snowflake Kafka Connector. Copy the link address as shown in the following image.:

Then, switch to the command line interface connected to your EC2 instance. To download the connector, navigate to the libs directory inside your Kafka folder and use the following wget command:

wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.0.1/snowflake-kafka-connector-2.0.1.jar

Create and Configure SF_connect.properties

The connector needs to be configured by creating a file specifying details like Snowflake credentials, topic names, and table names. Secure this file properly as it contain sensitive information like a private key. The connector uses key pair authentication, requiring a minimum 2048-bit RSA key pair. Generate this using OpenSSL and assign the public key to the Snowflake user (as shown in the following screenshot). For more on key pair authentication, refer to our related article.

Follow these steps:

  1. Go to the config directory within your Kafka folder.
  2. Create a new file called SF_connect.properties.
  3. Open the file to enter the necessary settings. Enter the blurred details with your own information as shown in the following screenshot.
  4. Save the file.

Notes:

  • To create an encrypted private key in Snowflake, you can omit the -nocrypt flag while generating it. Afterwards, you’ll need to include the snowflake.private key.passphrase configuration in your SF_connect.properties file.
  • Remove any delimiters from your private key.
  • Ensure you’re using an account with sufficient permissions to access the database, schema and create tables.
  • You don’t have to create a table or pipe manually; the connector takes care of it.
  • The lowest setting for buffer.flush.time is 10 second, which is how long the connector waits before sending cached records to Snowflake.

Update connect-standalone.properties

To ensure the correct function of the Snowflake connector and any other connector plugins, it’s essential to direct the Kafka Connect utility to the location where your connector plugins, including the Snowflake connector, are stored. Follow these steps within the config directory:

  1. Open the connect-standalone.properties file to edit.
  2. Uncomment the plugin.path line.
  3. Enter the path leading to the libs folder inside your Kafka directory.
  4. Save the changes and close the file.

Refer to the following image to see an example of the plugin.path setting we’re using for this demonstration:

Step 4: Establishing the Data Stream

To set up the data stream between Kafka and Snowflake, run the following command:

bin/connect-standalone.sh ./config/connect-standalone.properties ./config/SF_connect.properties

When you run this command, it triggers the Kafka Snowflake Connector using the settings in your pre-configured properties files. The connector then auto-generates a table with two VARIANT columns, named RECORD_METADATA and RECORD_CONTENT. In the same automated fashion, it also establishes an internal stage and a pipe. All of these components work together to efficiently funnel data from the stage to the newly created table, either immediately if the topic contains data or upon the arrival of the first record.

All that’s left is to start generating data. Open a new command line window, navigate to the Python script created earlier and run it:

python3 my_topic_producer.py

Bonus

Once the connection between Snowflake and the Kafka server is successfully set up, you unlock countless of opportunities for business analytics, application integration and even machine learning. In our specific use case, we’ve employed a Streamlit app to display the International Space Station’s (ISS) location. A screen recording, accelerated to 20x speed, is provided below to give you a glimpse of the app’s functionality.

We’re running the Streamlit app locally, but keep in mind that Streamlit is also featured directly in Snowflake in public preview. Our app uses pydeck for advanced visualization capabilities and updates every minute.

Wrap Up

In this article, we’ve shown how to use Snowflake Connector for Kafka With Snowpipe Streaming. This is a quick and effective way to move rows of data from Apache Kafka right into Snowflake tables. With just a ten-second delay, it’s almost like getting the data in real-time, which is great for many different use-cases.

To bring the concept to life, we gave a real-world example by showing how to send location data from the International Space Station (ISS) into Snowflake. This shows you just one of the many ways you could use this feature for your own needs.

If you have any questions or want to know how to stream your own specific data, feel free to reach out. Let’s discuss how this can work for you.

More About the Author

Fadi Al Rayes

Data Engineer
Simplifying Secure Access to Snowflake via Okta SSO This is the second and last part of the series on managing Snowflake users and roles via Okta. In our first part, we introduced a user ...
Streamlining User and Role Management in Snowflake via Okta In an insightful blog series, my colleague Danny dived into setting up SCIM and SSO with Azure Active Directory (AAD) – a ...

See more from this author →

InterWorks uses cookies to allow us to better understand how the site is used. By continuing to use this site, you consent to this policy. Review Policy OK

×

Interworks GmbH
Ratinger Straße 9
40213 Düsseldorf
Germany
Geschäftsführer: Mel Stephenson

Kontaktaufnahme: markus@interworks.eu
Telefon: +49 (0)211 5408 5301

Amtsgericht Düsseldorf HRB 79752
UstldNr: DE 313 353 072

×

Love our blog? You should see our emails. Sign up for our newsletter!