logo_white

A C++17 Data Stream Processing Parallel Library for Multicores and GPUs 

API Example

In this page, we show a simple example of streaming application in WindFlow. The example is a well-known streaming benchmark: WordCount. The Source operator is in charge of generating a stream of text lines. The second operator (Splitter) receives the text lines and splits them into words transmitted to the next operator (Counter). The Counter updates the count of each distinct received word, and sends a pair (word, count) to the Sink with the current value of the counter. While Splitter and Sink are stateless, the Counter is a stateful operator working on a keyby manner. The figure below shows a schematic view of the logical data-flow graph of the application.

WordCount logical data-flow graph

We report below some code snippets to understand the API. The Source can be defined by developing its functional logic through a functor, a lambda, or using a C++ function. In all cases, the logic must respect specific admissible signatures to be used to instantiate a Source operator. In the snippet below the source logic is developed as a C++ functor.


class Source_Functor
{
private:
	std::vector<std::string> dataset; // contains the text lines as strings

public:
	// Constructor
    Source_Functor(const std::vector<std::string> &_dataset):
                   dataset(_dataset) {}

	// Functor logic
	void operator()(wf::Source_Shipper<std::string> &shipper)
	{
    	// generation loop
    	for (auto &line: dataset) {
    		shipper.push(line);
    	}
	}
};

The Splitter is implemented as a FlatMap operator producing more outputs per input. Again, the functional logic can be provided as a function, lambda or a functor object (as in the snippet below) respecting a specific signature.


class Splitter_Functor
{
public:
	// Functor logic
    void operator()(const std::string &input, wf::Shipper<std::string> &shipper)
    {
        std::istringstream line(input);
        std::string word;
        while (getline(input, word, ' ')) {
            shipper.push(std::move(word));
        }
    }
};

The Counter can be implemented by a Map operator encapsulating an internal state partitioned by key (a hash table).


class Counter_Functor
{
private:
	std::unordered_map<std::string, int> table;

public:
	// Functional logic
    std::pair<std::string, int> operator()(const std::string &word)
    {
		if (map.find(word) == map.end()) {
        	map.insert(make_pair(word, 1));
		}
        else {
            map.at(word)++;
        }
        std::pair result(word, map.at(word));
        return result;
    }
};

Finally, we define the Sink functional logic as below.


class Sink_Functor
{
public:
	// Functor logic
    void operator()(std::optional<std::pair<std::string, int>> &input)
    {
        if (input) {
            std::cout << "Received word " << (*input).first << " with counter " << (*input).second << std::endl;
        }
        else {
        	std::cout << "End of stream" << std::endl;
        }
    }
};

Once defined the functional logic of each operator, they must be created by using the corresponding builder classes. During the creation, the user can specify several configuration parameters per operator, like its name (for logging purposes), its parallelism level and others.


Source_Functor source_functor(dataset);
wf::Source source = wf::Source_Builder(source_functor)
                		.withParallelism(2)
                		.withName("wc_source")
                		.build();

Splitter_Functor splitter_functor;
wf::FlatMap splitter = wf::FlatMap_Builder(splitter_functor)
                	   		.withParallelism(2)
                			.withName("wc_splitter")
                			.withOutputBatchSize(10)
                			.build();

Counter_Functor counter_functor;
wf::Map counter = wf::Map_Builder(counter_functor)
              	  		.withParallelism(3)
                		.withName("wc_counter")
                		.withKeyBy([](const std::string &word) -> std::string { return word; })
                		.build();

Sink_Functor sink_functor;
wf::Sink sink = wf::Sink_Builder(sink_functor)
                	.withParallelism(3)
                	.withName("wc_sink")
                	.build();

All the operators (except Sinks) can be declared with a custom output batch size. In the example above, the Splitter operator adopts a batching of 10 outputs, which means that it sends output results in batches of 10 individual outputs. This approach can be used to increase throughput at the expense of latency. If not specified, the batch size is zero (means that batching is disabled) and the operators send outputs individually. Furthermore, we observe that the Counter operator is declared with a key extractor to enable the keyby distribution between the Splitter and the Counter replicas.

Finally, the whole application can be executed by creating the streaming environment and by adding the operators in the proper order. This can be done by leveraging two programming constructs of WindFlow: the PipeGraph and the MultiPipe classes. The PipeGraph can be instantiated by specifying two configuration options: the execution mode and the time policy.

The execution mode states how the processing is done in the data-flow graph with respect to the ordering of inputs. There are three alternative modes:

  1. DETERMINISTIC: in this mode the user is requested to generate inputs in the Sources ordered by timestamps. The run-time environment guarantees that the replicas of all operators will process inputs ordered by timestamp. This execution modes prevents batching to be used and allows a deterministic execution often at the expense of performance.
  2. PROBABILISTIC: in this mode Source replicas can generate outputs in any order, but the run-time system still sorts items before processing the functional logics of the operators. In this case, to respect the ordering, items might be dropped. This execution mode still prevents batching to be used, but can be a reasonable compromise between ordering requirements and performance if some (few) dropped inputs are allowed by the application semantics.
  3. DEFAULT: this is the default mode without ordering guarantees. In this mode, inputs are processed in a non-deterministic ordering and temporal window-based computations are evaluated based on watermarks propagation. This execution mode allows batching to be used in the operators as in the previous code snippets.

For what regards the time policy, WindFlow supports two policies. With "Ingress Time", timestamps are automatically generated by the Sources (as in this WordCount example in the Source functor object) and watermarks are also automatically generated incase of DEFAULT execution mode. With “Event Time”, the user is instead in charge of emitting inputs in the sources with a user-defined timestamp. With this policy, watermarks must explicitly be emitted by the user in the sources in case of DEFAULT execution mode.

The code fragment below shows the creation of the PipeGraph and of the data-flow graph which is then run on a multi-core environment.


wf::PipeGraph topology("wc", Execution_Mode_t::DEFAULT, Time_Policy_t::INGRESS_TIME);
if (chaining) {
	topology.add_source(source).chain(splitter).add(counter).chain_sink(sink);
}
else {
	topology.add_source(source).add(splitter).add(counter).add_sink(sink);
}
topology.run(); // synchronous execution of the dataflow

In the code above, based on the value of a boolean flag (chaining), we create the data-flow graph by chaining stateless operators (to reduce thread oversubscription) or not. This results in two different physical data-flow graphs running respectively with five (left-hand side figure below) and ten (right-hand side) underlying threads (based on the chosen parallelism levels in the builders).

WordCount physical data-flow graphs

This example shows only a fraction of the expressive power of WindFlow, which is actually able to generate complex data-flow graphs using split and merge operations on MultiPipes. Interested users can refer to the doxygen documentation for the API and can look at the synthetic examples and tests distributed with the source code.