logo_white

A C++17 Data Stream Processing Parallel Library for Multicores and GPUs 

RocksDB Integration

WindFlow, starting from version 4.0, provides an integration with the RocksDB Key-Value Store (KVS). This allows a suite of new operators to maintain a state object per key on RocksDB. The general scenario for using such operators is in the case of “larger-than-memory state” applications, where the state of the operators is larger than the available memory of the machine. The supported persistent operators are the basic ones (P_Map, P_Filter, P_FlatMap, P_Reduce, and P_Sink) and a window-based operator (P_Keyed_Windows).

The set of persistent operators are described in the figure below.

kafka_operators

API Example

We show a very short example to integrate in a WindFlow application a P_FlatMap operator. The idea can be easily replicated with the other persistent operators. We define the type of the input tuple and of the state object:


struct tuple_t // Struct of the input tuple
{
    size_t key;
    int64_t value;
};

struct state_t // Struct of the state object per key
{
	...
};

Then, the user defines the business logic of the P_FlatMap by providing, for example, a functor object having the right signature expected by WindFlow:


class P_FlatMap_Functor // P_FlatMap functor
{
public:
    // operator()
    void operator()(const tuple_t &t, state_t &state, Shipper &shipper)
    {
        ... // we use the state object and the tuple to provide zero, one or more results.
        ... // The state can be modified and it is deserialized before and serialized after
        ... // the call of this function by the WindFlow runtime system
    }
};

Before calling this function for each input tuple, the WindFlow’s runtime system will deserialize the state object associated with the key attribute of the current tuple and passes it to the user function. After the execution, since the state object might be modified, it will be serialized and written again to the KVS.

The user creates the operators and the PipeGraph environment as usual. Furthermore, it creates the P_FlatMap operator using its builder. Before doing that, serialization and deserialization functions of the state objects should be provided (in the example below as two lambda functions):


int main()
{
	...
	auto serializer = [](state_t &s) -> std::string {
        ... // serialization function
    };
    auto deserializer = [](std::string &s) -> state_t {
        ... // deserialization function
    };

    ...

    P_FlatMap_Functor func;
    P_FlatMap op = P_FlatMap_Builder(p_flatmap_functor_kb)
                        .withName("name_op")
                        .withParallelism(3) // example running with three replicas
                        .withOutputBatchSize(64) // each replica will produce 64 outputs per batch
                        .withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
                        .withSerializer(serializer)
                        .withDeserializer(deserializer)
                        .build();
    ...
}

The application is finally run as usual.