API Reference¶
Overview¶
Content Types¶
The REST proxy uses content types for both requests and responses to indicate 3
properties of the data: the serialization format (e.g. json
), the version of
the API (e.g. v1
), and the embedded format (e.g. binary
or
avro
). Currently, the only serialization format supported is json
and
the only version of the API is v1
.
The embedded format is the format of data you are producing or consuming, which
are embedded into requests or responses in the serialization format. For
example, you can provide binary
data in a json
-serialized request; in
this case the data should be provided as a base64-encoded string. The proxy also
supports avro
, in which case a JSON form of the data can be embedded
directly and a schema (or schema ID) should be included with the request.
The format for the content type is:
application/vnd.kafka[.embedded_format].[api_version]+[serialization_format]
The serialization format can be omitted when there are no embedded messages
(i.e. for metadata requests). The preferred content type is
application/vnd.kafka.[embedded_format].v1+json
. However, other less
specific content types are permitted, including application/vnd.kafka+json
to indicate no specific API version requirement (the most recent stable version
will be used), application/json
, and application/octet-stream
. The
latter two are only supported for compatibility and ease of use. In all cases,
if the embedded format is omitted, binary
is assumed. Although using these
less specific values is permitted, to remain compatible with future versions you
should specify preferred content types in requests and check the content types
of responses.
Your requests should specify the most specific format and version information
possible via the HTTP Accept
header:
Accept: application/vnd.kafka.v1+json
The server also supports content negotiation, so you may include multiple, weighted preferences:
Accept: application/vnd.kafka.v1+json; q=0.9, application/json; q=0.5
which can be useful when, for example, a new version of the API is preferred but you cannot be certain it is available yet.
Errors¶
All API endpoints use a standard error message format for any requests that return an HTTP status indicating an error (any 400 or 500 statuses). For example, a request entity that omits a required field may generate the following response:
HTTP/1.1 422 OK Content-Type: application/vnd.kafka.v1+json { "error_code": 422, "message": "records may not be empty" }
Although it is good practice to check the status code, you may safely parse the
response of any non-DELETE API calls and check for the presence of an
error_code
field to detect errors.
Some error codes are used frequently across the entire API and you will probably want to have general purpose code to handle these, whereas most other error codes will need to be handled on a per-request basis.
-
ANY
*
¶ Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found.
- Error code 40402 – Partition not found.
- 422 Unprocessable Entity – The request payload is either improperly formatted or contains semantic errors
- 500 Internal Server Error –
- Error code 50001 – Zookeeper error.
- Error code 50002 – Kafka error.
- Error code 50003 – Retriable Kafka error. Although the operation failed, it’s possible that retrying the request will be successful.
- 404 Not Found –
Topics¶
The topics resource provides information about the topics in your Kafka cluster and their current state. It also lets
you produce messages by making POST
requests to specific topics.
-
GET
/topics
¶ Get a list of Kafka topics.
Response JSON Object: - topics (array) – List of topic names
Example request:
GET /topics HTTP/1.1 Host: kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json ["topic1", "topic2"]
-
GET
/topics/
(string: topic_name)¶ Get metadata about a specific topic.
Parameters: - topic_name (string) – Name of the topic to get metadata about
Response JSON Object: - name (string) – Name of the topic
- configs (map) – Per-topic configuration overrides
- partitions (array) – List of partitions for this topic
- partitions[i].partition (int) – the ID of this partition
- partitions[i].leader (int) – the broker ID of the leader for this partition
- partitions[i].replicas (array) – list of replicas for this partition, including the leader
- partitions[i].replicas[j].broker (array) – broker ID of the replica
- partitions[i].replicas[j].leader (boolean) – true if this replica is the leader for the partition
- partitions[i].replicas[j].in_sync (boolean) – true if this replica is currently in sync with the leader
Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found
Example request:
GET /topics/test HTTP/1.1 Host: kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "name": "test", "configs": { "cleanup.policy": "compact" }, "partitions": [ { "partition": 1, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true, }, { "broker": 2, "leader": false, "in_sync": true, } ] }, { "partition": 2, "leader": 2, "replicas": [ { "broker": 1, "leader": false, "in_sync": true, }, { "broker": 2, "leader": true, "in_sync": true, } ] } ] }
-
POST
/topics/
(string: topic_name)¶ Produce messages to a topic, optionally specifying keys or partitions for the messages. For the
avro
embedded format, you must provide information about schemas and the REST proxy must be configured with the URL to access the schema registry (schema.registry.connect
). Schemas may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response.Parameters: - topic_name (string) – Name of the topic to produce the messages to
Request JSON Object: - key_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
- key_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
- value_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
- value_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
Request JSON Array of Objects: - records – A list of records to produce to the topic.
- records[i].key (object) – The message key, formatted according to the embedded format, or null to omit a key (optional)
- records[i].value (object) – The message value, formatted according to the embedded format
- records[i].partition (int) – Partition to store the message in (optional)
Response JSON Object: - key_schema_id (int) – The ID for the schema used to produce keys, or null if keys were not used
- value_schema_id (int) – The ID for the schema used to produce values.
Response JSON Array of Objects: - offests (object) – List of partitions and offsets the messages were published to
- offsets[i].partition (int) – Partition the message was published to, or null if publishing the message failed
- offsets[i].offset (long) – Offset of the message, or null if publishing the message failed
- offsets[i].error_code (long) –
An error code classifying the reason this operation failed, or null if it succeeded.
- 1 - Non-retriable Kafka exception
- 2 - Retriable Kafka exception; the message might be sent successfully if retried
- offsets[i].error (string) – An error message describing why the operation failed, or null if it succeeded
Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found
- 422 Unprocessable Entity –
- Error code 42201 – Request includes keys and uses a format that requires schemas, but does
not include the
key_schema
orkey_schema_id
fields - Error code 42202 – Request includes values and uses a format that requires schemas, but
does not include the
value_schema
orvalue_schema_id
fields
- Error code 42201 – Request includes keys and uses a format that requires schemas, but does
not include the
Example binary request:
POST /topics/test HTTP/1.1 Host: kafkaproxy.example.com Content-Type: application/vnd.kafka.binary.v1+json Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json [ { "key": "a2V5", "value": "Y29uZmx1ZW50" }, { "value": "a2Fma2E=", "partition": 1 }, { "value": "bG9ncw==" } ]
Example binary response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "key_schema_id": null, "value_schema_id": null, "offsets": [ { "partition": 2, "offset": 100 }, { "partition": 1, "offset": 101 }, { "partition": 2, "offset": 102 } ] }
Example Avro request:
POST /topics/test HTTP/1.1 Host: kafkaproxy.example.com Content-Type: application/vnd.kafka.avro.v1+json Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json { "value_schema": "{\"name\":\"int\",\"type\": \"int\"}" "records": [ { "value": 12 }, { "value": 24, "partition": 1 } ] }
Example Avro response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "key_schema_id": null, "value_schema_id": 32, "offsets": [ { "partition": 2, "offset": 103 }, { "partition": 1, "offset": 104 } ] }
Partitions¶
The partitions resource provides per-partition metadata, including the current leaders and replicas for each partition.
It also allows you to produce messages to single partition using POST
requests.
-
GET
/topics/
(string: topic_name)/partitions
¶ Get a list of partitions for the topic.
Parameters: - topic_name (string) – the name of the topic
Response JSON Array of Objects: - partition (int) – ID of the partition
- leader (int) – Broker ID of the leader for this partition
- replicas (array) – List of brokers acting as replicas for this partition
- replicas[i].broker (int) – Broker ID of the replica
- replicas[i].leader (boolean) – true if this broker is the leader for the partition
- replicas[i].in_sync (boolean) – true if the replica is in sync with the leader
Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found
Example request:
GET /topics/test/partitions HTTP/1.1 Host: kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json [ { "partition": 1, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true, }, { "broker": 2, "leader": false, "in_sync": true, }, { "broker": 3, "leader": false, "in_sync": false, } ] }, { "partition": 2, "leader": 2, "replicas": [ { "broker": 1, "leader": false, "in_sync": true, }, { "broker": 2, "leader": true, "in_sync": true, }, { "broker": 3, "leader": false, "in_sync": false, } ] } ]
-
GET
/topics/
(string: topic_name)/partitions/
(int: partition_id)¶ Get metadata about a single partition in the topic.
Parameters: - topic_name (string) – Name of the topic
- partition_id (int) – ID of the partition to inspect
Response JSON Object: - partition (int) – ID of the partition
- leader (int) – Broker ID of the leader for this partition
- replicas (array) – List of brokers acting as replicas for this partition
- replicas[i].broker (int) – Broker ID of the replica
- replicas[i].leader (boolean) – true if this broker is the leader for the partition
- replicas[i].in_sync (boolean) – true if the replica is in sync with the leader
Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found
- Error code 40402 – Partition not found
Example request:
GET /topics/test/partitions/1 HTTP/1.1 Host: kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "partition": 1, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true, }, { "broker": 2, "leader": false, "in_sync": true, }, { "broker": 3, "leader": false, "in_sync": false, } ] }
-
POST
/topics/
(string: topic_name)/partitions/
(int: partition_id)¶ Produce messages to one partition of the topic. For the
avro
embedded format, you must provide information about schemas. This may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response.Parameters: - topic_name (string) – Topic to produce the messages to
- partition_id (int) – Partition to produce the messages to
Request JSON Object: - key_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
- key_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
- value_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
- value_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
- records – A list of records to produce to the partition.
Request JSON Array of Objects: - records[i].key (object) – The message key, formatted according to the embedded format, or null to omit a key (optional)
- records[i].value (object) – The message value, formatted according to the embedded format
Response JSON Object: - key_schema_id (int) – The ID for the schema used to produce keys, or null if keys were not used
- value_schema_id (int) – The ID for the schema used to produce values.
Response JSON Array of Objects: - offests (object) – List of partitions and offsets the messages were published to
- offsets[i].partition (int) – Partition the message was published to. This
will be the same as the
partition_id
parameter and is provided only to maintain consistency with responses from producing to a topic - offsets[i].offset (long) – Offset of the message
- offsets[i].error_code (long) –
An error code classifying the reason this operation failed, or null if it succeeded.
- 1 - Non-retriable Kafka exception
- 2 - Retriable Kafka exception; the message might be sent successfully if retried
- offsets[i].error (string) – An error message describing why the operation failed, or null if it succeeded
Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found
- Error code 40402 – Partition not found
- 422 Unprocessable Entity –
- Error code 42201 – Request includes keys and uses a format that requires schemas, but does
not include the
key_schema
orkey_schema_id
fields - Error code 42202 – Request includes values and uses a format that requires schemas, but
does not include the
value_schema
orvalue_schema_id
fields
- Error code 42201 – Request includes keys and uses a format that requires schemas, but does
not include the
Example binary request:
POST /topics/test/partitions/1 HTTP/1.1 Host: kafkaproxy.example.com Content-Type: application/vnd.kafka.binary.v1+json Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json { "records": [ { "key": "a2V5", "value": "Y29uZmx1ZW50" }, { "value": "a2Fma2E=" } ] }
Example binary response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "key_schema_id": null, "value_schema_id": null, "offsets": [ { "partition": 1, "offset": 100, }, { "partition": 1, "offset": 101, } ] }
Example Avro request:
POST /topics/test/partitions/1 HTTP/1.1 Host: kafkaproxy.example.com Content-Type: application/vnd.kafka.avro.v1+json Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json { "value_schema": "{\"name\":\"int\",\"type\": \"int\"}" "records": [ { "value": 25 }, { "value": 26 } ] }
Example Avro response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "key_schema_id": null, "value_schema_id": 32, "offsets": [ { "partition": 1, "offset": 100, }, { "partition": 1, "offset": 101, } ] }
Consumers¶
The consumers resource provides access to the current state of consumer groups, allows you to create a consumer in a consumer group and consume messages from topics and partitions. The proxy can convert data stored in Kafka in serialized form into a JSON-compatible embedded format. Currently two formats are supported: raw binary data is encoded as base64 strings and Avro data is converted into embedded JSON objects.
Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will end up adding new consumers to the group. If a REST proxy instance is shutdown, it will attempt to cleanly destroy any consumers before it is terminated.
Consumers may not change the set of topics they are subscribed to once they have started consuming messages. For example, if a consumer is created without specifying topic subscriptions, the first read from a topic will subscribe the consumer to that topic and attempting to read from another topic will cause an error.
-
POST
/consumers/
(string: group_name)¶ Create a new consumer instance in the consumer group. The
format
parameter controls the deserialization of data from Kafka and the content type that must be used in theAccept
header of subsequent read API requests performed against this consumer. For example, if the creation request specifiesavro
for the format, subsequent read requests should useAccept: application/vnd.kafka.avro.v1+json
.Note that the response includes a URL including the host since the consumer is stateful and tied to a specific REST proxy instance. Subsequent examples in this section use a
Host
header for this specific REST proxy instance.Parameters: - group_name (string) – The name of the consumer group to join
Request JSON Object: - id (string) – Unique ID for the consumer instance in this group. If omitted, one will be automatically generated using the REST proxy ID and an auto-incrementing number
- format (string) – The format of consumed messages, which is used to convert messages into a JSON-compatible form. Valid values: “binary”, “avro”. If unspecified, defaults to “binary”.
- auto.offset.reset (string) – Sets the
auto.offset.reset
setting for the consumer - auto.commit.enable (string) – Sets the
auto.commit.enable
setting for the consumer
Response JSON Object: - instance_id (string) – Unique ID for the consumer instance in this group. If provided in the initial request,
this will be identical to
id
. - base_uri (string) – Base URI used to construct URIs for subsequent requests against this consumer instance. This
will be of the form
http://hostname:port/consumers/consumer_group/instances/instance_id
.
Status Codes: - 422 Unprocessable Entity –
- Error code 42204 – Invalid consumer configuration. One of the settings specified in the request contained an invalid value.
Example request:
POST /consumers/testgroup/ HTTP/1.1 Host: kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json { "id": "my_consumer", "format": "binary", "auto.offset.reset": "smallest", "auto.commit.enable": "false" }
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "instance_id": "my_consumer", "base_uri": "http://proxy-instance.kafkaproxy.example.com/consumers/testgroup/instances/my_consumer" }
-
POST
/consumers/
(string: group_name)/instances/
(string: instance)/offsets
¶ Commit offsets for the consumer. Returns a list of the partitions with the committed offsets.
The body of this request is empty. The offsets are determined by the current state of the consumer instance on the proxy. The returned state includes both
consumed
andcommitted
offsets. After a successful commit, these should be identical; however, both are included so the output format is consistent with other API calls that return the offsets.Note that this request must be made to the specific REST proxy instance holding the consumer instance.
Parameters: - group_name (string) – The name of the consumer group
- instance (string) – The ID of the consumer instance
Response JSON Array of Objects: - topic (string) – Name of the topic for which an offset was committed
- partition (int) – Partition ID for which an offset was committed
- consumed (long) – The offset of the most recently consumed message
- committed (long) – The committed offset value. If the commit was successful, this should be identical to
consumed
.
Status Codes: - 404 Not Found –
- Error code 40403 – Consumer instance not found
- 500 Internal Server Error –
- Error code 500 – General consumer error response, caused by an exception during the operation. An error message is included in the standard format which explains the cause.
Example request:
POST /consumers/testgroup/instances/my_consumer HTTP/1.1 Host: proxy-instance.kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json [ { "topic": "test", "partition": 1, "consumed": 100, "committed": 100 }, { "topic": "test", "partition": 2, "consumed": 200, "committed": 200 }, { "topic": "test2", "partition": 1, "consumed": 50, "committed": 50 } ]
-
DELETE
/consumers/
(string: group_name)/instances/
(string: instance)¶ Destroy the consumer instance.
Note that this request must be made to the specific REST proxy instance holding the consumer instance.
Parameters: - group_name (string) – The name of the consumer group
- instance (string) – The ID of the consumer instance
Status Codes: - 404 Not Found –
- Error code 40403 – Consumer instance not found
Example request:
DELETE /consumers/testgroup/instances/my_consumer HTTP/1.1 Host: proxy-instance.kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 204 No Content
-
GET
/consumers/
(string: group_name)/instances/
(string: instance)/topics/
(string: topic_name)¶ Consume messages from a topic. If the consumer is not yet subscribed to the topic, this adds it as a subscriber, possibly causing a consumer rebalance.
The format of the embedded data returned by this request is determined by the format specified in the initial consumer instance creation request and must match the format of the
Accept
header. Mismatches will result in error code40601
.Note that this request must be made to the specific REST proxy instance holding the consumer instance.
Parameters: - group_name (string) – The name of the consumer group
- instance (string) – The ID of the consumer instance
- topic_name (string) – The topic to consume messages from.
Query Parameters: - max_bytes – The maximum number of bytes of unencoded keys and values that should be
included in the response. This provides approximate control over the size of
responses and the amount of memory required to store the decoded response. The
actual limit will be the minimum of this setting and the server-side
configuration
consumer.request.max.bytes
. Default is unlimited.
Response JSON Array of Objects: - key (string) – The message key, formatted according to the embedded format
- value (string) – The message value, formatted according to the embedded format
- partition (int) – Partition of the message
- offset (long) – Offset of the message
Status Codes: - 404 Not Found –
- Error code 40401 – Topic not found
- Error code 40403 – Consumer instance not found
- 406 Not Acceptable –
- Error code 40601 – Consumer format does not match the embedded format requested by the
Accept
header.
- Error code 40601 – Consumer format does not match the embedded format requested by the
- 409 Conflict –
- Error code 40901 – Consumer has already initiated a subscription. Consumers may subscribe to multiple topics, but all subscriptions must be initiated in a single request.
- 500 Internal Server Error –
- Error code 500 – General consumer error response, caused by an exception during the operation. An error message is included in the standard format which explains the cause.
Example binary request:
GET /consumers/testgroup/instances/my_consumer/topics/test_topic HTTP/1.1 Host: proxy-instance.kafkaproxy.example.com Accept: application/vnd.kafka.binary.v1+json
Example binary response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.binary.v1+json [ { "key": "a2V5", "value": "Y29uZmx1ZW50", "partition": 1, "offset": 100, }, { "key": "a2V5", "value": "a2Fma2E=", "partition": 2, "offset": 101, } ]
Example Avro request:
GET /consumers/avrogroup/instances/my_avro_consumer/topics/test_avro_topic HTTP/1.1 Host: proxy-instance.kafkaproxy.example.com Accept: application/vnd.kafka.avro.v1+json
Example Avro response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.avro.v1+json [ { "key": 1, "value": { "id": 1, "name": "Bill" }, "partition": 1, "offset": 100, }, { "key": 2, "value": { "id": 2, "name": "Melinda" }, "partition": 2, "offset": 101, } ]
Brokers¶
The brokers resource provides access to the current state of Kafka brokers in the cluster.
-
GET
/brokers
¶ Get a list of brokers.
Response JSON Object: - brokers (array) – List of broker IDs
Example request:
GET /brokers HTTP/1.1 Host: kafkaproxy.example.com Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "brokers": [1, 2, 3] }