Summarizing Data¶
Summarizing data refers to obtaining aggregates in an incremental manner for a specified set of time periods.
Performing clock-time based summarization¶
Performing clock time-based summarizations involve two steps:
-
Calculating the aggregations for the selected time granularities and storing the results.
-
Retrieving previously calculated aggregations for selected time granularities.
Calculating the aggregations for the selected time granularities and storing the results¶
To understand this, consider a scenario where the production statistics generated by the Sweet Production Factory processed. The results need to be summarized for different time granularities and saved so that they can be later retrieved for periodical production analysis. To do this, you can create a Siddhi application as shown below.
@App:name('ProductionAggregatesApp')
define stream ProductionStream (name string, amount double, timestamp long);
@store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/Production", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
define aggregation ProductionAggregation
from ProductionStream
select name, amount, sum(amount) as total, avg(amount) as average
group by name
aggregate by timestamp every sec ... year;
-
The stream
In addition to the
name
andamount
attributes to capture the name of the product and the amount produced, the stream has an attribute namedtimestamp
to capture the time at which the production run takes place. he aggregations are executed based on this time. This attribute's value could either be a long value (reflecting the Unix timestamp in milliseconds), or a string value adhering to one of the following formats:-
<YYYY>-<MM>-<dd> <HH>:<mm>:<ss> <Z>
: This format can be used if the timezone needs to be specified explicitly. Here the ISO 8601 UTC offset must be provided. e.g., +05:30 reflects the India Time Zone. If time is not in GMT, this value must be provided. -
<yyyy>-<MM>-<dd> <HH>:<mm>:<ss>
: This format can be used if the timezone is in GMT.
-
-
The aggregation
You are defining the
ProductionAggregation
aggregation to store the aggregated values.A store is connected to it via the
@store
annotation. If the store definition is not provided, the data is stored in-memory. The aggregations stored in-memory can be lost when the Siddhi application is stopped. -
Siddhi query
The Siddhi query gets the production events from the
ProductionStream
stream, calculates the total and the average, and aggregates them everysec...year
. This means the production total and the average is calculated per second, per minute, per hour, per day, per month, and per year. -
Persisted Aggregations.
With Persisted aggregation, the aggregation for higher granularities will be executing on top of the database at the end of each time granularity(Day - at the end of the day, Month - at the end of the month, Year - at the end of the year). This is the recommended approach if the aggregation group by elements have lots of unique combinations.
-
Enabling Persisted Aggregation
The persisted aggregation can be enabled by adding the @persistedAggregation(enable="true") annotation on top of the aggregation definition. Furthermore, in order to execute the aggregation query, the cud function which is there in siddhi-store-rdbms is used. So in order to enable the "cud" operations, please add the following configuration on the deployment.yaml file.
siddhi: extensions: - extension: name: cud namespace: rdbms properties: perform.CUD.operations: true
In order to use persisted aggregation, A datasource needs to configured through deployment.yaml file and it should be pointed out in @store annotation of the aggregation definition.
Furthermore when using persisted aggregation with MySQL, please provide the aggregation processing timezone in JDBC URL since by default MySQL database will use server timezone for some time-related conversions which are there in an aggregation query.
Also when using persisted aggregation with Oracle, add below configuration in the datasource configuration,
For an example please refer to the following query which will be executed on the database to update the table for below sample Aggregation ,connectionInitSql: alter session set NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS' eg: - name: APIM_ANALYTICS_DB description: "The datasource used for APIM statistics aggregated data." jndiConfig: name: jdbc/APIM_ANALYTICS_DB definition: type: RDBMS configuration: jdbcUrl: 'jdbc:oracle:thin:@localhost:1521:XE' username: 'root' password: '123' driverClassName: oracle.jdbc.OracleDriver maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 FROM DUAL connectionInitSql: alter session set NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS' validationTimeout: 30000 isAutoCommit: false
@persistedAggregation(enable="true") define aggregation ResponseStreamAggregation from processedResponseStream select api, version, apiPublisher, applicationName, protocol, consumerKey, userId, context, resourcePath, responseCode, avg(serviceTime) as avg_service_time, avg(backendTime) as avg_backend_time, sum(responseTime) as total_response_count, time1, epoch, eventTime group by api, version, apiPublisher, applicationName, protocol, consumerKey, userId, context, resourcePath, responseCode, time1, epoch aggregate by eventTime every min;
Retrieving previously calculated aggregations for selected time granularities¶
To retrieve the aggregates stored via the Siddhi application in the previous section, you need to create a new stream for data retrieval and join it with the aggregation that you previously created. In this example, let's assume that you need to production statistics for the period 12th October 2020 to 16th October 2020.
For this, you can update the ProductionAggregatesApp
Siddhi application that you previously created as follows:
-
Define a stream in which you want to generate the event (request) to retrieve data as follows.
-
Define a query that specifies the criteria for retrieving data as follows.
Observe the following in the above Siddhi query:@info(name = 'RetrievingAggregates') from ProductionSummaryRetrievalStream as b join ProductionAggregation as a on a.name == b.name within "2020-10-12 00:00:00 +00:00", "2020-10-17 00:00:00 +00:00" per "days" select a.name, a.total, a.average insert into ProductionSummaryStream;
-
The join
The above query joins the
ProductionsSummaryRetyrievalStream
stream and theProductionAggregation
aggregation. TheProductionsSummaryRetyrievalStream
stream is assignedb
as the short name, and the aggregation is assigneda
. Therefore,a.name == b.name
specifies that a matching event is identified when the value for thename
attribute is the same.For more information about how to perform joins, see Enriching Data.
-
within
clauseThis specifies the time interval for which the aggregates should be retrieved. You are requesting data for the period between 00.00 AM of 12th October 2020 and 00.00 AM of 17th October 2020 so that the days 12th, 13th, 14th, 15th, and the 16th of October are covered.
-
per
clauseThis specifies that the aggregates should be summarized per day.
-
select
clauseThis selects the
name
,total
andaverage
attributes to be selected from the aggregate to be included in the output event.
The output event is inserted into the
ProductionSummaryStream
stream. -
Try it out¶
To try out the example given above, follow the procedure below:
-
Download and install MySQL. Then start the MySQL server and create a new database in it by issuing the following command:
CREATE SCHEMA production;
Then open the
<SI_TOOLING_HOME>/conf/server/deployment.yaml
file and add the following datasource configuration underdatasources
.- name: Production_DB description: The datasource used for Production Statistics jndiConfig: name: jdbc/production definition: type: RDBMS configuration: jdbcUrl: 'jdbc:mysql://localhost:3306/production?useSSL=false' username: root password: root driverClassName: com.mysql.jdbc.Driver minIdle: 5 maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: false
-
Open a new file in Streaming Integrator Tooling. Then add and save the following Siddhi application.
@App:name('ProductionAggregatesApp') @App:description('Description of the plan') define stream ProductionStream (name string, amount double, timestamp long); define stream ProductionSummaryRetrievalStream (name string); @sink(type = 'log', prefix = "Production Summary", @map(type = 'text')) define stream ProductionSummaryStream (name string, total double, average double); @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/production?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver") define aggregation ProductionAggregation from ProductionStream select name, amount, sum(amount) as total, avg(amount) as average aggregate by timestamp every seconds...years; @info(name = 'RetrievingAggregates') from ProductionSummaryRetrievalStream as b join ProductionAggregation as a on a.name == b.name within "2020-10-12 00:00:00 +00:00", "2020-10-17 00:00:00 +00:00" per "days" select a.name as name, a.total as total, a.average as average insert into ProductionSummaryStream;
This is the complete ProductionAggregatesApp
Siddhi application with the queries given in the examples to store and retrieve aggregates. You are annotating a sink of the log
type to the ProductionSummaryStream
stream to which the retrieved aggregates are sent so that you can view the retrieved information in the terminal logs.
-
To store aggregates, simulate five events with the following values for the
ProductionStream
stream via the Event Simulator tool. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.name amount timestamp brownie
90
1602489419000
brownie
90
1602488519000
eclairs
95
1602661319000
brownie
100
1602747719000
brownie
120
1602834119000
The above events are stored in the
production
database that you previously defined. -
To retrieve the information you stored, simulate an event for the
ProductionSummaryRetrievalStream
stream withbrownie
as the value for `name'. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.The Streaming Integrator Tooling terminal displays the following logs.
Performing short term summarizations¶
This section explains how to apply Siddhi logic to process a subset of events received to a stream based on time or the number of events. This is achieved via Siddi Windows. The window can apply to a batch of events or in a sliding manner.
The following are a few examples of how short time summarizations can be performed based on time or the number of events.
-
Performing a time-based summarization in a sliding manner
This involves selecting a subset of events based on a specified duration of time in a sliding manner as illustrated via an example in the diagram below.
For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product every four minutes in a sliding manner. To address this, you can write a query as follows.
Here,from ProductionStream#window.time(4 min) select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg group by name insert into TimeSlidingOutputStream;
#window.time(4 min)
represents a sliding time window of four minutes. Based on this, the total for the last four minutes is calculated and presented aspastFourMinTotal
, and the average for the last four minutes is calculated and presented aspastFourMinAvg
. -
Performing a time-based summarization in a tumbling manner
This involves selecting a subset of events based on a specified duration of time in a tumbling manner as illustrated via an example in the diagram below.
For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product every four minutes in a tumbling manner. To address this, you can write a query as follows.
Here,from ProductionStream#window.timeBatch(4 min) select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg group by name insert into TimeBatchOutputStream;
#window.timeBatch(4 min)
represents a tumbling time window of four minutes. Based on this, the total for the last four minutes is calculated and presented aspastFourMinTotal
, and the average for the last four minutes is calculated and presented aspastFourMinAvg
. -
Performing a length-based summarization in a sliding manner
This involves selecting a batch of events based on the number of events specified in a sliding manner as illustrated via an example in the diagram below.
For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product for every three events in a sliding manner. To address this, you can write a query as follows.
Here,from ProductionStream#window.length(3) select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg group by name insert into LengthSlidingOutputStream;
#window.length(3)
represents a sliding length window of 3 events. Based on this, the total for the last three events is calculated and presented aslastBatchTotal
, and the average for the last three events is calculated and presented aslastBatchAvg
. -
Performing a length-based summarization to a batch of events
This involves selecting a batch of events based on the number of events specified in a sliding manner as illustrated via an example in the diagram below.
For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product for every three events in a sliding manner. To address this, you can write a query as follows.
Here,from ProductionStream#window.lengthBatch(3) select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg group by name insert into LengthBatchOutputStream;
#window.lengthBatch(3)
represents a sliding length window of 3 events. Based on this, the total for the last three events is calculated and presented aslastBatchTotal
, and the average for the last three events is calculated and presented aslastBatchAvg
.
Try it out¶
To try out the four sample queries given above, follow the steps below:
-
Open a new file. Then add and save the following Siddhi application.
@App:name('ProductionSummaryApp') @sink(type = 'log', prefix = "Four Minute Summary", @map(type = 'text')) define stream TimeSlidingOutputStream (name string, pastFourMinTotal double, pastFourMinAvg double); @sink(type = 'log', prefix = "Three Production Run Summary", @map(type = 'passThrough')) define stream LengthSlidingOutputStream (name string, lastBatchTotal double, lastBatchAvg double); define stream ProductionStream (name string, amount double, timestamp long); @sink(type = 'log', prefix = "Four Minute Summary - Batch", @map(type = 'text')) define stream TimeBatchOutputStream (name string, pastFourMinTotal double, pastFourMinAvg double); @sink(type = 'log', prefix = "Three Production Run Summary - Batch", @map(type = 'passThrough')) define stream LengthBatchOutputStream (name string, lastBatchTotal double, lastBatchAvg double); @info(name = 'query1') from ProductionStream#window.time(4 min) select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg group by name insert into TimeSlidingOutputStream; @info(name = 'query2') from ProductionStream#window.timeBatch(4 min) select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg group by name insert into TimeBatchOutputStream; @info(name = 'query3') from ProductionStream#window.length(3) select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg group by name insert into LengthSlidingOutputStream; @info(name = 'query4') from ProductionStream#window.lengthBatch(3) select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg group by name insert into LengthBatchOutputStream;
The above Siddhi application has all four sample queries used as examples in this section. Those queries insert their output into four different output streams connected to log sinks to log the output of each query.
-
Simulate eight events for the
ProductionStream
input stream of the above Siddhi application as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.name amount timestamp doughnuts
10
1602486060000
doughnuts
10
1602486120000
doughnuts
10
1602486180000
doughnuts
10
1602486240000
doughnuts
20
1602486300000
doughnuts
20
1602486360000
doughnuts
20
1602486420000
doughnuts
30
1602486480000
The above simulation results in the following logs.
Supported methods of summarization¶
WSO2 Streaming Integrator supports the following methods of summarization via Siddhi extensions. For more information about a summarization method, click on the relevant Siddhi link.