Skip to content

Summarizing Data

Summarizing data refers to obtaining aggregates in an incremental manner for a specified set of time periods.

Performing clock-time based summarization

Performing clock time-based summarizations involve two steps:

  1. Calculating the aggregations for the selected time granularities and storing the results.

  2. Retrieving previously calculated aggregations for selected time granularities.

Calculating the aggregations for the selected time granularities and storing the results

To understand this, consider a scenario where the production statistics generated by the Sweet Production Factory processed. The results need to be summarized for different time granularities and saved so that they can be later retrieved for periodical production analysis. To do this, you can create a Siddhi application as shown below.

@App:name('ProductionAggregatesApp')

define stream ProductionStream (name string, amount double, timestamp long);

@store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/Production", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
define aggregation ProductionAggregation
from ProductionStream
select name, amount, sum(amount) as total, avg(amount) as average 
group by name 
aggregate by timestamp every sec ... year;
Observe the following in the above Siddhi application:

  • The stream

    In addition to the name and amount attributes to capture the name of the product and the amount produced, the stream has an attribute named timestamp to capture the time at which the production run takes place. he aggregations are executed based on this time. This attribute's value could either be a long value (reflecting the Unix timestamp in milliseconds), or a string value adhering to one of the following formats:

    • <YYYY>-<MM>-<dd> <HH>:<mm>:<ss> <Z>: This format can be used if the timezone needs to be specified explicitly. Here the ISO 8601 UTC offset must be provided. e.g., +05:30 reflects the India Time Zone. If time is not in GMT, this value must be provided.

    • <yyyy>-<MM>-<dd> <HH>:<mm>:<ss>: This format can be used if the timezone is in GMT.

  • The aggregation

    You are defining the ProductionAggregation aggregation to store the aggregated values.

    A store is connected to it via the @store annotation. If the store definition is not provided, the data is stored in-memory. The aggregations stored in-memory can be lost when the Siddhi application is stopped.

  • Siddhi query

    The Siddhi query gets the production events from the ProductionStream stream, calculates the total and the average, and aggregates them every sec...year. This means the production total and the average is calculated per second, per minute, per hour, per day, per month, and per year.

  • Persisted Aggregations.

With Persisted aggregation, the aggregation for higher granularities will be executing on top of the database at the end of each time granularity(Day - at the end of the day, Month - at the end of the month, Year - at the end of the year). This is the recommended approach if the aggregation group by elements have lots of unique combinations.

  • Enabling Persisted Aggregation

    The persisted aggregation can be enabled by adding the @persistedAggregation(enable="true") annotation on top of the aggregation definition. Furthermore, in order to execute the aggregation query, the cud function which is there in siddhi-store-rdbms is used. So in order to enable the "cud" operations, please add the following configuration on the deployment.yaml file.

       siddhi:
         extensions:
           -
             extension:
               name: cud
               namespace: rdbms
               properties:
                 perform.CUD.operations: true
    

    In order to use persisted aggregation, A datasource needs to configured through deployment.yaml file and it should be pointed out in @store annotation of the aggregation definition.

    Furthermore when using persisted aggregation with MySQL, please provide the aggregation processing timezone in JDBC URL since by default MySQL database will use server timezone for some time-related conversions which are there in an aggregation query.

        jdbc:mysql://localhost:3306/TEST_DB?useSSL=false&tz=useLegacyDatetimeCode=false&serverTimezone=UTC
    

    Also when using persisted aggregation with Oracle, add below configuration in the datasource configuration,

    connectionInitSql: alter session set NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'
    
    eg:
    
     - name: APIM_ANALYTICS_DB
       description: "The datasource used for APIM statistics aggregated data."
       jndiConfig:
         name: jdbc/APIM_ANALYTICS_DB
         definition:
           type: RDBMS
           configuration:
             jdbcUrl: 'jdbc:oracle:thin:@localhost:1521:XE'
             username: 'root'
             password: '123'
             driverClassName: oracle.jdbc.OracleDriver
             maxPoolSize: 50
             idleTimeout: 60000
             connectionTestQuery: SELECT 1 FROM DUAL
             connectionInitSql: alter session set NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'
             validationTimeout: 30000
             isAutoCommit: false
    
    For an example please refer to the following query which will be executed on the database to update the table for below sample Aggregation ,

    @persistedAggregation(enable="true")
    define aggregation ResponseStreamAggregation
    from processedResponseStream
    select api, version, apiPublisher, applicationName, protocol, consumerKey, userId, context, resourcePath, responseCode, 
        avg(serviceTime) as avg_service_time, avg(backendTime) as avg_backend_time, sum(responseTime) as total_response_count, time1, epoch, eventTime
    group by api, version, apiPublisher, applicationName, protocol, consumerKey, userId, context, resourcePath, responseCode, time1, epoch
    aggregate by eventTime every min;
    

Retrieving previously calculated aggregations for selected time granularities

To retrieve the aggregates stored via the Siddhi application in the previous section, you need to create a new stream for data retrieval and join it with the aggregation that you previously created. In this example, let's assume that you need to production statistics for the period 12th October 2020 to 16th October 2020.

For this, you can update the ProductionAggregatesApp Siddhi application that you previously created as follows:

  1. Define a stream in which you want to generate the event (request) to retrieve data as follows.

    define stream ProductionSummaryRetrievalStream (name string);
    
  2. Define a query that specifies the criteria for retrieving data as follows.

    @info(name = 'RetrievingAggregates') 
    from ProductionSummaryRetrievalStream as b join ProductionAggregation as a
    on a.name == b.name 
    within "2020-10-12 00:00:00 +00:00", "2020-10-17 00:00:00 +00:00" 
    per "days" 
    select a.name, a.total, a.average 
    insert into ProductionSummaryStream;
    
    Observe the following in the above Siddhi query:

    • The join

      The above query joins the ProductionsSummaryRetyrievalStream stream and the ProductionAggregation aggregation. The ProductionsSummaryRetyrievalStream stream is assigned b as the short name, and the aggregation is assigned a. Therefore, a.name == b.name specifies that a matching event is identified when the value for the name attribute is the same.

      For more information about how to perform joins, see Enriching Data.

    • within clause

      This specifies the time interval for which the aggregates should be retrieved. You are requesting data for the period between 00.00 AM of 12th October 2020 and 00.00 AM of 17th October 2020 so that the days 12th, 13th, 14th, 15th, and the 16th of October are covered.

    • per clause

      This specifies that the aggregates should be summarized per day.

    • select clause

      This selects the name, total and average attributes to be selected from the aggregate to be included in the output event.

    The output event is inserted into the ProductionSummaryStream stream.

Try it out

To try out the example given above, follow the procedure below:

  1. Download and install MySQL. Then start the MySQL server and create a new database in it by issuing the following command:

    CREATE SCHEMA production;

    Then open the <SI_TOOLING_HOME>/conf/server/deployment.yaml file and add the following datasource configuration under datasources.

      - name: Production_DB
        description: The datasource used for Production Statistics
        jndiConfig:
          name: jdbc/production
        definition:
          type: RDBMS
          configuration:
            jdbcUrl: 'jdbc:mysql://localhost:3306/production?useSSL=false'
            username: root
            password: root
            driverClassName: com.mysql.jdbc.Driver
            minIdle: 5
            maxPoolSize: 50
            idleTimeout: 60000
            connectionTestQuery: SELECT 1
            validationTimeout: 30000
            isAutoCommit: false
    
  2. Start and Access Streaming Integrator Tooling.

  3. Open a new file in Streaming Integrator Tooling. Then add and save the following Siddhi application.

    @App:name('ProductionAggregatesApp')
    @App:description('Description of the plan')
    
    define stream ProductionStream (name string, amount double, timestamp long);
    
    define stream ProductionSummaryRetrievalStream (name string);
    
    @sink(type = 'log', prefix = "Production Summary",
        @map(type = 'text'))
    define stream ProductionSummaryStream (name string, total double, average double);
    
    
    @store(type = 'rdbms', jdbc.url = "jdbc:mysql://localhost:3306/production?useSSL=false", username = "root", password = "root", jdbc.driver.name = "com.mysql.jdbc.Driver")
    define aggregation ProductionAggregation
    from ProductionStream
    select name, amount, sum(amount) as total, avg(amount) as average
        aggregate by timestamp every seconds...years;
    
    @info(name = 'RetrievingAggregates')
    from ProductionSummaryRetrievalStream as b 
    join ProductionAggregation as a 
        on a.name == b.name
    within "2020-10-12 00:00:00 +00:00", "2020-10-17 00:00:00 +00:00"
    per "days" 
    select a.name as name, a.total as total, a.average as average 
    insert into ProductionSummaryStream;
    

This is the complete ProductionAggregatesApp Siddhi application with the queries given in the examples to store and retrieve aggregates. You are annotating a sink of the log type to the ProductionSummaryStream stream to which the retrieved aggregates are sent so that you can view the retrieved information in the terminal logs.

  1. To store aggregates, simulate five events with the following values for the ProductionStream stream via the Event Simulator tool. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.

    name amount timestamp
    brownie 90 1602489419000
    brownie 90 1602488519000
    eclairs 95 1602661319000
    brownie 100 1602747719000
    brownie 120 1602834119000

    The above events are stored in the production database that you previously defined.

  2. To retrieve the information you stored, simulate an event for the ProductionSummaryRetrievalStream stream with brownie as the value for `name'. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.

    The Streaming Integrator Tooling terminal displays the following logs.

    Aggregate Logs

Performing short term summarizations

This section explains how to apply Siddhi logic to process a subset of events received to a stream based on time or the number of events. This is achieved via Siddi Windows. The window can apply to a batch of events or in a sliding manner.

The following are a few examples of how short time summarizations can be performed based on time or the number of events.

  • Performing a time-based summarization in a sliding manner

    This involves selecting a subset of events based on a specified duration of time in a sliding manner as illustrated via an example in the diagram below.

    Time Sliding Window

    For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product every four minutes in a sliding manner. To address this, you can write a query as follows.

    from ProductionStream#window.time(4 min)
    select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg
    group by name
    insert into TimeSlidingOutputStream;
    
    Here, #window.time(4 min) represents a sliding time window of four minutes. Based on this, the total for the last four minutes is calculated and presented as pastFourMinTotal, and the average for the last four minutes is calculated and presented as pastFourMinAvg.

  • Performing a time-based summarization in a tumbling manner

    This involves selecting a subset of events based on a specified duration of time in a tumbling manner as illustrated via an example in the diagram below.

    Time Batch Window

    For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product every four minutes in a tumbling manner. To address this, you can write a query as follows.

    from ProductionStream#window.timeBatch(4 min)
    select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg
    group by name
    insert into TimeBatchOutputStream;
    
    Here, #window.timeBatch(4 min) represents a tumbling time window of four minutes. Based on this, the total for the last four minutes is calculated and presented as pastFourMinTotal, and the average for the last four minutes is calculated and presented as pastFourMinAvg.

  • Performing a length-based summarization in a sliding manner

    This involves selecting a batch of events based on the number of events specified in a sliding manner as illustrated via an example in the diagram below.

    Length Sliding Window

    For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product for every three events in a sliding manner. To address this, you can write a query as follows.

    from ProductionStream#window.length(3)
    select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg
    group by name
    insert into LengthSlidingOutputStream;
    
    Here, #window.length(3) represents a sliding length window of 3 events. Based on this, the total for the last three events is calculated and presented as lastBatchTotal, and the average for the last three events is calculated and presented as lastBatchAvg.

  • Performing a length-based summarization to a batch of events

    This involves selecting a batch of events based on the number of events specified in a sliding manner as illustrated via an example in the diagram below.

    Length Batch Window

    For example, consider that the factory foreman of a sweet factory wants to calculate the production total and average per product for every three events in a sliding manner. To address this, you can write a query as follows.

    from ProductionStream#window.lengthBatch(3)
    select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg
    group by name
    insert into LengthBatchOutputStream;
    
    Here, #window.lengthBatch(3) represents a sliding length window of 3 events. Based on this, the total for the last three events is calculated and presented as lastBatchTotal, and the average for the last three events is calculated and presented as lastBatchAvg.

Try it out

To try out the four sample queries given above, follow the steps below:

  1. Start and Access Streaming Integrator Tooling.

  2. Open a new file. Then add and save the following Siddhi application.

    @App:name('ProductionSummaryApp')
    
    
    @sink(type = 'log', prefix = "Four Minute Summary",
        @map(type = 'text'))
    define stream TimeSlidingOutputStream (name string, pastFourMinTotal double, pastFourMinAvg double);
    
    @sink(type = 'log', prefix = "Three Production Run Summary",
        @map(type = 'passThrough'))
    define stream LengthSlidingOutputStream (name string, lastBatchTotal double, lastBatchAvg double);
    
    define stream ProductionStream (name string, amount double, timestamp long);
    
    @sink(type = 'log', prefix = "Four Minute Summary - Batch",
        @map(type = 'text'))
    define stream TimeBatchOutputStream (name string, pastFourMinTotal double, pastFourMinAvg double);
    
    @sink(type = 'log', prefix = "Three Production Run Summary - Batch",
        @map(type = 'passThrough'))
    define stream LengthBatchOutputStream (name string, lastBatchTotal double, lastBatchAvg double);
    
    @info(name = 'query1')
    from ProductionStream#window.time(4 min) 
    select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg 
        group by name 
    insert into TimeSlidingOutputStream;
    
    @info(name = 'query2')
    from ProductionStream#window.timeBatch(4 min) 
    select name, sum(amount) as pastFourMinTotal, avg(amount) as pastFourMinAvg 
        group by name 
    insert into TimeBatchOutputStream;
    
    @info(name = 'query3')
    from ProductionStream#window.length(3) 
    select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg 
        group by name 
    insert into LengthSlidingOutputStream;
    
    @info(name = 'query4')
    from ProductionStream#window.lengthBatch(3) 
    select name, sum(amount) as lastBatchTotal, avg(amount) as lastBatchAvg 
        group by name 
    insert into LengthBatchOutputStream;
    

The above Siddhi application has all four sample queries used as examples in this section. Those queries insert their output into four different output streams connected to log sinks to log the output of each query.

  1. Simulate eight events for the ProductionStream input stream of the above Siddhi application as follows. For instructions to simulate events, see Testing Siddhi Applications - Simulating Events.

    name amount timestamp
    doughnuts 10 1602486060000
    doughnuts 10 1602486120000
    doughnuts 10 1602486180000
    doughnuts 10 1602486240000
    doughnuts 20 1602486300000
    doughnuts 20 1602486360000
    doughnuts 20 1602486420000
    doughnuts 30 1602486480000

    The above simulation results in the following logs.

    Summarization Logs

Supported methods of summarization

WSO2 Streaming Integrator supports the following methods of summarization via Siddhi extensions. For more information about a summarization method, click on the relevant Siddhi link.