Upgrade Guide

Table of Contents

Upgrading from CP 3.1.x (Kafka 0.10.1.x-cp2) to CP 3.2.x (Kafka 0.10.2.x-cp1)

Compatibility

Kafka Streams applications built with CP 3.2.x (Kafka 0.10.2.x-cp1) are forward and backward compatible with certain Kafka clusters.

  • Forward-compatible to CP 3.2.x clusters (Kafka 0.10.2.x-cp1): Existing Kafka Streams applications built with CP 3.0.x (Kafka 0.10.0.x-cp1) or CP 3.1.x (Kafka 0.10.1.x-cp2) will work with upgraded Kafka clusters running CP 3.2.x (Kafka 0.10.2.x-cp1).
  • Backward-compatible to CP 3.1.x clusters (Kafka 0.10.1.x-cp2): This is the first release allowing to upgrade your Kafka Streams application without a broker upgrade. New Kafka Streams applications built with CP 3.2.x (Kafka 0.10.2.x-cp1) will work with older Kafka clusters running CP 3.1.x (Kafka 0.10.1.x-cp2). Kafka clusters running CP 3.0.x (Kafka 0.10.0.x-cp1) are not compatible with new CP 3.2.x Kafka Streams applications though.

Compatibility Matrix:

  Kafka Broker (columns)
Streams API (rows) 3.0.x / 0.10.0.x 3.1.x / 0.10.1.x 3.2.x / 0.10.2.x
3.0.x / 0.10.0.x compatible compatible compatible
3.1.x / 0.10.1.x   compatible compatible
3.2.x / 0.10.2.x   compatible compatible

Upgrading your Kafka Streams applications to CP 3.2.x

To make use of CP 3.2.x (Kafka 0.10.2.x-cp1), you just need to update the Kafka Streams dependency of your application to use the version number 0.10.2.x-cp1, and then recompile your application.

For example, in your pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <!-- update version from 0.10.1.x-cp2 to 0.10.2.1-cp1 -->
    <version>0.10.2.1-cp1</version>
</dependency>

API changes

Kafka Streams and its API were improved and modified since the release of CP 3.1.x. Some of these changes are breaking changes that require you to update the code of your Kafka Streams applications. In this section we focus on only these breaking changes.

Handling Negative Timestamps and Timestamp Extractor Interface

Kafka Streams behavior with regard to invalid (i.e., negative) timestamps was improved. By default you will still get an exception on an invalid timestamp. However, you can reconfigure your application to react more gracefully to invalid timestamps which was not possible before.

Even if you do not use a custom timestamp extractor, you need to recompile your application code, because the TimestampExtractor interface was changed in an incompatible way.

The internal behavior of Kafka Streams with regard to negative timestamps was changed. Instead of raising an exception if the timestamp extractor returns a negative timestamp, the corresponding record will be dropped silently and not be processed. This allows to process topic for which only a few records cannot provide a valid timestamp. Furthermore, the TimestampExtractor interface was changed and now has one additional parameter. This parameter provides a timestamp that can be used, for example, to return an estimated timestamp, if no valid timestamp can be extracted from the current record.

The old default timestamp extractor ConsumerRecordTimestampExtractor was replaced with FailOnInvalidTimestamp, and two new extractors which both extract a record’s built-in timestamp were added (LogAndSkipOnInvalidTimestamp and UsePreviousTimeOnInvalidTimestamp). The new default extractor (FailOnInvalidTimestamp) raises an exception in case of a negative built-in record timestamp such that Kafka Streams’ default behavior is kept (i.e., fail-fast on negative timestamp). The two newly added extractors allow to handle negative timestamp more gracefully by implementing a log-and-skip and timestamp-estimation strategy.

// old interface
public class TimestampExtractor {
  // returning -1 results in an exception
  long extract(ConsumerRecord<Object, Object> record);
}

// new interface
public class TimestampExtractor {
  // provides a timestamp that could be used as a timestamp estimation,
  // if no valid timestamp can be extracted from the current record
  //
  // allows to return -1, which tells Kafka Streams to not process the record (it will be dropped silently)
  long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
}

Metrics

If you provide custom metrics by implementing interface StreamsMetrics you need to update your code as the interface has many new methods allowing to register finer grained metrics than before. More details are available in KIP-114.

// old interface
public interface StreamsMetrics {
  // Add the latency sensor.
  Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);

  // Record the given latency value of the sensor.
  void recordLatency(Sensor sensor, long startNs, long endNs);
}

// new interface
public interface StreamsMetrics {
  // Get read-only handle on global metrics registry.
  Map<MetricName, ? extends Metric> metrics();

  // Add a latency and throughput sensor for a specific operation
  Sensor addLatencyAndThroughputSensor(final String scopeName,
                                       final String entityName,
                                       final String operationName,
                                       final Sensor.RecordingLevel recordingLevel,
                                       final String... tags);

  // Record the given latency value of the sensor.
  void recordLatency(final Sensor sensor,
                     final long startNs,
                     final long endNs);

  // Add a throughput sensor for a specific operation:
  Sensor addThroughputSensor(final String scopeName,
                             final String entityName,
                             final String operationName,
                             final Sensor.RecordingLevel recordingLevel,
                             final String... tags);

  // Record the throughput value of a sensor.
  void recordThroughput(final Sensor sensor,
                        final long value);

  // Generic method to create a sensor.
  Sensor addSensor(final String name,
                   final Sensor.RecordingLevel recordingLevel);

  // Generic method to create a sensor with parent sensors.
  Sensor addSensor(final String name,
                   final Sensor.RecordingLevel recordingLevel,
                   final Sensor... parents);

  // Remove a sensor.
  void removeSensor(final Sensor sensor);
}

Scala

Starting with 0.10.2.0, if your application is written in Scala, you may need to declare types explicitly in order for the code to compile. The StreamToTableJoinScalaIntegrationTest has an example where the types of return variables are explicitly declared.

Full upgrade workflow

A typical workflow for upgrading Kafka Streams applications from CP 3.1.x to CP 3.2.x has the following steps:

  1. Upgrade your application: See upgrade instructions above.
  2. Stop the old application: Stop the old version of your application, i.e. stop all the application instances that are still running the old version of the application.
  3. Optional, upgrade your Kafka cluster: See kafka upgrade instructions.
  4. Start the upgraded application: Start the upgraded version of your application, with as many instances as needed. By default, the upgraded application will resume processing its input data from the point when the old version was stopped (see previous step).

Known issues and workarounds

We have currently observed the following issues with CP 3.2.x, followed by workarounds:

  1. RocksDB behavior in 1-core environments: There is a known issue with RocksDB when running in environments with just one CPU core. In some scenarios, the symptom is that the application’s performance might be very slow or unresponsive. The workaround is to set a particular RocksDB configuration using the RocksDB config setter as follows:
 public static class CustomRocksDBConfig implements RocksDBConfigSetter {
   @Override
   public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
      // Workaround: We must ensure that the parallelism is set to >= 2.
      int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
      // Set number of compaction threads (but not flush threads).
      options.setIncreaseParallelism(compactionParallelism);
   }
 }

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);