Building Efficient Event-Driven ETL Processes on Google Cloud: Best Practices, Correlation ID Tracking and Testing
ETL (Extract, Transform, Load) is a concept that became popular somewhere in the 1970s (Wikipedia). It provides a mechanism to literally extract, transform and load raw data into information, that is ready to be consumed.
You could compare ETL with a supply chain or a factory, where raw materials come in and a ready-for-sale product comes out.
ETL can be thought of as a manufacturing process that takes in raw data from various sources, transforms it into a format that can be used by data analysts and/or other end-users, and then stores it into a database or any other storage system.
Cloud platforms, like Google Cloud Platform (GCP), offer great ways to encapsulate ETL processes. In this article, we’ll dive deep into the techniques behind ETL processes, discuss some tips & tricks, show a very basic Python implementation and we’ll bring it to the cloud.
Enjoy reading and feel free to reach out if you have any questions or inquiries.
Event-Driven Architecture
In an event-driven architecture, code gets executed by means of events.
The code is orchestrated as functions in so-called workers, which are typically stateless processes. A worker doesn’t (and shouldn’t) have to know about its surrounding environment. It should just be concerned about incoming events (their structure/signature), processing it and pass it on, whether it be a new event or storing results in a database. An individual worker can already be thought of as a little ETL process by itself.
Queues and chaining events
Events are chunks of structured data that can be consumed and emitted. Events can, apart from being stored in a database, also be put on a queue for further processing. Other processes or workers can subscribe to such a queue to get notified and do their respective work. When a worker is done, it can emit a new event onto another queue, for another worker to work on. This way we can create a chain of multiple workers, processing a so-called event stream.
As we will find out later, when structuring the (meta) data of events, it’s a good habit to have a field for a correlation ID. This value will be passed on to each new event to be able to trace the progress of an individual chain. This is also useful to actually map existing chains, as they can get quite complex, and help operations to quickly diagnose and resolve issues.
Library of workers
As we will also find out later, all workers will eventually be part of a library. This is similar to any other code library (or standard library of the language) but now as standalone workers.
As we will explain in the testing section below, where it will become inevitable, workers should only know “at runtime” how they interact with the surrounding environment and thus their place in the event stream (between which queues). At runtime means that previous and the next queue, worker or process will be injected by, e.g., environment variables, when the worker is started.
This also means that new chains can be built from existing workers in the library. All coding guidelines, best practices and principles meant for regular programming with functions, also holds for workers. Keep them clean, concise focused on their intended purpose. Think of workers to perform specific, isolated tasks, rather than to act as mini-applications.
Cloud environments
In a cloud environment, workers are typically implemented as serverless functions and the amount of workers can automatically be adjusted with the level of congestion in the queues. With these techniques, a solid ETL process can be implemented. It’s a good practice to make use of the cloud features — such as scaling up when encountering congestion — in your event stream architecture.
Cloud platforms usually offer a (wide) range of products claiming to be best suited for a certain job. Very niche. In reality, while these cloud products might contain specific features, with respect to processing events, all ends up as serverless workers (or functions).
GCP
In this article, we will focus on the Google Cloud Platform (GCP). In GCP, Cloud Functions can be used as serverless workers. In Cloud Functions, you can write your stateless function in either .NET, Go, Java, Node.js, PHP, Python or Ruby. Within a single chain, multiple different programming languages may be used in different workers. However, in our examples, we will solely focus on Python.
Python is a common choice for ETL processes because of its flexibility, ease of use, and wide range of libraries and tools available for data manipulation and processing.
As a queueing mechanism, GCP offers Cloud Pub/Sub. Pub/Sub consists of topics and subscriptions. Events can be published onto a topic and subscriptions to that topic will automatically get a copy of that event and store it in a list (a queue) to be fetched by a worker and popped (removed) from the queue.
A typical (part of a) chain, expressed in GCP components, will look like this:
While the Functions might right now be clear to you, the Pub/Sub component might need some more explanation. Let’s start with explaining the concepts Topic and Subscription. We do this with an illustration that resembles the Pub/Sub logo:
As can be seen in the image above, Function A publishes events onto a Topic. A Topic can have multiple Subscriptions. Yet, Function B is subscribed to one of these. The other two Subscriptions are optional and can be used for other functions. This way you can fork a chain into multiple sub-chains, each with their own responsibilities and output.
So events can get published onto a topic and functions can subscribe to a subscription. Hence the name Pub/Sub, which in turn refers to the publish-subscribe pattern, that explains exactly this.
To illustrate the Pub/Sub component even deeper, have a look at the image below:
In the image above, the Subscription is shown as a Queue. New events get pushed from the left side and the oldest event get popped from the right side. M1 first, M2 second, etc.
In this particular setup, the popped events are being consumed by workers “exactly once”. Meaning, an event will only be processed by one worker. Function B is illustrated as a parallel process with two workers B1 and B2. As said before, this might happen when there’s congestion in the queue. In turn, both of these workers can write to a single Topic again to create a chain. In essence, Cloud Functions should always be thought of as a set of parallel workers.
Local development environment
The cloud (any cloud platform) offers a vast amount of ready-to-use services and features, to get up-and-running with these kind of architectures. This can get very complex quite quickly.
Serverless functions are a great way to efficiently process data, in the cloud, but on a local machine there is no such serverless environment, unless we build it ourselves.
Event-driven architectures, by definition, are also not very easy to comprehend and maintain, given the asynchronous nature.
Building and designing event-driven architectures, requires careful consideration of the complexities involved and proper documentation, implementation, testing, monitoring and management, to ensure their effectiveness and maintainability.
For a local development environment, this thus should be more than just a bunch of code that will eventually be deployed to Cloud Functions and that will to its magic there.
Testing
Any codebase should be covered with (unit) tests. We all know this. And preferably, the test suite should be able to be executed on the local machine of the developer, before git commit. We can — and must — unit test all individual serverless functions here. This should be easily doable by encapsulating the function in a mocked Pub/Sub object, that will feed the function with a prepared event and asserts the expected outcome of the resulting event. Doing so will immediately result in (essential) documentation.
This is just step one. To properly test an event-driven architecture, we must also validate the correctness of the event chain(s). This can be covered with an integration test and this test should actually also be able to be executed on a local machine (before going live).
Each function knows (at runtime) to which queue (or topic) it is subscribed to and to which queue it emits its own events to. As also said before, the knowledge of the queues surrounding the current function, should not be part of the function itself, but rather be external, e.g., in a configuration file or environment variables. The good news is that a function only has to know which queues it needs to resp. listen and send to, so this can be easily implemented.
In a test suite, environment variables can be easily mocked. This way, an integration test is relatively easy to set up.
Correlation ID tracking
With a correlation ID, you can track through which functions a stream of events has passed. In a linear architecture, where events flow sequentially through a single path, a correlation ID is less useful. But in a more complex tree-like (or graph-like) architecture, where events get “forked”, this becomes crucial to keep a grip on the process.
A correlation ID serves as a unique identifier that can be attached to events as they are emitted from a function, and can be used to trace the flow of events across different branches, helping to understand the sequence of events and their processing.
Apart from testing, a correlation ID helps to track and correlate events as they pass through different functions or components, enabling better visibility, traceability, and troubleshooting of event flows in the system.
Hands-on with Python
Let’s start with some code.
Consider the following code as our event structure:
import uuid
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
@dataclass
class Event():
id: str
correlation_id: str
occurred_on: datetime
data: dict
@staticmethod
def create(*, data: dict, correlation_id: str = ""):
if not correlation_id:
correlation_id = str(uuid.uuid4())
return Event(
id=str(uuid.uuid4()),
correlation_id=correlation_id,
occurred_on=datetime.now(timezone.utc),
data=data,
)
queues = defaultdict(list)
We define the class Event
that will be passed around via the queues
.
Now consider the following functions:
def start(*, subscribe: str = "", publish: str = "", value: int):
event = Event.create(data={"number": value})
queues[publish].insert(0, event)
def add_2(*, subscribe: str = "", publish: str = ""):
event = queues[subscribe].pop()
new_event = Event.create(
data={"number": event.data["number"] + 2},
correlation_id=event.correlation_id,
)
queues[publish].insert(0, new_event)
def end(*, subscribe: str = "", publish: str = ""):
event = queues[subscribe].pop()
print(event.data)
Function start
will be the entry point. It will generate the initial event and publishes it to a queue.
Function add_2
will pop an event from a queue, create a new event, changing the data by adding 2 to the number, reusing the correlation ID and publishes it onto another queue.
Function end
will pop an event from a queue and print it.
Respectively, start
, add_2
and end
are the “E”, “T” and “L” of ETL.
To run the process consider the following code:
if __name__ == "__main__":
start(publish="start", value=2)
add_2(subscribe="start", publish="middle")
add_2(subscribe="middle", publish="end")
end(subscribe="end")
This will print {'number': 6}
to the console.
Note the function add_2
is called twice. Earlier in this article we talked about a library of workers and about functions not knowing about their environment and place in the chain. This is such an example of reuse and the reason why functions (or workers) shouldn’t know about their place in the process. The unit test of add_2
should assert the addition of 2 to the provided number, but an integration test of the whole process should assert that being done twice (so assert 6 with value 2 as start).
At its core, this is a full example of a distributed ETL process. In this example we used the “pull” mechanism. This means we first run the function and then check if there are events on the queue. When the queues[subscribe].pop()
returns None
, the functions should just end. The alternative to “pull” is “push”. With “push” it’s the queue itself that triggers subscribed functions. This we will use below in our GCP example code.
The power of the correlation ID
Before we dive into GCP, we first show how to utilize correlation ID tracking.
Consider the following code:
paths = defaultdict(list)
def start(*, subscribe: str = "", publish: str = "", value: int):
...
paths[event.correlation_id].append("start")
def add_2(*, subscribe: str = "", publish: str = ""):
...
paths[event.correlation_id].append("add_2")
def end(*, subscribe: str = "", publish: str = ""):
...
paths[event.correlation_id].append("end")
if __name__ == "__main__":
...
print(paths)
Each function will append its own name (footprint) to a list that belongs to the correlation ID. The example code above would print something like: {‘691a0790–357c-414a-abcc-c931785106e9’: [‘start’, ‘add_2’, ‘add_2’, ‘end’]}
.
Of course this is just a simple example, but with many thousands of events crossing the ETL process, this concept makes reporting and debugging a feasible task.
Hands-on with GCP
In this section we’re just going to explain how to build the ETL process on GCP. We’re not going to explain how to setup a GCP project, enable all required APIs and install the Cloud SDK. You should thus have sufficient knowledge about Google Cloud Console to be able to reproduce the stack described below.
In essence, building an ETL process on GCP is identical to the Python example above, but with a bit more boilerplate. We switch the queues = defaultdict(list)
for Pub/Sub and wrap the functions in Cloud Functions. The start
will have an HTTP entry point, add_2
will subscribe to a Pub/Sub topic and push to another, and end
will write to a database, Firestore in our case. Furthermore, we will assign the respective topics to the Cloud Functions by environment variables.
Let’s start with the environment variables. We need three and we’ll declare them right beneath the imports (also a few more than before):
import base64
import json
import os
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "")
PROJECT_ID = os.getenv("PROJECT_ID", "")
PUBLISH_TOPIC = os.getenv("PUBLISH_TOPIC", "")
The COLLECTION_NAME
is the name of the database collection in Firebase. The PROJECT_ID
is the name of the project you’ve created in GCP. The PUBLISH_TOPIC
is the topic each respective Cloud Function will publish to in Pub/Sub.
Next we need a projector to publish events to Pub/Sub:
class PubSubEventProjector:
def project(self, event: Event):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, PUBLISH_TOPIC)
def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
data = json.dumps(event.__dict__, default=default).encode("utf-8")
publisher.publish(topic_path, data=data).result()
The project
function takes an Event
as parameter. It’s the same Event
class as before. In the project
function we connect to Pub/Sub, define the absolute path to the Pub/Sub topic, convert the event into a json byte string (while taking care of datetime objects) and publish the data to the topic.
Since we’re going to store data in Firestore, we also need a repository class:
if COLLECTION_NAME:
import firebase_admin
from firebase_admin import firestore
firebase_admin.initialize_app()
client = firestore.client()
class FirestoreEventRepository:
def add(self, event: Event) -> Event:
collection = client.collection(COLLECTION_NAME)
collection.document(event.id).set(event.__dict__)
return event
Notice that we connect to Firestore only when we have a COLLECTION_NAME
defined and this is done outside of the class. This has to be done only once for the Cloud Function, not each time we instantiate the class.
In the class we only define the function add
. In reality you would also have functions like update
, delete
, find
, etc., but in our example we don’t need it. Feel free to play with it yourself.
Inside the add
function we connect to a collection and add the event as passed as parameter.
Now the functions:
def start(request):
data = request.get_json()
event = Event.create(
data={"number": data["number"]},
)
projector = PubSubEventProjector()
projector.project(event)
return "ok"
def add_2(event, context):
pubsub_message = base64.b64decode(event["data"]).decode("utf-8")
event = Event(**json.loads(pubsub_message))
new_event = Event.create(
data={"number": event.data["number"] + 2},
correlation_id=event.correlation_id,
)
projector = PubSubEventProjector()
projector.project(new_event)
def end(event, context):
pubsub_message = base64.b64decode(event["data"]).decode("utf-8")
event = Event(**json.loads(pubsub_message))
repository = FirestoreEventRepository()
repository.add(event)
Notice that they don’t differ that much from our earlier functions. The start
function now is a Flask route and in the request body we find our input value as json.
The functions add_2
and end
now are Pub/Sub subscription endpoints/handlers (using “push” mechanism). They get the event data as parameter encoded as byte string. After decoding it we can reconstruct the Event
and resp. project to another topic and add to Firebase.
The only thing missing still is the topics the functions subscribe to. This is implemented outside of the Python code but part of the deployment process.
Let’s say all the code above resides in a file called main.py
. We also need a requirements.txt
file and both reside in a folder called demo_gcp
. The folder structure should look like this:
demo_gcp/
├── main.py
└── requirements.txt
The contents of the requirements.txt file are as follows:
firebase-admin
google-cloud-pubsub
Now we can deploy the Cloud Functions. To do this we can use a handy bash script:
#!/bin/bash -ex
if [[ -z $1 ]];
then
echo "No parameter passed. Usage: ./deploy-etl-demo-gcp.sh PROJECT_ID"
exit 1
fi
PROJECT_ID=$1
REGION="europe-west1"
SOURCE="demo_gcp"
gcloud functions deploy demo-etl-start --region=$REGION --source=$SOURCE --entry-point=start --runtime=python311 --ingress-settings=internal-only --trigger-http --set-env-vars=PUBLISH_TOPIC=demo-etl-start,PROJECT_ID=$PROJECT_ID &
gcloud functions deploy demo-etl-add_2_a --region=$REGION --source=$SOURCE --entry-point=add_2 --runtime=python311 --ingress-settings=internal-only --trigger-topic=demo-etl-start --set-env-vars=PUBLISH_TOPIC=demo-etl-middle,PROJECT_ID=$PROJECT_ID &
gcloud functions deploy demo-etl-add_2_b --region=$REGION --source=$SOURCE --entry-point=add_2 --runtime=python311 --ingress-settings=internal-only --trigger-topic=demo-etl-middle --set-env-vars=PUBLISH_TOPIC=demo-etl-end,PROJECT_ID=$PROJECT_ID &
gcloud functions deploy demo-etl-end --region=$REGION --source=$SOURCE --entry-point=end --runtime=python311 --ingress-settings=internal-only --trigger-topic=demo-etl-end --set-env-vars=COLLECTION_NAME=demo-etl-events,PROJECT_ID=$PROJECT_ID &
In the example we will deploy the code into region europe-west1
of GCP. Feel free to adjust to your preferred region.
The gcloud
commands will each deploy a different Cloud Function. They all deploy the same source code, but the parameter --entry-point
will dictate the correct function to be called.
Another important parameter is --trigger-topic
. This dictates the topic each respective Cloud Function subscribes to (if applicable). Notice the topics to publish to being provided as environment variable via --set-env-vars
.
Notice each gcloud
command ends with &
. This means they’ll all run in parallel in the background. This will save you quite some time as each deploy will take around 2 minutes.
Let’s put the bash script in a file called deploy-etl-demo-gcp.sh
. Make sure this file is executable, run: chmod +x deploy-etl-demo-gcp.sh
.
Your folder structure should now looks like this:
src/
├── demo_gcp/
│ ├── main.py
│ └── requirements.txt
└── deploy-etl-demo-gcp.sh
Now execute the script, pass the name of your GCP project and have patience: ./deploy-etl-demo-gcp.sh project-id
. If you’re on Windows, you’ll obviously need another way to run the commands.
When done, navigate to the start function (change the region if necessary): https://console.cloud.google.com/functions/details/europe-west1/demo-etl-start?tab=testing.
Enter {"number": 2}
in the textarea and hit “test the function”.
Meanwhile head over to https://console.cloud.google.com/firestore/databases/-default-/data/panel/demo-etl-events and notice the processed event appearing in the collection. You might need to reload the page a few times. In the end it should look like this:
Congratulations! You now have a fully working, distributed, event-driven ETL process implemented on GCP!
Next steps
The next steps for an ETL process on GCP include reporting and monitoring around the correlation ID. We could use another Firestore collection with a record per correlation ID and update it in every function. It could contain a list of function names the event-stream has passed, similar to our Python example above.
Another step to think about is putting events apart that encountered an error in a certain function. This is called dead-lettering. A dead-letter queue is an additional queue next to the original (operational) queue (or topic). Events on a dead-letter queue can be investigated and eventually retried on the regular queue. It’s depending on business rules whether an erroneous event should be retried. For a financial application this will be different than for a temperature sensor (most probably).
Conclusion
Event-driven architectures can be hard to understand and can get complex very quickly. This gets exponentially worse when a single function has too much (business) logic, is too distributed and/or isn’t documented or not tested.
In this article the event-stream was very straightforward, but consider business logic at play to dynamically decide the next topic to publish to. This too will increase complexity drastically.
Try to keep functions as simple as possible and split functions to leverage cloud features like scaling up in case of congestion. But don’t overdo splitting functions. There should be a healthy balance between operational excellence and pragmatism.
Keep in mind that distributed systems that can only run in a remote environment, like GCP, and cannot be executed separately, e.g., on a local machine for integration testing purposes, is a red flag for maintainability. You will need to invest a lot of time and money into these kind of systems to keep them manageable. This includes hiring new team members and seeing the original developers of it leave (another red flag).
Some final notes about testing. Tests are crucial for event-driven architectures and distributed systems in general. Because of the complexity. Developers that get onboarded with an existing process, have a hard time to understand it and feel confident enough to even touch the code and make changes to it. Tests are their safety net and a form of documentation at the same time.
For event-driven systems, unit-testing is actually relatively easy, because you just have to mock the incoming and outgoing topics and assert the events published to it. The correlation ID will be of great help when writing integration tests.
Feel free to reach out if you have any questions, inquiries or remarks regarding this article. I’m happy to help. You can reach out to me here or on my personal website.