librdkafka
The Apache Kafka C/C++ client library
rdkafka.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
43 /* @cond NO_DOC */
44 #pragma once
45 
46 #include <stdio.h>
47 #include <inttypes.h>
48 #include <sys/types.h>
49 
50 #ifdef __cplusplus
51 extern "C" {
52 #if 0
53 } /* Restore indent */
54 #endif
55 #endif
56 
57 #ifdef _MSC_VER
58 #include <basetsd.h>
59 typedef SSIZE_T ssize_t;
60 #define RD_UNUSED
61 #define RD_INLINE __inline
62 #define RD_DEPRECATED
63 #undef RD_EXPORT
64 #ifdef LIBRDKAFKA_EXPORTS
65 #define RD_EXPORT __declspec(dllexport)
66 #else
67 #define RD_EXPORT __declspec(dllimport)
68 #endif
69 
70 #else
71 #define RD_UNUSED __attribute__((unused))
72 #define RD_INLINE inline
73 #define RD_EXPORT
74 #define RD_DEPRECATED __attribute__((deprecated))
75 #endif
76 /* @endcond */
77 
78 
79 
101 #define RD_KAFKA_VERSION 0x000902ff
102 
111 RD_EXPORT
112 int rd_kafka_version(void);
113 
119 RD_EXPORT
120 const char *rd_kafka_version_str (void);
121 
140 typedef enum rd_kafka_type_t {
144 
145 
156 
157 
158 
165 RD_EXPORT
166 const char *rd_kafka_get_debug_contexts(void);
167 
175 #define RD_KAFKA_DEBUG_CONTEXTS \
176  "all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature"
177 
178 
179 /* @cond NO_DOC */
180 /* Private types to provide ABI compatibility */
181 typedef struct rd_kafka_s rd_kafka_t;
182 typedef struct rd_kafka_topic_s rd_kafka_topic_t;
183 typedef struct rd_kafka_conf_s rd_kafka_conf_t;
184 typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
185 typedef struct rd_kafka_queue_s rd_kafka_queue_t;
186 /* @endcond */
187 
188 
201 typedef enum {
202  /* Internal errors to rdkafka: */
274 
277 
278  /* Kafka broker errors: */
353 
354  RD_KAFKA_RESP_ERR_END_ALL,
356 
357 
365  const char *name;
366  const char *desc;
367 };
368 
369 
373 RD_EXPORT
374 void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
375  size_t *cntp);
376 
377 
378 
379 
385 RD_EXPORT
386 const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
387 
388 
389 
395 RD_EXPORT
396 const char *rd_kafka_err2name (rd_kafka_resp_err_t err);
397 
398 
421 RD_EXPORT
423 
424 
446 RD_EXPORT
448 
449 
459 RD_EXPORT
460 int rd_kafka_errno (void);
461 
462 
463 
479 typedef struct rd_kafka_topic_partition_s {
480  char *topic;
481  int32_t partition;
482  int64_t offset;
483  void *metadata;
484  size_t metadata_size;
485  void *opaque;
487  void *_private;
490 
491 
496 RD_EXPORT
498 
499 
504 typedef struct rd_kafka_topic_partition_list_s {
505  int cnt;
506  int size;
509 
510 
525 RD_EXPORT
527 
528 
532 RD_EXPORT
533 void
535 
545 RD_EXPORT
548  const char *topic, int32_t partition);
549 
550 
559 RD_EXPORT
560 void
562  *rktparlist,
563  const char *topic,
564  int32_t start, int32_t stop);
565 
566 
567 
579 RD_EXPORT
580 int
582  const char *topic, int32_t partition);
583 
584 
592 RD_EXPORT
593 int
596  int idx);
597 
598 
606 RD_EXPORT
609 
610 
611 
612 
620 RD_EXPORT
623  const char *topic, int32_t partition, int64_t offset);
624 
625 
626 
632 RD_EXPORT
635  const char *topic, int32_t partition);
636 
649 // FIXME: This doesn't show up in docs for some reason
650 // "Compound rd_kafka_message_t is not documented."
651 
665 typedef struct rd_kafka_message_s {
667  rd_kafka_topic_t *rkt;
668  int32_t partition;
669  void *payload;
673  size_t len;
676  void *key;
678  size_t key_len;
680  int64_t offset;
690  void *_private;
695 
696 
700 RD_EXPORT
702 
703 
704 
705 
710 static RD_INLINE const char *
711 RD_UNUSED
713  if (!rkmessage->err)
714  return NULL;
715 
716  if (rkmessage->payload)
717  return (const char *)rkmessage->payload;
718 
719  return rd_kafka_err2str(rkmessage->err);
720 }
721 
722 
723 
735 RD_EXPORT
736 int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
737  rd_kafka_timestamp_type_t *tstype);
738 
739 
740 
756 typedef enum {
761 
762 
793 RD_EXPORT
794 rd_kafka_conf_t *rd_kafka_conf_new(void);
795 
796 
800 RD_EXPORT
801 void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
802 
803 
807 RD_EXPORT
808 rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
809 
810 
820 RD_EXPORT
821 rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
822  const char *name,
823  const char *value,
824  char *errstr, size_t errstr_size);
825 
826 
832 RD_EXPORT
833 void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events);
834 
835 
839 RD_EXPORT
840 void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
841  void (*dr_cb) (rd_kafka_t *rk,
842  void *payload, size_t len,
844  void *opaque, void *msg_opaque));
845 
860 RD_EXPORT
861 void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
862  void (*dr_msg_cb) (rd_kafka_t *rk,
863  const rd_kafka_message_t *
864  rkmessage,
865  void *opaque));
866 
867 
872 RD_EXPORT
873 void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
874  void (*consume_cb) (rd_kafka_message_t *
875  rkmessage,
876  void *opaque));
877 
937 RD_EXPORT
939  rd_kafka_conf_t *conf,
940  void (*rebalance_cb) (rd_kafka_t *rk,
943  void *opaque));
944 
945 
946 
961 RD_EXPORT
963  rd_kafka_conf_t *conf,
964  void (*offset_commit_cb) (rd_kafka_t *rk,
967  void *opaque));
968 
969 
978 RD_EXPORT
979 void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
980  void (*error_cb) (rd_kafka_t *rk, int err,
981  const char *reason,
982  void *opaque));
983 
998 RD_EXPORT
999 void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
1000  void (*throttle_cb) (
1001  rd_kafka_t *rk,
1002  const char *broker_name,
1003  int32_t broker_id,
1004  int throttle_time_ms,
1005  void *opaque));
1006 
1007 
1018 RD_EXPORT
1019 void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
1020  void (*log_cb) (const rd_kafka_t *rk, int level,
1021  const char *fac, const char *buf));
1022 
1023 
1040 RD_EXPORT
1041 void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
1042  int (*stats_cb) (rd_kafka_t *rk,
1043  char *json,
1044  size_t json_len,
1045  void *opaque));
1046 
1047 
1048 
1061 RD_EXPORT
1062 void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf,
1063  int (*socket_cb) (int domain, int type,
1064  int protocol,
1065  void *opaque));
1066 
1067 
1068 #ifndef _MSC_VER
1069 
1081 RD_EXPORT
1082 void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
1083  int (*open_cb) (const char *pathname,
1084  int flags, mode_t mode,
1085  void *opaque));
1086 #endif
1087 
1091 RD_EXPORT
1092 void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque);
1093 
1097 RD_EXPORT
1098 void *rd_kafka_opaque(const rd_kafka_t *rk);
1099 
1100 
1101 
1107 RD_EXPORT
1108 void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
1109  rd_kafka_topic_conf_t *tconf);
1110 
1111 
1112 
1128 RD_EXPORT
1129 rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
1130  const char *name,
1131  char *dest, size_t *dest_size);
1132 
1133 
1139 RD_EXPORT
1140 rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
1141  const char *name,
1142  char *dest, size_t *dest_size);
1143 
1144 
1153 RD_EXPORT
1154 const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp);
1155 
1156 
1165 RD_EXPORT
1166 const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
1167  size_t *cntp);
1168 
1173 RD_EXPORT
1174 void rd_kafka_conf_dump_free(const char **arr, size_t cnt);
1175 
1180 RD_EXPORT
1181 void rd_kafka_conf_properties_show(FILE *fp);
1182 
1200 RD_EXPORT
1201 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);
1202 
1203 
1207 RD_EXPORT
1208 rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
1209  *conf);
1210 
1211 
1215 RD_EXPORT
1216 void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);
1217 
1218 
1227 RD_EXPORT
1228 rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
1229  const char *name,
1230  const char *value,
1231  char *errstr, size_t errstr_size);
1232 
1237 RD_EXPORT
1238 void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque);
1239 
1240 
1255 RD_EXPORT
1256 void
1257 rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
1258  int32_t (*partitioner) (
1259  const rd_kafka_topic_t *rkt,
1260  const void *keydata,
1261  size_t keylen,
1262  int32_t partition_cnt,
1263  void *rkt_opaque,
1264  void *msg_opaque));
1265 
1273 RD_EXPORT
1274 int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt,
1275  int32_t partition);
1276 
1277 
1278 /*******************************************************************
1279  * *
1280  * Partitioners provided by rdkafka *
1281  * *
1282  *******************************************************************/
1283 
1292 RD_EXPORT
1293 int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
1294  const void *key, size_t keylen,
1295  int32_t partition_cnt,
1296  void *opaque, void *msg_opaque);
1297 
1306 RD_EXPORT
1307 int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
1308  const void *key, size_t keylen,
1309  int32_t partition_cnt,
1310  void *opaque, void *msg_opaque);
1311 
1322 RD_EXPORT
1323 int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
1324  const void *key, size_t keylen,
1325  int32_t partition_cnt,
1326  void *opaque, void *msg_opaque);
1327 
1328 
1369 RD_EXPORT
1370 rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
1371  char *errstr, size_t errstr_size);
1372 
1373 
1379 RD_EXPORT
1380 void rd_kafka_destroy(rd_kafka_t *rk);
1381 
1382 
1383 
1387 RD_EXPORT
1388 const char *rd_kafka_name(const rd_kafka_t *rk);
1389 
1390 
1401 RD_EXPORT
1402 char *rd_kafka_memberid (const rd_kafka_t *rk);
1403 
1404 
1426 RD_EXPORT
1427 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
1428  rd_kafka_topic_conf_t *conf);
1429 
1430 
1431 
1436 RD_EXPORT
1437 void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
1438 
1439 
1443 RD_EXPORT
1444 const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt);
1445 
1446 
1450 RD_EXPORT
1451 void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt);
1452 
1453 
1460 #define RD_KAFKA_PARTITION_UA ((int32_t)-1)
1461 
1462 
1484 RD_EXPORT
1485 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
1486 
1487 
1498 RD_EXPORT
1499 void rd_kafka_yield (rd_kafka_t *rk);
1500 
1501 
1502 
1503 
1511 RD_EXPORT rd_kafka_resp_err_t
1512 rd_kafka_pause_partitions (rd_kafka_t *rk,
1513  rd_kafka_topic_partition_list_t *partitions);
1514 
1515 
1516 
1524 RD_EXPORT rd_kafka_resp_err_t
1525 rd_kafka_resume_partitions (rd_kafka_t *rk,
1526  rd_kafka_topic_partition_list_t *partitions);
1527 
1528 
1529 
1530 
1539 RD_EXPORT rd_kafka_resp_err_t
1540 rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
1541  const char *topic, int32_t partition,
1542  int64_t *low, int64_t *high, int timeout_ms);
1543 
1544 
1561 RD_EXPORT rd_kafka_resp_err_t
1562 rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
1563  const char *topic, int32_t partition,
1564  int64_t *low, int64_t *high);
1565 
1566 
1567 
1581 RD_EXPORT
1582 void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr);
1583 
1584 
1608 RD_EXPORT
1609 rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
1610 
1614 RD_EXPORT
1615 void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
1616 
1617 
1624 RD_EXPORT
1625 rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk);
1626 
1627 
1637 RD_EXPORT
1638 rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk);
1639 
1640 
1647 RD_EXPORT
1648 void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst);
1649 
1650 
1654 RD_EXPORT
1655 size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu);
1656 
1657 
1673 RD_EXPORT
1674 void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
1675  const void *payload, size_t size);
1676 
1687 #define RD_KAFKA_OFFSET_BEGINNING -2
1689 #define RD_KAFKA_OFFSET_END -1
1691 #define RD_KAFKA_OFFSET_STORED -1000
1693 #define RD_KAFKA_OFFSET_INVALID -1001
1697 #define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
1698 
1705 #define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))
1706 
1740 RD_EXPORT
1741 int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
1742  int64_t offset);
1743 
1758 RD_EXPORT
1759 int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
1760  int64_t offset, rd_kafka_queue_t *rkqu);
1761 
1775 RD_EXPORT
1776 int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
1777 
1778 
1779 
1794 RD_EXPORT
1795 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
1796  int32_t partition,
1797  int64_t offset,
1798  int timeout_ms);
1799 
1800 
1822 RD_EXPORT
1823 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
1824  int timeout_ms);
1825 
1826 
1827 
1850 RD_EXPORT
1851 ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
1852  int timeout_ms,
1853  rd_kafka_message_t **rkmessages,
1854  size_t rkmessages_size);
1855 
1856 
1857 
1878 RD_EXPORT
1879 int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
1880  int timeout_ms,
1881  void (*consume_cb) (rd_kafka_message_t
1882  *rkmessage,
1883  void *opaque),
1884  void *opaque);
1885 
1886 
1903 RD_EXPORT
1904 rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
1905  int timeout_ms);
1906 
1912 RD_EXPORT
1913 ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
1914  int timeout_ms,
1915  rd_kafka_message_t **rkmessages,
1916  size_t rkmessages_size);
1917 
1923 RD_EXPORT
1924 int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
1925  int timeout_ms,
1926  void (*consume_cb) (rd_kafka_message_t
1927  *rkmessage,
1928  void *opaque),
1929  void *opaque);
1930 
1931 
1957 RD_EXPORT
1958 rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
1959  int32_t partition, int64_t offset);
1982 RD_EXPORT rd_kafka_resp_err_t
1983 rd_kafka_subscribe (rd_kafka_t *rk,
1984  const rd_kafka_topic_partition_list_t *topics);
1985 
1986 
1990 RD_EXPORT
1991 rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);
1992 
1993 
2003 RD_EXPORT rd_kafka_resp_err_t
2004 rd_kafka_subscription (rd_kafka_t *rk,
2006 
2007 
2008 
2027 RD_EXPORT
2028 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
2029 
2045 RD_EXPORT
2047 
2048 
2049 
2063 RD_EXPORT rd_kafka_resp_err_t
2064 rd_kafka_assign (rd_kafka_t *rk,
2065  const rd_kafka_topic_partition_list_t *partitions);
2066 
2076 RD_EXPORT rd_kafka_resp_err_t
2077 rd_kafka_assignment (rd_kafka_t *rk,
2078  rd_kafka_topic_partition_list_t **partitions);
2079 
2080 
2081 
2082 
2098 RD_EXPORT rd_kafka_resp_err_t
2099 rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
2100  int async);
2101 
2102 
2108 RD_EXPORT rd_kafka_resp_err_t
2109 rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
2110  int async);
2111 
2112 
2134 RD_EXPORT rd_kafka_resp_err_t
2135 rd_kafka_commit_queue (rd_kafka_t *rk,
2136  const rd_kafka_topic_partition_list_t *offsets,
2137  rd_kafka_queue_t *rkqu,
2138  void (*cb) (rd_kafka_t *rk,
2139  rd_kafka_resp_err_t err,
2141  void *opaque),
2142  void *opaque);
2143 
2144 
2157 RD_EXPORT rd_kafka_resp_err_t
2158 rd_kafka_committed (rd_kafka_t *rk,
2159  rd_kafka_topic_partition_list_t *partitions,
2160  int timeout_ms);
2161 
2162 
2163 
2176 RD_EXPORT rd_kafka_resp_err_t
2177 rd_kafka_position (rd_kafka_t *rk,
2178  rd_kafka_topic_partition_list_t *partitions);
2179 
2180 
2196 #define RD_KAFKA_MSG_F_FREE 0x1
2197 #define RD_KAFKA_MSG_F_COPY 0x2
2198 #define RD_KAFKA_MSG_F_BLOCK 0x4
2274 RD_EXPORT
2275 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
2276  int msgflags,
2277  void *payload, size_t len,
2278  const void *key, size_t keylen,
2279  void *msg_opaque);
2280 
2281 
2282 
2304 RD_EXPORT
2305 int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
2306  int msgflags,
2307  rd_kafka_message_t *rkmessages, int message_cnt);
2308 
2309 
2310 
2311 
2323 RD_EXPORT
2324 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
2325 
2326 
2341 typedef struct rd_kafka_metadata_broker {
2342  int32_t id;
2343  char *host;
2344  int port;
2346 
2350 typedef struct rd_kafka_metadata_partition {
2351  int32_t id;
2352  rd_kafka_resp_err_t err;
2353  int32_t leader;
2354  int replica_cnt;
2355  int32_t *replicas;
2356  int isr_cnt;
2357  int32_t *isrs;
2359 
2363 typedef struct rd_kafka_metadata_topic {
2364  char *topic;
2365  int partition_cnt;
2366  struct rd_kafka_metadata_partition *partitions;
2374 typedef struct rd_kafka_metadata {
2375  int broker_cnt;
2376  struct rd_kafka_metadata_broker *brokers;
2378  int topic_cnt;
2379  struct rd_kafka_metadata_topic *topics;
2381  int32_t orig_broker_id;
2382  char *orig_broker_name;
2384 
2385 
2402 RD_EXPORT
2404 rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
2405  rd_kafka_topic_t *only_rkt,
2406  const struct rd_kafka_metadata **metadatap,
2407  int timeout_ms);
2408 
2412 RD_EXPORT
2413 void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
2414 
2415 
2436  char *member_id;
2437  char *client_id;
2438  char *client_host;
2439  void *member_metadata;
2441  int member_metadata_size;
2442  void *member_assignment;
2444  int member_assignment_size;
2445 };
2446 
2451  struct rd_kafka_metadata_broker broker;
2452  char *group;
2454  char *state;
2455  char *protocol_type;
2456  char *protocol;
2457  struct rd_kafka_group_member_info *members;
2458  int member_cnt;
2459 };
2460 
2467  struct rd_kafka_group_info *groups;
2468  int group_cnt;
2469 };
2488 RD_EXPORT
2490 rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
2491  const struct rd_kafka_group_list **grplistp,
2492  int timeout_ms);
2493 
2497 RD_EXPORT
2498 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);
2499 
2500 
2541 RD_EXPORT
2542 int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
2543 
2544 
2545 
2546 
2559 RD_EXPORT RD_DEPRECATED
2560 void rd_kafka_set_logger(rd_kafka_t *rk,
2561  void (*func) (const rd_kafka_t *rk, int level,
2562  const char *fac, const char *buf));
2563 
2564 
2572 RD_EXPORT
2573 void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
2574 
2575 
2579 RD_EXPORT
2580 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
2581  const char *fac, const char *buf);
2582 
2583 
2587 RD_EXPORT
2588 void rd_kafka_log_syslog(const rd_kafka_t *rk, int level,
2589  const char *fac, const char *buf);
2590 
2591 
2604 RD_EXPORT
2605 int rd_kafka_outq_len(rd_kafka_t *rk);
2606 
2607 
2608 
2615 RD_EXPORT
2616 void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
2617 
2618 
2619 
2625 RD_EXPORT
2626 int rd_kafka_thread_cnt(void);
2627 
2628 
2638 RD_EXPORT
2639 int rd_kafka_wait_destroyed(int timeout_ms);
2640 
2641 
2659 RD_EXPORT
2661 
2662 
2678 typedef int rd_kafka_event_type_t;
2679 #define RD_KAFKA_EVENT_NONE 0x0
2680 #define RD_KAFKA_EVENT_DR 0x1
2681 #define RD_KAFKA_EVENT_FETCH 0x2
2682 #define RD_KAFKA_EVENT_LOG 0x4
2683 #define RD_KAFKA_EVENT_ERROR 0x8
2684 #define RD_KAFKA_EVENT_REBALANCE 0x10
2685 #define RD_KAFKA_EVENT_OFFSET_COMMIT 0x20
2688 typedef struct rd_kafka_op_s rd_kafka_event_t;
2689 
2690 
2697 RD_EXPORT
2698 rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev);
2706 RD_EXPORT
2707 const char *rd_kafka_event_name (const rd_kafka_event_t *rkev);
2708 
2709 
2719 RD_EXPORT
2720 void rd_kafka_event_destroy (rd_kafka_event_t *rkev);
2721 
2722 
2735 RD_EXPORT
2736 const rd_kafka_message_t *rd_kafka_event_message_next (rd_kafka_event_t *rkev);
2737 
2738 
2749 RD_EXPORT
2750 size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
2751  const rd_kafka_message_t **rkmessages,
2752  size_t size);
2753 
2754 
2762 RD_EXPORT
2763 size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev);
2764 
2765 
2772 RD_EXPORT
2773 rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev);
2774 
2775 
2784 RD_EXPORT
2785 const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev);
2786 
2787 
2788 
2795 RD_EXPORT
2796 void *rd_kafka_event_opaque (rd_kafka_event_t *rkev);
2797 
2798 
2807 RD_EXPORT
2808 int rd_kafka_event_log (rd_kafka_event_t *rkev,
2809  const char **fac, const char **str, int *level);
2810 
2811 
2822 rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev);
2823 
2824 
2834 RD_EXPORT rd_kafka_topic_partition_t *
2835 rd_kafka_event_topic_partition (rd_kafka_event_t *rkev);
2836 
2837 
2845 RD_EXPORT
2846 rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms);
2847 
2848 
2851 #ifdef __cplusplus
2852 }
2853 #endif
void * _private
Definition: rdkafka.h:487
rd_kafka_resp_err_t
Error codes.
Definition: rdkafka.h:201
rd_kafka_topic_t * rkt
Definition: rdkafka.h:667
Definition: rdkafka.h:282
RD_EXPORT int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Start consuming messages for topic rkt and partition at offset offset which may either be an absolute...
Definition: rdkafka.h:263
rd_kafka_resp_err_t err
Definition: rdkafka.h:666
rd_kafka_conf_res_t
Configuration result type.
Definition: rdkafka.h:756
RD_EXPORT char * rd_kafka_memberid(const rd_kafka_t *rk)
Returns this client&#39;s broker-assigned group member id.
RD_EXPORT void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst)
Forward/re-route queue src to dst. If dst is NULL the forwarding is removed.
int cnt
Definition: rdkafka.h:505
RD_EXPORT int rd_kafka_thread_cnt(void)
Retrieve the current number of threads in use by librdkafka.
RD_EXPORT void rd_kafka_conf_set_consume_cb(rd_kafka_conf_t *conf, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque))
Consumer: Set consume callback for use with rd_kafka_consumer_poll()
rd_kafka_topic_partition_t * elems
Definition: rdkafka.h:507
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **topics)
Returns the current topic subscription.
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent partitioner.
RD_EXPORT void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf, void(*dr_msg_cb)(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque))
Producer: Set delivery report callback in provided conf object.
RD_EXPORT const char * rd_kafka_event_name(const rd_kafka_event_t *rkev)
RD_EXPORT void rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t *rkparlist)
Free all resources used by the list and the list itself.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_resume_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Resume producing consumption for the provided list of partitions.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a single rd_kafka_topic_conf_t value by property name.
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t *src)
Make a copy of an existing list.
RD_EXPORT void rd_kafka_topic_conf_set_partitioner_cb(rd_kafka_topic_conf_t *topic_conf, int32_t(*partitioner)(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque))
Producer: Set partitioner callback in provided topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk)
Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer&#39;s queue (rd_kafka_consumer_poll()).
RD_EXPORT void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage)
Frees resources for rkmessage and hands ownership back to rdkafka.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve topic configuration value for property name.
Definition: rdkafka.h:237
Definition: rdkafka.h:352
RD_EXPORT rd_kafka_resp_err_t rd_kafka_position(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Retrieve current positions (offsets) for topics+partitions.
Definition: rdkafka.h:251
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_consumer(rd_kafka_t *rk)
size_t key_len
Definition: rdkafka.h:678
Definition: rdkafka.h:231
RD_EXPORT rd_kafka_event_type_t rd_kafka_event_type(const rd_kafka_event_t *rkev)
Definition: rdkafka.h:758
RD_EXPORT rd_kafka_t * rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
Creates a new Kafka handle and starts its operation according to the specified type (RD_KAFKA_CONSUME...
RD_EXPORT void rd_kafka_get_err_descs(const struct rd_kafka_err_desc **errdescs, size_t *cntp)
Returns the full list of error codes.
Group information.
Definition: rdkafka.h:2464
char * group
Definition: rdkafka.h:2466
Partition information.
Definition: rdkafka.h:2364
Definition: rdkafka.h:220
RD_EXPORT int rd_kafka_event_log(rd_kafka_event_t *rkev, const char **fac, const char **str, int *level)
Extract log message from the event.
RD_EXPORT const char * rd_kafka_version_str(void)
Returns the librdkafka version as string.
RD_EXPORT void * rd_kafka_opaque(const rd_kafka_t *rk)
Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()
const char * name
Definition: rdkafka.h:365
Definition: rdkafka.h:227
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_new(int size)
Create a new list/vector Topic+Partition container.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async)
Commit message&#39;s offset on broker for the message&#39;s partition.
RD_EXPORT ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume up to rkmessages_size from topic rkt and partition putting a pointer to each message in the a...
RD_EXPORT void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque)
Sets the application&#39;s opaque pointer that will be passed to all topic callbacks as the rkt_opaque ar...
Definition: rdkafka.h:247
RD_EXPORT void rd_kafka_log_print(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin (default) log sink: print to stderr.
Definition: rdkafka.h:255
RD_EXPORT int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consume multiple messages from queue with callback.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a configuration property.
RD_EXPORT int rd_kafka_outq_len(rd_kafka_t *rk)
Returns the current out queue length.
Definition: rdkafka.h:273
RD_EXPORT const rd_kafka_message_t * rd_kafka_event_message_next(rd_kafka_event_t *rkev)
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Store offset offset for topic rkt partition partition.
RD_EXPORT void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, void(*error_cb)(rd_kafka_t *rk, int err, const char *reason, void *opaque))
Set error callback in provided conf object.
size_t len
Definition: rdkafka.h:673
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async)
Commit offsets on broker for the provided list of partitions.
RD_EXPORT void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf)
Definition: rdkafka.h:330
RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
Produce and send a single message to broker.
Definition: rdkafka.h:241
RD_EXPORT const char * rd_kafka_event_error_string(rd_kafka_event_t *rkev)
Definition: rdkafka.h:332
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_event_topic_partition(rd_kafka_event_t *rkev)
RD_EXPORT void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu)
RD_EXPORT size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu)
Definition: rdkafka.h:259
rd_kafka_resp_err_t code
Definition: rdkafka.h:364
RD_EXPORT void rd_kafka_destroy(rd_kafka_t *rk)
Destroy Kafka handle.
RD_EXPORT void rd_kafka_set_log_level(rd_kafka_t *rk, int level)
Specifies the maximum logging level produced by internal kafka logging and debugging.
RD_EXPORT int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consumes messages from topic rkt and partition, calling the provided callback for each consumed messs...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms)
Request Metadata from broker.
RD_EXPORT int rd_kafka_wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
Definition: rdkafka.h:316
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_queue(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, rd_kafka_queue_t *rkqu, void(*cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque), void *opaque)
Commit offsets on broker for the provided list of partitions.
Definition: rdkafka.h:304
int64_t offset
Definition: rdkafka.h:680
Definition: rdkafka.h:308
RD_EXPORT int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
Adds one or more brokers to the kafka handle&#39;s list of initial bootstrap brokers. ...
RD_EXPORT int rd_kafka_errno(void)
Returns the thread-local system errno.
Definition: rdkafka.h:290
Definition: rdkafka.h:216
Definition: rdkafka.h:280
RD_EXPORT void rd_kafka_log_syslog(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin log sink: print to syslog.
Definition: rdkafka.h:204
Group member information.
Definition: rdkafka.h:2449
void * key
Definition: rdkafka.h:676
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve configuration value for property name.
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_new(void)
Create topic configuration object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high)
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT void rd_kafka_dump(FILE *fp, rd_kafka_t *rk)
Dumps rdkafka&#39;s internal state for handle rk to stream fp.
Definition: rdkafka.h:229
A growable list of Topic+Partitions.
Definition: rdkafka.h:504
int rd_kafka_event_type_t
Event types.
Definition: rdkafka.h:2692
Topic information.
Definition: rdkafka.h:2377
RD_EXPORT void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf, void(*throttle_cb)(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque))
Set throttle callback.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_main(rd_kafka_t *rk)
RD_EXPORT void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, void(*log_cb)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger callback.
Definition: rdkafka.h:218
RD_EXPORT rd_kafka_resp_err_t rd_kafka_list_groups(rd_kafka_t *rk, const char *group, const struct rd_kafka_group_list **grplistp, int timeout_ms)
List and describe client groups in cluster.
int32_t partition
Definition: rdkafka.h:481
Definition: rdkafka.h:214
Definition: rdkafka.h:243
Definition: rdkafka.h:212
void * opaque
Definition: rdkafka.h:485
const char * desc
Definition: rdkafka.h:366
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)
Atomic assignment of partitions to consume.
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup(const rd_kafka_conf_t *conf)
Creates a copy/duplicate of configuration object conf.
RD_EXPORT int64_t rd_kafka_message_timestamp(const rd_kafka_message_t *rkmessage, rd_kafka_timestamp_type_t *tstype)
Returns the message timestamp for a consumed message.
RD_EXPORT int rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t *rktparlist, int idx)
Delete partition from list by elems[] index.
Definition: rdkafka.h:302
RD_EXPORT void rd_kafka_mem_free(rd_kafka_t *rk, void *ptr)
Free pointer returned by librdkafka.
static RD_INLINE const char *RD_UNUSED rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage)
Returns the error string for an errored rd_kafka_message_t or NULL if there was no error...
Definition: rdkafka.h:712
RD_EXPORT rd_kafka_resp_err_t rd_kafka_pause_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Pause producing or consumption for the provided list of partitions.
RD_EXPORT void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist)
Release list memory.
RD_EXPORT void rd_kafka_event_destroy(rd_kafka_event_t *rkev)
Destroy an event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_committed(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
Retrieve committed offsets for topics+partitions.
Definition: rdkafka.h:154
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_new(void)
Create configuration object.
RD_EXPORT rd_kafka_event_t * rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for an event for max timeout_ms.
RD_EXPORT int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition)
Stop consuming messages for topic rkt and partition, purging all messages currently in the local queu...
RD_EXPORT const char * rd_kafka_err2name(rd_kafka_resp_err_t err)
Returns the error code name (enum name).
RD_EXPORT int rd_kafka_version(void)
Returns the librdkafka version as integer.
Definition: rdkafka.h:350
RD_EXPORT int rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Delete partition from list.
int size
Definition: rdkafka.h:506
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscribe(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
Subscribe to topic set using balanced consumer groups.
RD_EXPORT void rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t *rktpar)
Destroy a rd_kafka_topic_partition_t.
RD_EXPORT void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque)
Sets the application&#39;s opaque pointer that will be passed to callbacks.
Definition: rdkafka.h:759
RD_EXPORT const char ** rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp)
Dump the configuration properties and values of conf to an array with "key", "value" pairs...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_event_error(rd_kafka_event_t *rkev)
Definition: rdkafka.h:271
char * topic
Definition: rdkafka.h:480
RD_EXPORT const char * rd_kafka_get_debug_contexts(void)
Retrieve supported debug contexts for use with the "debug" configuration property. (runtime)
RD_EXPORT void rd_kafka_conf_set_offset_commit_cb(rd_kafka_conf_t *conf, void(*offset_commit_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque))
Consumer: Set offset commit callback for use with consumer groups.
RD_EXPORT void rd_kafka_conf_set_rebalance_cb(rd_kafka_conf_t *conf, void(*rebalance_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque))
Consumer: Set rebalance callback for use with coordinated consumer group balancing.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms)
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
RD_EXPORT int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)
Polls the provided kafka handle for events.
RD_EXPORT int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt, int32_t partition)
Check if partition is available (has a leader broker).
Definition: rdkafka.h:141
Definition: rdkafka.h:320
RD_EXPORT void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf, int(*socket_cb)(int domain, int type, int protocol, void *opaque))
Set socket callback.
Definition: rdkafka.h:142
Definition: rdkafka.h:153
RD_EXPORT rd_kafka_resp_err_t rd_kafka_errno2err(int errnox)
Converts the system errno value errnox to a rd_kafka_resp_err_t error code upon failure from the foll...
Definition: rdkafka.h:239
RD_EXPORT void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events)
Enable event sourcing. events is a bitmask of RD_KAFKA_EVENT_* of events to enable for consumption by...
RD_EXPORT void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, void(*dr_cb)(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque))
Definition: rdkafka.h:235
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf)
Creates a copy/duplicate of topic configuration object conf.
Definition: rdkafka.h:284
RD_EXPORT RD_DEPRECATED void rd_kafka_set_logger(rd_kafka_t *rk, void(*func)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger function.
Definition: rdkafka.h:210
Metadata container.
Definition: rdkafka.h:2388
Definition: rdkafka.h:269
RD_EXPORT rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t *rk)
Unsubscribe from the current subscription set.
RD_EXPORT void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata)
Release metadata memory.
RD_EXPORT int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt)
Produce multiple messages.
RD_EXPORT void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf, int(*open_cb)(const char *pathname, int flags, mode_t mode, void *opaque))
Set open callback.
Definition: rdkafka.h:261
RD_EXPORT rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, int timeout_ms)
Seek consumer for topic+partition to offset which is either an absolute or logical offset...
RD_EXPORT void rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t start, int32_t stop)
Add range of partitions from start to stop inclusive.
RD_EXPORT size_t rd_kafka_event_message_count(rd_kafka_event_t *rkev)
Definition: rdkafka.h:757
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Add topic+partition to list.
Definition: rdkafka.h:233
Definition: rdkafka.h:152
Definition: rdkafka.h:257
RD_EXPORT ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume batch of messages from queue.
rd_kafka_timestamp_type_t
Definition: rdkafka.h:151
RD_EXPORT void * rd_kafka_topic_opaque(const rd_kafka_topic_t *rkt)
Get the rkt_opaque pointer that was set in the topic configuration.
RD_EXPORT void * rd_kafka_event_opaque(rd_kafka_event_t *rkev)
RD_EXPORT const char * rd_kafka_name(const rd_kafka_t *rk)
Returns Kafka handle name.
A Kafka message as returned by the rd_kafka_consume*() family of functions as well as provided to the...
Definition: rdkafka.h:665
Definition: rdkafka.h:245
RD_EXPORT void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu, int fd, const void *payload, size_t size)
Enable IO event triggering for queue.
RD_EXPORT void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt)
Destroy topic handle previously created with rd_kafka_topic_new().
int32_t partition
Definition: rdkafka.h:668
Definition: rdkafka.h:276
RD_EXPORT rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms)
Consume a single message from topic rkt and partition.
Definition: rdkafka.h:249
RD_EXPORT rd_kafka_message_t * rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms)
Poll the consumer for messages or events.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assignment(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions)
Returns the current partition assignment.
Definition: rdkafka.h:225
List of groups.
Definition: rdkafka.h:2480
RD_EXPORT const char * rd_kafka_err2str(rd_kafka_resp_err_t err)
Returns a human readable representation of a kafka error.
void * metadata
Definition: rdkafka.h:483
int64_t offset
Definition: rdkafka.h:482
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_event_topic_partition_list(rd_kafka_event_t *rkev)
Definition: rdkafka.h:208
RD_EXPORT void rd_kafka_conf_destroy(rd_kafka_conf_t *conf)
Destroys a conf object.
Definition: rdkafka.h:267
Broker information.
Definition: rdkafka.h:2355
Error code value, name and description. Typically for use with language bindings to automatically exp...
Definition: rdkafka.h:363
Definition: rdkafka.h:346
Definition: rdkafka.h:326
rd_kafka_resp_err_t err
Definition: rdkafka.h:486
Definition: rdkafka.h:265
Topic+Partition place holder.
Definition: rdkafka.h:479
Definition: rdkafka.h:206
size_t metadata_size
Definition: rdkafka.h:484
RD_EXPORT const char ** rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf, size_t *cntp)
Dump the topic configuration properties and values of conf to an array with "key", "value" pairs.
RD_EXPORT size_t rd_kafka_event_message_array(rd_kafka_event_t *rkev, const rd_kafka_message_t **rkmessages, size_t size)
Extacts size message(s) from the event into the pre-allocated array rkmessages.
rd_kafka_type_t
rd_kafka_t handle type.
Definition: rdkafka.h:140
RD_EXPORT int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Random partitioner.
RD_EXPORT void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, int(*stats_cb)(rd_kafka_t *rk, char *json, size_t json_len, void *opaque))
Set statistics callback in provided conf object.
Definition: rdkafka.h:296
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_find(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Find element by topic and partition.
RD_EXPORT void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf)
Destroys a topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition, int64_t offset)
Set offset to offset for topic and partition.
void * payload
Definition: rdkafka.h:669
RD_EXPORT void rd_kafka_yield(rd_kafka_t *rk)
Cancels the current callback dispatcher (rd_kafka_poll(), rd_kafka_consume_callback(), etc).
Definition: rdkafka.h:286
RD_EXPORT rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk)
Close down the KafkaConsumer.
RD_EXPORT void rd_kafka_conf_properties_show(FILE *fp)
Prints a table to fp of all supported configuration properties, their default values as well as a des...
RD_EXPORT void rd_kafka_conf_dump_free(const char **arr, size_t cnt)
Frees a configuration dump returned from rd_kafka_conf_dump() or `rd_kafka_topic_conf_dump().
void * _private
Definition: rdkafka.h:690
Definition: rdkafka.h:223
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent-Random partitioner.
RD_EXPORT const char * rd_kafka_topic_name(const rd_kafka_topic_t *rkt)
Returns the topic name.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, int timeout_ms)
Consume from queue.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_last_error(void)
Returns the last error code generated by a legacy API call in the current thread. ...
RD_EXPORT int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu)
Same as rd_kafka_consume_start() but re-routes incoming messages to the provided queue rkqu (which mu...
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_new(rd_kafka_t *rk)
Create a new message queue.
RD_EXPORT rd_kafka_topic_t * rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
Creates a new topic handle for topic named topic.