Shared Topic Subscription

With JMS 1.1, a subscription on a topic is not permitted to have more than one consumer at a time. That is, if multiple JMS consumers subscribe to a JMS topic, and if a message comes to that topic, multiple copies of the message is forwarded to each consumer. There is no way of sharing messages between consumers that come to the topic.

With the shared subscription feature in JMS 2.0, you can overcome this restriction. When shared subscription is used, a message that comes to a topic is forwarded to only one of the consumers. That is, if multiple JMS consumers subscribe to a JMS topic, consumers can share the messages that come to the topic. The advantage of shared topic subscription is that it allows to share the workload between consumers.

The Micro Integrator can be configured as a shared topic listener that can connect to a shared topic subscription as a message consumer (subscriber) to share workload between other consumers of the subscription.

To demonstrate the sample scenario, let's configure the JMS inbound endpoint in WSO2 Micro Integrator as a shared topic listener using HornetQ as the message broker.

Synapse configurations

Given below are the synapse configurations that are required for mediating the above use case.

See the instructions on how to build and run this example.

<inboundEndpoint name="jms_inbound" sequence="request"onError="fault" protocol="jms" suspend="false">
  <parameters>
    <parameter name="interval">1000</parameter>
    <parameter name="transport.jms.Destination">/topic/exampleTopic</parameter>
    <parameter name="transport.jms.CacheLevel">3</parameter>
    <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
    <parameter name="sequential">true</parameter>
    <parameter name="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</parameter>
    <parameter name="java.naming.provider.url">jnp://localhost:1099</parameter>
    <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
    <parameter name="transport.jms.SessionTransacted">false</parameter>
    <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>
    <parameter name="transport.jms.JMSSpecVersion">2.0</parameter>
    <parameter name="transport.jms.SharedSubscription">true</parameter>
    <parameter name="transport.jms.DurableSubscriberName">mySubscription</parameter>
  </parameters>
</inboundEndpoint>
<registry provider="org.wso2.micro.integrator.registry.MicroIntegratorRegistry">
  <parameter name="cachableDuration">15000</parameter>
</registry>
<taskManager provider="org.wso2.micro.integrator.mediation.ntask.NTaskTaskManager">
  <parameter name="cachableDuration">15000</parameter>
</taskManager>
<sequence name="request" onError="fault">
  <log level="full"/>
  <drop/>
</sequence>
<sequence name="fault">
  <log level="full">
    <property name="MESSAGE" value="Executing default &#34;fault&#34; sequence"/>
    <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
    <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
  </log>
  <drop/>
</sequence>

See the descriptions of the above configurations:

Artifact Description
Inbound Endpoint Make sure to configure the below properties as follows when setting up the inbound endpoint:
  • Set the value of the transport.jms.JMSSpecVersion property as 2.0.
  • Set the value of the transport.jms.SharedSubscription propoerty as true.
  • Specify a subscriber name as the value of the transport.jms.DurableSubscriberName property and use the same for all subscribers.

Build and run

Create the artifacts:

  1. Set up WSO2 Integration Studio.
  2. Create an integration project with an ESB Configs module and an Composite Exporter.
  3. Create the proxy service, registry artifact, scheduled task, and sequences with the configurations given above.
  4. Deploy the artifacts in your Micro Integrator.

Set up the broker:

  1. Configure a broker with your Micro Integrator instance. Let's use HornetQ for this example.

    • Be sure to create a sample topic by editing the HORNET_HOME/config/stand-alone/non-clustered/hornetq-jms.xml file as follows:
      <topic name="sampleTopic">
         <entry name="/topic/exampleTopic"/>
      </topic>
  2. Start HornetQ with the following command:

    • On Windows: HORNETQ_HOME\bin\run.bat --run
    • On MacOS/Linux/Solaris: sh HORNETQ_HOME/bin/run.sh
  3. Start the Micro Integrator (after starting the broker).

Follow the steps given below to create the topic consumer and publisher to run this example.

  1. Create and run the following topic consumer (TopicConsumer.java) and run.

    package SharedTopicSubscribe;
    
    import java.util.Properties;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    public class TopicConsumer {
      private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory";
      private static final String DEFAULT_DESTINATION = "/topic/exampleTopic";
      private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
      private static final String PROVIDER_URL = "jnp://localhost:1099";
      private static final String SUBSCRIPTION_NAME = "mySubscription";
    
      public static void main(final String[] args) {
        try {
                        runExample();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            }
    
             public static void runExample() throws Exception {
                    Connection connection = null;
                    Context initialContext = null;
                    try {
                        // /Step 1. Create an initial context to perform the JNDI lookup.
                        final Properties env = new Properties();
                        env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
                        env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
                        initialContext = new InitialContext(env);
    
                        // Step 2. perform a lookup on the topic
                        Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION);
    
                        // Step 3. perform a lookup on the Connection Factory
                        ConnectionFactory cf =
                                               (ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY);
    
                        // Step 4. Create a JMS Connection
                        connection = cf.createConnection();
    
                        // Step 5. Create a JMS Session
                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                        // Step 6. Create a JMS Message Consumer
                        MessageConsumer messageConsumer =
                                                          session.createSharedConsumer(topic, SUBSCRIPTION_NAME);
    
                        // Step 7. Start the Connection
                        connection.start();
                        System.out.println("Shared message consumer started on topic: " + DEFAULT_DESTINATION +
                                           "\n");
    
                        // Step 8. Receive the message
                        TextMessage messageReceived = null;
                        while (true) {
                            messageReceived = (TextMessage) messageConsumer.receive();
                            System.out.println("Consumer received message: " + messageReceived.getText() + "\n");
                        }
    
                    } finally {
    
                        // Step 9. Close JMS resources
                        if (connection != null) {
                            connection.close();
                        }
    
                        // Also the initialContext
                        if (initialContext != null) {
                            initialContext.close();
                        }
                    }
                }
            }
  2. Run the following java file (TopicPublisher.java) to publish 5 messages to the HornetQ topic:

    package SharedTopicSubscribe;
    
    import java.util.Properties;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    public class TopicPublisher {
        private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory";
        private static final String DEFAULT_DESTINATION = "/topic/exampleTopic";
        private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
        private static final String PROVIDER_URL = "jnp://localhost:1099";
        // Set up all the default values
        private static final String param = "IBM";
    
        public static void main(final String[] args) {
            try {
                runExample();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static boolean runExample() throws Exception {
            Connection connection = null;
            Context initialContext = null;
            try {
                // /Step 1. Create an initial context to perform the JNDI lookup.
                // Set up the namingContext for the JNDI lookup
                final Properties env = new Properties();
                env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
                env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
                initialContext = new InitialContext(env);
    
                // Step 2. perform a lookup on the topic
                Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION);
    
                // Step 3. perform a lookup on the Connection Factory
                ConnectionFactory cf =
                        (ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY);
    
                // Step 4. Create a JMS Connection
                connection = cf.createConnection();
    
                // Step 5. Create a JMS Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // Step 6. Create a Message Producer
                MessageProducer producer = session.createProducer(topic);
                System.out.println("Publishing 5 messages to topic/exampleTopic");
                for (int i = 0; i < 5; i++) {
    
                    // Step 7. Create a Text Message
                    TextMessage message = session.createTextMessage(getMessage());
    
                    // Step 8. Send the Message
                    producer.send(message);
                }
                return true;
            } finally {
    
                // Step 9. Close JMS resources
                if (connection != null) {
                    connection.close();
                }
    
                // Also the initialContext
                if (initialContext != null) {
                    initialContext.close();
                }
            }
        }
    
        private static double getRandom(double base, double varience, boolean onlypositive) {
            double rand = Math.random();
            return (base + (rand > 0.5 ? 1 : -1) * varience * base * rand) *
                    (onlypositive ? 1 : rand > 0.5 ? 1 : -1);
        }
    
        private static String getMessage() {
            return "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
                    "   <soapenv:Header/>\n" + "<soapenv:Body>\n" +
                    "<m:placeOrder xmlns:m=\"http://services.samples\">\n" + "    <m:order>\n" +
                    "        <m:price>" + getRandom(100, 0.9, true) + "</m:price>\n" +
                    "        <m:quantity>" + (int) getRandom(10000, 1.0, true) + "</m:quantity>\n" +
                    "        <m:symbol>" + param + "</m:symbol>\n" + "    </m:order>\n" +
                    "</m:placeOrder>" + "   </soapenv:Body>\n" + "</soapenv:Envelope>";
        }
    }

You will see that the 5 messages are shared between the inbound listener and TopicConsumer.java. This is because both the inbound listener and TopicConsumer.java are configured as shared subscribers.

The total number of consumed messages between the inbound listener and TopicConsumer.java will be equal to the number messages published by TopicPublisher.java.

Top