Sounds interesting? It did to me and these are the steps to do just that. I will be using the AWS Cloud Development Kit to deploy an Amazon Managed Streaming for Apache Kafka cluster.
By the end of this post you should have an AWS managed Kafka cluster setup and ready to use. You will also have some basic method of validating that it works and a CICD pipeline setup which will allow you to easily deploy changes. I will try to walk you through the reasoning behind each step,
The AWS CDK is a framework meant to allow you to provision infrastructure as code in a variety of languages. It was announced in July of last year and it was a wonderful bit of news for those of us who are not too excited by either HCL or whatever the YAML template language used for Cloud Formation is called. It is very similar in goals to Pulumi and I plan to give an account on my experience with both. This is not that post.
This post is meant to explore how easy it is to use the CDK to achieve something that might be worthwile. That goal will be setting up a proof of concept for a Kafka cluster on AWS.This might become part of a series that will include ingestion and stream processing as well. For now let’s get our hands on a Kafka cluster!
In keeping with the general approach I hope to stick to with these posts, I will try to make this as close to reproducible and production ready as a blog post can be.
Set-up
Prerequisites
We don’t need much to get started with the CDK. The aws docs mention some older versions are compatible, but these are the versions I had installed when I did this, so I know these work:
- python 3.8+
- nodejs v13.5.0+
- aws account credentials with enough privileges to deploy required services
Getting ready
You can follow the official instructions to get started and for the minimum version of the prereqs from above.
Here’s what I did. I already had the prerequisite tools from above installed. All I had to do was to install the CDK, which is a node module so this was easy.
sudo npm install -g aws-cdk
cdk --version # I now have version 1.25.0 (build 5ced526) of the cdk
I created directory named kafka-infra-python
and opened a terminal in it. The next step is to initialize a blank project using the CDK we installed above. We also use the provided scripts to create virtual environment and install any python requirements.
cdk init app --language python
# if you are on a new machine you probably don't have a virtual environment setup
# you can create one by running
# python -m venv .env
source .env/bin/activate
sudo pip install --upgrade pip
pip install -r requirements.txt
Take care of the license and .gitignore. This is not strictly needed but it’s nice for any code you plan to share.
# pick the license you like, I default to GPL v3
wget https://www.gnu.org/licenses/gpl-3.0.txt -O LICENSE
# get the gitignore settings for python and vscode in my case
wget https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore -O .gitignore
wget https://raw.githubusercontent.com/github/gitignore/master/Global/VisualStudioCode.gitignore -O ->> .gitignore
wget https://raw.githubusercontent.com/aws/aws-cdk/master/.gitignore -O ->> .gitignore
And we’re ready for our first commit! Woot!
git add .
git commit -m "inital commit"
# always push! it's Ctrl+S for the 2020s!
git remote add origin https://github.com/mariusfeteanu/kafka-on-cdk.git
git push -u origin master
Noice, a bit of copy pasting and we have our little project that does nothing. Stay in school, kids!
Access to AWS
Before that let’s create a requirements-dev.txt. That is how I like to keep track of all the python tools I use, but that are not required for using the app. Here’s what I put in it:
awscli == 1.18.5
And then just:
pip install -r requirements-dev.txt
You can now configure the aws cli.
I already have an account and user setup as part of another personal project. So I had to just create some credentials and then run:
aws configure
Does it work?
cdk synth
This seems to do something. It generates a bunch of json files, I assume it generates a Cloud Formation template version of what is in app.python
. Since there’s nothing interesting there I will not investigate and assume that not failure == success
for now.
The CDK for python is distributed as a collection of separate packages, each for a top level AWS service. Let’s just install the CDK libraries for AWS msk and ec2. We need ec2 because that defines the vpc component, and msk clusters must be launched inside a vpc.
echo "aws-cdk.aws-msk == 1.25.0" >> requirements.txt
echo "aws-cdk.aws-ec2 == 1.25.0" >> requirements.txt # a bit of foreshadowing here
pip install -r requirements.txt
The prep work is done. Time for fun!
Defining a MSK cluster
Since MSK is not a completely cloud native solution (like Kinesis) I expect we will have to provision quite a few resources that a cluster depends on. I hope not too many, but for now let’s read the startup guide and come back here in a few minutes.
It looks like we need to create a VPC for the Kafka cluster. Apart from the code that was generated for us by the cdk
this is all I had to write. Not bad!
aws_ec2.Vpc(self, 'vpc-msk')
Next we want to create an MSK cluster. This requires a few arguments, including one called broker_node_group_info
that is poorly documented. The python API page for cluster creation defines it’s type as:
broker_node_group_info (Union[Forwardref, IResolvable]) –
AWS::MSK::Cluster.BrokerNodeGroupInfo.
I am unsure what to make of that. The documentation for Cloud Formation is much clearer for that part.
After some digging in the source it seems the correct value is of type aws_msk.CfnCluster.BrokerNodeGroupInfoProperty
. This is documented in the main API, but as of today this returns an error when trying to access the Python section. Still, having access to the source helped so not all is lost - by source I mean the local package named aws_msk
.
We should be ready to create our MSK cluster. The minimum number of brokers is equal to the number of subnets configured, because the number of brokers must be a multiple of the number of subnets.
aws_msk.CfnCluster(
self,
'msk-cluster',
cluster_name='cdk-test',
number_of_broker_nodes=len(vpc.private_subnets),
kafka_version='2.3.1',
broker_node_group_info=aws_msk.CfnCluster.BrokerNodeGroupInfoProperty(
instance_type="kafka.m5.large",
client_subnets=[
subnet.subnet_id
for subnet
in vpc.private_subnets],
)
)
That took a while (more than 20 minutes) to deploy.
This is a good place to stop and take a breath. We got as far as creating the cluster but there is not way to test if this works, or to iterate over this code easily. Once we take care of that we should be ready to build other services on top of this.
CICD
I will use github actions to test and deploy the code above. I chose it because I haven’t used it before, so we might as well learn something new!
Testing infrastructure as code is hard. The CDK makes some provision for this by documenting a broad approach to stack testing. That does look interesting but it seems designed to test the planned output of a stack. This is similar to the approach in Pulumi, but without the awesome policy support. Still, all of this is premature for a proof of concept. It does not seem to answer the main question I care about at this stage: Will this deploy sucessfully?.
As such I will skip all of that and just asume that if I can cdk synth
the stack then that’s as close as I am going to get to the validation I am interested about without deploying. That will be our “test”.
Once that is successful we need to decide if we should deploy. Generally the answer should be yes, except that provisioning the stack above takes 20 minutes and once deployed (and active) it costs more than 15 USD per day. Yikes! I will create the build definition and test it, but then I will disable the deploy and end to end testing steps (more on e2e later).
Here’s a github action config that does all the things we talked about above. There is no step to deploy, but that is the same as the synth
step with a different command. Note that I had to manually add aws_access_key_id
and aws_secret_access_key
to the project secrets in github.
name: Deploy to AWS and Test
on:
push:
branches:
- master
jobs:
build:
name: Deploy to AWS and Test
runs-on: ubuntu-18.04
steps:
- name: checkout code
uses: actions/checkout@v2
- name: setup node
uses: actions/setup-node@v1
with:
node-version: '13.5'
- name: install cdk
run: |
npm install -g aws-cdk
- name: configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.aws_access_key_id }}
aws-secret-access-key: ${{ secrets.aws_secret_access_key }}
aws-region: us-west-1
- name: setup python
uses: actions/setup-python@v1
with:
python-version: 3.8
- name: install python dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: synthesize CloudFormation templates for main stack
run: |
cdk synth kafka-infra-python
End to end tests
We did not create any stack validation tests, for the reasons stated above. Still, we would like to see if the cluster works at all. This is more of a way of validating all those defaults we accepted in return for keeping the code short and sweet.
This doesn’t sound like a big deal at first. However I do not want to run these from CodeBuild or a similar AWS tool, but from github. I also do not want to make the kafka cluster public (by running Kafka proxy or similar in front of it).
A simple(?) plan would be to create a small containerized app that pushes events to the cluster and then reads the result back. This would validate the basic functionality of a the cluster and also allow us to iron out any potential networking/security issues. We will need to know all these details any way later on when we want to build something else on top of the kafka cluster.
Sounds simple enough, still here are all the things I foresee we will need:
- container definition (Dockerfile) for the test app
- something to host the container (EKS cluster using Fargate)
- a role and task definition and so on to allow the container to run
- some high level code/cli command to trigger the container and retrieve the status
- integrating/deploying all of this must also be done using the CDK/GitHub action
Between this and the deployment section above the dev tools part will end up being larger and more complicated than the actual infra code.
Containerized test app
Let’s start with the end goal. We need a small app of some sort that will connect to our kafka cluster and send some events. This is not a comprehensive test but it should be enough to validate that the infrastructure part works for now. Something like the code below, which uses a library called kafka-python
. This is wrapped in a test using the very light pytest
library, but this is not super relevant to the current project.
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKERS,
security_protocol='SSL')
producer.send(
topic=TOPIC_NAME,
value=b'yes')
Ignore for the moment where we got our KAFKA_BROKERS
config, but notice that we had to set the security protocol to SSL
since the default is plain text. Another thing I noticed is that by default automatic topic creation is disabled in MSK. Rather than enabling it I chose to do a best effort attempt in the tests to create the topic we need.
admin = KafkaAdminClient(
bootstrap_servers=KAFKA_BROKERS,
security_protocol='SSL')
try:
admin.create_topics([
NewTopic(TOPIC_NAME, 3, 2)
])
except TopicAlreadyExistsError:
print(f"Topic '{TOPIC_NAME}' already exists.")
Running that is as simple as invoking pytest
from the terminal. I wrapped that in a a file called entrypoint.sh
but that is not strictly needed. Now we can create a docker file and we should have all we need to to build and deploy the containerized test app.
FROM python:3.8-alpine3.11
RUN mkdir /test
COPY requirements.txt .
RUN pip install -r requirements.txt
WORKDIR /test
COPY test_* .
COPY entrypoint.sh .
RUN command chmod +x entrypoint.sh
ENTRYPOINT [ "./entrypoint.sh" ]
Container orchestration
Setting up some infra to run the test above is not as straigthforward as the other stuff in this post. But if we do it properly we should end up with something that emulates a real app very closely and it’s also very cheap to run. What you can see below is the definition of an ECS cluster with no instances since we plan to use Fargate to run the app without provisioning any servers. The coolest part below is the DockerImageAsset
which creates and publish a docker images to a CDK created container registry in one single step. This is followed by a fargate task definition and by some networking stuff. We will need that security group and subnet info to be exposed in order to allow connections from this app to the kafka cluster.
e2e_cluster = aws_ecs.Cluster(self, 'e2e-cluster',
vpc=vpc,
cluster_name='e2e-cluster')
e2e_image = aws_ecr_assets.DockerImageAsset(self, 'e2e-image',
directory='test/e2e')
e2e_task = aws_ecs.FargateTaskDefinition(self, 'e2e-task',
family='e2e-task')
e2e_task.add_container('e2e-test-kafka',
image=aws_ecs.ContainerImage.from_docker_image_asset(e2e_image),
logging=aws_ecs.AwsLogDriver(stream_prefix='e2e'))
e2e_security_group = aws_ec2.SecurityGroup(self, 'e2e', vpc=vpc)
self.e2e_security_group = e2e_security_group # expose it to give it access to kafka
core.CfnOutput(self,"subnets",
value=','.join([subnet.subnet_id for subnet in vpc.private_subnets]))
core.CfnOutput(self, "securitygroup",
value=e2e_security_group.security_group_id)
We can then use the exported values to allow ingress from this ECS cluster to the MSK one. This is at a lower level than we would like the code to be, but it seems to be the best way at the time I am writing this. Ideally I would like to give an instruction to connect the two clusters, and leave aws to take care of the protocols and ports and such, since those only have one possible correct value.
cluster_sec_group = aws_ec2.SecurityGroup(self, 'msk-cluster-sec-group',
vpc=vpc)
cluster_sec_group.add_ingress_rule(
peer=e2e_security_group,
connection=aws_ec2.Port(
string_representation='kafka',
protocol=aws_ec2.Protocol.TCP,
from_port=DEFAULT_KAFKA_PORT,
to_port=DEFAULT_KAFKA_PORT
))
Tying together in the CICD pipeline
In order to run the above I had to create a pretty hairy python script to retrieve the configuration from cloudformation outputs and wait for tasks to finish etc. The important part of that script is triggering the actual end to end test. Notice the KAFKA_BROKERS
variable is being set here to configure the test from above.
task_run_resp = ecs.run_task(
cluster=e2e_cluster,
taskDefinition=e2e_task_definition,
launchType='FARGATE',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': e2e_subnets,
'securityGroups': [e2e_security_group],
'assignPublicIp': 'DISABLED'
}
},
overrides={'containerOverrides':[{
'name': 'e2e-test-kafka',
'environment':[
{'name': 'KAFKA_BROKERS', 'value': msk_bootstrap_brokers}
]
}]}
)
This is all tied together in the github actions script.
- name: deploy network stack
run: |
cdk deploy --require-approval never kafka-infra-python-network
- name: deploy e2e test stack
run: |
cdk deploy --require-approval never kafka-infra-python-e2e-test
- name: deploy main stack
run: |
cdk deploy --require-approval never kafka-infra-python
- name: run the e2e test
run: |
python test/e2e/run-task.py
You can also use the following commands to destroy the stacks in order once you are done.
cdk destroy --force kafka-infra-python
cdk destroy --force kafka-infra-python-e2e-test
cdk destroy --force kafka-infra-python-network
Conclusion
That was way longer than expected! As usual, writing the main code was the easy and fun part. Putting it all together so that we can have trust in what we build and reuse it later was more difficult.
My feeling at the end of this is that this is much nicer than writing template/markup oriented code in Terraform/Cloudformation. There is a lot of scope for using general programming language tooling and abstraction methods to improve. I am especially interested in exploring how to make new CDK constructs.
There are many open questions around how this can be used to manage large scale infrastructure with many interdependant stacks. Still I will definitely try to use it as often as possible, and would much prefer to start new projects using this approach. I include Pulumi in this as well since it’s very similar in goals and execution.
Issues encountered
- The kafka port for the secure connection is 9094, not 9092. I did not know that.
- If you want to use more avanced CDK functionality you need to issue a
cdk bootsrap
to create some common resources. A example of advanced feature is the automagick container image creation. - The best way of passing resource between stacks that I found was to create them as stack object members (self.xx), retrieve them from there in the app and then pass them on the constructor of the dependent. I am not sure if that is the best practice, but it does feel natural.
- In my case the stacks need to be create in a certain order, e.g. network stack before anything else. I did that by using explicit dependencies, since I did not have time to check if these are detected automatically. It would be cool if that was the case.
- My end to end test (integration test?) turned out to take some time to create. I am unsure if there isn’t some easier way. Still it was a good opportunity to test cross-stack dependecies.
- It seems that not all resources have “native” CDK constructs defined as of today, notable MSK cluster themselves. This was not a big issue since it looks like aws used auto generated code based on the cloudformation API that works quite well.