This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C library).

In this post, we will cover the Kafka Producer API.

Initial Setup

Make sure you install a Kafka broker — a local setup should suffice. Of course you will need to have Rust installed as well — you will need version 1.45 or above

Before you begin, clone the GitHub repo:

Check the Cargo.toml file:

...
[dependencies]
rdkafka = { version = "0.25", features = ["cmake-build"] }
...

Note on the cmake-build feature

rust-rdkafka provides a couple of ways to resolve the librdkafka dependency. I chose static linking, wherein librdkafka was compiled. You could opt for dynamic linking to refer to a locally installed version though.

For more, please refer to this link

Ok, let’s start off with the basics.

Simple Producer

Here is a simple producer based on BaseProducer:

The send method to start producing messages – it’s done in tight loop with a thread::sleep in between (not something you would do in production) to make it easier to track/follow the results. The key, value (payload) and the destination Kafka topic is represented in the form of a BaseRecord

You can check the entire code in the file src/1_producer_simple.rs

To Test if the Producer Is Working …

Run the program:

  • simply rename the file src/1_producer_simple.rs to main.rs
  • execute cargo run

You should see this output:

sending message
sending message
sending message
...

What’s going on? To figure it out — connect to your Kafka topic (I have used rust as the name of the Kafka topic in the above example) using the Kafka CLI consumer (or any other consumer client e.g. kafkacat). You should see the messages flowing in.

For example:

Producer Callback

We are flying blind right now! Unless we explicitly create a consumer to look at our messages, we have no clue whether they are being sent to Kafka. Let’s fix that by implementing a ProducerContext (trait) to hook into the produce event — it’s like a callback.

Start by creating a struct and an empty implementation for the ClientContext trait (this is mandatory).

Now comes the main part where we implement the delivery function in the ProducerContext trait.

We match against the DeliveryResult (which is a Result after all) to account for success (Ok) and failure (Err) scenarios. All we do is simply log the message in both cases, since this is just an example. You could do pretty much anything you wanted to here (don’t go crazy though!)

We’ve ignored DeliveryOpaque which is an associated type of the ProducerContext trait

We need to make sure that we plug in our ProducerContext implementation. We do this by using the create_with_context method (instead of create) and make sure by providing the correct type for BaseProducer as well.

let producer: BaseProducer<ProduceCallbackLogger> = ClientConfig::new().set(....)
...
.create_with_context(ProduceCallbackLogger {})
...

How Does the “Callback Get Called”?

Ok, we have the implementation, but we need a way to trigger it! One of the ways is to call flush on the producer. So, we could write our producer as such:

  • add producer.flush(Duration::from_secs(3));, and
  • comment the sleep (just for now)

Hold On, We Can Do Better!

The send method is non-blocking (by default) but by calling flush after each send, we have now converted this into a synchronous invocation – not recommended from a performance perspective.

We can improve the situation by using a ThreadedProducer. It takes care of invoking the poll method in a background thread to ensure that the delivery callback notifications are delivered. Doing this is very simple — just change the type from BaseProducer to ThreadedProducer!

# before: BaseProducer<ProduceCallbackLogger>
# after: ThreadedProducer<ProduceCallbackLogger>

Also, we don’t need the call to flush anymore.

...
//producer.flush(Duration::from_secs(3));
//println!("flushed message");
thread::sleep(Duration::from_secs(3));
...

The code is available in src/2_threaded_producer.rs

Run the Program Again

  • Rename the file src/2_threaded_producer.rs to main.rs and
  • execute cargo run

Output:

sending message
sending message
produced message with key key-1 in offset 6 of partition 2
produced message with key key-2 in offset 3 of partition 0
sending message
produced message with key key-3 in offset 7 of partition 

As expected, you should be able to see the producer event callback, denoting that the messages were indeed sent to the Kafka topic. Of course, you can connect to the topic directly and double-check, just like before:

To try a failure scenario, try using an incorrect topic name and notice how the Err variant of the delivery implementation gets invoked.

Sending JSON Messages

So far, we were just sending Strings as key and values. JSON is a commonly used message format, let’s see how to use that.

Assume we want to send User info which will be represented using this struct:


We can then use serde_json library to serialize this as JSON. All we need is to use the custom derives in serdeDeserialize and Serialize

Change the producer loop:

  • Create a User instance
  • Serialize it to a JSON string using to_string_pretty
  • Include that in the payload

you can also use to_vec (instead of to_string()) to convert it into a Vec of bytes (Vec<u8>)

To Run the Program…

  • Rename the file src/3_JSON_payload.rs to main.rs, and
  • execute cargo run

Consume from the topic:

You should see messages with a String key (e.g. user-34) and JSON value:

Is There a Better Way?

Yes! If you are used to the declarative serialization/de-serialization approach in the Kafka Java client (and probably others as well), you may not like this “explicit” approach. Just to put things in perspective, this is how you’d do it in Java:

Notice that you simply configure the Producer to use KafkaJsonSchemaSerializer and the User class is serialized to JSON

rust-rdkafka provides something similar with the ToBytes trait. Here is what it looks like:

Self-explanatory, right? There are existing implementations for String, Vec<u8> etc. So you can use these types as key or value without any additional work – this is exactly what we just did. But the problem is the way we did it was “explicit” i.e. we converted the User struct into a JSON String and passed it on.

What if we could implement ToBytes for User?

You will see a compiler error:

cannot return value referencing local variable `b`
returns a value referencing data owned by the current function

For additional background, please refer to this [GitHub issue] (https://github.com/fede1024/rust-rdkafka/issues/128). I would happy to see an example other which can work with ToBytes – please drop in a note if you’ve inputs on this!

TL;DR is that it’s best to stick to the “explicit” way of doing things unless you have a ToBytes implementation that “does not involve an allocation and cannot fail”.

Wrap Up

That’s it for the first part! Part 2 will cover topics around Kafka consumers.



Source link

Write A Comment