Context setting
What is franz-go
and why does it matter?
I was looking at exporting data from OpenTelemetry Collectors to Kafka, and realised that the there is an exporter using a franz-go
client. I’ve always been keen to poke around the internals of Kafka, so I thought I’d use this chance to snoop into the internals of this client, which some Redditors dub as the best Kafka client written in Go.
Brief facts about Kafka Producers in general
Here are some characteristics of a Kafka Producer that will help us understand specific implementation details which we’ll cover in a later segment:
- An application code will want to send Kafka Producer multiple pieces of data, to sink into Kafka. This Producer will partition the data (according to some key), and will batch it over to the Kafka broker(s) as an optimisation. The Broker will only give a response after the data has been replicated.
Once the request has been fully replicated, the broker will take the request object out of purgatory, generate a response object, and place it on the response queue. (Source here
- This means that the application code using the Producer should not expect an immediate response, since material time will elapse between when the application code passes the data to the Producer and when the Broker actually responds. To handle this, the application code can issue a promise to the Producer, which the latter will invoke whenever the Broker responds. Here’s a built-in promise in the client itself, which appends the result to a slice.
promise = func(r *Record, err error) {
results = append(results, ProduceResult{r, err})
wg.Done()
}
Source here
What’s interesting about internals of this franz-go
Producer?
What happens when the Producer’s buffer fills up?
The producer’s buffer state is global, and is managed via sync.Cond
with a mutex over it (see func (p *producer) init(cl *Client)
where p.c = sync.NewCond(&p.mu)
).
When the buffer is full, we init a go routine that waits on the cond, and checks if the buffer is free. After firing off that go routine which monitors the buffer, we proceed with other tasks.
How do we know that said monitoring go routine has succeeded? It’s from the wait
channel.
wait := make(chan struct{})
var quit bool
go func() {
defer close(wait)
p.mu.Lock()
calcNums()
for !quit && (overMaxRecs || overMaxBytes) {
p.c.Wait()
calcNums()
}
p.blocked.Add(-1)
p.blockedBytes -= userSize
}()
Source here
Indeed, further down we see this, which is right before we exit the if
block when the buffer is full.
select {
case <-wait:
cl.cfg.logger.Log(LogLevelDebug,
`Produce block awoken,
we now have space to produce,
continuing to partition and produce`)
...
Source here
What happens while this go routine is monitoring? We should expect flushing of the buffer and updates to the producer’s buffer state. That is indeed achieved in this command, cl.unlingerDueToMaxRecsBuffered()
, which we’ll proceed to unpack:
- We loop over each partition within each topic, get the corresponding
recBuf
which will invokeunlingerAndManuallyDrain
. - Within
unlingerAndManuallyDrain
therecBuf
will retrieve itssink
andsink.drain()
begins, which chiefly invokessink.produce()
. - A bunch of things happen in
sink.produce()
, including getting a transaction ID if this is in fact a transaction, constructing theproduceRequest
. It culminates withs.doSequenced(req kmsg.Request, promise func(*broker, kmsg.Response, error))
. (Remember the Promise mentioned above? It’s indoSequenced
’s signature, and should also update the global state for Producers.)
For responses, how do we have concurrency and in-order guarantees?
In the section, we step thru dosequenced
, which is where we left off above. The notable thing is how franz-go
achieves concurrency as well as in-order guarantees.
- Within
doSequenced
, we load up the request & response ring buffers, and have them communicate with each other thru await
channel. - Critically to acheive in-order guarantees, the function clearing the respective ring buffer is each triggered only on first elem.
- For the request ring buffer,
broker
’shandleReqs
goes thru the ring buffer and also runs the user-defined promise, only if it is the first elem of the ring buffer. - Likewise for the response ring buffer,
handleSeqResps
gets triggered only on the first elem in the ring buffer, anddropPeek
s into the ring buffer to clear it.
- For the request ring buffer,
The following sketch illustrates what we’ve just gone thru in words:
Other observations
The inline documentation in the codebase is quite delightful. The docs in the following snippet elaborates on why lingeringTimer
is located with recBuf
instead of sink
. These are the decisions that a maintainer would appreciate!
// lingering is a timer that avoids starting maybeDrain until expiry,
// allowing for more records to be buffered in a single batch.
//
// Note that if something else starts a drain, if the first batch of
// this buffer fits into the request, it will be used.
//
// This is on recBuf rather than Sink to avoid some complicated
// interactions of triggering the sink to loop or not. Ideally, with
// the sticky partition hashers, we will only have a few partitions
// lingering and that this is on a RecBuf should not matter.
lingering *time.Timer