
Starting from version 4.0, WindFlow provides integration with the RocksDB key–value store (KVS). This enables a suite of new operators that can maintain per-key state directly on RocksDB. Such operators are particularly useful in larger-than-memory state scenarios, where the state maintained by the application exceeds the available memory of the machine.
The supported persistent operators include the basic ones: P_Map, P_Filter, P_FlatMap, P_Reduce, and P_Sink, as well as a window-based operator (P_Keyed_Windows).
The set of persistent operators is described in the figure below.

We present a short example showing how to integrate a P_FlatMap operator into a WindFlow application. The same approach can be easily applied to the other persistent operators. We begin by defining the types of the input tuple and the associated state object:
struct tuple_t // Struct of the input tuple
{
size_t key;
int64_t value;
};
struct state_t // Struct of the state object per key
{
...
};
Then, the user defines the business logic of the P_FlatMap by providing, for example, a functor object having the right signature expected by WindFlow:
class P_FlatMap_Functor // P_FlatMap functor
{
public:
// operator()
void operator()(const tuple_t &t, state_t &state, Shipper &shipper)
{
... // we use the state object and the tuple to provide zero, one or more results.
... // The state can be modified and it is deserialized before and serialized after
... // the call of this function by the WindFlow runtime system
}
};
Before invoking this function for each input tuple, WindFlow’s runtime system deserializes the state object associated with the key of the current tuple and passes it to the user-defined function. After execution, if the state object has been modified, it is serialized again and written back to the KVS.
The user creates the operators and the PipeGraph environment as usual. In addition, the P_FlatMap operator is instantiated using its builder. Before doing so, the serialization and deserialization functions for the state objects must be provided (in the example below, they are implemented as two lambda functions):
int main()
{
...
auto serializer = [](state_t &s) -> std::string {
... // serialization function
};
auto deserializer = [](std::string &s) -> state_t {
... // deserialization function
};
...
P_FlatMap_Functor func;
P_FlatMap op = P_FlatMap_Builder(p_flatmap_functor_kb)
.withName("name_op")
.withParallelism(3) // example running with three replicas
.withOutputBatchSize(64) // each replica will produce 64 outputs per batch
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
.withSerializer(serializer)
.withDeserializer(deserializer)
.build();
...
}
The application is finally run as usual.