I have been working with Kafka lately, I am building a microservices solution that utilizes Kafka for various tasks. I wanted a simple administration tool to help me with simple tasks for managing the Kafka environment on my development environments. Basically, I wanted the tool to do the following:
- Cluster
- Describe the Kafka cluster
- Describe the configuration
- Change the configuration of the cluster
- Manage ACLs
- Create
- Delete
- Describe
- Manage Delegation Tokens
- Create a delegation token
- Describe delegation token
- Renew delegation token
- Expire delegation token
- Manage Topics
- List topics
- Describe topics
- Create topics
- Delete Topics
- Increase the number of topic partitions
- Manage Consumer Groups
- Lists consumers and the topics they subscribed too
- List consumers offsets
And the list can go on and on, to simplify the code I started this as a console application in Scala, with IntelliJ community edition. I have not finished the tool yet, but I got it to a usable level that is helping me in my most day to day tasks.
I thought it might be helpful to share the code maybe it helps others. Please do not hesitate to contact me with your comments or thoughts about this tool.
Figure 1 shows the project structure.
Figure 1: Project Structure
When you run the tool you get the following main menu
********************************************************************************
***** Kafka Admin Tool *****
***** (c) Genetic Thought Software Inc 2018 *****
********************************************************************************
***** Note: set Kafka environment in environment.properties *****
***** Connecting to server: localhost:9092 *****
***** Connected to server: localhost:9092 *****
********************************************************************************
***** Commands: *****
***** 1- Describe Cluster *****
***** 2- Topics *****
***** 3- Consumers *****
***** 4- Exit *****
********************************************************************************
***** Enter selection: *****
If for example you selected the Topics menu you get the following menu
********************************************************************************
***** Topics Commands: *****
***** 1- List Topics *****
***** 2- Describe Topics *****
***** 3- Create Topics *****
***** 4- Delete Topic *****
***** 5- Increase Topic Partitions *****
***** 6- Return *****
********************************************************************************
***** Enter selection: *****
1
***** Topics in this cluster are: *****
***** Topic: US *****
***** Topic: China *****
***** Topic: Mexico *****
***** Topic: Korai *****
***** Topic: Canada *****
***** Topic: India *****
***** Topic: Italy *****
***** Topic: Germany *****
********************************************************************************
The mainApp is the objects that extends the App trait, here I just setup the logger and load the properties form environment.properties file. Where you set
bootstrap.servers to Kafka node. Initialize the KafkaAdmin object admin and start the controller mainMenu which controls the rest of the lifecycle till the user selects exit.
/* *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/
import com.geneticthought.Kafka.Administration.KafkaAdmin
import com.geneticthought.Utilities.{Helpers, Printer}
import org.slf4j.LoggerFactory
object mainApp extends App {
val logger = LoggerFactory.getLogger(“MainApp”)
logger.info(“Starting”)
val props = Helpers.getProperties(“environment.properties”)
Printer.printHeader
Printer.printBoxedString(s”Connecting to server: ” + props.getProperty(“bootstrap.servers”), 1)
val admin = new KafkaAdmin(props)
Printer.printBoxedString(s”Connected to server: ” + props.getProperty(“bootstrap.servers”), 1)
Controller.mainMenu(admin)
Printer.printBoxedString(s”Disconnecting from server: ” + props.getProperty(“bootstrap.servers”), 1)
admin.close
}
Below is the Controller Object Listing which is very simple implementation that take advantage of some of the Scala functional capabilities
* *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/
import com.geneticthought.Kafka.Administration.KafkaAdmin
import com.geneticthought.Utilities.{Helpers, Printer}
import scala.io.StdIn
object Controller {
def mainMenu(admin: KafkaAdmin): Unit = menu(4, admin, Printer.printCommandsMenu)(mainMenuCommands)
def menu(exCommand: Int, admin: KafkaAdmin, mn: () => Unit)(cmd: (Int, KafkaAdmin) => Unit): Unit = {
var command: Int = 0
while (command != exCommand) {
mn()
Printer.printBoxedString(“Enter selection: “, 2)
var str = StdIn.readLine()
command = Helpers.toInt(str)
cmd(command, admin)
}
}
def mainMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => describeCluster(admin)
case 2 => TopicsCommand(admin)
case 3 => ConsumerCommands(admin)
case 4 => Printer.printBoxedString(“Exiting “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}
def describeCluster(admin: KafkaAdmin): Unit = {
val cluster = admin.descibeCluster
println(cluster.toString)
}
def TopicsCommand(admin: KafkaAdmin): Unit = menu(6, admin, Printer.printTopicsMenu)(topicsMenuCommands)
def topicsMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => listTopics(admin)
case 2 => describeTopics(admin)
case 3 => createTopics(admin)
case 4 => deleteTopics(admin)
case 5 => increaseTopicPartations(admin)
case 6 => Printer.printBoxedString(“Return to main Menu “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}
def listTopics(admin: KafkaAdmin): Unit = {
val topics = admin.getTopics
if (topics.size > 0) Printer.printBoxedString(“Topics in this cluster are:”, 1)
else Printer.printBoxedString(“No Topics in this cluster defined yet”, 1)
for (x <- admin.getTopics) Printer.printBoxedString(s”Topic: $x“, 2)
}
def describeTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Describe:$topics “, 2)
return
}
Printer.printBoxedString(s”Describe Topics:$topics “, 2)
val tpdes = admin.describeTopics(topics)
for (tpd <- tpdes)
Printer.printBoxedString(tpd.toString, 4)
Printer.printBoxedString(s”Describe Topics:$topics Done”, 2)
}
def createTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics.map(x => (x, 1, 1.toShort)).toList
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Create:$topics “, 2)
return
}
admin.createTopics(topics)
Printer.printBoxedString(s”Created Topics: “, 2)
for (t <- topics) Printer.printBoxedString(s”(topicName,Partions,Replication):$t “, 4)
}
def deleteTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Delete:$topics “, 2)
return
}
admin.deleteTopics(topics)
Printer.printBoxedString(s”Deleted Topics:$topics “, 2)
}
def increaseTopicPartations(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Increse Partitions:$topics “, 2)
return
}
Printer.printBoxedString(“Enter Total Patitions No: “, 2)
var str = StdIn.readLine()
val partNo = Helpers.toInt(str)
if (partNo <= 1) {
Printer.printBoxedString(s”Partitions Number has to be ?> 1 you entered :$partNo “, 2)
return
}
val topicsIncrease: Map[String, Int] = topics.map(x => x -> partNo).toMap
admin.increasePartitions(topicsIncrease)
Printer.printBoxedString(s”Increased Topics $topics partitions to :$partNo “, 2)
}
def getTopics: List[String] = {
var cont = true
var result: List[String] = null
do
try {
Printer.printBoxedString(“Enter Topics names seperated by comma”, 1)
val input = StdIn.readLine() + “,”
result = input.split(“,”).toList
cont = false
}
catch {
case _ => Printer.printBoxedString(“Invalid Topics List”, 2)
}
while (cont)
result
}
def ConsumerCommands(admin: KafkaAdmin): Unit = menu(2, admin, Printer.printConsumersMenu)(consumersMenuCommands)
def consumersMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => listConsumers(admin)
case 2 => Printer.printBoxedString(“Return to main Menu “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}
def listConsumers(admin: KafkaAdmin): Unit = {
val consumers = admin.listConsumers
Printer.printBoxedString(s”Consumers List: “, 2)
consumers.foreach(x => Printer.printBoxedString(x, 4))
Printer.printBoxedString(s”end of Consumers List “, 2)
}
}
Below is the KafkaAdmin Class Listing which is implementing the main functionality of performing actions/queries against Kafka cluster
* *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/
package com.geneticthought.Kafka.Administration
import java.util.Properties
import org.apache.kafka.clients.admin._
import scala.collection.JavaConverters
class KafkaAdmin(val props: Properties) {
private val client: AdminClient = AdminClient.create(props)
def close: Unit = this.client.close()
/* Cluster operations */
def descibeCluster: ClusterInfo = new ClusterInfo(this.client.describeCluster())
/* Topics region */
def getTopics: Set[String] = JavaConverters.asScalaSet(this.client.listTopics.names.get()).toSet
def createTopics(topics: List[(String, Int, Short)]): Unit = {
val kafkaTopics = scala.collection.mutable.ArrayBuffer[NewTopic]()
for (topic <- topics) kafkaTopics += new NewTopic(topic._1, topic._2, topic._3)
val opresult = this.client.createTopics(JavaConverters.asJavaCollection(kafkaTopics))
opresult.all()
}
def deleteTopics(topics: List[String]): Unit = {
val result = this.client.deleteTopics(JavaConverters.asJavaCollection(topics))
result.all()
}
def describeTopics(topics: List[String]): Iterable[TopicDescription] = {
val describeResult = this.client.describeTopics(JavaConverters.asJavaCollection(topics))
val topicsdesc = describeResult.all().get()
JavaConverters.collectionAsScalaIterable(topicsdesc.values())
}
def increasePartitions(partitions: Map[String, Int]): Unit = {
val partionsRequest: scala.collection.mutable.ListMap[String, NewPartitions] =
new scala.collection.mutable.ListMap[String, NewPartitions]()
for ((k, v) <- partitions) {
partionsRequest += (k -> NewPartitions.increaseTo(v))
}
val requestReslut = this.client.createPartitions(JavaConverters.mutableMapAsJavaMap(partionsRequest))
requestReslut.all()
}
/* Configuration region */
/* Records region */
/* consumers region */
def listConsumers: List[String] = {
val consumers = this.client.listConsumerGroups().all().get().toArray()
consumers.map(x => x.toString).toList
}
}
The rest of the code is available for download.
Hope this helps