.. _streams_upgrade-guide:
Upgrade Guide
=============
Upgrading from CP 3.0.x (Kafka 0.10.0.x-cp2) to CP 3.1.2 (Kafka 0.10.1.1-cp1)
-----------------------------------------------------------------------------
.. _streams_upgrade-guide_3.1.2-compatibility:
Compatibility
^^^^^^^^^^^^^
Kafka Streams applications built with CP 3.1.2 (Kafka 0.10.1.1-cp1) are only compatible with Kafka clusters running CP 3.1.2 (Kafka 0.10.1.1-cp1).
* **Forward-compatible to CP 3.1.2 clusters (Kafka 0.10.0.x-cp2):**
Existing Kafka Streams applications built with CP 3.0.x (Kafka 0.10.0.x-cp2) will work with upgraded Kafka clusters
running CP 3.1.2 (Kafka 0.10.1.1-cp1).
.. _streams_upgrade-guide_3.1.2-upgrade-apps:
Upgrading your Kafka Streams applications to CP 3.1.2
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To make use of CP 3.1.2 (Kafka 0.10.1.1-cp1), you just need to update the Kafka Streams dependency of your application
to use the version number ``0.10.1.1-cp1``, and then recompile your application.
For example, in your ``pom.xml`` file:
.. sourcecode:: xml
org.apache.kafka
kafka-streams
0.10.1.1-cp1
API changes
^^^^^^^^^^^
Kafka Streams and its API were improved and modified since the release of CP 3.0.x.
Some of these changes are breaking changes that require you to update the code of your Kafka Streams applications.
In this section we focus on only these breaking changes.
Stream grouping and aggregation
"""""""""""""""""""""""""""""""
Grouping (i.e., repartitioning) and aggregation of the ``KStream`` API was significantly changed to be aligned with the ``KTable`` API.
Instead of using a single method with many parameters, grouping and aggregation is now split into two steps.
First, a ``KStream`` is transformed into a ``KGroupedStream`` that is a repartitioned copy of the original ``KStream``.
Afterwards, an aggregation can be performed on the ``KGroupedStream``, resulting in a new ``KTable`` that contains the result of the aggregation.
Thus, the methods ``KStream#aggregateByKey(...)``, ``KStream#reduceByKey(...)``, and ``KStream#countByKey(...)`` were replaced by ``KStream#groupBy(...)`` and ``KStream#groupByKey(...)`` which return a ``KGroupedStream``.
While ``KStream#groupByKey(...)`` groups on the current key, ``KStream#groupBy(...)`` sets a new key and re-partitions the data to build groups on the new key.
The new class ``KGroupedStream`` provides the corresponding methods ``aggregate(...)``, ``reduce(...)``, and ``count(...)``.
.. literalinclude:: upgrade-guide_grouping.java
:language: java
Auto Repartitioning
"""""""""""""""""""
Previously when performing ``KStream#join(...)``, ``KStream#outerJoin(...)`` or ``KStream#leftJoin(...)`` operations after a key changing operation, i.e,
``KStream#map(...)``, ``KStream#flatMap(...)``, ``KStream#selectKey(...)`` the developer was required to call ``KStream#through(...)`` to repartition the mapped ``KStream``
This is no longer required. Repartitioning now happens automatically for all join operations.
.. literalinclude:: upgrade-guide_repartitioning.java
:language: java
TopologyBuilder
"""""""""""""""
Two public method signatures have been changed on ``TopologyBuilder``, ``TopologyBuilder#sourceTopics(String applicationId)`` and ``TopologyBuilder#topicGroups(String applicationId)``.
These methods no longer take ``applicationId`` as a parameter and instead you should call ``TopologyBuilder#setApplicationId(String applicationId)`` before calling one of these methods.
.. literalinclude:: upgrade-guide_topology-builder.java
:language: java
.. _streams_upgrade-guide_dsl-store-names:
DSL: New parameters to specify state store names
""""""""""""""""""""""""""""""""""""""""""""""""
Apache Kafka ``0.10.1`` introduces :ref:`Interactive Queries `,
which allow you to directly query state stores of a Kafka Streams application.
This new feature required a few changes to the operators in the DSL.
Starting with Kafka ``0.10.1``, state stores must be always be "named",
which includes both explicitly used state stores (e.g., defined by the user)
and internally used state stores (e.g., created behind the scenes by operations such as ``count()``).
This naming is a prerequisite to make state stores queryable.
As a result of this, the previous "operator name" is now the state store name.
This change affects ``KStreamBuilder#table(...)`` and *windowed* aggregates ``KGroupedStream#count(...)``, ``#reduce(...)``, and ``#aggregate(...)``.
.. literalinclude:: upgrade-guide_operator-names.java
:language: java
Windowing
"""""""""
The API for ``JoinWindows`` was improved.
It is not longer possible to define a window with a default size (of zero).
Furthermore, windows are not named anymore.
Rather, any such naming is now done for state stores.
See section :ref:`DSL: New parameters to specify state store names ` above).
.. literalinclude:: upgrade-guide_windows.java
:language: java
Full upgrade workflow
^^^^^^^^^^^^^^^^^^^^^
A typical workflow for upgrading Kafka Streams applications from CP 3.0.x to CP 3.1.2 has the following steps:
#. **Upgrade your application:** See :ref:`upgrade instructions ` above.
#. **Stop the old application:** Stop the old version of your application, i.e. stop all the application instances
that are still running the old version of the application.
#. **Upgrade your Kafka cluster:** See :ref:`kafka updrade instructions `.
#. **Start the upgraded application:** Start the upgraded version of your application, with as many instances as
needed. By default, the upgraded application will resume processing its input data from the point where the old
version was stopped (see previous step).