Apache Kafka Consumer in Rust

This comprehensive guide discovers how to create a simple but powerful and efficient Kafka consumer using the Rust programming language.

Since its release, Rust has quickly become one of the most widely adopted programming languages. It has also seen lots of development, improvements, and enhancements. With its safety, speed, and concurrency features, Rust is one of the best languages to work with Kafka.

Whether you are new to Rust or Kafka, we will keep it at the basics to allow you to grasp every detail. In addition, we will add some simple code comments and explainers to help you get better at both technologies.

By the end of this tutorial, you will better understand how to produce Kafka messages using the Rust programming language.

Let us dive and build this thing!

Requirements

To follow along with this tutorial, you are required with the following:

  1. Installed and running Apache Kafka on your machine.
  2. The latest version of the Rust compiler on your system.

When you meet the given requirements, we can proceed with the tutorial.

Project Setup

The first step is to create a new Rust Project using Cargo. Then, we can run the command as shown in the following:

$ cargo new kafka-rs --bin

The previous command should initialize a new Rust project with the name “kafka-rs”.

Navigate into the kafka-rs directory and edit the “cargo.toml” file. This allows us to add the required dependencies for the project.

Add the RDKafka Library

We use the rust-rdkafka library to interact with the Kafka cluster. This free and open-source Rust package allows us to interact with a Kafka cluster.

Edit the “cargo.toml” file with your text editor of choice:

$ vim cargo.toml

Add the rust-rdkafka dependency as shown in the following entry:

[dependencies]

rdkafka = { version = "0.25", features = ["cmake-build"] }

Setup a Kafka Producer

Once everything is ready, create a “producer.rs” file in the src directory. This file contains the source code for the Kafka producer.

The source code is as shown in the following:

use rdkafka::producer::{FutureProducer, FutureRecord};

use std::time::Duration;

async fn main() {

let brokers = "localhost:9092";

let topic = "sample_topic";

let producer = FutureProducer::builder()

  .brokers(brokers)

  .build();

let message = "Test Message from Rust";

let record = FutureRecord::to::to(topic).key("").payload(message);

match producer.send(record, Duration::from_secs(0)).await {

Ok(_) => println!("Written: {}", message),

Err(e) => eprintln!("Error writting message: {}", e),

  }

}

The previous code simply writes a simple message to the Kafka cluster that is defined in the “brokers” variable. We also target the sample_topic within the cluster.

Setup the Kafka Consumer

The next step is to define a consumer application which allows us to read the topic’s messages.

Create a “consumer.rs” file in the src directory and add the source code as shown in the following:

$ touch ./src/consumer.rs

Source Code:

use rdkafka::config::ClientConfig;

use rdkafka::consumer::{Consumer, StreamConsumer};

fn main() {

let brokers = "localhost:9092";

let group_id = "local";

let topic = "sample_topic";

let consumer: StreamConsumer = ClientConfig::new()

  .set("group.id", group_id)

  .set("bootstrap.servers", brokers)

  .set("enable.auto.commit", "false")

  .create()

  .expect("Failed to create Kafka consumer");

  consumer.subscribe(&[topic]).expect("We could not subscribe to the defined topic.");

loop {

match consumer.poll(Duration::from_secs(1)) {

Ok(msg) => {

if let Some(Ok(payload)) = msg.payload() {

println!("Message read: {}", std::str::from_utf8(payload).unwrap());

    }

    consumer.commit_message(&msg, rdkafka::consumer::CommitMode::Sync).unwrap();

    }

Err(e) => eprintln!("Error reading message: {}", e),

    _ => {}

  }

 }

}

The previous code should subscribe to the sample_topic in the defined cluster and read the available messages.

Run the App

Once completed, edit the “main.rs” file and add the code for both the consumer and producer as shown in the following:

mod producer;

mod consumer;

fn main() {

producer::main().await;

consumer::main();

}

Once ready, open the terminal and run the producer as shown in the following command:

$ cargo run --bin producer

To run the consumer, run the following command:

$ cargo run --bin consumer

You can build the tool as a package as shown in the following:

cargo run--package kafka-rs --bin kafka-rs

Conclusion

This tutorial illustrates how to quickly set up the basic Kafka producer and consumer applications using the Rust programming language.



from https://ift.tt/ALOvgMU

Post a Comment

0 Comments