Cleansing Data¶
When you receive input data via the Streaming Integrator, it may consist of data that is not required to generate the required output, null values for certain attributes, etc. Cleansing data refers to refining the input data received by assigning values where there are missing values (if there are applicable values), filtering out the data that is not required, etc.
Filtering data based on conditions¶
You can filter data based on the exact match of attribute, based on a regex pattern or based on multiple criteria
To understand this, consider a scenario where you receive the temperature of multiple rooms in a streaming manner.
-
Filtering based on exact match of attribute:
If you want the temperature readings only for a specific room, you can add a query with a filter as follows.
With theroomNo=='2233'
filter, you are filtering the temperature readings for the room number2233
. These readings are then inserted into a separate stream namedRoomAnalysisStream
. -
Filtering based on regex pattern
In the example of processing temperature readings, assume that you need to filter the temperature readings for a specific range of devices located in the Southern wing and used for purpose B. Also assume that this can be derived from the device ID because the first three characters of the device ID represent the wing, and the eighth character represents the purpose. e.g., in device ID
SOU5438B765
, the first three charactersSOU
represent the Southern wing, and the eighth characterB
represents purpose B.You can achieve this by adding a filter with a regex pattern as follows:
The@info(name = 'FilteredRoomRange') from TempStream select regex.find(SOU*B*) as deviceID, roomNo, temp insert into FilteredResultsStream;
regex.find(SOU*B*)
function filters room IDs that start with the three charactersSOU
and has the characterB
with one or more characters before it as well as after it. -
Filtering based on multiple criteria
In the example of processing temperature readings, assume that you need to filter the readings for a range of rooms (e.g., rooms 100-210) where the temperature is greater than 40. For this, you can add a filter as follows.
Try it out¶
To try out the query used in the above example, let's include it in a Siddhi Application and run it.
-
Open a new file. Then add and save the following Siddhi application.
3. Open the event simulator and simulate three events for the@App:name('TemperatureApp') define stream TempStream (deviceID string, roomNo int, temp double); @sink(type = 'log', prefix = "FilteredResult", @map(type = 'passThrough')) define stream RoomAnalysisStream (deviceID string, roomNo int, temp double); @info(name = 'Filtering2233') from TempStream[roomNo == '2233'] select * insert into RoomAnalysisStream;
TempStream
input stream of theTemperatureApp
Siddhi application with the values for the attributes as given below. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event.Event deviceID roomNo temp 1 SOU5438B765
2233
30
2 WES1827A876
1121
27
3 NOR1633B231
0899
28
Only the first event is logged in the terminal as follows. This is because only the first event has
2233
as the value for theroomNo
attribute.4. Now remove theINFO {io.siddhi.core.stream.output.sink.LogSink} - FilteredResult : Event{timestamp=1604494352744, data=[SOU5438B765, 2233, 30.0], isExpired=false}
Filtering2233
query and replace it with the following query that filters based on multiple criteria. The complete Siddhi application is as follows:
@App:name('TemperatureApp')
define stream TempStream (deviceID string, roomNo int, temp double);
@sink(type = 'log', prefix = "FilteredResult",
@map(type = 'passThrough'))
define stream RoomAnalysisStream (deviceID string, roomNo int, temp double);
from TempStream [(roomNo >= 100 and roomNo < 210) and temp > 40]
select *
insert into RoomAnalysisStream;
-
Open the event simulator and simulate three events for the
TempStream
input stream of theTemperatureApp
Siddhi application with the values for the attributes as given below. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event.Event deviceID roomNo temp 1 SOU5438B765
183
30
2 WES1827A876
136
42
3 NOR1633B231
899
41
Only the second event is logged because only that event matched both the criteria. The event is logged as follows:
Modifying, removing, and replacing attributes¶
The input data may include attributes that are not required in order to generate the required output, attributes with values that need to be updated or replaced before further processing.
Assume that in the previous example, you do not need the device ID for further processing, and you need to present the room numbers as string values instead of integer values. To do this, follow the procedure below:
@info(name = 'CleaningData')
from FilteredResultsStream
select cast(roomNo string) as roomNo, temp
insert into CleansedDataStream;
cast()
function presents the value for the roomNo
attribute as a string value although it is received as an integer value. The select
clause excludes the deviceID
attribute.
Try it out¶
To try out the above example, follow the steps below:
-
Open a new file. Then add and save the following Siddhi application.
In this Siddhi application, the@App:name('TemperatureApp') define stream TempStream (deviceID string, roomNo int, temp double); @sink(type = 'log', prefix = "CleanedData", @map(type = 'passThrough')) define stream CleansedDataStream (roomNo string, temp double); @info(name = 'CleaningData') from TempStream select cast(roomNo, "string") as roomNo, temp insert into CleansedDataStream;
Temp Stream
has an attribute nameddeviceID
, but it is not selected to be included in the output events. TheroomNo
attribute is cast as an string value viacast(roomNo, "string")
. This means although the value for this attribute is received as an integer, it is presented as a string value in the output. -
Open the event simulator and simulate an event for the
TempStream
input stream of theTemperatureApp
Siddhi application with the values for the attributes as given below. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event.deviceID roomNo temp SOU5438B765
183
30
The output is logged as follows:
Handling attributes with null values¶
In the example of processing temperature readings, assume that some events arrive with null values for the deviceID
attribute, and you want to assign the value unknown
in such scenarios. This can be achieved by writing a query as follows:
@info(name = 'AddingMissingValues')
from FilteredResultsStream
select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp
insert into CleansedDataStream
Try it out.¶
To try out the above example, follow the steps below:
-
Open a new file. Then add and save the following Siddhi application.
In this Siddhi application, the@App:name("TemperatureApp") @App:description("Description of the plan") define stream TempStream (deviceID string, roomNo string, temp double); @sink(type = 'log', prefix = "Cleansed Data", @map(type = 'passThrough')) define stream CleansedDataStream (deviceID string, roomNo string, temp double); @info(name = 'AddingMissingValues') from TempStream select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp insert into CleansedDataStream;
Temp Stream
has an attribute nameddeviceID
, but it is not selected to be included in the output events. TheroomNo
attribute is cast as an string value viacast(roomNo, "string")
. This means although the value for this attribute is received as an integer, it is presented as a string value in the output. -
Open the event simulator and simulate an event for the
TempStream
input stream of theTemperatureApp
Siddhi application with the values for the attributes as given below. For instructions to simulate single events, see Testing Siddhi Applications - Simulating a single event.deviceID roomNo temp Select the Is Null check box for this sttribute 183
30
The output is logged as follows: