Introduction
The Golang package Shopify/sarama introduced transactional API for Kafka in release v1.37.2. These implement the Kafka transactional API and enable developing more consistent workflows in our applications.
In short, if you are a sarama
user you can now leverage concepts like idempotent producers, satisfy the exactly-once
practice for your consumers and generally improve how you handle producing a batch of messages synchronously with no headaches.
I work in the Development Experience organisation at Cloudflare and, between other things I do, I maintain a package
that utilises sarama
to interact with Kafka. At Cloudflare, we’re big Kafka adopters and there have been cases where transactional
API would have helped us in the past.
I’ve decided to explore these concepts leveraging sarama
at Cloudflare and sharing my findings with you all.
Contents
- TL;DR show me the source code: explore the source code used to write down this blog post. Skip the boring parts!
- The boring part: we’ll explore some core concepts about Kafka transactional API and what they mean on
sarama
- Let’s get dirty: we’ll write some code to implement
idempotent
producers andexactly-once
consumers - Gotchas: most interesting learnings
- What we’re not covering
- Conclusions: we’ll wrap everything we talked about up
TL;DR show me the source code
You are impatient, eh? You can check the code that I used to write this blog post here - use it with caution!
The boring part
Before getting our hands dirty it’s important to understand what Kafka Transactional API are and how they are implemented in sarama
.
Some concepts about Kafka Transactional APIs
I strongly recommend you to check out this blog post from confluent.io.
Sarama’s Transactional APIs
On release v1.37.2, sarama
introduced transactional API. These have been in the talks for a while.
The following methods have been added to the producers’ contracts (this is true for both Synchronous and Asynchronous consumers):
type (A)SyncProducer interface {
// TxnStatus return current producer transaction status.
TxnStatus() ProducerTxnStatusFlag
// IsTransactional return true when current producer is is transactional.
IsTransactional() bool
// BeginTxn mark current transaction as ready.
BeginTxn() error
// CommitTxn commit current transaction.
CommitTxn() error
// AbortTxn abort current transaction.
AbortTxn() error
// AddOffsetsToTxn add associated offsets to current transaction.
AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
// AddMessageToTxn add message offsets to current transaction.
AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}
On consumer’s side nothing has changed from API point of view but there are some gotchas that we’ll explore later.
Let’s get dirty
There are some examples available in sarama
for both producers and consumers, but I find them a bit unclear.
Let’s try to revisit them to make these concepts clearer.
The idea
We’ll try to test different scenarios by having different applications producing to the same topic and having two consumers reading from it:
- sync-simple-1: It will produce one message at the time and will commit it. This should be consumed.
- sync-simple-2: It should exit with an error log because there’s a conflict in its name. It should not produce any messages.
- sync-multiple: It will produce two messages at the time and will commit them. These should be consumed.
- sync-abort: It will produce one message at a time and will abort it. This should not be consumed.
- sync-abort-multiple: Like
sync-abort
but with two messages at a time being produced. - async-simple: It will produce two messages at the time and will commit them. These should be consumed.
- async-abort: It will produce two messages at a time and will abort them. These should not be consumed.
- consumer-committed: It will consume only committed messages.
- consumer-all: It will consume all messages. Doesn’t matter if they were committed or not.
This should give us a good understanding about how this API is behaving and what implications they have.
Prerequisites
To run the linked code you should only need to clone github.com/andream16/sarama-transactional-api-demo install Docker and use docker-compose
.
After cloning the project, just run docker-compose up
. You should see some logs like:
Great! We’ll understand what’s going on later.
What’s going on
Let’s understand the different components used to put together this experiment.
Infrastructure
We’re running kafka
, zookeeper
, prometheus
and our different services using docker-compose
.
You can check the docker-compose.yaml
configuration here
and the prometheus
configuration here.
We’re not going to cover this in detail. Just keep in mind that:
- we need
kafka
to emit our messages. We’ll leverage thetxs
topic - we need
zookeeper
to keep offsets information. We can also avoid using this, but, I’m familiar with this setting and will keep it simple. - we need
prometheus
to visualise & collect our metrics - golang applications are customised using environment variables injected from the
env/
folder to each application
Producer
We’ll cover both synchronous and asynchronous producers. The first block until a message is delivered to the broker while the second allow publishing to the broker in a non-blocking way.
You should use one or another based on your use case and how you want to handle errors.
Synchronous producer
Synchronous producers allow implementing consistent workflows as they block until a message is delivered to the broker. The latter enables to properly handle errors and potentially retry the operation if it failed.
These producers leverage two methods to send messages:
SendMessage
: allows sending one message at a time. It returns an error related to that message if something goes wrong.SendMessages
: allows sending a batch of messages. Behind the scenes, these messages are still sent individually to the broker. It returns a slice of errors, in the worst case as many as the batch as one error is linked back to a single message. This allows to handle different messages atomically.
Regardless of which method you choose to send messages (should be based on your use case), the approach used to enrich these workflows with transactions is the same.
In order, you want to:
- Start a transaction: to do so, you should leverage the
BeginTxn
method. All the messages sent after a transaction is initiated will be attached to this transaction. - Send your message(s): use the methods mentioned above to do so.
- Commit/Abort the transaction: if no retryable errors are returned when sending messages you want to use
CommitTxn
to say that these messages were correctly sent. On the other hand, if you received some non retryable errors, you should useAbortTxn
to say that the latter were not sent correctly. These two methods are important as they will tell the consumer which messages to consume based on theIsolationLevel
setting - we’ll cover this later.
Let’s write down some code to spin up our producers and emit some messages.
We’ll include the important code to create a producer and produce transactions only.
We’ll also handle errors and enrich the workflow with metrics:
package main
import (
"log"
"os"
"time"
"github.com/andream16/sarama-transactional-api-demo/internal/kafka"
"github.com/andream16/sarama-transactional-api-demo/internal/metric"
"github.com/Shopify/sarama"
"golang.org/x/sync/errgroup"
)
func main() {
...
var (
appName = os.Getenv("TRANSACTION_ID")
// Let's create a new configuration.
config = sarama.NewConfig()
)
config.ClientID = appName
// This is very important to enable transactionality.
config.Producer.Idempotent = true
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
config.Producer.Transaction.Retry.Backoff = 10
// This is very important to avoid collisions.
config.Producer.Transaction.ID = appName
config.Net.MaxOpenRequests = 1
client := kafka.NewClient(config)
defer client.Close()
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("could not create sarama producer: %v", err)
}
defer producer.Close()
// Let's create an errgroup
g, ctx := errgroup.WithContext(ctx)
// Let's spin up a go routine using an errgroup and emit a message every 5 seconds
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(5 * time.Second):
if err := producer.BeginTxn(); err != nil {
log.Printf("could not begin transaction: %v", err)
continue
}
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: "txs",
Value: sarama.StringEncoder(appName),
})
if err != nil {
if err := producer.AbortTxn(); err != nil {
log.Printf("could not abort transaction: %v", err)
metric.IncMessagesSent(appName, 0, false, false, false)
continue
}
metric.IncMessagesSent(appName, 0, false, false, true)
continue
}
if err := producer.CommitTxn(); err != nil {
log.Printf("could not commit transaction: %v\n", err)
if err := producer.AbortTxn(); err != nil {
log.Printf("could not abort transaction: %v", err)
metric.IncMessagesSent(appName, 1, true, false, false)
continue
}
metric.IncMessagesSent(appName, 1, true, false, true)
continue
}
metric.IncMessagesSent(appName, 1, true, true, false)
}
}
})
_ = g.Wait()
}
This snippet is used in the following services with slight variations to commit or abort messages:
sync-simple-1
sync-simple-2
sync-multiple
sync-abort
sync-abort-muliple
It’s important to notice that the producer must have a unique applicationID
which is used by sarama
to implement idempotent producers.
Creating another producer with the same applicationID
will lead the second to fail to start. We’ll spawn a second producer with the same applicationID
to highlight this behaviour.
Let’s take a look at the produced metrics by going to prometheus and run the following query:
sum(messages_sent{instance=~"sync-multiple:8082|sync-simple:8082|sync-abort:8082|sync-abort-multiple:8082"}) by (committed, instance, success)
We can see how sync-simple
is correctly producing messages and committing them as we defined.
We can also see how the other sync-simple
replica, instead exited with the following error could not create sarama producer: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
because its application name conflicts with the one defined for sync-simple-1
from looking at the logs.
This allows us to have idempotent producers as there can’t be one same instance of a producer at the same time. This in combination with transactions is very powerful.
Asynchronous producer
Asynchronous producers allow implementing non-blocking flows for sending messages. They make sense to implement fire-and-forget workflows, if you need them.
Effectively, you send a message to a background go routine which asynchronously sends the latter by publishing a message with the Input method.
You can then listen for the related errors in a go routine and handle them as you wish using the Errors()
method. Handling these errors is a bit trickier compared
to handling the ones returned by a synchronous producer because you have to think about consistency.
The transactional semantics for asynchronous producers are no different from the ones shown for synchronous producers:
package main
import (
"log"
"os"
"time"
"github.com/andream16/sarama-transactional-api-demo/internal/kafka"
transporthttp "github.com/andream16/sarama-transactional-api-demo/internal/transport/http"
"github.com/Shopify/sarama"
"golang.org/x/sync/errgroup"
)
func main() {
...
var (
appName = os.Getenv("TRANSACTION_ID")
config = sarama.NewConfig()
)
config.ClientID = appName
config.Producer.Idempotent = true
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
config.Producer.Transaction.Retry.Backoff = 10
config.Producer.Transaction.ID = appName
config.Net.MaxOpenRequests = 1
client := kafka.NewClient(config)
defer client.Close()
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Fatalf("could not create sarama producer: %v", err)
}
defer producer.Close()
g, ctx := errgroup.WithContext(ctx)
// We need to drain the successes channel.
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case _, ok := <-producer.Successes():
if !ok {
metric.IncMessagesSent(appName, 0, false, false, true)
return nil
}
metric.IncMessagesSent(appName, 1, true, true, false)
}
}
})
// We also need to drain the errors channel.
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case err, ok := <-producer.Errors():
if !ok {
return nil
}
log.Printf("could not send message: %v", err.Err)
}
}
})
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(5 * time.Second):
if err := producer.BeginTxn(); err != nil {
log.Printf("could not begin transaction: %v", err)
continue
}
producer.Input() <- &sarama.ProducerMessage{
Topic: "txs",
Value: sarama.StringEncoder(appName),
}
producer.Input() <- &sarama.ProducerMessage{
Topic: "txs",
Value: sarama.StringEncoder(appName),
}
if err := producer.CommitTxn(); err != nil {
log.Printf("could not commit transaction: %v\n", err)
if err := producer.AbortTxn(); err != nil {
log.Printf("could not abort transaction: %v", err)
continue
}
}
}
}
})
_ = g.Wait()
}
This snippet is used in the following services with slight variations to commit or abort messages:
async-simple
async-abort
The same points highlighted for the synchronous producer are valid here as well.
Let’s take a look at the produced metrics by going to prometheus and run the following query:
sum(messages_sent{instance=~"async-abort:8082|async-simple:8082"}) by (committed, instance, success)
Consumer
Now that we know how to mark messages as committed/aborted on production side, we only need to cover how to process them.
By default, sarama
’s consumers will consume any message, even if aborted, using the setting consumer.IsolationLevel = sarama.ReadUncommitted
.
In our case, to make sure that we consume each message exactly-once when it was committed successfully, we want to declare consumer.IsolationLevel = sarama.ReadCommitted
.
That’s really it!
For the sake of showing this behaviour we’ll define two consumers that do the same but set this property differently.
Consume committed messages
package main
import (
"log"
"os"
"strconv"
"github.com/andream16/sarama-transactional-api-demo/internal/kafka"
"github.com/andream16/sarama-transactional-api-demo/internal/metric"
"github.com/Shopify/sarama"
"golang.org/x/sync/errgroup"
)
type consumerHandler struct{}
func (c *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case <-session.Context().Done():
return nil
case message, ok := <-claim.Messages():
if !ok {
return nil
}
var (
sender = string(message.Value)
offset = message.Offset
partition = message.Partition
)
log.Printf("offset: %d, partition: %d, value: %v", offset, partition, sender)
metric.IncMessagesConsumed(sender, offset, partition)
session.MarkMessage(message, "")
}
}
}
func main() {
...
var (
appName = os.Getenv("TRANSACTION_ID")
shouldReadCommitted, _ = strconv.ParseBool(os.Getenv("READ_ONLY_COMMITTED"))
config = sarama.NewConfig()
)
config.ClientID = "consumer"
config.Version = sarama.V2_5_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.IsolationLevel = sarama.ReadUncommitted
if shouldReadCommitted {
// This is the important bit to have a consumer reading only committed messages
config.Consumer.IsolationLevel = sarama.ReadCommitted
}
client := kafka.NewClient(config)
defer client.Close()
consumer, err := sarama.NewConsumerGroupFromClient(appName, client)
if err != nil {
log.Fatalf("could not create consumer: %v", err)
}
defer consumer.Close()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
for {
if err := consumer.Consume(ctx, []string{"txs"}, &consumerHandler{}); err != nil {
log.Printf("unexpected consumer error: %v\n", err)
}
}
})
_ = g.Wait()
}
Let’s check how our consumer is behaving by going to prometheus using the query:
sum(messages_consumed{instance="consumer-committed-msgs:8082"}) by (sender)
We can see how consumer-committed-msgs
is consuming only the committed messages coming from the expected services. Pretty cool, right?
This allow us to consume only once committed messages. There can’t be an instance were a message that we didn’t want to consume is consumed.
Consume all messages
By using the default consumer.IsolationLevel = sarama.ReadUncommitted
, we can see how consumer-all-msgs
is consuming all messages.
We can see it from our metrics using the following query:
sum(messages_consumed{instance="consumer-all-msgs:8082"}) by (sender)
Gotchas
- Leveraging transactions with asynchronous producers is tricky, and you should do it only if you know what you are doing
- Producers must have a different
applicationID
to avoid collisions and achieve idempotency. - Consumers must specify whether they want to consume aborted messages or not by using
consumer.RequireCommitted
. - An extra topic
__transaction_state
is used to store transactional information on the broker(s). Locally the same topic is used. This can lead to an increase in the topic size. More information here - Transactions enable you to implement consistent workflows, especially when dealing with producing batches of messages. They also enable you to consume a message exactly-once.
What we’re not covering
We’re not going to cover AddOffsetsToTxn
and AddMessageToTxn
methods are used as they apply to workflows where a consumer has to produce to some other topic.
We can cover them separately.
Conclusion
We’ve explored Kafka’s transactional capabilities and how these are implemented in sarama
.
We played with these APIs and understood how to leverage them to implement idempotent producers and making sure that we consume each message exactly once.
I hope you found this helpful.
If you have any questions feel free to email me!