Kafka Cluster
View SourceRelease NotesThis folder contains a Terraform module for running a cluster of Apache Kafka brokers. Under the hood, the cluster is powered by the server-group module, so it supports attaching ENIs and EBS Volumes, zero-downtime rolling deployment, and auto-recovery of failed nodes.
Quick start
- See the root README for instructions on using Terraform modules.
- See the kafka-zookeeper-standalone-clusters example for sample usage.
- See vars.tf for all the variables you can set on this module.
- See Connecting to Kafka brokers for instructions on reading / writing to Kafka.
Key considerations for using this module
Here are the key things to take into account when using this module:
Kafka AMI
You specify the AMI to run in the cluster using the ami_id input variable. We recommend creating a
Packer template to define the AMI with the following modules installed:
install-open-jdk: Install OpenJDK. Note that this module is part of terraform-aws-zookeeper.
install-supervisord: Install Supervisord as a process manager. Note that this module is part of terraform-aws-zookeeper.
install-kafka: Install Kafka.
run-kafka: A script used to configure and start Kafka.
See the kafka-ami example for working sample code.
User Data
When your servers are booting, you need to tell them to start Kafka. The easiest way to do that is to specify a User
Data script via the user_data
input variable that runs the run-kafka script. See
kafka-user-data.sh for an example.
ZooKeeper
Kafka depends on ZooKeeper to work. The easiest way to run ZooKeeper is with terraform-aws-zookeeper. Check out the kafka-zookeeper-standalone-clusters example for how to run Kafka and ZooKeeper in separate clusters and the kafka-zookeeper-confluent-oss-colocated-cluster example for how to run Kafka and ZooKeeper co-located in the same cluster.
Hardware
The number and type of servers you need for Kafka depends on your use case and the amount of data you expect to process. Here are a few basic rules of thumb:
Every write to Kafka gets persisted to Kafka's log on disk, so hard drive performance is important. Check out Logs and EBS Volumes for more info.
Most writes to Kafka are initially buffered in memory by the OS. Therefore, you need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate: e.g., if you want to be able to buffer for 30 seconds, then you need at least
write_throughput * 30, wherewrite_throughputis how many MB/s you expect to be written to your Kafka cluster. Using 32GB+ machines for Kafka brokers is common.Kafka is not particularly CPU intensive, so getting machines with more cores is typically more efficient than machines with higher clock speeds. Note that enabling SSL for Kafka brokers significantly increases CPU usage.
In general
r3.xlargeorm4.2xlargeare a good choice for Kafka brokers.
For more info, see:
- Kafka Production Deployment
- Kafka Reference Architecture
- Design and Deployment Considerations for Deploying Apache Kafka on AWS
Logs and EBS Volumes
Every write to a Kafka broker is persisted to disk in Kafka's log. We recommend using a separate EBS Volume to store these logs. This ensures the hard drive used for transaction logs does not have to contend with any other disk operations, which can improve Kafka performance. Moreover, if a Kafka broker is replaced (e.g., during a deployment or after a crash), it can reattach the same EBS Volume and catch up on whatever data it missed much faster than if it has to start from scratch (see Design and Deployment Considerations for Deploying Apache Kafka on AWS).
This module creates an EBS Volume for each Kafka server and gives each (server, EBS Volume) a matching
ebs-volume-0 tag. You can use the persistent-ebs-volume
module in the User
Data of each server to find an
EBS Volume with a matching ebs-volume-0 tag and attach it to the server during boot. That way, if a server goes down
and is replaced, its replacement reattaches the same EBS Volume.
See kafka-user-data.sh for an example.
Health checks
We strongly recommend associating an Elastic Load Balancer
(ELB) with your Kafka cluster and configuring it
to perform TCP health checks on the Kafka broker port (9092 by default). The kafka-cluster module allows you
to associate an ELB with Kafka, using the ELB's health checks to perform zero-downtime
deployments (i.e., ensuring the previous node is passing health checks before deploying the next
one) and to detect when a server is down and needs to be automatically replaced.
Note that we do NOT recommend connecting to Kafka via the ELB. That's because Kafka clients need to connect to specific brokers, depending on which topics and partitions they are using, whereas an ELB will randomly round-robin requests across all brokers.
Check out the kafka-zookeeper-standalone-clusters example for working sample code that includes an ELB.
Rolling deployments
To deploy updates to a Kafka cluster, such as rolling out a new version of the AMI, you need to do the following:
- Shut down a Kafka broker on one server.
- Deploy the new code on the same server.
- Wait for the new code to come up successfully and start passing health checks.
- Repeat the process with the remaining servers.
This module can do this process for you automatically by using the server-group module's support for zero-downtime rolling deployment.
Data backup
Kafka's primary mechanism for backing up data is the replication within the cluster. Typically, the only backup you may do beyond that is to create a Kafka consumer that dumps all data into a permanent, reliable store such as S3. This functionality is NOT included with this module.
Connecting to Kafka brokers
Once you've used this module to deploy the Kafka brokers, you'll want to connect to them from Kafka clients (e.g.,
Kafka consumers and producers in your apps) to read and write data. To do this, you typically need to configure the
bootstrap.servers property for your Kafka client with the IP addresses of a few of your Kafka brokers (you don't
need all the IPs, as the rest will be discovered automatically via ZooKeeper):
--bootstrap.servers=10.0.0.4:9092,10.0.0.5:9092,10.0.0.6:9092
There are two main ways to get the IP addresses of your Kafka brokers:
Find Kafka brokers by tag
Each Kafka broker deployed using this module will have a tag called ServerGroupName with the value set to the
var.name parameter you pass in. You can automatically discover all the servers with this tag and get their IP
addresses using either the AWS CLI or AWS SDK.
Here's an example using the AWS CLI:
aws ec2 describe-instances \
--region <REGION> \
--filters \
"Name=instance-state-name,Values=running" \
"Name=tag:ServerGroupName,Values=<KAFKA_CLUSTER_NAME>"
In the command above, you'll need to replace <REGION> with your AWS region (e.g., us-east-1) and
<KAFKA_CLUSTER_NAME> with the name of your Kafka cluster (i.e., the var.name parameter you passed to this module).
The returned data will contain the information about all the Kafka brokers, including their private IP addresses.
Extract these IPs, add the Kafka port to each one (default 9092), and put them into a comma-separated list:
--bootstrap.servers=10.0.0.4:9092,10.0.0.5:9092,10.0.0.6:9092
Find Kafka brokers using ENIs
An alternative option is to attach an Elastic Network Interface
(ENI) to each Kafka broker so that it has a static
IP address. You can enable ENIs using the attach_eni parameter:
module "kafka_brokers" {
source = "git::git@github.com:gruntwork-io/terraform-aws-kafka.git//modules/kafka-cluster?ref=v0.0.5"
cluster_name = "example-kafka-brokers"
attach_eni = true
# (other params omitted)
}
With ENIs enabled, this module will output the list of private IPs for your brokers in the private_ips output
variable. Attach the port number (default 9092) to each of these IPs and pass them on to your Kafka clients:
bootstrap_servers = "${formatlist("%s:9092", module.kafka_brokers.private_ips)}"
The main downside of using ENIs is if you decide to change the size of your Kafka cluster, and therefore the number of
ENIs, then Kafka clients that have the old list of ENIs won't be updated until you re-deploy them with a
terraform apply. If you increased the size of your cluster, then those older clients may not have access to all the
available ENIs, which is typically not a problem, since they are only used for bootstrapping, and you only need a few
anyway. However, if you decreased the size of your cluster, then those older clients may be trying to connect to ENIs
that are no longer valid.
Sample Usage
- Terraform
- Terragrunt
# ------------------------------------------------------------------------------------------------------
# DEPLOY GRUNTWORK'S KAFKA-CLUSTER MODULE
# ------------------------------------------------------------------------------------------------------
module "kafka_cluster" {
source = "git::git@github.com:gruntwork-io/terraform-aws-kafka.git//modules/kafka-cluster?ref=v0.11.0"
# ----------------------------------------------------------------------------------------------------
# REQUIRED VARIABLES
# ----------------------------------------------------------------------------------------------------
# A list of CIDR-formatted IP address ranges that will be allowed to connect to
# the Kafka brokers
allowed_inbound_cidr_blocks = <INPUT REQUIRED>
# A list of security group IDs that will be allowed to connect to the Kafka
# brokers
allowed_inbound_security_group_ids = <INPUT REQUIRED>
# The ID of the AMI to run in this cluster. Should be an AMI that has Kafka
# installed by the install-kafka module.
ami_id = <INPUT REQUIRED>
# The AWS region to deploy into.
aws_region = <INPUT REQUIRED>
# The name of the Kafka cluster (e.g. kafka-stage). This variable is used to
# namespace all resources created by this module.
cluster_name = <INPUT REQUIRED>
# The number of brokers to have in the cluster.
cluster_size = <INPUT REQUIRED>
# The type of EC2 Instances to run for each node in the cluster (e.g. t2.micro).
instance_type = <INPUT REQUIRED>
# The number of security group IDs in var.allowed_inbound_security_group_ids. We
# should be able to compute this automatically, but due to a Terraform limitation,
# we can't:
# https://github.com/hashicorp/terraform/issues/14677#issuecomment-302772685
num_allowed_inbound_security_group_ids = <INPUT REQUIRED>
# The subnet IDs into which the EC2 Instances should be deployed. You should
# typically pass in one subnet ID per node in the cluster_size variable. We
# strongly recommend that you run Kafka in private subnets.
subnet_ids = <INPUT REQUIRED>
# A User Data script to execute while the server is booting. We remmend passing in
# a bash script that executes the run-kafka script, which should have been
# installed in the AMI by the install-kafka module.
user_data = <INPUT REQUIRED>
# The ID of the VPC in which to deploy the cluster
vpc_id = <INPUT REQUIRED>
# ----------------------------------------------------------------------------------------------------
# OPTIONAL VARIABLES
# ----------------------------------------------------------------------------------------------------
# A list of Security Group IDs that should be added to the Auto Scaling Group's
# Launch Configuration used to launch the Kafka cluster EC2 Instances.
additional_security_group_ids = []
# A list of CIDR-formatted IP address ranges from which the EC2 Instances will
# allow SSH connections
allowed_ssh_cidr_blocks = []
# A list of security group IDs from which the EC2 Instances will allow SSH
# connections
allowed_ssh_security_group_ids = []
# If set to true, associate a public IP address with each EC2 Instance in the
# cluster. We strongly recommend against making Kafka nodes publicly accessible.
associate_public_ip_address = false
# Set to true to attach an Elastic Network Interface (ENI) to each server. This is
# an IP address that will remain static, even if the underlying servers are
# replaced.
attach_eni = false
# The port the Kafka brokers should listen on
broker_port = 9092
# Custom tags to apply to the Kafka nodes and all related resources (i.e.,
# security groups, EBS Volumes, ENIs).
custom_tags = {}
# How many servers to deploy at a time during a rolling deployment. For example,
# if you have 10 servers and set this variable to 2, then the deployment will a)
# undeploy 2 servers, b) deploy 2 replacement servers, c) repeat the process for
# the next 2 servers.
deployment_batch_size = 1
# The common portion of the DNS name to assign to each ENI in the Confluent Tools
# server group. For example, if confluent.acme.com, this module will create DNS
# records 0.confluent.acme.com, 1.confluent.acme.com, etc. Note that this value
# must be a valid record name for the Route 53 Hosted Zone ID specified in
# var.route53_hosted_zone_id.
dns_name_common_portion = null
# A list of DNS names to assign to the ENIs in the Confluent Tools server group.
# Make sure the list has n entries, where n = var.cluster_size. If this var is
# specified, it will override var.dns_name_common_portion. Example: [0.acme.com,
# 1.acme.com, 2.acme.com]. Note that the list entries must be valid records for
# the Route 53 Hosted Zone ID specified in var.route53_hosted_zone_id.
dns_names = []
# The TTL (Time to Live) to apply to any DNS records created by this module.
dns_ttl = 300
# A list that defines the EBS Volumes to create for each server. Each item in the
# list should be a map that contains the keys 'type' (one of standard, gp2, or
# io1), 'size' (in GB), and 'encrypted' (true or false). We recommend attaching an
# EBS Volume to Kafka to use for Kafka logs.
ebs_volumes = [{"encrypted":true,"size":50,"type":"gp2"}]
# A list of Elastic Load Balancer (ELB) names to associate with the Kafka brokers.
# We recommend using an ELB for health checks. If you're using an Application Load
# Balancer (ALB), use var.target_group_arns instead.
elb_names = []
# Enable detailed CloudWatch monitoring for the servers. This gives you more
# granularity with your CloudWatch metrics, but also costs more money.
enable_detailed_monitoring = false
# If true, create an Elastic IP Address for each ENI and associate it with the
# ENI.
enable_elastic_ips = false
# A list of metrics the ASG should enable for monitoring all instances in a group.
# The allowed values are GroupMinSize, GroupMaxSize, GroupDesiredCapacity,
# GroupInServiceInstances, GroupPendingInstances, GroupStandbyInstances,
# GroupTerminatingInstances, GroupTotalInstances.
enabled_metrics = []
# Time, in seconds, after instance comes into service before checking health.
health_check_grace_period = 300
# Controls how health checking is done. Must be one of EC2 or ELB.
health_check_type = "EC2"
# The port the Kafka Connect Worker should listen on. Set to 0 to disable this
# Security Group Rule.
kafka_connect_port = 8083
# Whether the root volume should be destroyed on instance termination.
root_volume_delete_on_termination = true
# If true, the launched EC2 instance will be EBS-optimized.
root_volume_ebs_optimized = false
# The size, in GB, of the root EBS volume.
root_volume_size = 50
# The type of volume. Must be one of: standard, gp2, or io1.
root_volume_type = "gp2"
# The ID of the Route53 Hosted Zone in which we will create the DNS records
# specified by var.dns_names. Must be non-empty if var.dns_name_common_portion or
# var.dns_names is non-empty.
route53_hosted_zone_id = null
# The log level to use with the rolling deploy script. It can be useful to set
# this to DEBUG when troubleshooting the script.
script_log_level = "INFO"
# If set to true, skip the health check, and start a rolling deployment without
# waiting for the server group to be in a healthy state. This is primarily useful
# if the server group is in a broken state and you want to force a deployment
# anyway.
skip_health_check = false
# If set to true, skip the rolling deployment, and destroy all the servers
# immediately. You should typically NOT enable this in prod, as it will cause
# downtime! The main use case for this flag is to make testing and cleanup easier.
# It can also be handy in case the rolling deployment code has a bug.
skip_rolling_deploy = false
# The name of an EC2 Key Pair that can be used to SSH to the EC2 Instances in this
# cluster. Set to an empty string to not associate a Key Pair.
ssh_key_name = null
# The port used for SSH connections
ssh_port = 22
# A list of target group ARNs of Application Load Balanacer (ALB) targets to
# associate with the Kafka brokers. We recommend using an ELB for health checks.
# If you're using a Elastic Load Balancer (AKA ELB Classic), use var.elb_names
# instead.
target_group_arns = []
# The tenancy of the instance. Must be one of: default or dedicated.
tenancy = "default"
# By passing a value to this variable, you can effectively tell this module to
# wait to deploy until the given variable's value is resolved, which is a way to
# require that this module depend on some other module. Note that the actual value
# of this variable doesn't matter.
wait_for = ""
# A maximum duration that Terraform should wait for ASG instances to be healthy
# before timing out. Setting this to '0' causes Terraform to skip all Capacity
# Waiting behavior.
wait_for_capacity_timeout = "10m"
}
# Coming soon!
Reference
- Inputs
- Outputs
Required
allowed_inbound_cidr_blockslist(string)A list of CIDR-formatted IP address ranges that will be allowed to connect to the Kafka brokers
allowed_inbound_security_group_idslist(string)A list of security group IDs that will be allowed to connect to the Kafka brokers
ami_idstringThe ID of the AMI to run in this cluster. Should be an AMI that has Kafka installed by the install-kafka module.
aws_regionstringThe AWS region to deploy into.
cluster_namestringThe name of the Kafka cluster (e.g. kafka-stage). This variable is used to namespace all resources created by this module.
cluster_sizenumberThe number of brokers to have in the cluster.
instance_typestringThe type of EC2 Instances to run for each node in the cluster (e.g. t2.micro).
The number of security group IDs in allowed_inbound_security_group_ids. We should be able to compute this automatically, but due to a Terraform limitation, we can't: https://github.com/hashicorp/terraform/issues/14677#issuecomment-302772685
subnet_idslist(string)The subnet IDs into which the EC2 Instances should be deployed. You should typically pass in one subnet ID per node in the cluster_size variable. We strongly recommend that you run Kafka in private subnets.
user_datastringA User Data script to execute while the server is booting. We remmend passing in a bash script that executes the run-kafka script, which should have been installed in the AMI by the install-kafka module.
vpc_idstringThe ID of the VPC in which to deploy the cluster
Optional
additional_security_group_idslist(string)A list of Security Group IDs that should be added to the Auto Scaling Group's Launch Configuration used to launch the Kafka cluster EC2 Instances.
[]allowed_ssh_cidr_blockslist(string)A list of CIDR-formatted IP address ranges from which the EC2 Instances will allow SSH connections
[]allowed_ssh_security_group_idslist(string)A list of security group IDs from which the EC2 Instances will allow SSH connections
[]If set to true, associate a public IP address with each EC2 Instance in the cluster. We strongly recommend against making Kafka nodes publicly accessible.
falseattach_eniboolSet to true to attach an Elastic Network Interface (ENI) to each server. This is an IP address that will remain static, even if the underlying servers are replaced.
falsebroker_portnumberThe port the Kafka brokers should listen on
9092custom_tagsmap(string)Custom tags to apply to the Kafka nodes and all related resources (i.e., security groups, EBS Volumes, ENIs).
{}deployment_batch_sizenumberHow many servers to deploy at a time during a rolling deployment. For example, if you have 10 servers and set this variable to 2, then the deployment will a) undeploy 2 servers, b) deploy 2 replacement servers, c) repeat the process for the next 2 servers.
1dns_name_common_portionstringThe common portion of the DNS name to assign to each ENI in the Confluent Tools server group. For example, if confluent.acme.com, this module will create DNS records 0.confluent.acme.com, 1.confluent.acme.com, etc. Note that this value must be a valid record name for the Route 53 Hosted Zone ID specified in route53_hosted_zone_id.
nulldns_nameslist(string)A list of DNS names to assign to the ENIs in the Confluent Tools server group. Make sure the list has n entries, where n = cluster_size. If this var is specified, it will override dns_name_common_portion. Example: [0.acme.com, 1.acme.com, 2.acme.com]. Note that the list entries must be valid records for the Route 53 Hosted Zone ID specified in route53_hosted_zone_id.
[]dns_ttlnumberThe TTL (Time to Live) to apply to any DNS records created by this module.
300ebs_volumeslist(object(…))A list that defines the EBS Volumes to create for each server. Each item in the list should be a map that contains the keys 'type' (one of standard, gp2, or io1), 'size' (in GB), and 'encrypted' (true or false). We recommend attaching an EBS Volume to Kafka to use for Kafka logs.
list(object({
type = string
size = number
encrypted = bool
}))
[
{
encrypted = true,
size = 50,
type = "gp2"
}
]
elb_nameslist(string)A list of Elastic Load Balancer (ELB) names to associate with the Kafka brokers. We recommend using an ELB for health checks. If you're using an Application Load Balancer (ALB), use target_group_arns instead.
[]Enable detailed CloudWatch monitoring for the servers. This gives you more granularity with your CloudWatch metrics, but also costs more money.
falseIf true, create an Elastic IP Address for each ENI and associate it with the ENI.
falseenabled_metricslist(string)A list of metrics the ASG should enable for monitoring all instances in a group. The allowed values are GroupMinSize, GroupMaxSize, GroupDesiredCapacity, GroupInServiceInstances, GroupPendingInstances, GroupStandbyInstances, GroupTerminatingInstances, GroupTotalInstances.
[]Example
enabled_metrics = [
"GroupDesiredCapacity",
"GroupInServiceInstances",
"GroupMaxSize",
"GroupMinSize",
"GroupPendingInstances",
"GroupStandbyInstances",
"GroupTerminatingInstances",
"GroupTotalInstances"
]
Time, in seconds, after instance comes into service before checking health.
300health_check_typestringControls how health checking is done. Must be one of EC2 or ELB.
"EC2"kafka_connect_portnumberThe port the Kafka Connect Worker should listen on. Set to 0 to disable this Security Group Rule.
8083Whether the root volume should be destroyed on instance termination.
trueIf true, the launched EC2 instance will be EBS-optimized.
falseroot_volume_sizenumberThe size, in GB, of the root EBS volume.
50root_volume_typestringThe type of volume. Must be one of: standard, gp2, or io1.
"gp2"route53_hosted_zone_idstringThe ID of the Route53 Hosted Zone in which we will create the DNS records specified by dns_names. Must be non-empty if dns_name_common_portion or dns_names is non-empty.
nullscript_log_levelstringThe log level to use with the rolling deploy script. It can be useful to set this to DEBUG when troubleshooting the script.
"INFO"If set to true, skip the health check, and start a rolling deployment without waiting for the server group to be in a healthy state. This is primarily useful if the server group is in a broken state and you want to force a deployment anyway.
falseIf set to true, skip the rolling deployment, and destroy all the servers immediately. You should typically NOT enable this in prod, as it will cause downtime! The main use case for this flag is to make testing and cleanup easier. It can also be handy in case the rolling deployment code has a bug.
falsessh_key_namestringThe name of an EC2 Key Pair that can be used to SSH to the EC2 Instances in this cluster. Set to an empty string to not associate a Key Pair.
nullssh_portnumberThe port used for SSH connections
22target_group_arnslist(string)A list of target group ARNs of Application Load Balanacer (ALB) targets to associate with the Kafka brokers. We recommend using an ELB for health checks. If you're using a Elastic Load Balancer (AKA ELB Classic), use elb_names instead.
[]tenancystringThe tenancy of the instance. Must be one of: default or dedicated.
"default"wait_forstringBy passing a value to this variable, you can effectively tell this module to wait to deploy until the given variable's value is resolved, which is a way to require that this module depend on some other module. Note that the actual value of this variable doesn't matter.
""A maximum duration that Terraform should wait for ASG instances to be healthy before timing out. Setting this to '0' causes Terraform to skip all Capacity Waiting behavior.
"10m"