Upgrade Guide¶
Table of Contents
Upgrading from CP 3.3.x (Kafka 0.11.0.x-cp1) to CP 4.0.0 (Kafka 1.0.0-cp1)¶
Compatibility¶
Kafka Streams applications built with CP 4.0.0 (Kafka 1.0.0-cp1) are forward and backward compatible with certain Kafka clusters.
- Forward-compatible to CP 4.0.0 clusters (Kafka 1.0.0-cp1): Existing Kafka Streams applications built with CP 3.0.x (Kafka 0.10.0.x-cp1), CP 3.1.x (Kafka 0.10.1.x-cp2), CP 3.2.x (Kafka 0.10.2.x-cp1) or CP 3.3.x (Kafka 0.11.0.x-cp1) will work with upgraded Kafka clusters running CP 4.0.0 (Kafka 1.0.0-cp1).
- Backward-compatible to CP 3.1.x, CP 3.2.x and CP 3.3.x clusters (Kafka 0.10.1.x-cp2, 0.10.2.x-cp1 and 0.11.0.x-cp1): New Kafka Streams applications built with CP 4.0.0 (Kafka 1.0.0-cp1) will work with older Kafka clusters running CP 3.1.x (Kafka 0.10.1.x-cp2), CP 3.2.x (Kafka 0.10.2.x-cp1) or CP 3.3.x (Kafka 0.11.0.x-cp1). However, when exactly-once processing guarantee is required, your Kafka cluster needs to be upgraded to at least CP 3.3.x (Kafka 0.11.0.x-cp1). Note, that exactly-once feature is disabled by default and thus a rolling bounce upgrade of your Streams application is possible if you don’t enable this new feature explicitly. Kafka clusters running CP 3.0.x (Kafka 0.10.0.x-cp1) are not compatible with new CP 4.0.0 Kafka Streams applications.
Compatibility Matrix:
Kafka Broker (columns) | |||||
Streams API (rows) | 3.0.x / 0.10.0.x | 3.1.x / 0.10.1.x | 3.2.x / 0.10.2.x | 3.3.x / 0.11.0.x | 4.0.x / 1.0.x |
3.0.x / 0.10.0.x | compatible | compatible | compatible | compatible | compatible |
3.1.x / 0.10.1.x | compatible | compatible | compatible | compatible | |
3.2.x / 0.10.2.x | compatible | compatible | compatible | compatible | |
3.3.x / 0.11.0.x | mostly compatible (exactly-once processing requires CP 3.3.x) | mostly compatible (exactly-once processing requires CP 3.3.x) | compatible | compatible | |
4.0.x / 1.0.x | mostly compatible (exactly-once processing requires CP 3.3.x) | mostly compatible (exactly-once processing requires CP 3.3.x) | compatible | compatible |
The Streams API does is not compatible with clusters running older Kafka brokers (0.7, 0.8, 0.9).
Upgrading your Kafka Streams applications to CP 4.0.0¶
To make use of Confluent Platform 4.0.0 (Kafka 1.0.x-cp1), you just need to update the Kafka Streams dependency of your application
to use the version number 1.0.0-cp1
, and then recompile your application.
For example, in your pom.xml
file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<!-- update version to 1.0.0-cp1 -->
<version>1.0.0-cp1</version>
</dependency>
Note that there are some Streams API changes in CP 4.0.0. So it is recommended, though not required, to update your code when using the new version.
API changes¶
Kafka Streams and its API were improved and modified since the release of CP 3.3.x. All of these changes are backward compatible, thus it’s not require to update the code of your Kafka Streams applications immediately. However, some methods were deprecated and thus it is recommend to update your code eventually to allow for future upgrades. In this section we focus on deprecated APIs.
Building and running a topology¶
The two main classes to specify a topology, KStreamBuilder
and TopologyBuilder
, were deprecated and replaced by
StreamsBuilder
and Topology
.
Note, that both new classes are in package org.apache.kafka.streams
and that StreamsBuilder
does not extend Topology
,
i.e., the class hierarchy is different now.
This change also affects KafakStreams
constructors that now only accept a Topology
.
If you use StreamsBuilder
you can obtain the constructed topology via StreamsBuilder#build()
.
The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API.
However, some internal methods that were public in KStreamBuilder
and TopologyBuilder
, but not part of the actual API,
are no longer included in the new classes.
// old API
KStreamBuilder builder = new KStreamBuilder(); // for DSL
// or
TopologyBuilder builder = new TopologyBuilder(); // for Processor API
Properties config = new Properties();
KafkaStreams streams = new KafkaStreams(builder, config);
// new API
StreamsBuilder builder = new StreamsBuilder(); // for DSL
// ... specify computational logic
Topology topology = builder.build();
// or
Topology topology = new Topology(); // for Processor API
Properties config = new Properties();
KafkaStreams streams = new KafkaStreams(topology, config);
Describing topology and stream task metadata¶
KafkaStreams#toString()
and KafkaStreams#toString(final String indent)
, which were previously used to retrieve
the user-specified processor topology information as well as runtime stream tasks metadata, are deprecated in 4.0.0.
Instead, a new method of KafkaStreams
, namely localThreadsMetadata()
is added which returns an org.apache.kafka.streams.processor.ThreadMetadata
object
for each of the local stream threads that describes the runtime state of the thread as well as its current assigned tasks metadata. Such information will be very helpful
in terms of debugging and monitoring your streams applications.
For retrieving the specified processor topology
information, users can now call Topology#describe()
which returns an org.apache.kafka.streams.TopologyDescription
object that contains the detailed description
of the topology (for DSL users they would need to call StreamsBuilder#build()
to get the Topology
object first).
Punctuation functions¶
The Processor API was extended to allow users to schedule punctuate
functions either based on stream-time (i.e. PunctuationType.STREAM_TIME
) or wall-clock-time (i.e. PunctuationType.WALL_CLOCK_TIME
).
Before this, users could only schedule based on stream-time and hence the punctuate
function was data-driven only.
As a result, the original ProcessorContext#schedule
is deprecated with a new overloaded function. In addition, the punctuate
function inside Processor
is also deprecated, and is replaced by the newly added Punctuator#punctuate
interface.
// old API (punctuate defined in Processor, and schedule only with stream-time)
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 milliseconds
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
}
// .. other functions
}
// new API (punctuate defined in Punctuator, and schedule can be either stream-time or wall-clock-time)
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
// schedule a punctuate() method every 1000 milliseconds based on stream time
this.context.schedule(1000, PunctuationType.STREAM_TIME, (timestamp) -> {
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
});
}
// .. other functions
}
Streams Configuration¶
You can now override the configs that are used to create internal repartition and changelog topics.
You provide these configs via the StreamsConfig
by adding the topic configs with the prefix as defined by StreamsConfig#topicPrefix(String)
.
Any properties in the StreamsConfig
with the prefix will be applied when creating internal topics.
Any configs that aren’t topic configs will be ignored.
If you are already using StateStoreSupplier
or Materialized
to provide configs for changelogs, then they will take precedence over those supplied in the config.
For example:
Properties streamProps = ...;
// use cleanup.policy=delete for internal topics
streamsProps.put(StreamsConfig.topicPrefix("cleanup.policy"), "delete");
New classes for optional DSL parameters¶
Several new classes were introduced, i.e., Serialized
, Consumed
, Produced
etc. to enable us to reduce the overloads in the DSL.
These classes mostly have a static method with
to create an instance, i.e., Serialized.with(Serdes.Long(), Serdes.String())
.
Scala users should be aware that they will need to surround with
with backticks.
For example:
// When using Scala: enclose "with" with backticks
Serialized.`with`(Serdes.Long(), Serdes.String())
Full upgrade workflow¶
A typical workflow for upgrading Kafka Streams applications from CP 3.3.x to CP 4.0.0 has the following steps:
- Upgrade your application: See upgrade instructions above.
- Stop the old application: Stop the old version of your application, i.e. stop all the application instances that are still running the old version of the application.
- Optional, upgrade your Kafka cluster: See kafka upgrade instructions. Note, if you want to use exactly-once processing semantics, upgrading your cluster to CP 3.3.x is mandatory.
- Start the upgraded application: Start the upgraded version of your application, with as many instances as needed. By default, the upgraded application will resume processing its input data from the point when the old version was stopped (see previous step).
Upgrading older Kafka Streams applications to CP 4.0.0¶
It is also possible to upgrade CP 3.0.x, CP 3.1.x, CP 3.2.x, CP 3.3.x applications to CP 4.0.0. Some of the API improvements introduced in CP 3.1, CP 3.2 and CP 3.3 may require you to update the code of your Kafka Streams applications. In the following sections we focus on these changes.
API changes (from CP 3.2 to CP 3.3)¶
Kafka Streams and its API were improved and modified since the release of CP 3.2.x. All of these changes are backward compatible, thus it’s not require to update the code of your Kafka Streams applications immediately. However, some methods and configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades. In this section we focus on deprecated APIs.
Streams Configuration¶
The following configuration parameters were renamed and their old names were deprecated.
key.serde
renamed todefault.key.serde
value.serde
renamed todefault.value.serde
timestamp.extractor
renamed todefault.timestamp.extractor
Thus, StreamsConfig#KEY_SERDE_CONFIG
, StreamsConfig#VALUE_SERDE_CONFIG
, and StreamsConfig#TIMESTAMP_EXTRACTOR_CONFIG
were deprecated, too.
Additionally, the following method changes apply:
- method
keySerde()
was deprecated and replaced bydefaultKeySerde()
- method
valueSerde()
was deprecated and replaced bydefaultValueSerde()
- new method
defaultTimestampExtractor()
was added
Local timestamp extractors¶
The Streams API was extended to allow users to specify a per stream/table timestamp extractor.
This simplifies the usage of different timestamp extractor logic for different streams/tables.
Before, users needed to apply an if-then-else
pattern within the default timestamp extractor to apply different logic to different input topics.
The old behavior introduced unnecessary dependencies and thus limited code modularity and code reuse.
To enable the new feature, the methods KStreamBuilder#stream()
, KStreamBuilder#table()
, KStream#globalTable()
,
TopologyBuilder#addSource()
, and TopologyBuilder#addGlobalStore()
have new overloads that allow to specify a “local” timestamp
extractor that is solely applied to the corresponding input topics.
// old API (single default TimestampExtractor that is applied globally)
public class MyTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord record, long previousTimestamp) {
long timestamp;
String topic = record.topic();
switch (topic) {
case "streamInputTopic":
timestamp = record.value().getDataTimestamp(); // assuming that value type has a method #getDataTimestamp()
break;
default:
timestamp = record.timestamp();
}
if (timestamp < 0) {
throw new RuntimeException("Invalid negative timestamp.");
}
return timestamp;
}
}
KStreamBuilder builder = new KStreamBuilder();
KStream stream = builder.stream(keySerde, valueSerde, "streamInputTopic");
KTable table= builder.table("tableInputTopic");
Properties config = new Properties(); // omitting mandatory configs for brevity
// set MyTimestampExtractor as global default extractor for all topics
config.set("default.timestamp.extractor", MyTimestampExtractor.class);
KafkaStreams streams = new KafkaStreams(builder, config);
// new API (custom TimestampExtractor for topic "streamInputTopic" only; returns value embedded timestamp)
public class StreamTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord record, long previousTimestamp) {
long timestamp = record.value().getDataTimestamp(); // assuming that value type has a method #getDataTimestamp()
if (timestamp < 0) {
throw new RuntimeException("Invalid negative timestamp.");
}
return timestamp;
}
}
KStreamBuilder builder = new KStreamBuilder();
// set StreamTimestampExtractor explicitly for "streamInputTopic"
KStream stream = builder.stream(new StreamTimestampExtractor(), keySerde, valueSerde, "streamInputTopic");
KTable table= builder.table("tableInputTopic");
Properties config = new Properties(); // omitting mandatory configs for brevity
KafkaStreams streams = new KafkaStreams(builder, config);
KTable Changes¶
The following methods have been deprecated on the KTable
interface
void foreach(final ForeachAction<? super K, ? super V> action)
void print()
void print(final String streamName)
void print(final Serde<K> keySerde, final Serde<V> valSerde)
void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)
void writeAsText(final String filePath)
void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)
void writeAsText(final String filePath, final String streamName)
void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)
These methods have been deprecated in favor of using the Interactive Queries API.
If you want to query the current content of the state store backing the KTable
, use the following approach:
- Make a call to
KafkaStreams.store(String storeName, QueryableStoreType<T> queryableStoreType)
followed by a call toReadOnlyKeyValueStore.all()
to iterate over the keys of aKTable
.
If you want to view the changelog stream of the KTable
then you could do something along the lines of the following:
- Call
KTable.toStream()
then callKStream#print()
.
API changes (from CP 3.1 to CP 3.2)¶
Kafka Streams and its API were improved and modified since the release of CP 3.1.x. Some of these changes are breaking changes that require you to update the code of your Kafka Streams applications. In this section we focus on only these breaking changes.
Handling Negative Timestamps and Timestamp Extractor Interface¶
Kafka Streams behavior with regard to invalid (i.e., negative) timestamps was improved. By default you will still get an exception on an invalid timestamp. However, you can reconfigure your application to react more gracefully to invalid timestamps which was not possible before.
Even if you do not use a custom timestamp extractor, you need to recompile your application code,
because the TimestampExtractor
interface was changed in an incompatible way.
The internal behavior of Kafka Streams with regard to negative timestamps was changed.
Instead of raising an exception if the timestamp extractor returns a negative timestamp,
the corresponding record will be dropped silently and not be processed.
This allows to process topic for which only a few records cannot provide a valid timestamp.
Furthermore, the TimestampExtractor
interface was changed and now has one additional parameter.
This parameter provides a timestamp that can be used, for example, to return an estimated timestamp,
if no valid timestamp can be extracted from the current record.
The old default timestamp extractor ConsumerRecordTimestampExtractor
was replaced with
FailOnInvalidTimestamp
, and two new extractors which both extract a record’s built-in timestamp
were added (LogAndSkipOnInvalidTimestamp
and UsePreviousTimeOnInvalidTimestamp
).
The new default extractor (FailOnInvalidTimestamp
) raises an exception in case of a negative built-in
record timestamp such that Kafka Streams’ default behavior is kept (i.e., fail-fast on negative timestamp).
The two newly added extractors allow to handle negative timestamp more gracefully by implementing
a log-and-skip and timestamp-estimation strategy.
// old interface
public class TimestampExtractor {
// returning -1 results in an exception
long extract(ConsumerRecord<Object, Object> record);
}
// new interface
public class TimestampExtractor {
// provides a timestamp that could be used as a timestamp estimation,
// if no valid timestamp can be extracted from the current record
//
// allows to return -1, which tells Kafka Streams to not process the record (it will be dropped silently)
long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
}
Metrics¶
If you provide custom metrics by implementing interface StreamsMetrics
you need to update your code as the
interface has many new methods allowing to register finer grained metrics than before.
More details are available in
KIP-114.
// old interface
public interface StreamsMetrics {
// Add the latency sensor.
Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
// Record the given latency value of the sensor.
void recordLatency(Sensor sensor, long startNs, long endNs);
}
// new interface
public interface StreamsMetrics {
// Get read-only handle on global metrics registry.
Map<MetricName, ? extends Metric> metrics();
// Add a latency and throughput sensor for a specific operation
Sensor addLatencyAndThroughputSensor(final String scopeName,
final String entityName,
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags);
// Record the given latency value of the sensor.
void recordLatency(final Sensor sensor,
final long startNs,
final long endNs);
// Add a throughput sensor for a specific operation:
Sensor addThroughputSensor(final String scopeName,
final String entityName,
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags);
// Record the throughput value of a sensor.
void recordThroughput(final Sensor sensor,
final long value);
// Generic method to create a sensor.
Sensor addSensor(final String name,
final Sensor.RecordingLevel recordingLevel);
// Generic method to create a sensor with parent sensors.
Sensor addSensor(final String name,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents);
// Remove a sensor.
void removeSensor(final Sensor sensor);
}
Scala¶
Starting with 0.10.2.0, if your application is written in Scala, you may need to declare types explicitly in order for the code to compile. The StreamToTableJoinScalaIntegrationTest has an example where the types of return variables are explicitly declared.
API changes (from CP 3.0 to CP 3.1)¶
Stream grouping and aggregation¶
Grouping (i.e., repartitioning) and aggregation of the KStream
API was significantly changed to be aligned with the KTable
API.
Instead of using a single method with many parameters, grouping and aggregation is now split into two steps.
First, a KStream
is transformed into a KGroupedStream
that is a repartitioned copy of the original KStream
.
Afterwards, an aggregation can be performed on the KGroupedStream
, resulting in a new KTable
that contains the result of the aggregation.
Thus, the methods KStream#aggregateByKey(...)
, KStream#reduceByKey(...)
, and KStream#countByKey(...)
were replaced by KStream#groupBy(...)
and KStream#groupByKey(...)
which return a KGroupedStream
.
While KStream#groupByKey(...)
groups on the current key, KStream#groupBy(...)
sets a new key and re-partitions the data to build groups on the new key.
The new class KGroupedStream
provides the corresponding methods aggregate(...)
, reduce(...)
, and count(...)
.
KStream stream = builder.stream(...);
Reducer reducer = new Reducer() { /* ... */ };
// old API
KTable newTable = stream.reduceByKey(reducer, name);
// new API, Group by existing key
KTable newTable = stream.groupByKey().reduce(reducer, name);
// or Group by a different key
KTable otherTable = stream.groupBy((key, value) -> value).reduce(reducer, name);
Auto Repartitioning¶
Previously when performing KStream#join(...)
, KStream#outerJoin(...)
or KStream#leftJoin(...)
operations after a key changing operation, i.e,
KStream#map(...)
, KStream#flatMap(...)
, KStream#selectKey(...)
the developer was required to call KStream#through(...)
to repartition the mapped KStream
This is no longer required. Repartitioning now happens automatically for all join operations.
KStream streamOne = builder.stream(...);
KStream streamTwo = builder.stream(...);
KeyValueMapper selector = new KeyValueMapper() { /* ... */ };
ValueJoiner joiner = new ValueJoiner { /* ... */ };
JoinWindows windows = JoinWindows.of("the-join").within(60 * 1000);
// old API
KStream oldJoined = streamOne.selectKey(selector)
.through("repartitioned-topic")
.join(streamTwo,
joiner,
windows);
// new API
KStream newJoined = streamOne.selectKey((key,value) -> value)
.join(streamTwo,
joiner,
windows);
TopologyBuilder¶
Two public method signatures have been changed on TopologyBuilder
, TopologyBuilder#sourceTopics(String applicationId)
and TopologyBuilder#topicGroups(String applicationId)
.
These methods no longer take applicationId
as a parameter and instead you should call TopologyBuilder#setApplicationId(String applicationId)
before calling one of these methods.
TopologyBuilder builder = new TopologyBuilder();
...
// old API
Set<String> topics = topologyBuilder.sourceTopics("applicationId");
Map<Integer, TopicsInfo> topicGroups = topologyBuilder.topicGroups("applicationId");
// new API
topologyBuilder.setApplicationId("applicationId");
Set<String> topics = topologyBuilder.sourceTopics();
Map<Integer, TopicsInfo> topicGroups = topologyBuilder.topicGroups();
DSL: New parameters to specify state store names¶
Apache Kafka 0.10.1
introduces Interactive Queries,
which allow you to directly query state stores of a Kafka Streams application.
This new feature required a few changes to the operators in the DSL.
Starting with Kafka 0.10.1
, state stores must be always be “named”,
which includes both explicitly used state stores (e.g., defined by the user)
and internally used state stores (e.g., created behind the scenes by operations such as count()
).
This naming is a prerequisite to make state stores queryable.
As a result of this, the previous “operator name” is now the state store name.
This change affects KStreamBuilder#table(...)
and windowed aggregates KGroupedStream#count(...)
, #reduce(...)
, and #aggregate(...)
.
// old API
builder.table("topic");
builder.table(keySerde, valSerde, "topic");
table2 = table1.through("topic");
stream.countByKey(TimeWindows.of("windowName", 1000)); // window has a name
// new API
builder.table("topic", "storeName"); // requires to provide a store name to make KTable queryable
builder.table(keySerde, valSerde, "topic", "storeName"); // requires to provide a store name to make KTable queryable
table2 = table1.through("topic", "storeName"); // requires to provide a store name to make KTable queryable
// for changes of countByKey() -> groupByKey().count(...), please see example above
// for changes of TimeWindows.of(...), please see example below
stream.groupByKey().count(TimeWindows.of(1000), "countStoreName"); // window name removed, store name added
Windowing¶
The API for JoinWindows
was improved.
It is not longer possible to define a window with a default size (of zero).
Furthermore, windows are not named anymore.
Rather, any such naming is now done for state stores.
See section DSL: New parameters to specify state store names above).
// old API
JoinWindows.of("name"); // defines window with size zero
JoinWindows.of("name").within(60 * 1000L);
TimeWindows.of("name", 60 * 1000L);
UnlimitedWindows.of("name", 60 * 1000L);
// new API, no name, requires window size
JoinWindows.of(0); // no name; set window size explicitly to zero
JoinWindows.of(60 * 1000L); // no name
TimeWindows.of(60 * 1000L); // not required to specify a name anymore
UnlimitedWindows.of(); // not required to specify a name anymore