
Starting from version 3.2.0, WindFlow includes an extension that interfaces with Apache Kafka, a widely used publish–subscribe messaging system. This integration allows WindFlow applications to subscribe to Kafka topics (and thus receive messages), transform them according to the application semantics defined in the dataflow graph, and then publish the resulting output messages back to Kafka.
To support this integration, WindFlow provides the two operators illustrated in the figure below.

Kafka_Source is an alternative source operator that can be used to build a WindFlow application. It provides a user-friendly builder class that allows the developer to configure the operator and its interaction with Apache Kafka, ensuring correct subscription to the desired topics.
KafkaSource_Functor source_functor;
Kafka_Source source = KafkaSource_Builder(source_functor)
.withName("kafka_source")
.withBrokers("hostname:9092")
.withTopics("input")
.withGroupID("groupid")
.withAssignmentPolicy("roundrobin")
.withIdleness(seconds(5))
.withParallelism(1)
.withOffsets(-1) // -1 means from the most recent offset
.build();
The builder allows the developer to specify several configuration parameters, such as a set of address:port strings identifying the Kafka brokers to connect to, the names of the topics to subscribe to (including all their partitions), and the group_id (all replicas of Kafka_Source belong to the same consumer group and receive messages from the subscribed topics). Additional parameters cover more advanced settings (e.g., the assignment policy). Interested readers can refer to the librdkafka-dev API for further details. The withIdleness() method enables the configuration of a timeout mechanism, allowing a callback to be triggered when a replica of Kafka_Source does not receive any message for a specified time interval.
The builder is instantiated by passing the deserialization logic to its constructor. This logic determines how a raw Kafka message is transformed into a data structure that can be exchanged internally by WindFlow operators. The deserialization logic can be provided as a lambda, a functor object, or a plain function, as long as it adheres to the required signature. Below is an example that uses a functor object exposing an operator() method.
class KafkaSource_Functor
{
public:
bool operator()(std::optional<std::reference_wrapper<rdkafka::message>> msg, Source_Shipper<tuple_t> &shipper)
{
if (msg) {
tuple_t out;
out.value = static_cast<const char="" *="">(msg->get().payload());
shipper.push(std::move(out));
return true;
}
else {
return false; // if timeout, we terminate listening
}
}
};
The boolean return value indicates whether the Kafka_Source replica should continue waiting for additional messages from Apache Kafka or stop (thereby initiating the application’s termination).
The Kafka_Sink operator allows a WindFlow application to publish its results as messages to specific Apache Kafka topics. As with the source operator, the sink can be configured using its builder class, as shown in the code snippet below.
KafkaSink_Functor sink_functor;
Kafka_Sink sink = KafkaSink_Builder(sink_functor)
.withName("kafka_sink")
.withParallelism(1)
.withBrokers("hostname:9092")
.build();
We can configure the Kafka brokers as well as the usual operator parameters provided by the library, such as parallelism level, name, and more. The builder is instantiated by supplying the serialization logic, as shown in the code snippet below.
class KafkaSink_Functor
{
public:
// operator()
wf::wf_kafka_sink_msg operator()(tuple_t &out)
{
wf::wf_kafka_sink_msg tmp;
std::string msg = out.value;
tmp.payload = msg;
tmp.topic = "output";
return tmp;
}
};
The return value of the serialization logic is a message of type wf_kafka_sink_msg, which contains the serialized payload (as a string), the name of the topic, and the index of the partition to which the message should be published in Apache Kafka.