Receiving JSON Events via RabbitMQ¶
Purpose:¶
This application demonstrates how to configure WSO2 Streaming Integrator Tooling to receive events to the SweetProductionStream
via RabbitMQ broker in JSON format using default mapping and log the events in LowProductionAlertStream
to the output console.
Prerequisites:¶
- Save this sample.
- Install the RabbitMQ server using the following command
- To enable RabbitMQ Management Console, run the following:
- To start the service, issue the following command:
Executing the Sample:¶
- Start the Siddhi application by clicking on 'Run'.
- If the Siddhi application starts successfully, the following messages would be shown on the console. Check whether the exchange 'rabbitmq_sample' is created in the RabbitMQ server or not. To check that you can visit http://localhost:15672/.
Testing the Sample:¶
Publish events with RabbitMQ sample publisher:¶
Open a terminal and issue command from the {WSO2SIHome}/samples/sample-clients/rabbitmq-producer
and run ant
command.
If you want to publish custom number of events, you need to run ant
command as follows:
Viewing the Results:¶
See the output. Following message would be shown on the console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveRabbitmqInJSONFormat : LowProducitonAlertStream : Event{timestamp=1513233900122, data=[Lollipop, 6186.0], isExpired=false}
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveRabbitmqInJSONFormat : LowProducitonAlertStream : Event{timestamp=1513233901122, data=[Donut, 7904.0], isExpired=false}
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveRabbitmqInJSONFormat : LowProducitonAlertStream : Event{timestamp=1513233902124, data=[Honeycomb, 4495.0], isExpired=false}
INFO {io.siddhi.core.stream.output.sink.LogSink} - ReceiveRabbitmqInJSONFormat : LowProducitonAlertStream : Event{timestamp=1513233903125, data=[Donut, 1393.0], isExpired=false}
@App:name("ReceiveRabbitmqInJSONFormat")
@app:description("Receives the events from the rabbitmq broker using the AMQP protocol.")
@source(type='rabbitmq', uri = 'amqp://guest:guest@localhost:5672', exchange.name = 'rabbitmq_sample', @map(type='json'))
define stream SweetProductionStream (name string, amount double);
@sink(type='log')
define stream LowProductionAlertStream (name string, amount double);
-- passthrough data in the SweetProductionStream into LowProductionAlertStream
@info(name='query1')
from SweetProductionStream
select *
insert into LowProductionAlertStream;