Outshift Logo

INSIGHTS

42 min read

Blog thumbnail
Published on 08/16/2022
Last updated on 04/05/2024

Taming of the queue: How to manage your data pipelines

Share

Building dynamic data pipelines using AWS DocumentDB, MSK, and Lambda

TamingQueue1

There are many data driven applications that require online processing of data as well as storing the raw data. Examples include recommendation engines, IoT processors, event-driven services, and more. If you’ve ever needed to build a data processing pipeline or workflow, you’re probably familiar with the challenges of handling multiple data types while accounting for the possibility of new data types coming in during the lifetime of your system. 

In this article, I will discuss a solution I came up with for a recent project using AWS DocumentDB, MSK, and Lambda functions, and will provide instructions for deploying a simple pipeline along with useful Go code snippets.

What is a data pipeline?

A quick note on data pipelines: they allow you to process, manage, and store your data. Just as a pipeline helps you manage water flow automatically, your data pipelines are the conduits through which your information flows. The better you can construct these pipelines, the better your enterprise technology will support all of your business processes.

The design of your data processing pipelines

When building a data processing pipeline, one typically needs two components: data transmission and processing blocks. Processing blocks are manipulations applied to the incoming data in a given sequence that may change depending on the type of data, while the data transmission is how data is moved between the various processing blocks. You can think of this as a production line where the conveyor belt moves the products in their various stages between the various stations until the final product comes out the end of the line. 

The conveyor belt is our data transmission and the stations are the processing blocks. More often than not, you also want a means of storing the raw data because you never know what you may want to do with it later on, or if some error would force you to run your data through the pipeline again. My design requirements were simple:

  1. Build a fully managed system
  2. Keep a copy of all the raw data
  3. Process data in near real-time and in low-latency
  4. Account for the possibility of new data types coming in over time without requiring system downtime

After looking into several options, I settled on the following design for my data pipeline:

[caption id="attachment_2079" align="aligncenter" width="744"]TamingQueue2 Architecture Diagram[/caption]

Let's start by reviewing the building blocks we will use to understand what they are and how we can use them: AWS DocumentDB is Amazon’s managed MongoDB service based on MongoDB 3.6 or 4.0. As such, it can store, query and index JSON and BSON data. 

MongoDB is a source-available NoSQL JSON database that uses JavaScript as the basis for its query language thus allowing you to also run JavaScript functions server-side. Within a MongoDB deployment, one can define multiple DBs with multiple collections in each one. One useful feature introduced in Mongo 3.6 are Change Streams. Change Streams allow applications to access real-time data changes by registering for events on specific collections. 

The event notifications can be configured to include the deltas or full documents and capture “insert,” “change,” and “delete” events. AWS MSK (Managed Streaming for Apache Kafka) is Amazon’s managed Kafka service. Apache Kafka is a widely used open-source distributed event store and stream-processing platform. Kafka is designed for high-throughput, low-latency real-time data processing. With Kafka you can define “topics” to publish key/value messages. Multiple publishers can publish a topic and multiple consumers can consume it. Kafka topics are managed using a ZooKeeper cluster while publishing and consuming are done via Kafka brokers. 

AWS Lambda is a serverless, event-driven compute service that lets you run code in response to various events and triggers while automatically managing the underlying compute resources required by your code. Lambda supports many programming languages including Node.js, Python, Java, Ruby, C#, and Go. 

A very useful feature of Lambda is the ability to trigger functions on Kafka topics, which makes it ideal for usage as part of a data processing pipeline. AWS VPC is a way to create Virtual Private Networks within AWS. This allows you to contain services in a more secure environment and gives you total control over who and what has access to the network and the resources within. AWS EC2 is Amazon’s Elastic Compute Cloud where you can deploy virtual or physical instances (computers) and manage their security and networking properties. 

AWS S3 is a Simple Storage Service. With the S3 object storage you can create buckets and store files and folders within them. S3 gives you full access control and security. AWS IAM is Amazon’s Identity and Access Management system. IAM lets you manage users, roles, and policies so you can achieve fine-grained access control over your resources and grant access to other users and AWS accounts in a secure way. AWS CloudWatch is Amazon’s observability platform where you can aggregate logs from AWS services and easily filter and search through them.

Putting things together

The idea is that DocumentDB is used as the entry point into the pipeline while MSK acts as the data transmission. Each path between processing blocks is implemented using a Kafka topic. One processing block publishes its output to the topic while the next block in line consumes the topic to get its input. 

The first processing block will act as a “router” that analyses the new data and decides its type. It publishes the data to a dedicated topic for that data type so the proper processing blocks can be used on that data. I start by inserting a new piece of raw data into DocumentDB. Next, I use an MSK connector to register to a change stream for my DocumentDB collection and push the newly inserted documents into an initial MSK topic, which is the input to the “router.” 

Then, I configure a Lambda-based “router” function to consume the initial MSK topic, analyze the messages and publish each one to a dedicated MSK topic. Finally, for each data-type dedicated topic, I would have a specific Lambda function that knows how to process that data. I can then continue building more processing elements on the pipeline as required. 

Once all the pieces are in place, all I have to do to run new data through the pipeline is simply insert it into my DocumentDB collection. From that point on, everything happens automatically. Moreover, by combining Kafka topics and Lambda functions, I can dynamically create topics for new types of data messages and then define handlers to process them. The messages would wait in the topic until I build a processor that can handle them, and as soon as I deploy the new processor, it can start processing the messages, which means messages are never lost. 

This design also allows me to dynamically change the layout of my processing pipeline over time. To configure DocumentDB and MSK, I use a bastion instance that I deploy on EC2. This instance allows me to connect to my VPC using a secure SSH connection as well as port forwarding to give my local environment access to the VPC. I use an S3 bucket to store the Kafka connector package and my Lambda functions’ code package. In addition, I use IAM to create the required execution roles for the Kafka connector and the Lambda functions. 

Finally, I use CloudWatch to gain visibility into what the Lambda functions are doing by funneling the Lambda logs into CloudWatch log groups. Let us now go over each of the components and see how to provision and/or deploy them.

VPC gateway

DocumentDB as well as MSK are both deployed only in a VPC. To connect to them from your local machine for development, testing, and debugging, you need to create a gateway into your VPC. We will use the default VPC, but any VPC can be used instead. 

Please refer to the AWS documentation for information on how to create a new VPC in case the default one is not appropriate. We start by creating an EC2 instance that we will use as our gateway. Simply launch a new EC2 instance in the default VPC and choose Ubuntu 20.04 as the OS image (there is no support for the mongo CLI in Ubuntu 22.04 at the time of writing this article):

[caption id="attachment_2078" align="aligncenter" width="624"]TamingQueue3 Launch New EC2 Instance[/caption]
 

Next, create an SSH key-pair that you will use to access the new instance from your local machine. Click on “Create new key pair” to create a new key pair and download the public key, or choose an existing one:

[caption id="attachment_2077" align="aligncenter" width="741"]TamingQueue4 New EC2 Instance SSH Key Pair[/caption]

Next, we look at the “Network Settings” section. Make sure you select the VPC you wish to use and the security group:

[caption id="attachment_2076" align="aligncenter" width="897"]TamingQueue5 New EC2 Instance Network Settings[/caption]

Finally, launch your new instance. Once your instance is up, you can SSH into the new instance:

[caption id="attachment_2075" align="aligncenter" width="975"]TamingQueue6 SSH Into the New Instance[/caption]

DocumentDB

Now that we have a VPC, we can start looking into deploying DocumentDB

TamingQueue7Architecture — DocumentDB

Start by going into the Amazon DocumentDB dashboard and click on “Create Cluster.” Give your cluster a name, make sure the selected engine version (MongoDB version) is “4.0.0” and select the desired instance class. The connectors we will use to let MSK register to the DocumentDB change stream require that the MongoDB deployment be part of a replica-set so make sure the number of instances is greater than 1: [caption id="attachment_2073" align="aligncenter" width="975"]TamingQueue8 DocumentDB — Launch New Instance[/caption] 

Next, under the authentication section, fill in the admin username and password you would like to use in your cluster. Now, click on the “Show advanced settings” toggle at the bottom to open the network settings and make sure that the selected VPC is the same as the one in which you deployed your EC2 instance:

[caption id="attachment_2072" align="aligncenter" width="975"]TamingQueue9 DocumentDB — New Instance Network Settings[/caption]

Tweak any other settings and then click on the “Create cluster” button at the bottom to launch the new cluster. The process takes a few minutes and then you will see the following or similar according to your choices:

[caption id="attachment_2071" align="aligncenter" width="1024"]TamingQueue10 AWS Document DB Cluster[/caption]

To test our new cluster, we need to install the mongo client in our EC2 gateway instance. Follow these instructions to do so: https://www.mongodb.com/docs/mongodb-shell/install/ 

Next, go into the cluster details in the AWS console and follow the instructions to download the CA certificate to your EC2 instance. Next, run the newly installed mongo client to connect to your new cluster:

[caption id="attachment_2070" align="aligncenter" width="1008"]TamingQueue11 Connecting to DocumentDB using mongo shell[/caption]

If the connection fails, you may need to manage the DocumentDB cluster’s security group to allow access to your EC2 instance’s security group. To do so, go into the cluster’s details and scroll down to the “Security Groups” section:

[caption id="attachment_2069" align="alignnone" width="1024"]TamingQueue12 AWS Security Groups[/caption]

Select the security group and then go into the “Inbound rules” tab:

[caption id="attachment_2068" align="aligncenter" width="1024"]TamingQueue13 AWS Security Group Inbound Rules[/caption]

Click on the “Edit inbound rules” button to edit the inbound rules and then add a rule that allows the traffic type you need (if you need to specify a port, use 27017) from a “Custom” source. In the search box, search for and select the security group you used for the EC2 instance:

[caption id="attachment_2067" align="aligncenter" width="1024"]TamingQueue14 Adding Inbound Rule to AWS Security Group[/caption]

Finally, save the rules. You should now have access from your instance to DocumentDB. To make development and debugging easier, you may want to use a tool such as Robo 3T,that lets you access MongoDB using a nice GUI that is intuitive and easy to use and that lets you view and manage data conveniently. You will need to forward port 27017 from your local machine to DocumentDB via the EC2 instance using SSH:

[caption id="attachment_2066" align="aligncenter" width="1024"]TamingQueue15 SSH to the Gateway with Port Forwarding to DocumentDB[/caption]

Now you can configure your local Robo 3T or mongo client to access port 27017 on your local machine:

[caption id="attachment_2065" align="aligncenter" width="1024"]TamingQueue16 Robo 3T — New Connection[/caption]

For Robo 3T, make sure to allow invalid hostnames because your local hostname is different than the one in the CA certificate:

[caption id="attachment_2064" align="aligncenter" width="1024"]TamingQueue17 Robo 3T — setting CA Certificate[/caption]

Now, we can create a new Database called “pipeline” and in it a collection called “intake”. We also create a new user called “puser” that has read permissions for the “pipeline” database:

[caption id="attachment_2063" align="aligncenter" width="608"]TamingQueue18 Robo 3T — After Creating A Collection and A User[/caption]
 

The last thing we need to do is enable change streams on our new collection. To do that we need to connect to DocumentDB as we did above and then run the following command:

db.adminCommand({modifyChangeStreams: 1,
                 database: "pipeline",
                 collection: "intake", 
                 enable: true});

If you are using Robo 3T as I do, right-click on the "pipeline" database in the tree on the left and then select "Open Shell." Now you can enter the above command and use CTRL+ENTER to execute it.

MSK

Now that we have DocumentDB set up, we can move on to MSK.

[caption id="attachment_2062" align="aligncenter" width="1024"]TamingQueue19 Architecture — MSK[/caption]

We will be deploying our MSK cluster using the “Quick create” option. For this little demo, we will use the “kafka.t3.small” flavor and allocate only 1GB of space. If you need to change the network settings to choose a different VPC, zones and subnets, you will have to switch from “Quick create” to “Custom create." In any case, ensure that your MSK cluster is in the same VPC and subnets as the gateway EC2 instance. Otherwise, you would have to start configuring routing between the VPC or subnets, which we will NOT cover in this article. When done, click on the “Create cluster” button and wait until your cluster is up: [caption id="attachment_2061" align="aligncenter" width="1400"]TamingQueue20 MSK Cluster[/caption] 

To test our cluster, we need to get the brokers’ addresses. Click on the cluster, select the “Properties” tab and scroll down to the “Brokers” section. There, you will find a list of the brokers that have been deployed as part of the cluster:

[caption id="attachment_2060" align="aligncenter" width="1024"]TamingQueue21 MSK Cluster Brokers[/caption]

Managing a Kafka cluster is done using the Kafka CLI tools. The CLI tools require the Java runtime as Kafka is written in Scala which runs on a JVM (Java Virtual Machine). We are using openjdk-8-jre. Now, download the Kafka package from https://kafka.apache.org/downloads to the EC2 instance and extract it. For this document, we are using Kafka 2.13–3.1.0 . Next, use the “kafka-topics” command to get the list of existing topics. You need to provide a bootstrap server, which can be any of the brokers in the cluster (we use the first one):

[caption id="attachment_2059" align="alignnone" width="1024"]TamingQueue22 List Kafka Topics[/caption]

Please note that port 9092 does NOT use TLS. If you wish to use a secure TLS connection, you should follow these steps:

  1. Create a client profile:

    TamingQueue23
  2. Create the initial trust store:

    TamingQueue24

Note that the location of the “cacerts” file will change according to the JRE you installed on your machine.

  1. Finally run the command as follows:

    TamingQueue25

Now that we have our MSK cluster deployed and accessible, we can create our initial topic:

TamingQueue26

Let us publish a test message to our new topic:

TamingQueue27

We can see that our new message was, indeed, published and can be consumed as well. But how do we clear out a topic? There is no way to directly delete messages from a topic. Instead, we have to change the retention policy and wait for Kafka to delete all the expired messages for us before we can restore our original retention policy. First, we get the current settings and see that there is no policy set:

TamingQueue28

Thus, the default policy applies which is 7 days (https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html). We now change that policy to one second:

TamingQueue29

It takes a little while for the new policy to take effect, but once it does, we can run our consumer and see that there is nothing in the topic:

TamingQueue30

Finally, we can restore the default policy:

TamingQueue31

S3

If order to create a Kafka MongoDB connector and to make reusing Lambda code easier, we need to create an S3 bucket where the code packages would be kept. To do that, start by going to the S3 dashboard and create a new bucket by clicking the “Create bucket” button. Fill in the name for your new bucket, select the requested region and then scroll down and click on the “Create bucket” button. We keep all the default options for now, but you can play with them later if you wish to change anything.

IAM execution role

Our next challenge is tying DocumentDB to Kafka so that inserting new documents into DocumentDB automatically puts notifications with the full document data into a given Kafka topic. For this, we will use a Kafka connector that will register for a Mongo change stream for our collection and then publish the new documents to the chosen Kafka topic. 

We will start by creating an IAM execution role for our new connector. Note, that when creating a connector, AWS will allow you to create an execution role. However, it turns out that due to some changes made by AWS to how execution roles work, using this option results in a Service Linked role that is not usable by MSK Connect. AWS is aware of this issue but has not fixed it as of the date of writing this article. So, we need to create our own role manually. 

Go to the IAM console, select the “Roles” section on the left and then click on “Create role” on the top right:

[caption id="attachment_2049" align="aligncenter" width="1024"]TamingQueue32 IAM — Create New Role[/caption]

Next, select the “AWS account” option and then click “Next." At this point you can select a policy to use. None of these policies are good for us so just click “Next” again. Now give your role a name and description and then scroll to the bottom and click “Create role”. Now that we have a role, we need to configure the proper permissions so find your role in the list of roles and click on it. Under the “Permissions” tab click on the “Add permissions” button to open the drop-down menu and select “Create inline policy”:

[caption id="attachment_2048" align="aligncenter" width="1024"]TamingQueue33 IAM — Creating a New Policy For MSK Connect[/caption]

We would now like to manually enter a policy so select the “JSON” tab and then replace the existing empty policy with the following one:

{
	"Version": "2012-10-17",
	"Statement": [{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": "ec2:CreateNetworkInterface",
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Sid": "VisualEditor1",
			"Effect": "Allow",
			"Action": "ec2:CreateTags",
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Sid": "VisualEditor2",
			"Effect": "Allow",
			"Action": [
				"ec2:DetachNetworkInterface",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:DeleteNetworkInterface",
				"ec2:AttachNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		},
		{
			"Sid": "VisualEditor3",
			"Effect": "Allow",
			"Action": "ec2:CreateNetworkInterface",
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Sid": "VisualEditor4",
			"Effect": "Allow",
			"Action": "sts:*",
			"Resource": "*"
		},
		{
			"Sid": "VisualEditor5",
			"Effect": "Allow",
			"Action": "ec2:DescribeNetworkInterfaces",
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}

Now click on “Review policy” and then give your new policy a name:

[caption id="attachment_2047" align="aligncenter" width="1024"]TamingQueue34 IAM — Review Policy[/caption]

Note that this policy grants many permissions. We do this for simplicity, but you may want to experiment and limit the permissions you grant for better security. Finally, click on the “Create policy” button at the bottom. You will now be able to see your new policy listed in your role:

[caption id="attachment_2046" align="aligncenter" width="1024"]TamingQueue35 IAM — New Inline Policy[/caption]
 

Next, we need to add the proper trust policy so click on the “Trust relationships” tab and then on “Edit trust policy”. In the editor that opened, replace all the text with the following:

{
	"Version": "2012-10-17",
	"Statement": [{
		"Effect": "Allow",
		"Principal": {
			"Service": "kafkaconnect.amazonaws.com"
		},
		"Action": "sts:AssumeRole"
	}]
}

Finally, click on “Update policy” to finalize the new role.

CloudWatch

To keep track of what is going on with our connector and Lambda functions, we need a place to keep our logs. We will use CloudWatch so we need to create a log group. Go to the CloudWatch dashboard, select “Logs” on the left and then “Log groups”. Click on “Create log group” on the right, give your log group a name and a retention setting, and then click on “Create” on the bottom and you are done. That was simple!

MSK Security Group

Another thing to tackle before we can create a Kafka connector is configuring our security group to allow internal communications. Go to your MSK cluster’s configurations, click on the “Properties” tab, and then click on the security group that is applied. If you followed this article, you should only have a single security group applied. 

Once you get to the EC2 dashboard and to the security group settings, you need to click on the “Edit inbound rules” button to add the required rule. Now, click on the “Add rule” button on the bottom left, select “All traffic” for the rule type and then find your security group in the custom “Source” search box. Make sure to select the same security group as the one you are currently editing. Note that the name of the security group appears in the navigation bar on the top left of the page. Finally, click on “Save rules” and you should be set.

MSK and DocumentDB security

In the interest of developing these data pipelines properly, I have to point out a slight problem with the DocumentDB connection. We used the default DocumentDB configurations that enables TLS. This means that in order to connect to DocumentDB, we needed to supply the client with the CA file we downloaded from the DocumentDB dashboard. 

However, since MSK is a managed service, we have no way of installing these certificates in the new plugin. Furthermore, while there is a way to specify the CA file within the MongoDB URI, the current MongoDB driver used within both the Confluent and Debezium connectors simply ignores this option and/or the CA file if we try to include it in the JAR file or in a ZIP file that holds both. If any readers are aware of a way to do this, please let me know so I can update this document. 

The only other option would be to implement our own connector that would contain the certificates and use them without relying on external files or certificate registries, which is out of the scope of this article. Thus, we first need to turn off TLS in our DocumentDB. For this, go back to the DocumentDB dashboard, select “Parameter groups” from the left side menu and then click on the “Create” button on the right.

[caption id="attachment_2045" align="aligncenter" width="1024"]TamingQueue36 DocumentDB — Creating a New Parameter Group[/caption]

Fill in a name for the new parameter group, add a description, and click "Create. Next, click the new group from the list, then select “tls” from the new list that opens, and click on the “Edit” button on the top right of the screen:

[caption id="attachment_2044" align="aligncenter" width="1024"]TamingQueue37 DocumentDB — Modify the “tls” Parameter[/caption]

Set the selection to “disabled” and click on “Modify cluster parameter”. Now, click on “Clusters” from the left side menu and then click on your cluster. Go to the “Configuration” tab and click on the “Modify” button within that tab:

[caption id="attachment_2043" align="aligncenter" width="1024"]TamingQueue38 DocumentDB — Modify Cluster Options[/caption]

Under the “Cluster options” section, select the new parameter group that we just created and then scroll down, click on the “Continue” button, and finally click on the “Modify cluster” button. This will modify the settings and take you back to the cluster list. However, the new settings will not take effect until you reboot the cluster. If you click on the cluster again, you will see that the summary section indicates “pending-reboot”:

[caption id="attachment_2042" align="aligncenter" width="1024"]TamingQueue40 DocumentDB — Cluster Pending Reboot[/caption]

Go back to the cluster list, select the cluster by clicking on the checkbox next to it, then click the “Actions” button to open the menu and select “Reboot." The cluster will now reboot, and in a few minutes it will be ready for work.

Kafka MongoDB connector

[caption id="attachment_2041" align="aligncenter" width="1024"]TamingQueue41 Architecture — Kafka MongoDB Source Connector[/caption]

There are two options we can use, the Confluence connector and the Debezium connector. Both are Java-based but the Confluence connector is easier to use so we will focus on that one and mention the differences in the Debezium connector briefly. 

Do NOT go to https://www.confluent.io/hub/mongodb/kafka-connect-mongodb/ to download the connector from there. Although we ARE going to use this connector, the version you will find there is designed specifically for the confluence cloud and so missing some dependencies required by MSK that are provided by the Confluence cloud. 

Instead, go to the Maven repo at https://search.maven.org/search?q=a:mongo-kafka-connect, click on the “Download” icon on the right and select “all.” This will download a JAR file that includes all the required dependencies. Upload this JAR file to your S3 bucket. In MSK, you first need to create a plugin, and then a connector which is an instance of the plugin. In our case, MSK does not have a built-in MongoDB plugin so we need to create a custom plugin. Fortunately for us, MSK can wrap the process of creating both plugin and connector into a single sequence. 

Go to the MSK dashboard, select “Connectors” from the left side menu and then click on the “Create connector” button. You can see that MSK takes you to the “Custom plugin” screen to first create the new custom plugin. Select the “Create custom plugin” option, and then click on the “Browse S3” button to find your S3 bucket and select the JAR you just uploaded. Next, give your plugin a name and add a description, and then click on “Next” to start creating the connector.

[caption id="attachment_2040" align="aligncenter" width="1024"]TamingQueue42 MSK — Create a Custom Plugin[/caption]

To create a connector, start by choosing a name and add a description. Then choose you MSK cluster from the “Apache Kafka Cluster” list and the “None” authentication method as our plugin does not support the IAM authentication.

[caption id="attachment_2039" align="aligncenter" width="827"]TamingQueue43 MSK — Create a New Connector[/caption]

Now we need to configure our connector. You can find detailed configuration information in the MongoDB connectors documentation site and more information about MongoDB and change stream settings in the MongoDB documentation. We want to monitor the “intake” collection in the “pipeline” database and publish new documents to the pipeline.intake.inserts topic. We also want to poll the change stream every second (this might be high so consider reducing the polling frequency according to your application) and get the results in JSON format. The following configurations specify these choices:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
name=<YOUR_CONNECTOR_NAME>
connection.uri=mongodb://puser:<YOUR_PASSWORD>@medium-db.cluster-calpleq1h0kg.eu-west-2.docdb.amazonaws.com:27017
database=pipeline
collection=intake
publish.full.document.only=true
poll.await.time.ms=1000
tasks.max=1
batch.size=10
poll.max.batch.size=1
output.format.key=json
output.format.value=json
pipeline=[{"$match": {"operationType": "insert"}}]
topic.suffix=inserts

Note that for <YOUR_CONNECTOR_NAME> you have to use the exact same name you chose for your connector. Also make sure to use your actual password instead of <YOUR_PASSWORD> in the URI. We leave the rest of the settings in the page on the defaults, but you can try and change them according to your needs. We need to give our connector access permissions using an AWS IAM role, so we choose the IAM execution role we created before and then click on the “Next” button. The next section deals with security and the defaults here are good for us, so we touch nothing and simply click on the “Next” button again. Now we need to choose where to send logging information. We previously created a CloudWatch log group and now is the time to use it. So, choose “Deliver to Amazon CloudWatch Logs” and then select the log group using the “Browse” button.

[caption id="attachment_2038" align="aligncenter" width="1024"]TamingQueue44 MSK — Sending Connector Logs to AWS CloudWatch[/caption]

Click “Next” one more time to get to the “Review and create” screen. This screen shows you a summary of your choices and configurations and gives you the ability to edit things you missed. After making sure everything is as it should be, click the “Create connector” button to finish the process. Your new connector will now be created. This process can take a few minutes. You can go to the CloudWatch console and select your log group to watch for progress. First you will see a new log stream titled “log_stream_created_by_aws_to_validate_log_delivery_subscriptions” appearing to indicate that the connector has permissions to log to CloudWatch. If you never see this, you need to go back and check the execution role settings to make sure you got them right. After a couple of more minutes, you should see a log stream titled something like “medium-connector-33190fb9-ae60–471b-8a8f-412186b023ce-3”. If you click on this log stream you will be able to see all the output from your new connector as it initializes. If you see any errors during initialization, which may be in the form of Java exceptions and stack traces, you probably missed some of the steps above so go back and make sure you configured everything correctly. Note that you can NOT modify an existing connector so you would need to delete it once it reaches a “failed” state and create a new one instead. If everything works and the connector was able to connect to DocumentDB and initialize the change stream, you will see messages like these appearing:

[caption id="attachment_2037" align="aligncenter" width="1024"]TamingQueue45 MSK COnnector Logs in CloudWatch[/caption]

Your plugin is now ready for work! We can run the CLI Kafka consumer as before and then use our MongoDB client to insert some documents:

[caption id="attachment_2036" align="alignnone" width="1024"]TamingQueue46 Inserting New Documents to DocumentDB using Robo 3T[/caption]

The consumer will then show us that the connector picked up the documents and published them to our chosen topic:

[caption id="attachment_2035" align="alignnone" width="1024"]TamingQueue47 Kafka CLI Consumer Showing the New Documents in the Topic[/caption]

The connector is working as expected! As for the Debezium connector, the documentation and download link can be found here. Once downloaded, extract the archive, and upload the JAR to S3 so it can be used in MSK as with the Confluence connector. The configurations are a bit different for the Debezium connector:

connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=10.101.5.244:27017
mongodb.user=puser
mongodb.password=&lt;YOUR_PASSWORD&gt;
collection.include.list=pipeline.intake
capturemode=change_streams_update_full
skipped.operations=u,d
mongodb.name=inserts
snapshot.fetch.size=1
mongodb.poll.interval.ms=1000
tasks.max=1
provide.transaction.metadata=false

Unlike the Confluence connector, the Debezium one does not let you set a suffix for the topic name but rather uses the “mongodb.name” logical name as a prefix. Thus, we cannot use a topic like pipeline.intake.inserts. These configs will actually cause the connector to try and publish to a topic named inserts.pipeline.intake so make sure to name your topic correctly if you wish to use this connector. Otherwise testing should be done in the same way as before.

Lambda

This is where we start building our actual processing data pipeline and graph. We need to create a new Lambda function and set an MSK based trigger for it.

TamingQueue48

Start by going to the Lambda dashboard and click on the “Create function” button on the top right. Choose “Author from scratch” and fill in a name for your function. We are going to use Go code so select “Go 1.x” from the “Runtime” list.

[caption id="attachment_2033" align="aligncenter" width="1024"]TamingQueue49 Lambda — Create a New Function[/caption]

Next, expand the “Change default execution role” section, select “Create a new role from AWS policy templates” and give the role a name. This will create a new “service-role” to be used as the execution role for the Lambda function. Once created, we will need to tweak the permissions. Expand the “Advanced settings” section and tick the “Enable VPC” box. We need our Lambda function to have access to the MSK cluster so the trigger can read from a topic and so we can publish to the next topic in line. Choose your VPC from the list, ALL the subnets where the MSK brokers are deployed and finally the security group as we defined previously:

[caption id="attachment_2032" align="aligncenter" width="857"]TamingQueue50 Lambda — Function VPC Settings[/caption]

Create the function by clicking on “Create function”. Once the function is created, click on it and then go to the “Configuration” tab and select “Permissions” from the left menu:

[caption id="attachment_2031" align="aligncenter" width="854"]TamingQueue51 Lambda — Execution Role[/caption]

This shows you the execution role created for you, and you can browse the list of permissions it gives your Lambda function. Click on the role to open it in the IAM dashboard, remove the current policy and then create a new one that has the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:<REGION>:<ACCOUNT_ID>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<REGION>:<ACCOUNT_ID>:log-group:/aws/lambda/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeGroup",
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeClusterDynamicConfiguration",
                "kafka:*",
                "ec2:*"
            ],
            "Resource": "*"
        }
    ]
}

Note to replace the <REGION> with your selected region and <ACCOUNT_ID> with your AWS account ID. Go back to the Lambda function’s configurations, click on the “Monitor” tab and then click on “View logs in CloudWatch”. This should take you to the log group that was created for you Lambda functions. If, for some reason, this group was not automatically created, you will get an error message like this one:

[caption id="attachment_2030" align="aligncenter" width="1024"]TamingQueue52 CloudWatch — Log Group Missing Error[/caption]

You will need to manually create the log group. To do this, note the group’s required name in the error message, which in this case is /aws/lambda/medium-pipline-router. Now click on “Log groups” from the navigation bar under the error message or by expanding the left sidebar and clicking on “Log groups” there.

[caption id="attachment_2029" align="aligncenter" width="1024"]TamingQueue53 CloudWatch — Showing Log Groups[/caption]

Now click on “Create log group”, fill in the required name and then click on the “Create” button:

[caption id="attachment_2028" align="alignnone" width="1024"]TamingQueue54 CloudEWatch — Creating a New Log Group[/caption]

VPC revisited

Another thing we need to take care of at this point is making sure we have network connectivity to some required AWS services for our trigger. Depending on your VPC of choice, you may not have connectivity to the STS, Lambda, and/or Secrets Manager services. We can fix this by adding VPC endpoints for each of these to our VPC. 

If you get an error message about this when setting up the MSK trigger for the Lambda function, follow these instructions: Go to the VPC dashboard and select “Endpoints” from the left side menu. Then click on “Create Endpoint” on the top right. Fill in a descriptive name for your endpoint and then search and select the com.amazonaws.<REGION>.sts service from the “Services” list. 

Remember to replace <REGION> with your region. Now select your VPC from the VPC list, select ALL the subnets where MSK brokers are deployed, and for each, Select the subnet ID from the combobox. Select the “IPv4” IP address type and then select the security group we set up for our VPC:

[caption id="attachment_2027" align="alignnone" width="955"]TamingQueue55 Creating VPC Endpoints[/caption]

Leave the rest as is and click on “Create Endpoint” at the bottom. Repeat the process for the lambda and secretsmanager services as well.

Lambda router

Now that you have a Lambda function, we can write code for it and configure a trigger. Here is an example Go code that will take care of everything our Lambda’s will do:

package main

import (
	"context"
	"crypto/tls"
	b64 "encoding/base64"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"reflect"
	"strings"
	"time"

	"github.com/aws/aws-lambda-go/events"
	runtime "github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/lambdacontext"
	"github.com/segmentio/kafka-go"
)

const (
	// kafkaBrokersEnvVar is the topic to publish to when calling PublishToKafka
	kafkaTopicEnvVar = "KAFKATOPIC"

	// kafkaBrokersEnvVar is a comma-separated list of TLS supporting Kafka brokers to use
	kafkaBrokersEnvVar = "KAFKABROKERS"

	// An environment varilable to indicate the role of the node in within the pipeline
	nodeRoleEnvVar = "NODE_ROLE"

	// The router role
	nodeRoleRouter = "ROUTER"

	// The worker role
	nodeRoleWorker = "WORKER"

	// The tester role
	nodeRoleTester = "TESTER"
)

// IsTerminator checks is both the topic and brokers are set
func (m *MediumPipeline) IsTerminator() bool {
	return m.topics == nil || m.brokers == nil
}

// PublishToKafka publishes the given data to the topic and brokers set using the environment variables
func (m *MediumPipeline) PublishToKafka(ctx context.Context, data interface{}) error {
	return m.PublishToKafkaTopic(ctx, m.topics, data)
}

// PublishToKafkaTopic publishes the given data to the given topic (overriding the KAFKATOPIC environment variable if given)
func (m *MediumPipeline) PublishToKafkaTopic(ctx context.Context, topics []string, data interface{}) error {

	if topics == nil || m.brokers == nil {
		return fmt.Errorf("kafka topic or brokers not set")
	}

	// prep the message data
	bytes, err := json.Marshal(data)
	if err != nil {
		log.Println("Error converting data into a JSON: " + err.Error())
		return err
	}

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		TLS:       &tls.Config{},
	}

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  m.brokers,
		Topic:    "",
		Balancer: &kafka.Hash{},
		Dialer:   dialer,
	})

	for topic := range topics {

		err = w.WriteMessages(ctx, kafka.Message{
			Topic: topics[topic],
			Key:   nil,
			Value: bytes,
		})
		if err != nil {
			log.Printf("Error writing message(s) to topic '%s': %s", topics[topic],
				err.Error())
			continue
		}

		// log a confirmation once the message is written
		fmt.Println("Published to kafka topic: " + topics[topic])
	}

	w.Close()

	return nil
}

// handlerFunc is the type of an user-supplied event handler
type handlerFunc func(pipeline *MediumPipeline, data interface{}) error

// MediumPipeline is a Medium Pipeline object
type MediumPipeline struct {
	handler  handlerFunc
	datatype interface{}
	topics   []string
	brokers  []string
}

// NewPipeline creates a new Medium Pipeline object with the given handler and datatype
func NewPipeline(handler handlerFunc, datatype interface{}) *MediumPipeline {
	instance := &MediumPipeline{
		handler:  handler,
		datatype: datatype,
	}

	if env := os.Getenv(kafkaTopicEnvVar); env != "" {
		instance.topics = strings.Split(env, ",")
	}
	if env := os.Getenv(kafkaBrokersEnvVar); env != "" {
		instance.brokers = strings.Split(env, ",")
	}

	return instance
}

// Start runs the pipeline function
func (m *MediumPipeline) Start() {
	runtime.Start(m.handleRequest)
}

// getRequest returns the encapsulated request in the user given datatype
func (m *MediumPipeline) getRequest(record *events.KafkaRecord) (interface{}, error) {

	// decode the value
	value, err := b64.StdEncoding.DecodeString(record.Value)
	if err != nil {
		return nil, err
	}

	// convert to a map
	// Get a reflection of the given object so we can extract its type
	val := reflect.ValueOf(m.datatype)

	// create new instance of given type
	document := reflect.New(val.Type()).Interface()

	err = json.Unmarshal(value, &document)
	if err != nil {
		return nil, err
	}

	return document, nil
}

// handleRequest is the internal handler that is actually run by the Lambda mechanism
func (m *MediumPipeline) handleRequest(ctx context.Context, event events.KafkaEvent) error {

	// start by parsing the incoming event
	eventstr, err := json.Marshal(event)
	if err != nil {
		log.Printf("Error parsing incoming event: %s\n", err.Error())
		return err
	}

	log.Printf("EVENT:\n%s\n", eventstr)

	// environment variables
	log.Printf("REGION: %s", os.Getenv("AWS_REGION"))
	log.Println("ALL ENV VARS:")
	for _, element := range os.Environ() {
		log.Println(element)
	}
	// request context
	lc, _ := lambdacontext.FromContext(ctx)
	log.Printf("REQUEST ID: %s", lc.AwsRequestID)

	// global variable
	log.Printf("FUNCTION NAME: %s", lambdacontext.FunctionName)

	// context method
	deadline, _ := ctx.Deadline()

	log.Printf("DEADLINE: %s", deadline)

	for k, s := range event.Records {
		log.Printf("Processing recordset '%s'", k)

		for _, v := range s {
			request, err := m.getRequest(&v)

			if err != nil {
				log.Printf("Got error: %s", err.Error())
			}

			if err = m.handler(m, request); err != nil {
				return err
			}

		}
	}

	return nil
}

type MyRequest struct {
	Name  string `json:"name"`
	Age   int    `json:"age"`
	Color string `json:"color"`
}

func MediumWorkerHandler(pipeline *MediumPipeline, data interface{}) error {
	request := data.(*MyRequest)

	log.Printf("Hello %s, you are %d years old and your color is %s!",
		request.Name, request.Age, request.Color)

	if !pipeline.IsTerminator() {
		return pipeline.PublishToKafka(context.TODO(), data)
	} else {
		return nil
	}
}

func MediumRouterHandler(pipeline *MediumPipeline, data interface{}) error {
	request := data.(*MyRequest)

	log.Printf("Hello %s, you are %d years old and your color is %s!",
		request.Name, request.Age, request.Color)

	return pipeline.PublishToKafkaTopic(context.TODO(),
		[]string{"pipeline.type." + request.Color}, data)
}

func MediumTestHandler(pipeline *MediumPipeline, data interface{}) error {
	request := MyRequest{
		Name:  "Tester",
		Age:   100,
		Color: "blue",
	}

	log.Printf("Hello %s, you are %d years old and your color is %s!",
		request.Name, request.Age, request.Color)

	return pipeline.PublishToKafkaTopic(context.TODO(),
		[]string{"pipeline.type." + request.Color}, request)
}

func main() {
	var pipeline *MediumPipeline

	switch os.Getenv(nodeRoleEnvVar) {
	case nodeRoleRouter:
		pipeline = NewPipeline(MediumRouterHandler, MyRequest{})
	case nodeRoleWorker:
		pipeline = NewPipeline(MediumWorkerHandler, MyRequest{})
	case nodeRoleTester:
		pipeline = NewPipeline(MediumTestHandler, MyRequest{})
		MediumTestHandler(pipeline, nil)
		return
	default:
		return
	}

	pipeline.Start()
}

As can be seen, this code encapsulates the Lambda Go SDK functionality required to run the proper handler function when the Lambda function is activated. The handlers specifically expect a Kafka data structure (this is being deserialized by the AWS Lambda SDK for us), and we then take the JSON items contained within and convert them to our own Go data structure. 

Finally, if specified using the KAFKATOPIC and KAFKABROKERS environment variables, we also publish to a Kafka topic. This code contains handlers for both a router and a worker, and chooses the correct one based on the NODE_ROL Eenvironment variable. Please note that while the Confluence MSK connector we used publishes the data to Kafka in JSON format, the Debezium connector uses the Rust Serde serialization instead. The resulting structure looks like this:

Struct{
    after={
        "_id": {
            "$oid": "61c39e264f43cf91d9708d23"
        },
        "name": "Alice",
        "age": 30
    },
    source=Struct{
        version=1.8.0.Final,
        connector=mongodb,
        name=pipeline,
        ts_ms=1640209958000,
        db=medium,
        rs=medium0,
        collection=intake,
        ord=1
    },
    op=c,
    ts_ms=1640209958636
}

This means we will need to replace the JSON deserialization code with something like serde-go, which we will not cover in this article. To build you code for Lambda, you need to specify some Go build parameters like this:

GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o medium-lambda-node

and then zip the resulting binary. If you which to easily reuse your code, it is recommended to upload the zip file to S3 at this point. Now go to your Lambda function’s settings and select the “Code” tab. Click on “Upload from”, select “.zip file”, click “Upload” in the dialog that popped up and select your zip file. If you uploaded to S3, select “Amazon S3 location” instead then paste the S3 link URL to your zip file. Next, under the “Runtime settings” section, click the “Edit” button and enter your compiled binary’s filename in the “Handler” box:

[caption id="attachment_2026" align="aligncenter" width="1024"]TamingQueue56 Lambda — Uploading Code to the function[/caption]

Click on “Save” and give your lambda a few minutes to load the new code.

Lambda MSK trigger

Next, we will set up the MSK trigger. This trigger will connect to the MSK cluster, listen on our initial topic, and then trigger the Lambda on new incoming messages.

TamingQueue57

You can either click on “+ Add Trigger” from the “Function overview” section, or go to the “Configuration” tab, select the “Triggers” section on the left and then click on “Add trigger” on the right. In the new dialog, select the “MSK” trigger type. This will open the trigger’s settings. Select the MSK cluster, set the desired batch size, your “Batch Window” (the max polling time in seconds for the topic) and then fill in the topic name. By default, the trigger will handle new incoming messages. If you want it to process ALL the messages in the topic, select “Trim horizon” from the “Starting position” box. This is the desired action for us as one of our requirements was that we do not want to lose messages so we can dynamically plug in components into our pipeline to start handling previously unknown message types. In this article we are not taking care of Authentication. However, there is a fine point to note here. We created the MSK cluster with the default Authentication settings, which means that both “Unauthenticated” and “IAM role-based authentication” are enabled. The trigger will default to using “IAM role-based authentication” for which we previous added the “kafka-cluster” actions to our Lambda execution role’s policy. If these permissions are missing, you will get a “SASL authentication failed” error message from the trigger. We did NOT enable SASL authentication in our MSK cluster, so why are we getting this error? Well, this is because behind the scenes, AWS implements the “IAM role-based authentication” mechanism using SASL. As we are not tackling authentication here, just go ahead and click the “Add” button to create the new trigger:

[caption id="attachment_2024" align="aligncenter" width="975"]TamingQueue59 Lambda — Creating an MSK Trigger[/caption]

Deploying the trigger can take a few minutes, and once deployed we can configure and test our router. Under the “Configuration” section, select “Environment variables” on the left and then click the “Edit” button and add the NODE_ROLE variable with ROUTER as the value. If we now add this new document to our DocumentDB, it will be published to the pipeline.intake.inserts topic by the MSK connector as we saw before. The trigger will then pick it up and trigger the Lambda. Our sample code will output the information read in the message to CloudWatch so if you go to the log group we created above, you will see a new log entry appearing and within something like this:

[caption id="attachment_2023" align="alignnone" width="1024"]TamingQueue60 Lambda — Successful RunCloudWatch Output[/caption]

To finalize the router’s work, we want to demonstrate that it can route messages to different topics based on their type. The Lambda code will use the color field as a type for the message and will route the message to a Kafka topic named pipeline.type.<COLOR>. So, create two new topics: pipeline.type.blue and pipeline.type.green. Next, we should tell our Lambda where MSK is so it can actually publish the messages to the various topics so go back to the environment variables and add a variable called KAFKABROKERS and set its value to be a comma-separated list of the brokers in you MSK cluster. For example

b-1.medium-msk.i78wus.c3.kafka.eu-west-2.amazonaws.com:9094,b-2.medium-msk.i78wus.c3.kafka.eu-west-2.amazonaws.com:9094,b-3.medium-msk.i78wus.c3.kafka.eu-west-2.amazonaws.com:9094

You may note that we are using port 9094 here as our nodes are connecting to MSK securely using TLS. This will tell our example code to publish the data to the proper queue in MSK in addition to printing to the log. Now, if we add a new document with the same message as before, we will be able to use the console consumer as we did before to listen on the pipeline.type.blue topic and we will see the message appear under that topic with a few seconds from creation:

[caption id="attachment_2022" align="aligncenter" width="1024"]TamingQueue61 Kafka CLI Consumer[/caption]

Our router picked up the new document, determined that the message type is blue and published the data to the pipeline.type.blue queue as desired.

Lambda processor

The final piece of the puzzle is the processor node, which is just another Lambda function that uses the same code as before. Go ahead and replicate the process we used for the router Lambda to create a new Lambda with an MSK trigger listening on the pipeline.type.blue topic. This time, set the NODE_ROLE environment variable to WORKER so the worker handler function is used. Finally, we can test the full E2E mechanism by adding a new document as before, and we will see a new log entry in CloudWatch for the new Lambda function with the content of the document. If we create another document with green as the color, we will see the new message appearing under the pipeline.type.green topic, but no Lambda function will be activated as we do not have a trigger listening on this topic. The messages will accumulate in this topic until we create a new Lambda function capable of processing “green” messages and add a trigger for it that listens on the pipeline.type.green topic. Our sample pipeline is now complete!

Improving your data pipelines one step at a time

In this article, we demonstrated how to configure various AWS components (VPC, EC2, DocumentDB, S3, IAM, CloudWatch, MSK and Lambda) and tie them together to create an E2E processing pipeline that triggers automatically when new documents are inserted into the DB. Pipelines as code are not a new innovation

For this article, we chose the smallest available instance sizes for the various service so our EC2 instance and our DocumentDB are running for free while our Lambda functions are only billed when running (i.e., when triggered by a new document published into the proper topic until they finish processing it). Our pipeline can be dynamically configured to accommodate any processing structure by creating Kafka topics as edges in our graph and Lambda functions as nodes with triggers for the specific Kafka topics. Restructuring can be done while the system is running without losing any data. We also covered various CLI, shell and GUI tools for working with MongoDB and Kafka as well as remotely connecting to our resources via SSH port forwarding. 

Hopefully, this document provides a good starting point for anyone interested in building fully managed cloud-based processing pipelines using AWS services. Want more information about how data management can change your enterprise? Check out our post on why data models rule the enterprise world.

Thanks

I wish to thank Julia Valenti for her amazing assistance in reviewing and editing this article. ❤️

Subscribe card background
Subscribe
Subscribe to
the Shift!

Get emerging insights on emerging technology straight to your inbox.

Unlocking Multi-Cloud Security: Panoptica's Graph-Based Approach

Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.

thumbnail
I
Subscribe
Subscribe
 to
the Shift
!
Get
emerging insights
on emerging technology straight to your inbox.

The Shift keeps you at the forefront of cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations that are shaping the future of technology.

Outshift Background