librdkafka
The Apache Kafka C/C++ client library
rdkafka.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
43 /* @cond NO_DOC */
44 #pragma once
45 
46 #include <stdio.h>
47 #include <inttypes.h>
48 #include <sys/types.h>
49 
50 #ifdef __cplusplus
51 extern "C" {
52 #if 0
53 } /* Restore indent */
54 #endif
55 #endif
56 
57 #ifdef _MSC_VER
58 #include <basetsd.h>
59 #ifndef WIN32_MEAN_AND_LEAN
60 #define WIN32_MEAN_AND_LEAN
61 #endif
62 #include <Winsock2.h> /* for sockaddr, .. */
63 typedef SSIZE_T ssize_t;
64 #define RD_UNUSED
65 #define RD_INLINE __inline
66 #define RD_DEPRECATED
67 #undef RD_EXPORT
68 #ifdef LIBRDKAFKA_STATICLIB
69 #define RD_EXPORT
70 #else
71 #ifdef LIBRDKAFKA_EXPORTS
72 #define RD_EXPORT __declspec(dllexport)
73 #else
74 #define RD_EXPORT __declspec(dllimport)
75 #endif
76 #ifndef LIBRDKAFKA_TYPECHECKS
77 #define LIBRDKAFKA_TYPECHECKS 0
78 #endif
79 #endif
80 
81 #else
82 #include <sys/socket.h> /* for sockaddr, .. */
83 
84 #define RD_UNUSED __attribute__((unused))
85 #define RD_INLINE inline
86 #define RD_EXPORT
87 #define RD_DEPRECATED __attribute__((deprecated))
88 
89 #ifndef LIBRDKAFKA_TYPECHECKS
90 #define LIBRDKAFKA_TYPECHECKS 1
91 #endif
92 #endif
93 
94 
100 #if LIBRDKAFKA_TYPECHECKS
101 #define _LRK_TYPECHECK(RET,TYPE,ARG) \
102  ({ if (0) { TYPE __t RD_UNUSED = (ARG); } RET; })
103 
104 #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \
105  ({ \
106  if (0) { \
107  TYPE __t RD_UNUSED = (ARG); \
108  TYPE2 __t2 RD_UNUSED = (ARG2); \
109  } \
110  RET; })
111 #else
112 #define _LRK_TYPECHECK(RET,TYPE,ARG) (RET)
113 #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) (RET)
114 #endif
115 
116 /* @endcond */
117 
118 
140 #define RD_KAFKA_VERSION 0x000905ff
141 
150 RD_EXPORT
151 int rd_kafka_version(void);
152 
158 RD_EXPORT
159 const char *rd_kafka_version_str (void);
160 
179 typedef enum rd_kafka_type_t {
183 
184 
195 
196 
197 
204 RD_EXPORT
205 const char *rd_kafka_get_debug_contexts(void);
206 
214 #define RD_KAFKA_DEBUG_CONTEXTS \
215  "all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature"
216 
217 
218 /* @cond NO_DOC */
219 /* Private types to provide ABI compatibility */
220 typedef struct rd_kafka_s rd_kafka_t;
221 typedef struct rd_kafka_topic_s rd_kafka_topic_t;
222 typedef struct rd_kafka_conf_s rd_kafka_conf_t;
223 typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
224 typedef struct rd_kafka_queue_s rd_kafka_queue_t;
225 /* @endcond */
226 
227 
240 typedef enum {
241  /* Internal errors to rdkafka: */
317 
320 
321  /* Kafka broker errors: */
412 
413  RD_KAFKA_RESP_ERR_END_ALL,
415 
416 
424  const char *name;
425  const char *desc;
426 };
427 
428 
432 RD_EXPORT
433 void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
434  size_t *cntp);
435 
436 
437 
438 
444 RD_EXPORT
445 const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
446 
447 
448 
454 RD_EXPORT
455 const char *rd_kafka_err2name (rd_kafka_resp_err_t err);
456 
457 
480 RD_EXPORT
482 
483 
505 RD_EXPORT
507 
508 
518 RD_EXPORT
519 int rd_kafka_errno (void);
520 
521 
522 
538 typedef struct rd_kafka_topic_partition_s {
539  char *topic;
540  int32_t partition;
541  int64_t offset;
542  void *metadata;
543  size_t metadata_size;
544  void *opaque;
546  void *_private;
549 
550 
555 RD_EXPORT
557 
558 
563 typedef struct rd_kafka_topic_partition_list_s {
564  int cnt;
565  int size;
568 
569 
584 RD_EXPORT
586 
587 
591 RD_EXPORT
592 void
594 
604 RD_EXPORT
607  const char *topic, int32_t partition);
608 
609 
618 RD_EXPORT
619 void
621  *rktparlist,
622  const char *topic,
623  int32_t start, int32_t stop);
624 
625 
626 
638 RD_EXPORT
639 int
641  const char *topic, int32_t partition);
642 
643 
651 RD_EXPORT
652 int
655  int idx);
656 
657 
665 RD_EXPORT
668 
669 
670 
671 
679 RD_EXPORT
682  const char *topic, int32_t partition, int64_t offset);
683 
684 
685 
691 RD_EXPORT
694  const char *topic, int32_t partition);
695 
696 
704 RD_EXPORT void
706  int (*cmp) (const void *a, const void *b,
707  void *opaque),
708  void *opaque);
709 
710 
728 typedef enum rd_kafka_vtype_t {
739 
740 
749 #define RD_KAFKA_V_END RD_KAFKA_VTYPE_END
750 
754 #define RD_KAFKA_V_TOPIC(topic) \
755  _LRK_TYPECHECK(RD_KAFKA_VTYPE_TOPIC, const char *, topic), \
756  (const char *)topic
757 
760 #define RD_KAFKA_V_RKT(rkt) \
761  _LRK_TYPECHECK(RD_KAFKA_VTYPE_RKT, rd_kafka_topic_t *, rkt), \
762  (rd_kafka_topic_t *)rkt
763 
766 #define RD_KAFKA_V_PARTITION(partition) \
767  _LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \
768  (int32_t)partition
769 
772 #define RD_KAFKA_V_VALUE(VALUE,LEN) \
773  _LRK_TYPECHECK2(RD_KAFKA_VTYPE_VALUE, void *, VALUE, size_t, LEN), \
774  (void *)VALUE, (size_t)LEN
775 
778 #define RD_KAFKA_V_KEY(KEY,LEN) \
779  _LRK_TYPECHECK2(RD_KAFKA_VTYPE_KEY, const void *, KEY, size_t, LEN), \
780  (void *)KEY, (size_t)LEN
781 
784 #define RD_KAFKA_V_OPAQUE(opaque) \
785  _LRK_TYPECHECK(RD_KAFKA_VTYPE_OPAQUE, void *, opaque), \
786  (void *)opaque
787 
791 #define RD_KAFKA_V_MSGFLAGS(msgflags) \
792  _LRK_TYPECHECK(RD_KAFKA_VTYPE_MSGFLAGS, int, msgflags), \
793  (int)msgflags
794 
797 #define RD_KAFKA_V_TIMESTAMP(timestamp) \
798  _LRK_TYPECHECK(RD_KAFKA_VTYPE_TIMESTAMP, int64_t, timestamp), \
799  (int64_t)timestamp
800 
812 // FIXME: This doesn't show up in docs for some reason
813 // "Compound rd_kafka_message_t is not documented."
814 
828 typedef struct rd_kafka_message_s {
830  rd_kafka_topic_t *rkt;
831  int32_t partition;
832  void *payload;
836  size_t len;
839  void *key;
841  size_t key_len;
843  int64_t offset;
853  void *_private;
858 
859 
863 RD_EXPORT
865 
866 
867 
868 
873 static RD_INLINE const char *
874 RD_UNUSED
876  if (!rkmessage->err)
877  return NULL;
878 
879  if (rkmessage->payload)
880  return (const char *)rkmessage->payload;
881 
882  return rd_kafka_err2str(rkmessage->err);
883 }
884 
885 
886 
898 RD_EXPORT
899 int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
900  rd_kafka_timestamp_type_t *tstype);
901 
902 
903 
919 typedef enum {
924 
925 
956 RD_EXPORT
957 rd_kafka_conf_t *rd_kafka_conf_new(void);
958 
959 
963 RD_EXPORT
964 void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
965 
966 
970 RD_EXPORT
971 rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
972 
973 
990 RD_EXPORT
991 rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
992  const char *name,
993  const char *value,
994  char *errstr, size_t errstr_size);
995 
996 
1002 RD_EXPORT
1003 void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events);
1004 
1005 
1009 RD_EXPORT
1010 void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
1011  void (*dr_cb) (rd_kafka_t *rk,
1012  void *payload, size_t len,
1013  rd_kafka_resp_err_t err,
1014  void *opaque, void *msg_opaque));
1015 
1030 RD_EXPORT
1031 void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
1032  void (*dr_msg_cb) (rd_kafka_t *rk,
1033  const rd_kafka_message_t *
1034  rkmessage,
1035  void *opaque));
1036 
1037 
1042 RD_EXPORT
1043 void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
1044  void (*consume_cb) (rd_kafka_message_t *
1045  rkmessage,
1046  void *opaque));
1047 
1107 RD_EXPORT
1109  rd_kafka_conf_t *conf,
1110  void (*rebalance_cb) (rd_kafka_t *rk,
1111  rd_kafka_resp_err_t err,
1112  rd_kafka_topic_partition_list_t *partitions,
1113  void *opaque));
1114 
1115 
1116 
1131 RD_EXPORT
1133  rd_kafka_conf_t *conf,
1134  void (*offset_commit_cb) (rd_kafka_t *rk,
1135  rd_kafka_resp_err_t err,
1137  void *opaque));
1138 
1139 
1148 RD_EXPORT
1149 void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
1150  void (*error_cb) (rd_kafka_t *rk, int err,
1151  const char *reason,
1152  void *opaque));
1153 
1168 RD_EXPORT
1169 void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
1170  void (*throttle_cb) (
1171  rd_kafka_t *rk,
1172  const char *broker_name,
1173  int32_t broker_id,
1174  int throttle_time_ms,
1175  void *opaque));
1176 
1177 
1194 RD_EXPORT
1195 void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
1196  void (*log_cb) (const rd_kafka_t *rk, int level,
1197  const char *fac, const char *buf));
1198 
1199 
1216 RD_EXPORT
1217 void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
1218  int (*stats_cb) (rd_kafka_t *rk,
1219  char *json,
1220  size_t json_len,
1221  void *opaque));
1222 
1223 
1224 
1239 RD_EXPORT
1240 void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf,
1241  int (*socket_cb) (int domain, int type,
1242  int protocol,
1243  void *opaque));
1244 
1245 
1246 
1259 RD_EXPORT void
1260 rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
1261  int (*connect_cb) (int sockfd,
1262  const struct sockaddr *addr,
1263  int addrlen,
1264  const char *id,
1265  void *opaque));
1266 
1274 RD_EXPORT void
1275 rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
1276  int (*closesocket_cb) (int sockfd,
1277  void *opaque));
1278 
1279 
1280 
1281 #ifndef _MSC_VER
1282 
1296 RD_EXPORT
1297 void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
1298  int (*open_cb) (const char *pathname,
1299  int flags, mode_t mode,
1300  void *opaque));
1301 #endif
1302 
1306 RD_EXPORT
1307 void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque);
1308 
1312 RD_EXPORT
1313 void *rd_kafka_opaque(const rd_kafka_t *rk);
1314 
1315 
1316 
1322 RD_EXPORT
1323 void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
1324  rd_kafka_topic_conf_t *tconf);
1325 
1326 
1327 
1347 RD_EXPORT
1348 rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
1349  const char *name,
1350  char *dest, size_t *dest_size);
1351 
1352 
1358 RD_EXPORT
1359 rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
1360  const char *name,
1361  char *dest, size_t *dest_size);
1362 
1363 
1372 RD_EXPORT
1373 const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp);
1374 
1375 
1384 RD_EXPORT
1385 const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
1386  size_t *cntp);
1387 
1392 RD_EXPORT
1393 void rd_kafka_conf_dump_free(const char **arr, size_t cnt);
1394 
1399 RD_EXPORT
1400 void rd_kafka_conf_properties_show(FILE *fp);
1401 
1419 RD_EXPORT
1420 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);
1421 
1422 
1426 RD_EXPORT
1427 rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
1428  *conf);
1429 
1430 
1434 RD_EXPORT
1435 void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);
1436 
1437 
1446 RD_EXPORT
1447 rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
1448  const char *name,
1449  const char *value,
1450  char *errstr, size_t errstr_size);
1451 
1456 RD_EXPORT
1457 void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque);
1458 
1459 
1474 RD_EXPORT
1475 void
1476 rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
1477  int32_t (*partitioner) (
1478  const rd_kafka_topic_t *rkt,
1479  const void *keydata,
1480  size_t keylen,
1481  int32_t partition_cnt,
1482  void *rkt_opaque,
1483  void *msg_opaque));
1484 
1492 RD_EXPORT
1493 int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt,
1494  int32_t partition);
1495 
1496 
1497 /*******************************************************************
1498  * *
1499  * Partitioners provided by rdkafka *
1500  * *
1501  *******************************************************************/
1502 
1511 RD_EXPORT
1512 int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
1513  const void *key, size_t keylen,
1514  int32_t partition_cnt,
1515  void *opaque, void *msg_opaque);
1516 
1525 RD_EXPORT
1526 int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
1527  const void *key, size_t keylen,
1528  int32_t partition_cnt,
1529  void *opaque, void *msg_opaque);
1530 
1541 RD_EXPORT
1542 int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
1543  const void *key, size_t keylen,
1544  int32_t partition_cnt,
1545  void *opaque, void *msg_opaque);
1546 
1547 
1588 RD_EXPORT
1589 rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
1590  char *errstr, size_t errstr_size);
1591 
1592 
1598 RD_EXPORT
1599 void rd_kafka_destroy(rd_kafka_t *rk);
1600 
1601 
1602 
1606 RD_EXPORT
1607 const char *rd_kafka_name(const rd_kafka_t *rk);
1608 
1609 
1620 RD_EXPORT
1621 char *rd_kafka_memberid (const rd_kafka_t *rk);
1622 
1623 
1645 RD_EXPORT
1646 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
1647  rd_kafka_topic_conf_t *conf);
1648 
1649 
1650 
1659 RD_EXPORT
1660 void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
1661 
1662 
1666 RD_EXPORT
1667 const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt);
1668 
1669 
1673 RD_EXPORT
1674 void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt);
1675 
1676 
1683 #define RD_KAFKA_PARTITION_UA ((int32_t)-1)
1684 
1685 
1707 RD_EXPORT
1708 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
1709 
1710 
1721 RD_EXPORT
1722 void rd_kafka_yield (rd_kafka_t *rk);
1723 
1724 
1725 
1726 
1734 RD_EXPORT rd_kafka_resp_err_t
1735 rd_kafka_pause_partitions (rd_kafka_t *rk,
1736  rd_kafka_topic_partition_list_t *partitions);
1737 
1738 
1739 
1747 RD_EXPORT rd_kafka_resp_err_t
1748 rd_kafka_resume_partitions (rd_kafka_t *rk,
1749  rd_kafka_topic_partition_list_t *partitions);
1750 
1751 
1752 
1753 
1762 RD_EXPORT rd_kafka_resp_err_t
1763 rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
1764  const char *topic, int32_t partition,
1765  int64_t *low, int64_t *high, int timeout_ms);
1766 
1767 
1784 RD_EXPORT rd_kafka_resp_err_t
1785 rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
1786  const char *topic, int32_t partition,
1787  int64_t *low, int64_t *high);
1788 
1789 
1790 
1809 RD_EXPORT rd_kafka_resp_err_t
1810 rd_kafka_offsets_for_times (rd_kafka_t *rk,
1812  int timeout_ms);
1813 
1814 
1828 RD_EXPORT
1829 void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr);
1830 
1831 
1855 RD_EXPORT
1856 rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
1857 
1861 RD_EXPORT
1862 void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
1863 
1864 
1871 RD_EXPORT
1872 rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk);
1873 
1874 
1884 RD_EXPORT
1885 rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk);
1886 
1897 RD_EXPORT
1898 rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
1899  const char *topic,
1900  int32_t partition);
1901 
1912 RD_EXPORT
1913 void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst);
1914 
1931 RD_EXPORT
1933  rd_kafka_queue_t *rkqu);
1934 
1935 
1939 RD_EXPORT
1940 size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu);
1941 
1942 
1958 RD_EXPORT
1959 void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
1960  const void *payload, size_t size);
1961 
1972 #define RD_KAFKA_OFFSET_BEGINNING -2
1974 #define RD_KAFKA_OFFSET_END -1
1976 #define RD_KAFKA_OFFSET_STORED -1000
1978 #define RD_KAFKA_OFFSET_INVALID -1001
1982 #define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
1983 
1990 #define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))
1991 
2025 RD_EXPORT
2026 int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
2027  int64_t offset);
2028 
2043 RD_EXPORT
2044 int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
2045  int64_t offset, rd_kafka_queue_t *rkqu);
2046 
2060 RD_EXPORT
2061 int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
2062 
2063 
2064 
2079 RD_EXPORT
2080 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
2081  int32_t partition,
2082  int64_t offset,
2083  int timeout_ms);
2084 
2085 
2107 RD_EXPORT
2108 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
2109  int timeout_ms);
2110 
2111 
2112 
2135 RD_EXPORT
2136 ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
2137  int timeout_ms,
2138  rd_kafka_message_t **rkmessages,
2139  size_t rkmessages_size);
2140 
2141 
2142 
2163 RD_EXPORT
2164 int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
2165  int timeout_ms,
2166  void (*consume_cb) (rd_kafka_message_t
2167  *rkmessage,
2168  void *opaque),
2169  void *opaque);
2170 
2171 
2188 RD_EXPORT
2189 rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
2190  int timeout_ms);
2191 
2197 RD_EXPORT
2198 ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
2199  int timeout_ms,
2200  rd_kafka_message_t **rkmessages,
2201  size_t rkmessages_size);
2202 
2208 RD_EXPORT
2209 int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
2210  int timeout_ms,
2211  void (*consume_cb) (rd_kafka_message_t
2212  *rkmessage,
2213  void *opaque),
2214  void *opaque);
2215 
2216 
2242 RD_EXPORT
2243 rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
2244  int32_t partition, int64_t offset);
2271 RD_EXPORT rd_kafka_resp_err_t
2272 rd_kafka_subscribe (rd_kafka_t *rk,
2273  const rd_kafka_topic_partition_list_t *topics);
2274 
2275 
2279 RD_EXPORT
2280 rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);
2281 
2282 
2292 RD_EXPORT rd_kafka_resp_err_t
2293 rd_kafka_subscription (rd_kafka_t *rk,
2295 
2296 
2297 
2316 RD_EXPORT
2317 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
2318 
2334 RD_EXPORT
2336 
2337 
2338 
2352 RD_EXPORT rd_kafka_resp_err_t
2353 rd_kafka_assign (rd_kafka_t *rk,
2354  const rd_kafka_topic_partition_list_t *partitions);
2355 
2365 RD_EXPORT rd_kafka_resp_err_t
2366 rd_kafka_assignment (rd_kafka_t *rk,
2367  rd_kafka_topic_partition_list_t **partitions);
2368 
2369 
2370 
2371 
2386 RD_EXPORT rd_kafka_resp_err_t
2387 rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
2388  int async);
2389 
2390 
2396 RD_EXPORT rd_kafka_resp_err_t
2397 rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
2398  int async);
2399 
2400 
2422 RD_EXPORT rd_kafka_resp_err_t
2423 rd_kafka_commit_queue (rd_kafka_t *rk,
2424  const rd_kafka_topic_partition_list_t *offsets,
2425  rd_kafka_queue_t *rkqu,
2426  void (*cb) (rd_kafka_t *rk,
2427  rd_kafka_resp_err_t err,
2429  void *opaque),
2430  void *opaque);
2431 
2432 
2445 RD_EXPORT rd_kafka_resp_err_t
2446 rd_kafka_committed (rd_kafka_t *rk,
2447  rd_kafka_topic_partition_list_t *partitions,
2448  int timeout_ms);
2449 
2450 
2451 
2464 RD_EXPORT rd_kafka_resp_err_t
2465 rd_kafka_position (rd_kafka_t *rk,
2466  rd_kafka_topic_partition_list_t *partitions);
2467 
2468 
2484 #define RD_KAFKA_MSG_F_FREE 0x1
2485 #define RD_KAFKA_MSG_F_COPY 0x2
2486 #define RD_KAFKA_MSG_F_BLOCK 0x4
2562 RD_EXPORT
2563 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
2564  int msgflags,
2565  void *payload, size_t len,
2566  const void *key, size_t keylen,
2567  void *msg_opaque);
2568 
2569 
2580 RD_EXPORT
2581 rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...);
2582 
2583 
2605 RD_EXPORT
2606 int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
2607  int msgflags,
2608  rd_kafka_message_t *rkmessages, int message_cnt);
2609 
2610 
2611 
2612 
2624 RD_EXPORT
2625 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
2626 
2627 
2642 typedef struct rd_kafka_metadata_broker {
2643  int32_t id;
2644  char *host;
2645  int port;
2647 
2651 typedef struct rd_kafka_metadata_partition {
2652  int32_t id;
2653  rd_kafka_resp_err_t err;
2654  int32_t leader;
2655  int replica_cnt;
2656  int32_t *replicas;
2657  int isr_cnt;
2658  int32_t *isrs;
2660 
2664 typedef struct rd_kafka_metadata_topic {
2665  char *topic;
2666  int partition_cnt;
2667  struct rd_kafka_metadata_partition *partitions;
2675 typedef struct rd_kafka_metadata {
2676  int broker_cnt;
2677  struct rd_kafka_metadata_broker *brokers;
2679  int topic_cnt;
2680  struct rd_kafka_metadata_topic *topics;
2682  int32_t orig_broker_id;
2683  char *orig_broker_name;
2685 
2686 
2703 RD_EXPORT
2705 rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
2706  rd_kafka_topic_t *only_rkt,
2707  const struct rd_kafka_metadata **metadatap,
2708  int timeout_ms);
2709 
2713 RD_EXPORT
2714 void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
2715 
2716 
2737  char *member_id;
2738  char *client_id;
2739  char *client_host;
2740  void *member_metadata;
2742  int member_metadata_size;
2743  void *member_assignment;
2745  int member_assignment_size;
2746 };
2747 
2752  struct rd_kafka_metadata_broker broker;
2753  char *group;
2755  char *state;
2756  char *protocol_type;
2757  char *protocol;
2758  struct rd_kafka_group_member_info *members;
2759  int member_cnt;
2760 };
2761 
2768  struct rd_kafka_group_info *groups;
2769  int group_cnt;
2770 };
2789 RD_EXPORT
2791 rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
2792  const struct rd_kafka_group_list **grplistp,
2793  int timeout_ms);
2794 
2798 RD_EXPORT
2799 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);
2800 
2801 
2842 RD_EXPORT
2843 int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
2844 
2845 
2846 
2847 
2860 RD_EXPORT RD_DEPRECATED
2861 void rd_kafka_set_logger(rd_kafka_t *rk,
2862  void (*func) (const rd_kafka_t *rk, int level,
2863  const char *fac, const char *buf));
2864 
2865 
2873 RD_EXPORT
2874 void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
2875 
2876 
2880 RD_EXPORT
2881 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
2882  const char *fac, const char *buf);
2883 
2884 
2888 RD_EXPORT
2889 void rd_kafka_log_syslog(const rd_kafka_t *rk, int level,
2890  const char *fac, const char *buf);
2891 
2892 
2905 RD_EXPORT
2906 int rd_kafka_outq_len(rd_kafka_t *rk);
2907 
2908 
2909 
2916 RD_EXPORT
2917 void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
2918 
2919 
2920 
2926 RD_EXPORT
2927 int rd_kafka_thread_cnt(void);
2928 
2929 
2939 RD_EXPORT
2940 int rd_kafka_wait_destroyed(int timeout_ms);
2941 
2942 
2960 RD_EXPORT
2962 
2963 
2979 typedef int rd_kafka_event_type_t;
2980 #define RD_KAFKA_EVENT_NONE 0x0
2981 #define RD_KAFKA_EVENT_DR 0x1
2982 #define RD_KAFKA_EVENT_FETCH 0x2
2983 #define RD_KAFKA_EVENT_LOG 0x4
2984 #define RD_KAFKA_EVENT_ERROR 0x8
2985 #define RD_KAFKA_EVENT_REBALANCE 0x10
2986 #define RD_KAFKA_EVENT_OFFSET_COMMIT 0x20
2989 typedef struct rd_kafka_op_s rd_kafka_event_t;
2990 
2991 
2998 RD_EXPORT
2999 rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev);
3007 RD_EXPORT
3008 const char *rd_kafka_event_name (const rd_kafka_event_t *rkev);
3009 
3010 
3020 RD_EXPORT
3021 void rd_kafka_event_destroy (rd_kafka_event_t *rkev);
3022 
3023 
3036 RD_EXPORT
3037 const rd_kafka_message_t *rd_kafka_event_message_next (rd_kafka_event_t *rkev);
3038 
3039 
3050 RD_EXPORT
3051 size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
3052  const rd_kafka_message_t **rkmessages,
3053  size_t size);
3054 
3055 
3063 RD_EXPORT
3064 size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev);
3065 
3066 
3073 RD_EXPORT
3074 rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev);
3075 
3076 
3085 RD_EXPORT
3086 const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev);
3087 
3088 
3089 
3096 RD_EXPORT
3097 void *rd_kafka_event_opaque (rd_kafka_event_t *rkev);
3098 
3099 
3108 RD_EXPORT
3109 int rd_kafka_event_log (rd_kafka_event_t *rkev,
3110  const char **fac, const char **str, int *level);
3111 
3112 
3123 rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev);
3124 
3125 
3135 RD_EXPORT rd_kafka_topic_partition_t *
3136 rd_kafka_event_topic_partition (rd_kafka_event_t *rkev);
3137 
3138 
3146 RD_EXPORT
3147 rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms);
3148 
3157 RD_EXPORT
3158 int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms);
3159 
3160 
3163 #ifdef __cplusplus
3164 }
3165 #endif
void * _private
Definition: rdkafka.h:546
rd_kafka_resp_err_t
Error codes.
Definition: rdkafka.h:240
rd_kafka_topic_t * rkt
Definition: rdkafka.h:830
Definition: rdkafka.h:325
RD_EXPORT int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Start consuming messages for topic rkt and partition at offset offset which may either be an absolute...
Definition: rdkafka.h:302
rd_kafka_resp_err_t err
Definition: rdkafka.h:829
rd_kafka_conf_res_t
Configuration result type.
Definition: rdkafka.h:919
RD_EXPORT char * rd_kafka_memberid(const rd_kafka_t *rk)
Returns this client&#39;s broker-assigned group member id.
RD_EXPORT void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst)
Forward/re-route queue src to dst. If dst is NULL the forwarding is removed.
int cnt
Definition: rdkafka.h:564
RD_EXPORT int rd_kafka_thread_cnt(void)
Retrieve the current number of threads in use by librdkafka.
RD_EXPORT void rd_kafka_conf_set_consume_cb(rd_kafka_conf_t *conf, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque))
Consumer: Set consume callback for use with rd_kafka_consumer_poll()
rd_kafka_topic_partition_t * elems
Definition: rdkafka.h:566
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **topics)
Returns the current topic subscription.
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent partitioner.
RD_EXPORT void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf, void(*dr_msg_cb)(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque))
Producer: Set delivery report callback in provided conf object.
RD_EXPORT const char * rd_kafka_event_name(const rd_kafka_event_t *rkev)
Definition: rdkafka.h:737
RD_EXPORT void rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t *rkparlist)
Free all resources used by the list and the list itself.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_resume_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Resume producing consumption for the provided list of partitions.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a single rd_kafka_topic_conf_t value by property name.
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t *src)
Make a copy of an existing list.
RD_EXPORT void rd_kafka_topic_conf_set_partitioner_cb(rd_kafka_topic_conf_t *topic_conf, int32_t(*partitioner)(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque))
Producer: Set partitioner callback in provided topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk)
Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer&#39;s queue (rd_kafka_consumer_poll()).
RD_EXPORT void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage)
Frees resources for rkmessage and hands ownership back to rdkafka.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve topic configuration value for property name.
Definition: rdkafka.h:276
Definition: rdkafka.h:395
Definition: rdkafka.h:730
RD_EXPORT rd_kafka_resp_err_t rd_kafka_position(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Retrieve current positions (offsets) for topics+partitions.
Definition: rdkafka.h:290
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_consumer(rd_kafka_t *rk)
size_t key_len
Definition: rdkafka.h:841
Definition: rdkafka.h:270
RD_EXPORT rd_kafka_event_type_t rd_kafka_event_type(const rd_kafka_event_t *rkev)
Definition: rdkafka.h:921
RD_EXPORT rd_kafka_t * rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
Creates a new Kafka handle and starts its operation according to the specified type (RD_KAFKA_CONSUME...
RD_EXPORT void rd_kafka_get_err_descs(const struct rd_kafka_err_desc **errdescs, size_t *cntp)
Returns the full list of error codes.
Group information.
Definition: rdkafka.h:2765
char * group
Definition: rdkafka.h:2767
Partition information.
Definition: rdkafka.h:2665
Definition: rdkafka.h:259
RD_EXPORT int rd_kafka_event_log(rd_kafka_event_t *rkev, const char **fac, const char **str, int *level)
Extract log message from the event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offsets_for_times(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *offsets, int timeout_ms)
Look up the offsets for the given partitions by timestamp.
RD_EXPORT const char * rd_kafka_version_str(void)
Returns the librdkafka version as string.
RD_EXPORT void * rd_kafka_opaque(const rd_kafka_t *rk)
Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()
const char * name
Definition: rdkafka.h:424
Definition: rdkafka.h:266
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_new(int size)
Create a new list/vector Topic+Partition container.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async)
Commit message&#39;s offset on broker for the message&#39;s partition.
RD_EXPORT ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume up to rkmessages_size from topic rkt and partition putting a pointer to each message in the a...
Definition: rdkafka.h:735
RD_EXPORT void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque)
Sets the application&#39;s opaque pointer that will be passed to all topic callbacks as the rkt_opaque ar...
Definition: rdkafka.h:286
RD_EXPORT void rd_kafka_log_print(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin (default) log sink: print to stderr.
Definition: rdkafka.h:294
RD_EXPORT int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consume multiple messages from queue with callback.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a configuration property.
RD_EXPORT int rd_kafka_outq_len(rd_kafka_t *rk)
Returns the current out queue length.
Definition: rdkafka.h:312
rd_kafka_vtype_t
Var-arg tag types.
Definition: rdkafka.h:728
RD_EXPORT const rd_kafka_message_t * rd_kafka_event_message_next(rd_kafka_event_t *rkev)
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Store offset offset for topic rkt partition partition.
RD_EXPORT void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, void(*error_cb)(rd_kafka_t *rk, int err, const char *reason, void *opaque))
Set error callback in provided conf object.
size_t len
Definition: rdkafka.h:836
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async)
Commit offsets on broker for the provided list of partitions.
RD_EXPORT void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf)
Definition: rdkafka.h:373
RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
Produce and send a single message to broker.
Definition: rdkafka.h:280
RD_EXPORT const char * rd_kafka_event_error_string(rd_kafka_event_t *rkev)
Definition: rdkafka.h:375
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_event_topic_partition(rd_kafka_event_t *rkev)
RD_EXPORT void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu)
RD_EXPORT size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu)
Definition: rdkafka.h:298
rd_kafka_resp_err_t code
Definition: rdkafka.h:423
RD_EXPORT void rd_kafka_destroy(rd_kafka_t *rk)
Destroy Kafka handle.
RD_EXPORT void rd_kafka_set_log_level(rd_kafka_t *rk, int level)
Specifies the maximum logging level produced by internal kafka logging and debugging.
RD_EXPORT int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consumes messages from topic rkt and partition, calling the provided callback for each consumed messs...
Definition: rdkafka.h:736
RD_EXPORT rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms)
Request Metadata from broker.
RD_EXPORT int rd_kafka_wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
Definition: rdkafka.h:359
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_queue(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, rd_kafka_queue_t *rkqu, void(*cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque), void *opaque)
Commit offsets on broker for the provided list of partitions.
Definition: rdkafka.h:347
int64_t offset
Definition: rdkafka.h:843
Definition: rdkafka.h:351
RD_EXPORT int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
Adds one or more brokers to the kafka handle&#39;s list of initial bootstrap brokers. ...
RD_EXPORT int rd_kafka_errno(void)
Returns the thread-local system errno.
Definition: rdkafka.h:333
Definition: rdkafka.h:255
Definition: rdkafka.h:323
RD_EXPORT void rd_kafka_log_syslog(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin log sink: print to syslog.
Definition: rdkafka.h:243
Group member information.
Definition: rdkafka.h:2750
void * key
Definition: rdkafka.h:839
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve configuration value for property name.
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_new(void)
Create topic configuration object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high)
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT void rd_kafka_dump(FILE *fp, rd_kafka_t *rk)
Dumps rdkafka&#39;s internal state for handle rk to stream fp.
Definition: rdkafka.h:732
Definition: rdkafka.h:268
RD_EXPORT void rd_kafka_conf_set_closesocket_cb(rd_kafka_conf_t *conf, int(*closesocket_cb)(int sockfd, void *opaque))
Set close socket callback.
A growable list of Topic+Partitions.
Definition: rdkafka.h:563
int rd_kafka_event_type_t
Event types.
Definition: rdkafka.h:2993
Topic information.
Definition: rdkafka.h:2678
RD_EXPORT void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf, void(*throttle_cb)(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque))
Set throttle callback.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_main(rd_kafka_t *rk)
RD_EXPORT void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, void(*log_cb)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger callback.
Definition: rdkafka.h:257
RD_EXPORT rd_kafka_resp_err_t rd_kafka_list_groups(rd_kafka_t *rk, const char *group, const struct rd_kafka_group_list **grplistp, int timeout_ms)
List and describe client groups in cluster.
Definition: rdkafka.h:731
int32_t partition
Definition: rdkafka.h:540
Definition: rdkafka.h:253
Definition: rdkafka.h:282
Definition: rdkafka.h:251
void * opaque
Definition: rdkafka.h:544
const char * desc
Definition: rdkafka.h:425
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)
Atomic assignment of partitions to consume.
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup(const rd_kafka_conf_t *conf)
Creates a copy/duplicate of configuration object conf.
RD_EXPORT int64_t rd_kafka_message_timestamp(const rd_kafka_message_t *rkmessage, rd_kafka_timestamp_type_t *tstype)
Returns the message timestamp for a consumed message.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_partition(rd_kafka_t *rk, const char *topic, int32_t partition)
RD_EXPORT int rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t *rktparlist, int idx)
Delete partition from list by elems[] index.
Definition: rdkafka.h:345
RD_EXPORT void rd_kafka_conf_set_connect_cb(rd_kafka_conf_t *conf, int(*connect_cb)(int sockfd, const struct sockaddr *addr, int addrlen, const char *id, void *opaque))
Set connect callback.
RD_EXPORT void rd_kafka_mem_free(rd_kafka_t *rk, void *ptr)
Free pointer returned by librdkafka.
static RD_INLINE const char *RD_UNUSED rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage)
Returns the error string for an errored rd_kafka_message_t or NULL if there was no error...
Definition: rdkafka.h:875
RD_EXPORT rd_kafka_resp_err_t rd_kafka_pause_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Pause producing or consumption for the provided list of partitions.
RD_EXPORT void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist)
Release list memory.
Definition: rdkafka.h:405
RD_EXPORT void rd_kafka_event_destroy(rd_kafka_event_t *rkev)
Destroy an event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_committed(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
Retrieve committed offsets for topics+partitions.
Definition: rdkafka.h:193
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_new(void)
Create configuration object.
RD_EXPORT rd_kafka_event_t * rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for an event for max timeout_ms.
RD_EXPORT int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition)
Stop consuming messages for topic rkt and partition, purging all messages currently in the local queu...
RD_EXPORT const char * rd_kafka_err2name(rd_kafka_resp_err_t err)
Returns the error code name (enum name).
RD_EXPORT int rd_kafka_version(void)
Returns the librdkafka version as integer.
Definition: rdkafka.h:393
RD_EXPORT int rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Delete partition from list.
int size
Definition: rdkafka.h:565
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscribe(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
Subscribe to topic set using balanced consumer groups.
RD_EXPORT void rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t *rktpar)
Destroy a rd_kafka_topic_partition_t.
RD_EXPORT void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque)
Sets the application&#39;s opaque pointer that will be passed to callbacks.
Definition: rdkafka.h:922
RD_EXPORT const char ** rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp)
Dump the configuration properties and values of conf to an array with "key", "value" pairs...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_event_error(rd_kafka_event_t *rkev)
Definition: rdkafka.h:310
char * topic
Definition: rdkafka.h:539
RD_EXPORT const char * rd_kafka_get_debug_contexts(void)
Retrieve supported debug contexts for use with the "debug" configuration property. (runtime)
Definition: rdkafka.h:316
RD_EXPORT void rd_kafka_conf_set_offset_commit_cb(rd_kafka_conf_t *conf, void(*offset_commit_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque))
Consumer: Set offset commit callback for use with consumer groups.
RD_EXPORT void rd_kafka_conf_set_rebalance_cb(rd_kafka_conf_t *conf, void(*rebalance_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque))
Consumer: Set rebalance callback for use with coordinated consumer group balancing.
Definition: rdkafka.h:729
RD_EXPORT int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for events served through callbacks for max timeout_ms.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms)
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
RD_EXPORT int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)
Polls the provided kafka handle for events.
RD_EXPORT int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt, int32_t partition)
Check if partition is available (has a leader broker).
Definition: rdkafka.h:180
Definition: rdkafka.h:409
Definition: rdkafka.h:363
RD_EXPORT void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf, int(*socket_cb)(int domain, int type, int protocol, void *opaque))
Set socket callback.
Definition: rdkafka.h:181
Definition: rdkafka.h:192
RD_EXPORT rd_kafka_resp_err_t rd_kafka_errno2err(int errnox)
Converts the system errno value errnox to a rd_kafka_resp_err_t error code upon failure from the foll...
Definition: rdkafka.h:278
RD_EXPORT void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events)
Enable event sourcing. events is a bitmask of RD_KAFKA_EVENT_* of events to enable for consumption by...
RD_EXPORT void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, void(*dr_cb)(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque))
Definition: rdkafka.h:274
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf)
Creates a copy/duplicate of topic configuration object conf.
Definition: rdkafka.h:327
RD_EXPORT RD_DEPRECATED void rd_kafka_set_logger(rd_kafka_t *rk, void(*func)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger function.
Definition: rdkafka.h:249
Metadata container.
Definition: rdkafka.h:2689
Definition: rdkafka.h:308
RD_EXPORT rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t *rk)
Unsubscribe from the current subscription set.
RD_EXPORT void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata)
Release metadata memory.
RD_EXPORT int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt)
Produce multiple messages.
RD_EXPORT void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf, int(*open_cb)(const char *pathname, int flags, mode_t mode, void *opaque))
Set open callback.
Definition: rdkafka.h:300
RD_EXPORT rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk,...)
Produce and send a single message to broker.
Definition: rdkafka.h:407
RD_EXPORT rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, int timeout_ms)
Seek consumer for topic+partition to offset which is either an absolute or logical offset...
RD_EXPORT void rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t start, int32_t stop)
Add range of partitions from start to stop inclusive.
RD_EXPORT size_t rd_kafka_event_message_count(rd_kafka_event_t *rkev)
Definition: rdkafka.h:920
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Add topic+partition to list.
Definition: rdkafka.h:272
Definition: rdkafka.h:191
Definition: rdkafka.h:296
RD_EXPORT ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume batch of messages from queue.
rd_kafka_timestamp_type_t
Definition: rdkafka.h:190
RD_EXPORT void * rd_kafka_topic_opaque(const rd_kafka_topic_t *rkt)
Get the rkt_opaque pointer that was set in the topic configuration.
RD_EXPORT void * rd_kafka_event_opaque(rd_kafka_event_t *rkev)
RD_EXPORT const char * rd_kafka_name(const rd_kafka_t *rk)
Returns Kafka handle name.
A Kafka message as returned by the rd_kafka_consume*() family of functions as well as provided to the...
Definition: rdkafka.h:828
Definition: rdkafka.h:284
RD_EXPORT void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu, int fd, const void *payload, size_t size)
Enable IO event triggering for queue.
RD_EXPORT void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt)
Loose application&#39;s topic handle refcount as previously created with rd_kafka_topic_new().
int32_t partition
Definition: rdkafka.h:831
Definition: rdkafka.h:319
RD_EXPORT rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms)
Consume a single message from topic rkt and partition.
Definition: rdkafka.h:288
RD_EXPORT rd_kafka_message_t * rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms)
Poll the consumer for messages or events.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assignment(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions)
Returns the current partition assignment.
Definition: rdkafka.h:264
List of groups.
Definition: rdkafka.h:2781
Definition: rdkafka.h:734
RD_EXPORT const char * rd_kafka_err2str(rd_kafka_resp_err_t err)
Returns a human readable representation of a kafka error.
void * metadata
Definition: rdkafka.h:542
RD_EXPORT void rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t *rktparlist, int(*cmp)(const void *a, const void *b, void *opaque), void *opaque)
Sort list using comparator cmp.
int64_t offset
Definition: rdkafka.h:541
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_event_topic_partition_list(rd_kafka_event_t *rkev)
Definition: rdkafka.h:247
RD_EXPORT void rd_kafka_conf_destroy(rd_kafka_conf_t *conf)
Destroys a conf object.
Definition: rdkafka.h:306
Broker information.
Definition: rdkafka.h:2656
Error code value, name and description. Typically for use with language bindings to automatically exp...
Definition: rdkafka.h:422
Definition: rdkafka.h:389
Definition: rdkafka.h:369
rd_kafka_resp_err_t err
Definition: rdkafka.h:545
Definition: rdkafka.h:304
Topic+Partition place holder.
Definition: rdkafka.h:538
Definition: rdkafka.h:245
size_t metadata_size
Definition: rdkafka.h:543
RD_EXPORT const char ** rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf, size_t *cntp)
Dump the topic configuration properties and values of conf to an array with "key", "value" pairs.
RD_EXPORT size_t rd_kafka_event_message_array(rd_kafka_event_t *rkev, const rd_kafka_message_t **rkmessages, size_t size)
Extacts size message(s) from the event into the pre-allocated array rkmessages.
Definition: rdkafka.h:733
rd_kafka_type_t
rd_kafka_t handle type.
Definition: rdkafka.h:179
RD_EXPORT int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Random partitioner.
RD_EXPORT void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, int(*stats_cb)(rd_kafka_t *rk, char *json, size_t json_len, void *opaque))
Set statistics callback in provided conf object.
Definition: rdkafka.h:339
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_find(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Find element by topic and partition.
RD_EXPORT void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf)
Destroys a topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition, int64_t offset)
Set offset to offset for topic and partition.
Definition: rdkafka.h:399
void * payload
Definition: rdkafka.h:832
RD_EXPORT void rd_kafka_yield(rd_kafka_t *rk)
Cancels the current callback dispatcher (rd_kafka_poll(), rd_kafka_consume_callback(), etc).
Definition: rdkafka.h:329
RD_EXPORT rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t *rk, rd_kafka_queue_t *rkqu)
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk)
Close down the KafkaConsumer.
RD_EXPORT void rd_kafka_conf_properties_show(FILE *fp)
Prints a table to fp of all supported configuration properties, their default values as well as a des...
RD_EXPORT void rd_kafka_conf_dump_free(const char **arr, size_t cnt)
Frees a configuration dump returned from rd_kafka_conf_dump() or `rd_kafka_topic_conf_dump().
void * _private
Definition: rdkafka.h:853
Definition: rdkafka.h:262
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent-Random partitioner.
RD_EXPORT const char * rd_kafka_topic_name(const rd_kafka_topic_t *rkt)
Returns the topic name.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, int timeout_ms)
Consume from queue.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_last_error(void)
Returns the last error code generated by a legacy API call in the current thread. ...
RD_EXPORT int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu)
Same as rd_kafka_consume_start() but re-routes incoming messages to the provided queue rkqu (which mu...
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_new(rd_kafka_t *rk)
Create a new message queue.
RD_EXPORT rd_kafka_topic_t * rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
Creates a new topic handle for topic named topic.