Receiving Data via TCP and Preprocessing¶
Purpose¶
This application demonstrates how to receive events via TCP transport and carryout data pre-processing with numerous Siddhi extensions (e.g.. string extension, time extension). For more information on Siddhi extensions please refer to "https://wso2.github.io/siddhi/extensions/". In this sample, a composite ID is obtained using string concatenation and the time format of the incoming event
Prerequisites¶
- Ensure that MySQL is installed on your machine.
- Add the MySQL JDBC driver into
{WSO2_SI_HOME}/lib
as follows:- Download the JDBC driver from: https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz.
- Unzip the archive.
- Copy
mysql-connector-java-5.1.45-bin.jar
to{WSO2_SI_Home}/lib
directory.
- Create a database named
sampleDB
in MySQL. This database is referred to withjdbc:mysql://localhost:3306/sampleDB
url. - In the store configuration of this application, replace 'username' and 'password' values with your MySQL credentials.
- Save this sample.
Executing the Sample¶
- Start the Siddhi application by clicking on 'Run'.
- If the Siddhi application starts successfully, the following messages would be shown on the console.
Testing the Sample¶
Navigate to {WSO2SIHome}/samples/sample-clients/tcp-client
and run the ant
command as follows.
If you want to publish custom number of events, you need to run ant
command as follows.
Viewing the Results¶
Check the ProcessedSweetProductionTable
created in sampleDB
. You would be able to see the pre-processed data written to the table
@App:name("SweetProductionDataPreprocessing")
@App:description('Collect data via TCP transport and pre-process')
@source(type='tcp',
context='SweetProductionStream',
port='9892',
@map(type='binary'))
define stream SweetProductionStream (name string, amount double);
@Store(type="rdbms",
jdbc.url="jdbc:mysql://localhost:3306/sampleDB",
username="root",
password="mysql" ,
jdbc.driver.name="com.mysql.jdbc.Driver")
@PrimaryKey("compositeID")
define table ProcessedSweetProductionTable (compositeID string, amount double, date string);
--Process smart home data by concatenating the IDs and formatting the time
@info(name='query1')
from SweetProductionStream
select str:concat(str:lower(name), "::", time:currentTimestamp()) as compositeID, amount, time:currentDate() as date
insert into ProcessedSweetProductionTable;