
Starting from version 3.0, WindFlow supports a set of operators that can target NVIDIA GPUs. Among these operators are Map and Filter. In this page, we demonstrate how to use them through a simple example that represents the skeleton of a FraudDetection application processing credit card transactions (see the DSPBench benchmark suite [1] for more details).
The application consists of four operators, two of which are offloaded to the GPU. The Source produces a stream of transactions from an input dataset (already parsed for simplicity). The Filter discards all empty transactions present in the stream. The Map operator processes all transactions belonging to the same customer and adds a boolean flag indicating whether each transaction is classified as potentially fraudulent. Finally, the Sink receives the results together with their classification flags.
The logical data-flow graph of the application is shown below. WindFlow enables the use of GPU devices for general-purpose, stateful stream processing, marking a significant shift from the state of the art. Traditionally, GPUs are used to accelerate specific data analytics tasks that apply associative functions on input batches (e.g., reduceByKey in Apache Spark).

GPU operators come with additional constraints. In particular, the input and output types—as well as the key attribute type, if present—must be trivially copyable from host-accessible memory to GPU-accessible memory. The data type used by the operators is defined below.
#define MAX_LEN_RECORD 256
struct record_t
{
int customer_id // identifies the customer
char record[MAX_LEN_RECORD]; // contains the transaction id and the transaction type
bool isFraud; // true if the record is classified as a fraud, false otherwise
};
The Source operator is in charge of producing a stream of objects of this type by reading a large sequence of transactions from an already prepared vector.
class Source_Functor
{
private:
std::vector<record_t> records; // contains the transactions
public:
// Constructor
Source_Functor(const std::vector<record_t> &_records):
records(_records) {}
// Functor logic
void operator()(wf::Source_Shipper<record_t> &shipper)
{
// generation loop
for (auto record: records) {
record_t output(record);
shipper.push(output);
}
}
};
The first operator is a Filter_GPU instance that uses the functional logic defined by the functor object shown below. This operator applies the predicate to each input and retains only those inputs for which the predicate evaluates to true.
class Filter_Functor
{
public:
// Functor logic
__device__ bool operator()(record_t &input)
{
if ((input.record)[0] == '\0')
return false;
else
return true;
}
}
The Map_GPU operator is configured to operate in a key-by manner. This means that all inputs sharing the same key—in this case, the customer_id field—are processed sequentially, while parallelism is exploited across records belonging to different customers. The API allows GPU operators that are created with a key extractor to maintain a per-key state object in GPU memory, which is always accessed sequentially by the active CUDA threads. The fragment below shows the definition of the functional logic for Map_GPU, where the classification code has been omitted for brevity.
class Map_Functor
{
public:
...
// Functor logic
__device__ void operator()(record_t &input, PredictionModel &model)
{
model.add_record(input);
input.isFraud = model.updateClassification();
}
}
class PredictionModel
{
private:
... // member variables and functions
private:
// Constructor (default construtor is required!)
__device__ PredictionModel() {...}
// Destructor
__device__ ~PredictionMode(...) {...}
// add a new record to the model
__device__ void addRecord(record_t &record) {...}
// evaluate the classification
__device__ bool updateClassification() {...}
};
The run-time system guarantees that updates to the prediction model for each customer are performed sequentially, one at a time. As a result, WindFlow enables GPUs to be used for stateful stream processing following the common key-by paradigm. This approach is general and applies to a wide range of real-world applications. Naturally, when dealing with associative functions, other GPU operators can be employed (e.g., Reduce_GPU and FFAT_Windows_GPU, which are not shown in this example).
To conclude, the API showcases a set of operators that can be created through a high-level, user-friendly interface.
Source_Functor source_functor(records);
wf::Source source = wf::Source_Builder(source_functor)
.withParallelism(1)
.withName("wc_source")
.withOutputBatchSize(1000)
.build();
Filter_Functor filter_functor;
wf::Filter_GPU filter_gpu = wf::FilterGPU_Builder(filter_functor)
.withParallelism(2)
.withName("filter_gpu")
.build();
Map_Functor map_functor;
wf::Map_GPU map_gpu = wf::MapGPU_Builder(map_functor)
.withParallelism(2)
.withName("map_gpu")
.withKeyBy([] __host__ __device__ (const record_t &record) -> int { return record.customer_id; })
.build();
Sink_Functor sink_functor;
wf::Sink sink = wf::Sink_Builder(sink_functor)
.withParallelism(3)
.withName("sink")
.build();
wf::PipeGraph topology("fraudprediction", Execution_Mode_t::DEFAULT, Time_Policy_t::INGRESS_TIME);
topology.add_source(source).add(filter_gpu).add(map_gpu).add_sink(sink);
topology.run(); // synchronous execution of the dataflow
The use of GPU operators requires special care. In the code above, the batch size is configured during the creation of the first operator in the GPU-executed portion of the pipeline. In this example, the Source defines a batch size of 1,000 tuples per batch. These batches are processed by Filter_GPU, and each resulting batch—potentially containing fewer than 1,000 inputs, depending on the filtering predicate—is then passed to Map_GPU.
Selecting an appropriate batch size, along with the parallelism level of the operators, requires manual tuning by the user in order to achieve the best possible performance.