25 de Abril Bridge

Introduction

The Golang package Shopify/sarama introduced transactional API for Kafka in release v1.37.2. These implement the Kafka transactional API and enable developing more consistent workflows in our applications.

In short, if you are a sarama user you can now leverage concepts like idempotent producers, satisfy the exactly-once practice for your consumers and generally improve how you handle producing a batch of messages synchronously with no headaches.

I work in the Development Experience organisation at Cloudflare and, between other things I do, I maintain a package that utilises sarama to interact with Kafka. At Cloudflare, we’re big Kafka adopters and there have been cases where transactional API would have helped us in the past.

I’ve decided to explore these concepts leveraging sarama at Cloudflare and sharing my findings with you all.

Contents

TL;DR show me the source code

You are impatient, eh? You can check the code that I used to write this blog post here - use it with caution!

The boring part

Before getting our hands dirty it’s important to understand what Kafka Transactional API are and how they are implemented in sarama.

Some concepts about Kafka Transactional APIs

I strongly recommend you to check out this blog post from confluent.io.

Sarama’s Transactional APIs

On release v1.37.2, sarama introduced transactional API. These have been in the talks for a while.

The following methods have been added to the producers’ contracts (this is true for both Synchronous and Asynchronous consumers):

type (A)SyncProducer interface {
    // TxnStatus return current producer transaction status.
    TxnStatus() ProducerTxnStatusFlag
    
    // IsTransactional return true when current producer is is transactional.
    IsTransactional() bool
    
    // BeginTxn mark current transaction as ready.
    BeginTxn() error
    
    // CommitTxn commit current transaction.
    CommitTxn() error
    
    // AbortTxn abort current transaction.
    AbortTxn() error
    
    // AddOffsetsToTxn add associated offsets to current transaction. 
	AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
    
    // AddMessageToTxn add message offsets to current transaction. 
    AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}

On consumer’s side nothing has changed from API point of view but there are some gotchas that we’ll explore later.

Let’s get dirty

There are some examples available in sarama for both producers and consumers, but I find them a bit unclear.

Let’s try to revisit them to make these concepts clearer.

The idea

We’ll try to test different scenarios by having different applications producing to the same topic and having two consumers reading from it:

  • sync-simple-1: It will produce one message at the time and will commit it. This should be consumed.
  • sync-simple-2: It should exit with an error log because there’s a conflict in its name. It should not produce any messages.
  • sync-multiple: It will produce two messages at the time and will commit them. These should be consumed.
  • sync-abort: It will produce one message at a time and will abort it. This should not be consumed.
  • sync-abort-multiple: Like sync-abort but with two messages at a time being produced.
  • async-simple: It will produce two messages at the time and will commit them. These should be consumed.
  • async-abort: It will produce two messages at a time and will abort them. These should not be consumed.
  • consumer-committed: It will consume only committed messages.
  • consumer-all: It will consume all messages. Doesn’t matter if they were committed or not.

This should give us a good understanding about how this API is behaving and what implications they have.

Prerequisites

To run the linked code you should only need to clone github.com/andream16/sarama-transactional-api-demo install Docker and use docker-compose.

After cloning the project, just run docker-compose up. You should see some logs like:

logs

Great! We’ll understand what’s going on later.

What’s going on

Let’s understand the different components used to put together this experiment.

Infrastructure

We’re running kafka, zookeeper, prometheus and our different services using docker-compose.

You can check the docker-compose.yaml configuration here and the prometheus configuration here.

We’re not going to cover this in detail. Just keep in mind that:

  • we need kafka to emit our messages. We’ll leverage the txs topic
  • we need zookeeper to keep offsets information. We can also avoid using this, but, I’m familiar with this setting and will keep it simple.
  • we need prometheus to visualise & collect our metrics
  • golang applications are customised using environment variables injected from the env/ folder to each application

Producer

We’ll cover both synchronous and asynchronous producers. The first block until a message is delivered to the broker while the second allow publishing to the broker in a non-blocking way.

You should use one or another based on your use case and how you want to handle errors.

Synchronous producer

Synchronous producers allow implementing consistent workflows as they block until a message is delivered to the broker. The latter enables to properly handle errors and potentially retry the operation if it failed.

These producers leverage two methods to send messages:

  • SendMessage: allows sending one message at a time. It returns an error related to that message if something goes wrong.
  • SendMessages: allows sending a batch of messages. Behind the scenes, these messages are still sent individually to the broker. It returns a slice of errors, in the worst case as many as the batch as one error is linked back to a single message. This allows to handle different messages atomically.

Regardless of which method you choose to send messages (should be based on your use case), the approach used to enrich these workflows with transactions is the same.

In order, you want to:

  1. Start a transaction: to do so, you should leverage the BeginTxn method. All the messages sent after a transaction is initiated will be attached to this transaction.
  2. Send your message(s): use the methods mentioned above to do so.
  3. Commit/Abort the transaction: if no retryable errors are returned when sending messages you want to use CommitTxn to say that these messages were correctly sent. On the other hand, if you received some non retryable errors, you should use AbortTxn to say that the latter were not sent correctly. These two methods are important as they will tell the consumer which messages to consume based on the IsolationLevel setting - we’ll cover this later.

Let’s write down some code to spin up our producers and emit some messages.

We’ll include the important code to create a producer and produce transactions only.

We’ll also handle errors and enrich the workflow with metrics:

package main

import (
	"log"
	"os"
	"time"
	
	"github.com/andream16/sarama-transactional-api-demo/internal/kafka"
	"github.com/andream16/sarama-transactional-api-demo/internal/metric"

	"github.com/Shopify/sarama"
	"golang.org/x/sync/errgroup"
)

func main() {
	...

	var (
		appName    = os.Getenv("TRANSACTION_ID")
		// Let's create a new configuration.
        config     = sarama.NewConfig()
	)

	config.ClientID = appName 
	// This is very important to enable transactionality.
	config.Producer.Idempotent = true
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	config.Producer.Transaction.Retry.Backoff = 10 
	// This is very important to avoid collisions.
	config.Producer.Transaction.ID = appName
	config.Net.MaxOpenRequests = 1

	client := kafka.NewClient(config)

	defer client.Close()

	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		log.Fatalf("could not create sarama producer: %v", err)
	}

	defer producer.Close()

	// Let's create an errgroup
	g, ctx := errgroup.WithContext(ctx)

	// Let's spin up a go routine using an errgroup and emit a message every 5 seconds
	g.Go(func() error {
		for {
			select {
			case <-ctx.Done():
				return nil
            case <-time.After(5 * time.Second): 
                if err := producer.BeginTxn(); err != nil {
                    log.Printf("could not begin transaction: %v", err)
                    continue
				}
				
				_, _, err := producer.SendMessage(&sarama.ProducerMessage{
					Topic: "txs", 
					Value: sarama.StringEncoder(appName),
				})

				if err != nil {
					if err := producer.AbortTxn(); err != nil {
						log.Printf("could not abort transaction: %v", err)
						metric.IncMessagesSent(appName, 0, false, false, false)
						continue
					}
					metric.IncMessagesSent(appName, 0, false, false, true)
					continue
				}
				
				if err := producer.CommitTxn(); err != nil {
					log.Printf("could not commit transaction: %v\n", err)
					if err := producer.AbortTxn(); err != nil {
						log.Printf("could not abort transaction: %v", err)
						metric.IncMessagesSent(appName, 1, true, false, false)
						continue
					}
					metric.IncMessagesSent(appName, 1, true, false, true)
					continue
				}
				
				metric.IncMessagesSent(appName, 1, true, true, false)
			}
		}
	})

	_ = g.Wait()
}

This snippet is used in the following services with slight variations to commit or abort messages:

  • sync-simple-1
  • sync-simple-2
  • sync-multiple
  • sync-abort
  • sync-abort-muliple

It’s important to notice that the producer must have a unique applicationID which is used by sarama to implement idempotent producers. Creating another producer with the same applicationID will lead the second to fail to start. We’ll spawn a second producer with the same applicationID to highlight this behaviour.

Let’s take a look at the produced metrics by going to prometheus and run the following query:

sum(messages_sent{instance=~"sync-multiple:8082|sync-simple:8082|sync-abort:8082|sync-abort-multiple:8082"}) by (committed, instance, success)

Sync producer metrics

We can see how sync-simple is correctly producing messages and committing them as we defined.

We can also see how the other sync-simple replica, instead exited with the following error could not create sarama producer: kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing because its application name conflicts with the one defined for sync-simple-1 from looking at the logs.

This allows us to have idempotent producers as there can’t be one same instance of a producer at the same time. This in combination with transactions is very powerful.

Asynchronous producer

Asynchronous producers allow implementing non-blocking flows for sending messages. They make sense to implement fire-and-forget workflows, if you need them.

Effectively, you send a message to a background go routine which asynchronously sends the latter by publishing a message with the Input method. You can then listen for the related errors in a go routine and handle them as you wish using the Errors() method. Handling these errors is a bit trickier compared to handling the ones returned by a synchronous producer because you have to think about consistency.

The transactional semantics for asynchronous producers are no different from the ones shown for synchronous producers:

package main

import (
	"log"
	"os"
	"time"
	
	"github.com/andream16/sarama-transactional-api-demo/internal/kafka"
	transporthttp "github.com/andream16/sarama-transactional-api-demo/internal/transport/http"
	
	"github.com/Shopify/sarama"
	"golang.org/x/sync/errgroup"
)

func main() {
	...

	var (
		appName    = os.Getenv("TRANSACTION_ID")
		config     = sarama.NewConfig()
	)

	config.ClientID = appName
	config.Producer.Idempotent = true
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	config.Producer.Transaction.Retry.Backoff = 10
	config.Producer.Transaction.ID = appName
	config.Net.MaxOpenRequests = 1

	client := kafka.NewClient(config)

	defer client.Close()
	
	producer, err := sarama.NewAsyncProducerFromClient(client)
	if err != nil {
		log.Fatalf("could not create sarama producer: %v", err)
	}

	defer producer.Close()

	g, ctx := errgroup.WithContext(ctx)

	// We need to drain the successes channel.
	g.Go(func() error {
		for {
			select {
			case <-ctx.Done():
				return nil
            case _, ok := <-producer.Successes(): 
                if !ok {
                    metric.IncMessagesSent(appName, 0, false, false, true)
                    return nil
                }
                metric.IncMessagesSent(appName, 1, true, true, false)
			}
		}
	})

	// We also need to drain the errors channel.
	g.Go(func() error {
		for {
			select {
			case <-ctx.Done(): 
                return nil
            case err, ok := <-producer.Errors():
                if !ok {
                    return nil
                }
			    log.Printf("could not send message: %v", err.Err)
			}
		}
	})

	g.Go(func() error {
		for {
			select {
			case <-ctx.Done():
				return nil
            case <-time.After(5 * time.Second):
				if err := producer.BeginTxn(); err != nil {
					log.Printf("could not begin transaction: %v", err)
					continue
				}
				
				producer.Input() <- &sarama.ProducerMessage{
					Topic: "txs",
					Value: sarama.StringEncoder(appName),
				}

				producer.Input() <- &sarama.ProducerMessage{
					Topic: "txs",
					Value: sarama.StringEncoder(appName),
				}

				if err := producer.CommitTxn(); err != nil {
					log.Printf("could not commit transaction: %v\n", err)
					if err := producer.AbortTxn(); err != nil {
						log.Printf("could not abort transaction: %v", err)
						continue
					}
				}
			}
		}
	})

	_ = g.Wait()
}

This snippet is used in the following services with slight variations to commit or abort messages:

  • async-simple
  • async-abort

The same points highlighted for the synchronous producer are valid here as well.

Let’s take a look at the produced metrics by going to prometheus and run the following query:

sum(messages_sent{instance=~"async-abort:8082|async-simple:8082"}) by (committed, instance, success)

Async Producer Metrics

Consumer

Now that we know how to mark messages as committed/aborted on production side, we only need to cover how to process them.

By default, sarama’s consumers will consume any message, even if aborted, using the setting consumer.IsolationLevel = sarama.ReadUncommitted. In our case, to make sure that we consume each message exactly-once when it was committed successfully, we want to declare consumer.IsolationLevel = sarama.ReadCommitted.

That’s really it!

For the sake of showing this behaviour we’ll define two consumers that do the same but set this property differently.

Consume committed messages
package main

import (
	"log"
	"os"
	"strconv"
	
	"github.com/andream16/sarama-transactional-api-demo/internal/kafka"
	"github.com/andream16/sarama-transactional-api-demo/internal/metric"

	"github.com/Shopify/sarama"
	"golang.org/x/sync/errgroup"
)

type consumerHandler struct{}

func (c *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case <-session.Context().Done():
			return nil
		case message, ok := <-claim.Messages():
			if !ok {
				return nil
			}

			var (
				sender    = string(message.Value)
				offset    = message.Offset
				partition = message.Partition
			)

			log.Printf("offset: %d, partition: %d, value: %v", offset, partition, sender)
			metric.IncMessagesConsumed(sender, offset, partition)
			session.MarkMessage(message, "")
		}
	}
}

func main() {
	...

	var (
		appName                = os.Getenv("TRANSACTION_ID")
		shouldReadCommitted, _ = strconv.ParseBool(os.Getenv("READ_ONLY_COMMITTED"))
		config     = sarama.NewConfig()
	)

	config.ClientID = "consumer"
	config.Version = sarama.V2_5_0_0
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Consumer.IsolationLevel = sarama.ReadUncommitted

	if shouldReadCommitted {
		// This is the important bit to have a consumer reading only committed messages
		config.Consumer.IsolationLevel = sarama.ReadCommitted
	}

	client := kafka.NewClient(config)

	defer client.Close()

	consumer, err := sarama.NewConsumerGroupFromClient(appName, client)
	if err != nil {
		log.Fatalf("could not create consumer: %v", err)
	}

	defer consumer.Close()

	g, ctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		for {
			if err := consumer.Consume(ctx, []string{"txs"}, &consumerHandler{}); err != nil {
				log.Printf("unexpected consumer error: %v\n", err)
			}
		}
	})

	_ = g.Wait()
}

Let’s check how our consumer is behaving by going to prometheus using the query:

sum(messages_consumed{instance="consumer-committed-msgs:8082"}) by (sender)

Read Committed Consumer Metrics

We can see how consumer-committed-msgs is consuming only the committed messages coming from the expected services. Pretty cool, right?

This allow us to consume only once committed messages. There can’t be an instance were a message that we didn’t want to consume is consumed.

Consume all messages

By using the default consumer.IsolationLevel = sarama.ReadUncommitted, we can see how consumer-all-msgs is consuming all messages.

We can see it from our metrics using the following query:

sum(messages_consumed{instance="consumer-all-msgs:8082"}) by (sender)

Read Uncommitted consumer metrics

Gotchas

  • Leveraging transactions with asynchronous producers is tricky, and you should do it only if you know what you are doing
  • Producers must have a different applicationID to avoid collisions and achieve idempotency.
  • Consumers must specify whether they want to consume aborted messages or not by using consumer.RequireCommitted.
  • An extra topic __transaction_state is used to store transactional information on the broker(s). Locally the same topic is used. This can lead to an increase in the topic size. More information here
  • Transactions enable you to implement consistent workflows, especially when dealing with producing batches of messages. They also enable you to consume a message exactly-once.

What we’re not covering

We’re not going to cover AddOffsetsToTxn and AddMessageToTxn methods are used as they apply to workflows where a consumer has to produce to some other topic.

We can cover them separately.

Conclusion

We’ve explored Kafka’s transactional capabilities and how these are implemented in sarama.

We played with these APIs and understood how to leverage them to implement idempotent producers and making sure that we consume each message exactly once.

I hope you found this helpful.

If you have any questions feel free to email me!

Miradouro das Portas do Sol in Lisbon