Skip to content

Installing Streaming Integrator Using Kubernetes

WSO2 Streaming Integrator can be deployed natively on Kubernetes via the Siddhi Kubernetes Operator.

The Streaming Integrator can be configured in the <SI-TOOLING_HOME>/wso2/server/resources/decker/export/siddhi-process.yaml file and passed to the CRD(Custom Resource Definition)for deployment. Siddhi logic can be directly written in the <SI-TOOLING_HOME>/wso2/server/resources/decker/export/siddhi-process.yaml file or passed as .siddhi files via configuration maps.

To install WSO2 Streaming Integrator via Kubernetes, follow the steps below:

Before you begin:

Start a Kubernetes cluster. The Kubernetes version must be v1.10.11 or later. To start the cluster, you can use Minikube, GKE(Google Kubernetes Engine) Cluster, Docker for Mac, or any other Kubernetes cluster.

Minikube

You can install Minikube from the Kubernetes/Minikube

Siddhi operator automatically creates NGINX ingress. Therefore, for it to work, you can do one of the following:

- Enable ingress on Minikube by issuing the following command.

minikube addons enable ingress

- Disable automatic ingress creation by the Siddhi operator. For instructions, see Siddhi Kubernetes Microservice Documentation - Deploy Siddhi Applications without Ingress creation.

GKE Cluster

To install Siddhi operator, you have to give cluster admin permission to your account. In order to do this, issue the following command. You need to replace [email protected] with your account email address

kubectl create clusterrolebinding user-cluster-admin-binding --clusterrole=cluster-admin [email protected]

Docker for Mac

Siddhi operator automatically creates NGINX ingress. Therefore, for it to work, you can do one of the following:

- Enable NGINX ingress. For instructions, see NGINX Ingress Controller documentation.

- Disable automatic ingress creation by the Siddhi operator. For instructions, see Siddhi Kubernetes Microservice Documentation - Deploy Siddhi Applications without Ingress creation.

You are also required to have admin privileges to install the Siddhi operator.

Install Siddhi Operator for Streaming Integrator

To install the Siddhi Kubernetes operator for WSO2 Streaming Integrator, issue the following commands.

kubectl apply -f https://github.com/wso2/streaming-integrator/tree/master/modules/kubenetes/00-prereqs.yaml
kubectl apply -f https://github.com/wso2/streaming-integrator/tree/master/modules/kubenetes/01-siddhi-operator.yaml

You can verify the installation by making sure the following deployments are running in your Kubernetes cluster.

$ kubectl get deployment

NAME                 DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
streaming-integrator   1         1         1            1           1m
siddhi-parser          1         1         1            1           1m

Deploy Streaming Integrator

The Siddhi application that contains the streaming integration logic can be deployed in Kubernetes via the Siddhi operator.

To understand how this is done, let's create a very simple Siddhi stream processing application that consumes events via HTTP, filters the input events where the value for deviceType is dryer and the value for power is greater than 600, and then logs the output in the console. This can be created by configuring the <SI-TOOLING_HOME>/wso2/server/resources/decker/export/siddhi-process.yaml file as given below.

    apiVersion: siddhi.io/v1alpha2
    kind: SiddhiProcess
    metadata: 
      name: streaming-integrator
    spec: 
      apps: 
        - script: |
            @App:name("PowerSurgeDetection")
            @App:description("App consumes events from HTTP as a JSON message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power level is greater than or equal to 600W by printing a message in the log.")
            /*
                Input: deviceType string and powerConsuption int(Watt)
                Output: Alert user from printing a log, if there is a power surge in the dryer. In other words, notify when power is greater than or equal to 600W.
            */

            @source(
              type='http',
              receiver.url='${RECEIVER_URL}',
              basic.auth.enabled='false',
              @map(type='json')
            )
            define stream DevicePowerStream(deviceType string, power int);
            @sink(type='log', prefix='LOGGER')  
            define stream PowerSurgeAlertStream(deviceType string, power int); 
            @info(name='surge-detector')  
            from DevicePowerStream[deviceType == 'dryer' and power >= 600] 
            select deviceType, power  
            insert into PowerSurgeAlertStream;
      container: 
        env: 
          - 
            name: RECEIVER_URL
            value: "http://0.0.0.0:8080/checkPower"
          - 
            name: BASIC_AUTH_ENABLED
            value: "false"

To change the default configurations in WSO2 Streaming Integrator that are defined in the <SI-TOOLING_HOME>/conf/server/deployment.yaml file, you need to add he required configurations with the required over-riding values in the SiddhiProcess.yaml file under a section named runner as shown in the example below.

    apiVersion: siddhi.io/v1alpha2
    kind: SiddhiProcess
    metadata: 
      name: streaming-integrator-app
    spec: 
      apps: 
        - script: |
            @App:name("PowerSurgeDetection")
            @App:description("App consumes events from HTTP as a JSON message of { 'deviceType': 'dryer', 'power': 6000 } format and inserts the events into DevicePowerStream, and alerts the user if the power level is greater than or equal to 600W by printing a message in the log.")
            /*
                Input: deviceType string and powerConsuption int(Watt)
                Output: Alert user from printing a log, if there is a power surge in the dryer. In other words, notify when power is greater than or equal to 600W.
            */

            @source(
              type='http',
              receiver.url='${RECEIVER_URL}',
              basic.auth.enabled='false',
              @map(type='json')
            )
            define stream DevicePowerStream(deviceType string, power int);
            @sink(type='log', prefix='LOGGER')  
            define stream PowerSurgeAlertStream(deviceType string, power int); 
            @info(name='surge-detector')  
            from DevicePowerStream[deviceType == 'dryer' and power >= 600] 
            select deviceType, power  
            insert into PowerSurgeAlertStream;
      container: 
        env: 
          - 
            name: RECEIVER_URL
            value: "http://0.0.0.0:8080/checkPower"
          - 
            name: BASIC_AUTH_ENABLED
            value: "false"

      runner: |
        auth.configs:
          type: 'local'        # Type of the IdP client used
          userManager:
            adminRole: admin   # Admin role which is granted all permissions
            userStore:         # User store
              users:
              -
                user:
                  username: root
                  password: YWRtaW4=
                  roles: 1
              roles:
              -
                role:
                  id: 1
                  displayName: root
          restAPIAuthConfigs:
            exclude:
              - /simulation/*
              - /stores/* 

Here, you have included a configuration for auth.configs to over-ride the default values that are applicable to the Streaming Integrator (i.e., values configured under auth.configs in the <SI-TOOLING_HOME>/conf/server/deployment.yaml file.

To apply the configurations in the siddhi-process.yaml to your Kubernetes cluster, save the file in a preferred location and then issue the following command.

kubectl apply -f <PATH_to_siddhi-process.yaml>

Invoke Siddhi Applications

To invoke Siddhi Applications, follow the steps below:

  1. To obtain the external IP of the Ingress load balancer, issue the following command.

    kubectl get ingress

    This generates a response similar to the following:

    NAME      HOSTS     ADDRESS     PORTS     AGE
    siddhi    siddhi    10.0.2.15    80       14d
    
  2. Add the host (i.e., siddhi) and the related external IP (i.e., ADDRESS) in the /etc/hosts file in your machine.

    Info

    For Minikube, you have to use Minikube IP as the external IP. Therefore, issue the minikube ip command to get the IP of the Minikube cluster.

  3. To send events to the PowerSurgeDetection deployed in Kubernetes, issue the following CURL command.

    curl -X POST \
      http://siddhi/streaming-integrator-0/8080/checkPower \
        -H 'Accept: */*' \
        -H 'Content-Type: application/json' \
        -H 'Host: siddhi' \
        -d '{
            "deviceType": "dryer",
            "power": 600
            }'
    
  4. To monitor the associated logs for the above Siddhi application, list down the available pods by executing the following command.

    kubectl get pods

    The pods are listed as shown in the following sample response.

    NAME                                        READY    STATUS    RESTARTS    AGE
    streaming-integrator-app-0-b4dcf85-npgj7     1/1     Running      0        165m
    streaming-integrator-5f9fcb7679-n4zpj        1/1     Running      0        173m
    
  5. Issue the following command in order to monitor the logs of the relevant pod. In this example, let's assume that you need to monitor the logs for the streaming-integrator-app-0-b4dcf85-npgj7 pod.

    kubectl logs -f streaming-integrator-app-0-b4dcf85-npgj7

Info

For more details about the Siddhi Operator, see Siddhi as a Kubernetes Microservice.