logo_white

A C++17 Data Stream Processing Parallel Library for Multicores and GPUs 

Apache Kafka Integration

WindFlow (from version 3.2.0) incorporates an extension to interface with Apache Kafka, a popular publish-subscribe messaging system. This allows WindFlow applications to subscribe to Apache Kafka (and so receiving messages from it), transforming them based on the application semantics of the dataflow graph, and then to report results to Kafka by publishing output messages.

To do that, WindFlow provides the following two operators described in the figure below.

kafka_operators

Kafka_Source

Kafka_Source is an alternative source that can be used to create a WindFlow application. It provides a user-friendly builder class that can be used to configure the operator and its interaction with Apache Kafka in order to properly subscribe to topics.


KafkaSource_Functor source_functor;
Kafka_Source source = KafkaSource_Builder(source_functor)
                                .withName("kafka_source")
                                .withBrokers("hostname:9092")
                                .withTopics("input")
                                .withGroupID("groupid")
                                .withAssignmentPolicy("roundrobin")
                                .withIdleness(seconds(5))
                                .withParallelism(1)
                                .withOffsets(-1) // -1 means from the most recent offset
                                .build();

The builder allows the developer to specify several configuration parameters, such as a set of address:port strings of the Kafka brokers to connect with, the names of the topics to subscribe (and to all their partitions), the name of the group_id (all the replicas of the Kafka_Source will belong to the same consumer group and they receive messages of the subscribed topics). Other configuration parameters are more advanced (e.g., the assignment policy). Interested readers can refer to the API of librdkafka-dev. The withIdleness() method allows the configuration of a timeout control, in order to execute a callback once a replica of the Kafka_Source does not receive any message for a given time period.

The builder is instantiated by passing to its constructor the deserialization logic, which determines how to deserialize a raw Kafka message into a data structure exchanged internally by WindFlow operators. The deserialization logic can be expressed as a lambda, functor object or using a plain function respecting a given signature. Below, there is an example using a functor object exposing an operator() method.


class KafkaSource_Functor
{
public:
    bool operator()(std::optional<std::reference_wrapper<rdkafka::message>> msg, Source_Shipper<tuple_t> &shipper)
    {
        if (msg) {
            tuple_t out;
            out.value = static_cast<const char="" *="">(msg->get().payload());
            shipper.push(std::move(out));
            return true;
        }
        else {
            return false; // if timeout, we terminate listening
        }
    }
};

The Boolean return value states whether the replica of the Kafka_Source should wait for another message from Apache Kafka or not (in order to start the application termination).

Kafka_Sink

The Kafka_Sink operator allows a WindFlow application to publish its results as messages of specific topics to Apache Kafka. Also in this case, the operator can be configured with its builder class as in the code snippet below.


KafkaSink_Functor sink_functor;
Kafka_Sink sink = KafkaSink_Builder(sink_functor)
                        .withName("kafka_sink")
                        .withParallelism(1)
                        .withBrokers("hostname:9092")
                        .build();

We can configure the Kafka brokers, and as for all operators in the library, the parallelism level, name and so forth. The builder is constructed by providing the serialization logic as done in the code snippet below.


class KafkaSink_Functor
{
public:
    // operator()
    wf::wf_kafka_sink_msg operator()(tuple_t &out)
    {
        wf::wf_kafka_sink_msg tmp;
        std::string msg = out.value;
        tmp.payload = msg;
        tmp.topic = "output";
        return tmp;
    }
};

The return value  of the serialization logic is a message of type wf_kafka_sink_msg, which contains the serialized message (as a string), the name of the topic and the index of the partition to be used to publish that message to Apache Kafka.