In this page, we show a simple example of streaming application in WindFlow. The example is a well-known streaming benchmark: WordCount. The Source operator is in charge of generating a stream of text lines. The second operator (Splitter) receives the text lines and splits them into words transmitted to the next operator (Counter). The Counter updates the count of each distinct received word, and sends a pair (word, count) to the Sink with the current value of the counter. While Splitter and Sink are stateless, the Counter is a stateful operator working on a keyby manner. The figure below shows a schematic view of the logical data-flow graph of the application.
We report below some code snippets to understand the API. The Source can be defined by developing its functional logic through a functor, a lambda, or using a C++ function. In all cases, the logic must respect specific admissible signatures to be used to instantiate a Source operator. In the snippet below the source logic is developed as a C++ functor.
class Source_Functor
{
private:
std::vector<std::string> dataset; // contains the text lines as strings
public:
// Constructor
Source_Functor(const std::vector<std::string> &_dataset):
dataset(_dataset) {}
// Functor logic
void operator()(wf::Source_Shipper<std::string> &shipper)
{
// generation loop
for (auto &line: dataset) {
shipper.push(line);
}
}
};
The Splitter is implemented as a FlatMap operator producing more outputs per input. Again, the functional logic can be provided as a function, lambda or a functor object (as in the snippet below) respecting a specific signature.
class Splitter_Functor
{
public:
// Functor logic
void operator()(const std::string &input, wf::Shipper<std::string> &shipper)
{
std::istringstream line(input);
std::string word;
while (getline(input, word, ' ')) {
shipper.push(std::move(word));
}
}
};
The Counter can be implemented by a Map operator encapsulating an internal state partitioned by key (a hash table).
class Counter_Functor
{
private:
std::unordered_map<std::string, int> table;
public:
// Functional logic
std::pair<std::string, int> operator()(const std::string &word)
{
if (map.find(word) == map.end()) {
map.insert(make_pair(word, 1));
}
else {
map.at(word)++;
}
std::pair result(word, map.at(word));
return result;
}
};
Finally, we define the Sink functional logic as below.
class Sink_Functor
{
public:
// Functor logic
void operator()(std::optional<std::pair<std::string, int>> &input)
{
if (input) {
std::cout << "Received word " << (*input).first << " with counter " << (*input).second << std::endl;
}
else {
std::cout << "End of stream" << std::endl;
}
}
};
Once defined the functional logic of each operator, they must be created by using the corresponding builder classes. During the creation, the user can specify several configuration parameters per operator, like its name (for logging purposes), its parallelism level and others.
Source_Functor source_functor(dataset);
wf::Source source = wf::Source_Builder(source_functor)
.withParallelism(2)
.withName("wc_source")
.build();
Splitter_Functor splitter_functor;
wf::FlatMap splitter = wf::FlatMap_Builder(splitter_functor)
.withParallelism(2)
.withName("wc_splitter")
.withOutputBatchSize(10)
.build();
Counter_Functor counter_functor;
wf::Map counter = wf::Map_Builder(counter_functor)
.withParallelism(3)
.withName("wc_counter")
.withKeyBy([](const std::string &word) -> std::string { return word; })
.build();
Sink_Functor sink_functor;
wf::Sink sink = wf::Sink_Builder(sink_functor)
.withParallelism(3)
.withName("wc_sink")
.build();
All the operators (except Sinks) can be declared with a custom output batch size. In the example above, the Splitter operator adopts a batching of 10 outputs, which means that it sends output results in batches of 10 individual outputs. This approach can be used to increase throughput at the expense of latency. If not specified, the batch size is zero (means that batching is disabled) and the operators send outputs individually. Furthermore, we observe that the Counter operator is declared with a key extractor to enable the keyby distribution between the Splitter and the Counter replicas.
Finally, the whole application can be executed by creating the streaming environment and by adding the operators in the proper order. This can be done by leveraging two programming constructs of WindFlow: the PipeGraph and the MultiPipe classes. The PipeGraph can be instantiated by specifying two configuration options: the execution mode and the time policy.
The execution mode states how the processing is done in the data-flow graph with respect to the ordering of inputs. There are three alternative modes:
For what regards the time policy, WindFlow supports two policies. With "Ingress Time", timestamps are automatically generated by the Sources (as in this WordCount example in the Source functor object) and watermarks are also automatically generated incase of DEFAULT execution mode. With “Event Time”, the user is instead in charge of emitting inputs in the sources with a user-defined timestamp. With this policy, watermarks must explicitly be emitted by the user in the sources in case of DEFAULT execution mode.
The code fragment below shows the creation of the PipeGraph and of the data-flow graph which is then run on a multi-core environment.
wf::PipeGraph topology("wc", Execution_Mode_t::DEFAULT, Time_Policy_t::INGRESS_TIME);
if (chaining) {
topology.add_source(source).chain(splitter).add(counter).chain_sink(sink);
}
else {
topology.add_source(source).add(splitter).add(counter).add_sink(sink);
}
topology.run(); // synchronous execution of the dataflow
In the code above, based on the value of a boolean flag (chaining), we create the data-flow graph by chaining stateless operators (to reduce thread oversubscription) or not. This results in two different physical data-flow graphs running respectively with five (left-hand side figure below) and ten (right-hand side) underlying threads (based on the chosen parallelism levels in the builders).
This example shows only a fraction of the expressive power of WindFlow, which is actually able to generate complex data-flow graphs using split and merge operations on MultiPipes. Interested users can refer to the doxygen documentation for the API and can look at the synthetic examples and tests distributed with the source code.