Receiving JSON Events via JMS
Purpose:
This application demonstrates how to configure WSO2 Streaming Integrator Tooling to receive events to the SweetProductionStream
via Jms transport in Json format using default mapping and log the events in LowProductionAlertStream
to the output console.
Prerequisites:
Setup ActiveMQ
Download activemq-client-5.x.x.jar
(http://central.maven.org/maven2/org/apache/activemq/activemq-client/5.9.0/activemq-client-5.9.0.jar)
Download apache-activemq-5.x.x-bin.zip
(http://archive.apache.org/dist/activemq/apache-activemq/5.9.0/apache-activemq-5.9.0-bin.zip)
ActiveMQ activemq-client-5.x.x.jar
lib to be added and converted to OSGI (See Note: To convert ActiveMQ lib to OSGI).
Unzip the apache-activemq-5.x.x-bin.zip
and copy the following ActiveMQ libs in apache-activemq-5.x.x/lib
to {WSO2SIHome}/samples/sample-clients/lib
and {WSO2SIHome}/lib
.
hawtbuf-1.9.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
Save this sample.
If there is no syntax error, the following message is shown on the console:
Siddhi App ReceiveJMSInJsonFormat successfully deployed.
Note:
To convert ActiveMQ lib to OSGI,
1. Navigate to {WSO2SIHome}/bin and run the following command:
* For Linux:
./icf-provider.sh org.apache.activemq.jndi.ActiveMQInitialContextFactory <Downloaded Jar Path>/activemq-client-5.x.x.jar <Output Jar Path>
* For Windows:
./icf-provider.bat org.apache.activemq.jndi.ActiveMQInitialContextFactory <Downloaded Jar Path>\a ctivemq-client-5.x.x.jar <Output Jar Path>
* Provide privileges if necessary using chmod +x icf-provider.(sh|bat)
.
* Also, this will register the InitialContextFactory
implementation according to the OSGi JNDI spec.
2. If converted successfully then it will create 'activemq-client-5.x.x' directory in the with OSGi converted and original jars:
- activemq-client-5.x.x.jar
(Original Jar)
- activemq-client-5.x.x_1.0.0.jar
(OSGi converted Jar)
Also, following messages would be shown on the terminal.
- INFO: Executing 'jar uf <absolute_path>/activemq-client-5.x.x/activemq-client-5.x.x.jar -C <absolute_path>/activemq-client-5.x.x /internal/CustomBundleActivator.class'
[timestamp] org.wso2.carbon.tools.spi.ICFProviderTool addBundleActivatorHeader
- INFO: Running jar to bundle conversion [timestamp] org.wso2.carbon.tools.converter.utils.BundleGeneratorUtils convertFromJarToBundle
- INFO: Created the OSGi bundle activemq_client_5.x.x_1.0.0.jar for JAR file <absolute_path>/activemq-client-5.x.x/activemq-client-5.x.x.jar
3) You can find the OSGi converted libs in activemq-client-5.x.x
folder. You can copy activemq-client-5.x.x/activemq_client_5.x.x_1.0.0.jar
to {WSO2SIHome}/lib
and activemq-client-5.x.x/activemq-client-5.x.x.jar
to {WSO2SIHome}/samples/sample-clients/lib
.
Executing the Sample:
Navigate to {apache-activemq-5.x.x}
unzipped directory.
Provide required permissions by executing,
Create system wide configuration defaults, by executing,
sudo bin/activemq setup /etc/default/activemq
Start ActiveMQ server node using 'bin/activemq start'.
Start the Siddhi application by clicking on 'Run'.
If the Siddhi application starts successfully, the following messages are shown on the console:
ReceiveJMSInJsonFormat.siddhi - Started Successfully!
Testing the Sample:
Navigate to {WSO2SIHome}/samples/sample-clients/jms-producer
and run ant
command without arguments.
Viewing the Results:
Messages similar to the following would be shown on the editor console.
- INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveJMSInJsonFormat : OutputStream : Event{timestamp=1513617090756, data=[Cream Sandwich, 790.7842348407036], isExpired=false}
@ App : name ( 'ReceiveJMSInJsonFormat' )
@ App : description ( 'Receive events via JMS provider in JSON format with default mapping and view the output on the console.' )
@ source ( type = 'jms' ,
factory . initial = 'org.apache.activemq.jndi.ActiveMQInitialContextFactory' ,
provider . url = 'tcp://localhost:61616' ,
destination = 'jms_result_topic' ,
connection . factory . type = 'topic' ,
connection . factory . jndi . name = 'TopicConnectionFactory' ,
transport . jms . SubscriptionDurable = 'true' ,
transport . jms . DurableSubscriberClientID = 'wso2SPclient1' ,
@ map ( type = 'json' ))
define stream SweetProductionStream ( name string , amount double );
@ sink ( type = 'log' )
define stream LowProductionAlertStream ( name string , amount double );
@ info ( name = 'EventsPassthroughQuery' )
from SweetProductionStream
select *
insert into LowProductionAlertStream ;