.. _streams_upgrade-guide:
Upgrade Guide
=============
**Table of Contents**
.. contents::
:local:
Upgrading from CP 3.1.x (Kafka 0.10.1.x-cp2) to CP 3.2.x (Kafka 0.10.2.x-cp1)
-----------------------------------------------------------------------------
.. _streams_upgrade-guide_3.2.x-compatibility:
Compatibility
^^^^^^^^^^^^^
Kafka Streams applications built with CP 3.2.x (Kafka 0.10.2.x-cp1) are forward and backward compatible with certain Kafka clusters.
* **Forward-compatible to CP 3.2.x clusters (Kafka 0.10.2.x-cp1):**
Existing Kafka Streams applications built with CP 3.0.x (Kafka 0.10.0.x-cp1) or CP 3.1.x (Kafka 0.10.1.x-cp2) will work with upgraded Kafka clusters
running CP 3.2.x (Kafka 0.10.2.x-cp1).
* **Backward-compatible to CP 3.1.x clusters (Kafka 0.10.1.x-cp2):**
This is the first release allowing to upgrade your Kafka Streams application without a broker upgrade.
New Kafka Streams applications built with CP 3.2.x (Kafka 0.10.2.x-cp1) will work with older Kafka clusters running CP 3.1.x (Kafka 0.10.1.x-cp2).
Kafka clusters running CP 3.0.x (Kafka 0.10.0.x-cp1) are *not* compatible with new CP 3.2.x Kafka Streams applications though.
Compatibility Matrix:
+------------------------+------------------+------------------+------------------+
| | **Kafka Broker (columns)** |
+------------------------+------------------+------------------+------------------+
| **Streams API (rows)** | 3.0.x / 0.10.0.x | 3.1.x / 0.10.1.x | 3.2.x / 0.10.2.x |
+------------------------+------------------+------------------+------------------+
| 3.0.x / 0.10.0.x | compatible | compatible | compatible |
+------------------------+------------------+------------------+------------------+
| 3.1.x / 0.10.1.x | | compatible | compatible |
+------------------------+------------------+------------------+------------------+
| 3.2.x / 0.10.2.x | | compatible | compatible |
+------------------------+------------------+------------------+------------------+
.. _streams_upgrade-guide_3.2.x-upgrade-apps:
Upgrading your Kafka Streams applications to CP 3.2.x
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To make use of CP 3.2.x (Kafka 0.10.2.x-cp1), you just need to update the Kafka Streams dependency of your application
to use the version number ``0.10.2.x-cp1``, and then recompile your application.
For example, in your ``pom.xml`` file:
.. sourcecode:: xml
org.apache.kafka
kafka-streams
0.10.2.1-cp1
API changes
^^^^^^^^^^^
Kafka Streams and its API were improved and modified since the release of CP 3.1.x.
Some of these changes are breaking changes that require you to update the code of your Kafka Streams applications.
In this section we focus on only these breaking changes.
Handling Negative Timestamps and Timestamp Extractor Interface
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
Kafka Streams behavior with regard to invalid (i.e., negative) timestamps was improved.
By default you will still get an exception on an invalid timestamp.
However, you can reconfigure your application to react more gracefully to invalid timestamps which was not possible before.
Even if you do not use a custom timestamp extractor, you need to recompile your application code,
because the ``TimestampExtractor`` interface was changed in an incompatible way.
The internal behavior of Kafka Streams with regard to negative timestamps was changed.
Instead of raising an exception if the timestamp extractor returns a negative timestamp,
the corresponding record will be dropped silently and not be processed.
This allows to process topic for which only a few records cannot provide a valid timestamp.
Furthermore, the ``TimestampExtractor`` interface was changed and now has one additional parameter.
This parameter provides a timestamp that can be used, for example, to return an estimated timestamp,
if no valid timestamp can be extracted from the current record.
The old default timestamp extractor ``ConsumerRecordTimestampExtractor`` was replaced with
``FailOnInvalidTimestamp``, and two new extractors which both extract a record's built-in timestamp
were added (``LogAndSkipOnInvalidTimestamp`` and ``UsePreviousTimeOnInvalidTimestamp``).
The new default extractor (``FailOnInvalidTimestamp``) raises an exception in case of a negative built-in
record timestamp such that Kafka Streams' default behavior is kept (i.e., fail-fast on negative timestamp).
The two newly added extractors allow to handle negative timestamp more gracefully by implementing
a log-and-skip and timestamp-estimation strategy.
.. literalinclude:: upgrade-guide_timestamp-extractor.java
:language: java
Metrics
"""""""
If you provide custom metrics by implementing interface ``StreamsMetrics`` you need to update your code as the
interface has many new methods allowing to register finer grained metrics than before.
More details are available in
`KIP-114 `__.
.. literalinclude:: upgrade-guide_metrics.java
:language: java
Scala
"""""
Starting with 0.10.2.0, if your application is written in Scala, you may need to declare types explicitly in order for
the code to compile. The
:cp-examples:`StreamToTableJoinScalaIntegrationTest|src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala`
has an example where the types of return variables are explicitly declared.
Full upgrade workflow
^^^^^^^^^^^^^^^^^^^^^
A typical workflow for upgrading Kafka Streams applications from CP 3.1.x to CP 3.2.x has the following steps:
#. **Upgrade your application:** See :ref:`upgrade instructions ` above.
#. **Stop the old application:** Stop the old version of your application, i.e. stop all the application instances
that are still running the old version of the application.
#. **Optional, upgrade your Kafka cluster:** See :ref:`kafka upgrade instructions `.
#. **Start the upgraded application:** Start the upgraded version of your application, with as many instances as needed.
By default, the upgraded application will resume processing its input data from the point when the old version was stopped (see previous step).
Known issues and workarounds
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
We have currently observed the following issues with CP 3.2.x, followed by workarounds:
#. **RocksDB behavior in 1-core environments:** There is a known issue with RocksDB
when running in environments with just one CPU core. In some scenarios, the symptom
is that the application's performance might be very slow or unresponsive.
The workaround is to set a particular RocksDB configuration using
the :ref:`RocksDB config setter ` as follows:
.. sourcecode:: java
public static class CustomRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName, final Options options, final Map configs) {
// Workaround: We must ensure that the parallelism is set to >= 2.
int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
// Set number of compaction threads (but not flush threads).
options.setIncreaseParallelism(compactionParallelism);
}
}
Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);