logo_white

A C++17 Data Stream Processing Parallel Library for Multicores and GPUs 

GPU API Example

WindFlow, starting from version 3.0, supports some operators targeting NVIDIA GPUs. Two of them are Maps and Filters. In this page, we show how to use them in a simple example representing the skeleton of a FraudDetection application processing credit card transactions (see the DSPBench benchmark suite [1] for further information). The application is composed of four operators, two of them offloaded on GPU. The Source is in charge of producing a stream of transactions from an input dataset (already parsed for simplicity). The Filter is in charge of dropping all the empty transactions present in the input stream. The Map operator processes all the transactions belonging to the same customer in order to add a boolean flag stating whether the transaction is classified as a potential fraud or not. Finally, the Sink receives results with their classification flag.

The logical data-flow graph of the application is reported below. WindFlow allows the use of GPU devices for general-purpose stateful stream processing. This is a shift with respect to the state of the art. Indeed, GPUs are traditionally used to accelerate specific data analytics tasks which apply associative functions on input batches (e.g., reduceByKey of Apache Spark). But, let’s address this point step by step.

application_gpu

GPU operators come with some additional constraints to be used. In particular, the input and output types, as well as the key attribute type (if any), must be trivially copyable from host-accessible memory to GPU-accessible memory. Below, we define the data type used by the operators.


#define MAX_LEN_RECORD 256

struct record_t
{
    int customer_id // identifies the customer
    char record[MAX_LEN_RECORD]; // contains the transaction id and the transaction type
    bool isFraud; // true if the record is classified as a fraud, false otherwise
};

The Source operator is in charge of producing a stream of objects of this type by reading a large sequence of transactions from an already prepared vector.


class Source_Functor
{
private:
	std::vector<record_t> records; // contains the transactions

public:
	// Constructor
    Source_Functor(const std::vector<record_t> &_records):
                   records(_records) {}

	// Functor logic
	void operator()(wf::Source_Shipper<record_t> &shipper)
	{
    	// generation loop
    	for (auto record: records) {
    		record_t output(record);
    		shipper.push(output);
    	}
	}
};

The first operator is a Filter_GPU instance using the functional logic defined by the functor object below. The operator applies the functor logic on each input and retains all the inputs for which the predicate is true.


class Filter_Functor
{
public:
	// Functor logic
    __device__ bool operator()(record_t &input)
    {
    	if ((input.record)[0] == '\0')
    		return false;
    	else
    		return true;
    }  
}

The Map_GPU operator is configured to work on a keyby manner. This means that all inputs having the same key (in this case the customer_id field) will be processed sequentially, while parallelism is exploited between records belonging to different customers. The API allows GPU operators created with a key extractor to use a state object maintained on GPU memory for each key (and always accessed sequentially by the active CUDA threads). The fragment below shows the definition of the functional logic of the Map_GPU, where the classification code is abstracted from brevity.


class Map_Functor
{
public:
	...
	// Functor logic
    __device__ void operator()(record_t &input, PredictionModel &model)
    {
    	model.add_record(input);
    	input.isFraud = model.updateClassification();
    }
}

class PredictionModel
{
private:
	... // member variables and functions
private:
	// Constructor (default construtor is required!)
	__device__ PredictionModel() {...}

	// Destructor
	__device__ ~PredictionMode(...) {...}
	
	// add a new record to the model
	__device__ void addRecord(record_t &record) {...}
	
	// evaluate the classification
	__device__ bool updateClassification() {...}
};

The run-time system guarantees that the update to the prediction model for each customer will be done sequentially, one by one. Therefore, WindFlow allows  GPUs to be used for stateful stream processing using the common keyby paradigm. This approach is quite general and useful for a broad set of real-world applications. Of course, in case of associative functions, other operators on GPU can be used (e.g., Reduce_GPU and FFAT_Windows_GPU not shown in this example).

To conclude to showcase the API, operators are created with a very high-level user-friendly interface.


Source_Functor source_functor(records);
wf::Source source = wf::Source_Builder(source_functor)
                		.withParallelism(1)
                		.withName("wc_source")
                		.withOutputBatchSize(1000)
                		.build();

Filter_Functor filter_functor;
wf::Filter_GPU filter_gpu = wf::FilterGPU_Builder(filter_functor)
                	   			.withParallelism(2)
                				.withName("filter_gpu")
                				.build();

Map_Functor map_functor;
wf::Map_GPU map_gpu = wf::MapGPU_Builder(map_functor)
              	  			.withParallelism(2)
                			.withName("map_gpu")
                			.withKeyBy([] __host__ __device__ (const record_t &record) -> int { return record.customer_id; })
                			.build();

Sink_Functor sink_functor;
wf::Sink sink = wf::Sink_Builder(sink_functor)
                	.withParallelism(3)
                	.withName("sink")
                	.build();

wf::PipeGraph topology("fraudprediction", Execution_Mode_t::DEFAULT, Time_Policy_t::INGRESS_TIME);
topology.add_source(source).add(filter_gpu).add(map_gpu).add_sink(sink);
topology.run(); // synchronous execution of the dataflow

Important Aspects

The use of GPU operators requires special care. In the code above, the batch size is set during the creation of the first operator preceding a sequence of GPU operators. In this case, the Source defines a batch size of 1000 tuples composing each batch. Batches of 1000 inputs are processed by the Filter_GPU, and each one is delivered to the following Map_GPU operator (now the batch can contain less than 1000 inputs depending on the filtering predicate). Choosing the right batch size, as well the parallelism level of the operators, requires manual tuning by the user to achieve the highest performance as possible.

References

  1. M. V. Bordin, D. Griebler, G. Mencagli, Claudio F. R. Geyer, Luiz G. L. Fernandes. DSPBench: a Suite of Benchmark Applications for Distributed Data Stream Processing Systems. IEEE Access, 2020, IEEE. ISSN: 2169-3536, DOI: 10.1109/ACCESS.2020.3043948