Working with Kafka¶
Introduction¶
The Streaming Integrator can consume from a Kafka topic as well as to publish to a Kafka topic in a streaming manner.
This tutorial takes you through consuming from a Kafka topic, processing the messages, and finally, publishing output to a Kafka topic.
Before you begin¶
Prepare the server to consume from or to publish to Kafka, follow the steps below:
-
Download the Kafka broker from the Apache site and extract it. From here onwards, this directory is referred to as
<KAFKA_HOME>
. -
Create a directory named
source
in a preferred location in your machine and copy the following JARs to it from the<KAFKA_HOME>/libs
directory.-
kafka_2.12-2.3.0.jar
-
kafka-clients-2.3.0.jar
-
metrics-core-2.2.0.jar
-
scala-library-2.12.8.jar
-
zkclient-0.11.jar
-
zookeeper-3.4.14.jar
-
-
Create another directory named
destination
in a preferred location in your machine. -
To convert the Kafka JARS you copied to the
source
directory, issue the following command:
- Copy all the JARs from the
destination
directory to the<SI_HOME>/lib
directory.
Consuming data from Kafka¶
Step 1: Start Kafka¶
-
Navigate to the
<KAFKA_HOME>
directory and start a zookeeper node by issuing the following command.sh bin/zookeeper-server-start.sh config/zookeeper.properties
-
Navigate to the
<KAFKA_HOME>
directory and start Kafka server node by issuing the following command.sh bin/kafka-server-start.sh config/server.properties
Step 2: Start the Streaming Integrator¶
Navigate to the <SI_HOME>/bin
directory and issue the following command.
sh server.sh
The following log appears on the SI console when the server is started successfully.
INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - WSO2 Streaming Integrator started in 4.240 sec
Step 3: Consume from a Kafka topic¶
Let's create a basic Siddhi application to consume messages from a Kafka topic.
-
Open a text file and copy-paste following Siddhi application to it.
@App:name("HelloKafka") @App:description('Consume events from a Kafka Topic and log the messages on the console.') @source(type='kafka', topic.list='productions', threading.option='single.thread', group.id="group1", bootstrap.servers='localhost:9092', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='log') define stream OutputStream (name string, amount double); -- Query to transform the name to upper case. from SweetProductionStream select str:upper(name) as name, amount insert into OutputStream;
-
Save this file as
HelloKafka.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory.The following log appears on the SI console.
Info
You just created a Siddhi application that listens to a Kafka topic named productions
and logs any incoming messages. When logging, the name attribute of the message is converted to upper case. However, you have still not created this Kafka topic or published any messages to it. To do this, proceed to the next substep.
-
Generate some Kafka messages that the Streaming Integrator can receive by following the procedure below:
-
Create a topic named
productions
in the Kafka server. To do this, navigate to<KAFKA_HOME>
and run following command: -
Run the Kafka command line client to push a few messages to the Kafka server.
-
You are prompted to type messages in the console. Type the following in the command prompt:
This pushes a message to the Kafka Server. Then, the Siddhi application you deployed in the Streaming Integrator consumes this message. As a result, the Streaming Integrator log displays the following:
You may notice that the output message has an uppercase name:
ALMOND COOKIE
. This is because of the simple message transformation done in the Siddhi application. -
Step 4: Consume Kafka messages with an offset¶
Previously, you consumed messages from the productions
topic without specifying an offset. In other words, the Kafka offset was zero. In this section, instead of consuming with a zero offset, you specify an offset value and consume messages from that offset onwards.
For this purpose, you can configure the topic.offsets.map
parameter. Let's modify our previous Siddhi application to specify an offset value. Specify an offset value 2
so that the Siddhi application consumes messages with index 2
and above.
-
Open the
<SI_HOME>/wso2/server/deployment/siddhi-files/HelloKafka.siddhi
file and add the following new configuration parameter.The complete Siddhi application is as follows.
@App:name("HelloKafka") @App:description('Consume events from a Kafka Topic and log the messages on the console.') @source(type='kafka', topic.list='productions', threading.option='single.thread', group.id="group1", bootstrap.servers='localhost:9092', topic.offsets.map='productions=2', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='log') define stream OutputStream (name string, amount double); from SweetProductionStream select str:upper(name) as name, amount insert into OutputStream;
-
Save the file.
-
Push the following message to the Kafka server.
Note that this is the second message that you pushed (hence bearing index 1), and therefore it is not consumed by the Streaming Integrator.
-
Let's push another message (bearing index 2) to the Kafka server.
The following log appears in the Streaming Integrator Studio console.
As you configured your Siddhi application to consume messages with offset 2
, all messages bearing index 2
or above are consumed.
Step 5: Add more consumers to the consumer group¶
In our HelloKafka
Siddhi application, note the group.id
parameter. This parameter defines the Kafka consumer group.
Let's add another Siddhi application HelloKafka_2
, to add another Kafka consumer to the same consumer group.
-
Open a text file and copy-paste following Siddhi application to it.
@App:name("HelloKafka_2") @App:description('Consume events from a Kafka Topic and log the messages on the console.') @source(type='kafka', topic.list='productions', threading.option='single.thread', group.id="group1", bootstrap.servers='localhost:9092', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='log') define stream OutputStream (name string, amount double); from SweetProductionStream select str:upper(name) as name, amount insert into OutputStream;
-
Save this file as
HelloKafka_2.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory. When the Siddhi application is successfully deployed, the followingINFO
log appears in the Streaming Integrator console. -
Navigate to the
<KAFKA_HOME>
directory and run following command.
This adds another partition to the productions
Kafka topic.
-
Push following messages to the Kafka server using the Kafka Console Producer.
Observe the following logs on the SI console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka_2 : OutputStream : Event{timestamp=1562759480019, data=[DOUGHNUT, 500.0], isExpired=false}
INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562759494710, data=[DANISH PASTRY, 200.0], isExpired=false}
INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka_2 : OutputStream : Event{timestamp=1562759506252, data=[ECLAIR, 400.0], isExpired=false}
INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562759508757, data=[ECLAIR TOFFEE, 100.0], isExpired=false}
You can see that the events are being received by the two consumers in a Round Robin manner. Events received by the first consumer are logged by Siddhi application HelloKafka
, whilst events received by the second consumer are logged by the HelloKafka_2
Siddhi application.
Step 6: Assign consumers to partitions¶
In the previous step, you had two partitions for the Kafka topic and two consumers. Instead of assigning the consumers to the partitions, you allowed Kafka do the assignments. Optionally, you can assign consumers to partitions.
This option is useful if you have multiple consumers with different performance speeds, and you need to balance the load among the consumers.
Let's alter your topic to have three partitions. After that, you can assign two partitions to consumer-1
, and the remaining partition to consumer-2
.
-
Navigate to the
<KAFKA_HOME>
directory and issue following command.
This adds another partition to the productions
Kafka topic. As a result, there are three partitions in total.
-
To assign partitions to the consumers, add the
partition.no.list
parameter as shown below.@App:name("HelloKafka") @App:description('Consume events from a Kafka Topic and log the messages on the console.') -- consumer-1 @source(type='kafka', topic.list='productions', threading.option='single.thread', group.id="group1", bootstrap.servers='localhost:9092', partition.no.list='0,1', @map(type='json')) define stream SweetProductionStream1 (name string, amount double); -- consumer-2 @source(type='kafka', topic.list='productions', threading.option='single.thread', group.id="group1", bootstrap.servers='localhost:9092', partition.no.list='2', @map(type='json')) define stream SweetProductionStream2 (name string, amount double); @sink(type='log') define stream OutputStream (name string, amount double, id string); from SweetProductionStream1 select str:upper(name) as name, amount, 'consumer-1' as id insert into OutputStream; from SweetProductionStream2 select str:upper(name) as name, amount, 'consumer-2' as id insert into OutputStream;
Note that
consumer-1
is assigned partitions0
and1
, whileconsumer-2
is assigned partition2
. -
Publish some messages as follows, and see how the load is distributed among the consumers with the new partition assignments.
-
Observe the following Streaming Integrator logs. The following is displayed.
INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562851086792, data=[FORTUNE COOKIE, 100.0, consumer-1], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562851092100, data=[FROZEN YOGURT, 350.0, consumer-1], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562851094459, data=[GINGERBREAD, 450.0, consumer-2], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562851096434, data=[HOT-FUDGE SUNDAE, 150.0, consumer-1], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562851098328, data=[HOT-CHOCOLATE PUDDING, 200.0, consumer-1], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - HelloKafka : OutputStream : Event{timestamp=1562851100309, data=[ICE CREAM CAKE, 250.0, consumer-2], isExpired=false}
You can observe a pattern where the load is distributed among consumer-1
and consumer-2
in the 2:1 ratio. This is because you assigned two partitions to consumer-1
and assigned only one partition to consumer-2
.
Step 7: Publish to a Kafka topic¶
Let's create a new Siddhi application to consume from the productions
topic, filter the incoming messages based on a condition, and then publish those filtered messages to another Kafka topic.
-
Create a new topic named
bulk-orders
in the Kafka server. -
To publish the filtered messages to the
bulk-orders
Kafka topic you created, issue the following command. -
Next, let's create the Siddhi application. Open a text file, and copy-paste following Siddhi application into it.
@App:name("PublishToKafka") @App:description('Consume events from a Kafka Topic, do basic filtering and publish filtered messages to a Kafka topic.') @source(type='kafka', topic.list='productions', threading.option='single.thread', group.id="group2", bootstrap.servers='localhost:9092', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='kafka', topic='bulk-orders', bootstrap.servers='localhost:9092', partition.no='0', @map(type='json')) define stream BulkOrdersStream (name string, amount double); from SweetProductionStream[amount > 50] select * insert into BulkOrdersStream;
-
Save this file as
PublishToKafka.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory. When the Siddhi application is successfully deployed, the followingINFO
log appears in the Streaming Integrator console.INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App PublishToKafka deployed successfully
Info
The
PublishToKafka
Siddhi application consumes all the messages from theproductions
topic and populates theSweetProductionStream
stream. All the sweet production runs where the amount is greater than 100 are inserted into theBulkOrdersStream
stream. These events are pushed to thebulk-orders
Kafka topic. -
To observe the messages in the
You can see the following message in the Kafka Consumer log. These indicate the production runs of which the amount is greater than 50.bulk-orders
topic, run a Kafka Console Consumer. Then navigate to the<KAFKA_HOME>
directory and issue the following command.
Step 8: Preserve the state of the application through a system failure¶
Let's try out a scenario in which you deploy a Siddhi application to count the total number of productions.
Info
In this scenario, the SI server is required to remember the current count through system failures so that when the system is restored, the count is not reset to zero.
To achieve this, you can use the state persistence capability in the Streaming Integrator.
-
Enable state persistence feature in SI server as follows. Open the
<SI_HOME>/conf/server/deployment.yaml
file on a text editor and locate thestate.persistence
section.# Periodic Persistence Configuration state.persistence: enabled: true intervalInMin: 1 revisionsToKeep: 2 persistenceStore: org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore config: location: siddhi-app-persistence
Set
enabled
parameter totrue
and save the file. -
Enable state persistence debug logs as follows. Open the
<SI_HOME>/conf/server/log4j2.xml
file on a text editor and locate following line in it.Add following
<Logger>
element below that.Save the file.
-
Restart the Streaming Integrator server for above change to be effective.
-
Let's create a new topic named
sandwich_productions
in the Kafka server. To do this, navigate to<KAFKA_HOME>
and run following command: -
Open a text file and copy-paste following Siddhi application to it.
@App:name("CountProductions") @App:description('Siddhi application to count the total number of orders.') @source(type='kafka', topic.list='sandwich_productions', threading.option='single.thread', group.id="group3", bootstrap.servers='localhost:9092', partition.no.list='0', @map(type='json')) define stream SandwichProductionStream (name string, amount double); @sink(type='log') define stream OutputStream (totalProductions double); from SandwichProductionStream select sum(amount) as totalProductions insert into OutputStream;
-
Save this file as
CountProductions.siddhi
in the<SI_HOME>/wso2/server/deployment/siddhi-files
directory. When the Siddhi application is successfully deployed, the followingINFO
log appears in the Streaming Integrator console. -
Run the Kafka command line client to push a few messages to the Kafka server. Navigate to
<KAFKA_HOME>
and run following command: -
You are prompted to type the messages in the console. Type following in the command prompt:
The following logs appear on the SI console.
INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : OutputStream : Event{timestamp=1563903034768, data=[100.0], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - CountProductions : OutputStream : Event{timestamp=1563903034768, data=[200.0], isExpired=false}
These logs print the sandwich production count. Note that the current count of sandwich productions is being printed as
200
in the second log. This is because the production count up to now is200
sandwiches:100
bagels and100
buterbrods. -
Wait for following log to appear on the SI console
DEBUG {org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore} - Periodic persistence of CountProductions persisted successfully
This log indicates that the current state of the Siddhi application is successfully persisted. Siddhi application state is persisted every minute. Therefore, you can notice this log appearing every minute.
Next, let's push two sandwich production messages to the Kafka server and shutdown the SI server before state persistence happens (i.e., before the above log appears).
Tip
It is better to start pushing messages immediately after the state persistence log appears, so that you have plenty of time to push messages and shutdown the server, until next log appears.
-
Push following messages to the Kafka server using the Kafka Console Producer:
-
Shutdown SI server. Here you are deliberately creating a scenario where the server crashes before the SI server could persist the latest production count.
Info
Here the SI server crashes before the state is persisted. Therefore the SI server cannot persist the latest count (which should include the last two productions
100
Croissants and100
Croutons). The good news is, the Kafka source replays the last two messages, thereby allowing the Streaming Integrator to successfully recover from the server crash. -
Restart the SI server and wait for about one minute to observe the following logs.
Note that the Kafka source has replayed the last two messages. As a result, the sandwich productions count is correctly restored.