@App:name("HelloKafka")@App:description('Consume events from a Kafka Topic and publish to a different Kafka Topic')@source(type='kafka',topic.list='kafka_topic',partition.no.list='0',threading.option='single.thread',group.id="group",bootstrap.servers='54.177.187.50:9092',@map(type='json'))definestreamSweetProductionStream(namestring,amountdouble);@sink(type='log')definestreamKafkaSourceThroughputStream(countlong);fromSweetProductionStream#window.timeBatch(5sec)selectcount(name)/5ascountinsertintoKafkaSourceThroughputStream;
@App:name("HttpSource")@App:description('Consume events from http clients')@source(type='http',worker.count='20',receiver.url='http://0.0.0.0:8082/service',@map(type='json'))definestreamSweetProductionStream(namestring,amountdouble);@sink(type='log')definestreamHttpSourceThroughputStream(tpslong);fromSweetProductionStream#window.timeBatch(5sec)selectcount(amount)/5astpsinsertintoHttpSourceThroughputStream;