APItoolkit full color logo
Sign Up

Integrating APItoolkit with Kafka

This guide demonstrates how to integrate APItoolkit with Kafka producers and consumers using OpenTelemetry for comprehensive API monitoring.


Prerequisites

  • Apache Kafka cluster
  • Kafka client application (producer/consumer)
  • APItoolkit account with an API key

Setting Up OpenTelemetry for Kafka

1. Configure Environment Variables

Set up OpenTelemetry environment variables in your Kafka client application environment:

# Specifies the endpoint URL for the OpenTelemetry collector
export OTEL_EXPORTER_OTLP_ENDPOINT="http://otelcol.apitoolkit.io:4317"

# Specifies the name of the service
export OTEL_SERVICE_NAME="kafka-client-service"

# Adds your API KEY to the resource
export OTEL_RESOURCE_ATTRIBUTES="at-project-key=YOUR_API_KEY"

# Specifies the protocol to use for the OpenTelemetry exporter
export OTEL_EXPORTER_OTLP_PROTOCOL="grpc"

Replace YOUR_API_KEY with your actual APItoolkit project key.

2. Instrument Kafka Producers

Java Example

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.apache.kafka.clients.producer.*;

public class KafkaProducerExample {
    private final Tracer tracer = GlobalOpenTelemetry.getTracer("kafka-producer-instrumentation");
    private final Producer<String, String> producer;

    public KafkaProducerExample() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(props);
    }

    public void sendMessage(String topic, String key, String value) {
        Span span = tracer.spanBuilder("kafka.produce").startSpan();
        try {
            span.setAttribute("messaging.system", "kafka");
            span.setAttribute("messaging.destination", topic);
            span.setAttribute("messaging.destination_kind", "topic");
            span.setAttribute("messaging.kafka.key", key);

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    span.recordException(exception);
                } else {
                    span.setAttribute("messaging.kafka.partition", metadata.partition());
                    span.setAttribute("messaging.kafka.offset", metadata.offset());
                }
            });
        } finally {
            span.end();
        }
    }
}

3. Instrument Kafka Consumers

Java Example

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.apache.kafka.clients.consumer.*;

public class KafkaConsumerExample {
    private final Tracer tracer = GlobalOpenTelemetry.getTracer("kafka-consumer-instrumentation");
    private final Consumer<String, String> consumer;

    public KafkaConsumerExample() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));
    }

    public void consumeMessages() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                Span span = tracer.spanBuilder("kafka.consume").startSpan();
                try {
                    span.setAttribute("messaging.system", "kafka");
                    span.setAttribute("messaging.destination", record.topic());
                    span.setAttribute("messaging.destination_kind", "topic");
                    span.setAttribute("messaging.kafka.key", record.key());
                    span.setAttribute("messaging.kafka.partition", record.partition());
                    span.setAttribute("messaging.kafka.offset", record.offset());

                    // Process the record
                    processRecord(record);
                } catch (Exception e) {
                    span.recordException(e);
                    throw e;
                } finally {
                    span.end();
                }
            }
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // Your message processing logic here
    }
}

Using Kafka Interceptors with OpenTelemetry

Kafka also supports interceptors that can be used to automatically add OpenTelemetry instrumentation:

Producer Interceptor

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class OpenTelemetryProducerInterceptor implements ProducerInterceptor<String, String> {
    private Tracer tracer;

    @Override
    public void configure(Map<String, ?> configs) {
        tracer = GlobalOpenTelemetry.getTracer("kafka-producer-interceptor");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        Span span = tracer.spanBuilder("kafka.produce").startSpan();
        span.setAttribute("messaging.system", "kafka");
        span.setAttribute("messaging.destination", record.topic());
        span.setAttribute("messaging.kafka.key", record.key());

        // Store span context in record headers
        // Implementation details omitted for brevity

        span.end();
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // Handle acknowledgment with metrics
    }

    @Override
    public void close() {
        // Cleanup resources
    }
}

Add the interceptor to your Kafka producer configuration:

Properties props = new Properties();
// ... other configuration
props.put("interceptor.classes", "com.example.OpenTelemetryProducerInterceptor");

Using the OpenTelemetry Kafka Instrumentation Library

For automatic instrumentation, you can use the OpenTelemetry Kafka instrumentation library:

Maven Dependency

<dependency>
    <groupId>io.opentelemetry.instrumentation</groupId>
    <artifactId>opentelemetry-kafka-clients-2.6</artifactId>
    <version>1.29.0-alpha</version>
</dependency>

Gradle Dependency

implementation 'io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6:1.29.0-alpha'

Verifying the Setup

After setting up OpenTelemetry with your Kafka applications:

  1. Send and receive a few test messages through your Kafka topics

  2. Check your APItoolkit dashboard to see the incoming telemetry data from your Kafka clients

  3. Look for metrics such as: - Message production and consumption rates - Message processing time - Error rates

Next Steps

  • Set up alerting in APItoolkit for Kafka performance issues
  • Configure custom metrics for specific Kafka monitoring needs
  • Correlate Kafka telemetry with other services in your architecture