J2EE Java

/*
 * @(#)TopicSelectors.java  1.3 02/05/02
 * 
 * Copyright (c) 2000-2002 Sun Microsystems, Inc. All Rights Reserved.
 * 
 * Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
 * modify and redistribute this software in source and binary code form,
 * provided that i) this copyright notice and license appear on all copies of
 * the software; and ii) Licensee does not utilize the software in a manner
 * which is disparaging to Sun.
 *
 * This software is provided "AS IS," without a warranty of any kind. ALL
 * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
 * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
 * NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
 * LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
 * OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
 * LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
 * INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
 * CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
 * OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGES.
 *
 * This software is not designed or intended for use in on-line control of
 * aircraft, air traffic, aircraft navigation or aircraft communications; or in
 * the design, construction, operation or maintenance of any nuclear
 * facility. Licensee represents and warrants that it will not use or
 * redistribute the Software for such purposes.
 */
import java.util.*;
import javax.jms.*;
/**
 * The TopicSelectors class demonstrates the use of multiple 
 * subscribers and message selectors.
 * 


 * The program contains a Publisher class, a Subscriber class with a listener 
 * class, a main method, and a method that runs the subscriber and publisher
 * threads.
 * 


 * The Publisher class publishes 30 messages of 6 different types, randomly
 * selected, then publishes a "Finished" message.  The program creates four
 * instances of the Subscriber class, one for each of three types and one that 
 * listens for the "Finished" message.  Each subscriber instance uses a 
 * different message selector to fetch messages of only one type.  The publisher
 * displays the messages it sends, and the listener displays the messages that
 * the subscribers receive.  Because all the objects run in threads, the
 * displays are interspersed when the program runs.
 * 


 * Specify a topic name on the command line when you run the program.  The 
 * program also uses a queue named "controlQueue", which should be created  
 * before you run the program.
 * 
 * @author Kim Haase
 * @version 1.6, 08/18/00
 */
public class TopicSelectors {
    final String         CONTROL_QUEUE = "controlQueue";
    String               topicName = null;
    static final String  MESSAGE_TYPES[] = 
                             {"Nation/World", "Metro/Region", "Business",
                              "Sports", "Living/Arts", "Opinion",
                               // always last type
                              "Finished"
                             };
    static final String  END_OF_MESSAGE_STREAM_TYPE = 
                             MESSAGE_TYPES[MESSAGE_TYPES.length-1];
    int                  exitResult = 0;
    /**
     * The Publisher class publishes a number of messages.  For each, it
     * randomly chooses a message type.  It creates a message and sets its
     * text to a message that indicates the message type.
     * It also sets the client property NewsType, which the Subscriber
     * objects use as the message selector.
     * After a pause to allow the subscribers to get all the messages, the
     * publisher sends a final message with a NewsType of "Finished", which 
     * signals the end of the messages.
     *
     * @author Kim Haase
     * @version 1.6, 08/18/00
     */
    public class Publisher extends Thread {
        final int  NUM_SUBSCRIBERS;
        final int  ARRSIZE = 6;
        
        public Publisher(int numSubscribers) {
            NUM_SUBSCRIBERS = numSubscribers;
        }
        /**
         * Chooses a message type by using the random number generator
         * found in java.util.
         *
         * @return  the String representing the message type
         */
        public String chooseType() {
           int     whichMsg;
           Random  rgen = new Random();
           
           whichMsg = rgen.nextInt(ARRSIZE);
           return MESSAGE_TYPES[whichMsg];
        }
        
        /**
         * Runs the thread.
         */
        public void run() {
            ConnectionFactory    connectionFactory = null;
            Connection           connection = null;
            Session              session = null;
            Topic                topic = null;
            MessageProducer      msgProducer = null;
            TextMessage          message = null;
            int                  numMsgs = ARRSIZE * 5;
            String               messageType = null;
            try {
                connectionFactory = 
                    SampleUtilities.getConnectionFactory();
                connection = 
                    connectionFactory.createConnection();
                session = connection.createSession(false, 
                    Session.AUTO_ACKNOWLEDGE);
                topic = SampleUtilities.getTopic(topicName, session);
            } catch (Exception e) {
                System.out.println("Connection problem: " + e.toString());
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException ee) {}
                }
              System.exit(1);
            }
            
            /*
             * After synchronizing with subscriber, create publisher.
             * Create and send news messages.
             * Send end-of-messages message.
             */
            try {
                /*
                 * Synchronize with subscribers.  Wait for messages indicating 
                 * that all subscribers are ready to receive messages.
                 */
                try {
                    SampleUtilities.receiveSynchronizeMessages("PUBLISHER THREAD: ",
                                                               CONTROL_QUEUE, 
                                                               NUM_SUBSCRIBERS);
                } catch (Exception e) {
                    System.out.println("Queue probably missing: " + e.toString());
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException ee) {}
                    }
                  System.exit(1);
              }
                msgProducer = session.createProducer(topic);
                message = session.createTextMessage();
                for (int i = 0; i < numMsgs; i++) {
                    messageType = chooseType();
                    message.setStringProperty("NewsType", messageType);
                    message.setText("Item " + i + ": " + messageType);
                    System.out.println("PUBLISHER THREAD: Setting message text to: " 
                        + message.getText());
                    msgProducer.send(message);
                }
                
                message.setStringProperty("NewsType", END_OF_MESSAGE_STREAM_TYPE);
                message.setText("That's all the news for today.");
                System.out.println("PUBLISHER THREAD: Setting message text to: " 
                    + message.getText());
                msgProducer.send(message);
            } catch (JMSException e) {
                System.out.println("Exception occurred: " + e.toString());
                exitResult = 1;
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        exitResult = 1;
                    }
                }
            }
        }
    }
    
    /**
     * Each instance of the Subscriber class creates a subscriber that uses
     * a message selector that is based on the string passed to its 
     * constructor.
     * It registers its message listener, then starts listening
     * for messages.  It does not exit until the message listener sets the 
     * variable done to true, which happens when the listener gets the last
     * message.
     *
     * @author Kim Haase
     * @version 1.6, 08/18/00
     */
    public class Subscriber extends Thread {
        String                  whatKind;
        int                     subscriberNumber;
        /**
         * The MultipleListener class implements the MessageListener interface  
         * by defining an onMessage method for the Subscriber class.
         *
         * @author Kim Haase
         * @version 1.6, 08/18/00
         */
        private class MultipleListener implements MessageListener {
            final SampleUtilities.DoneLatch  monitor =
                new SampleUtilities.DoneLatch();
            /**
             * Displays the message text.
             * If the value of the NewsType property is "Finished", the message 
             * listener sets its monitor state to all done processing messages.
             *
             * @param inMessage  the incoming message
             */
            public void onMessage(Message inMessage) {
                TextMessage  msg = (TextMessage) inMessage;
                String       newsType;
                try {
                    System.out.println("SUBSCRIBER " + subscriberNumber 
                                       + " THREAD: Message received: " 
                                       + msg.getText());
                    newsType = msg.getStringProperty("NewsType");
                    if (newsType.equals(TopicSelectors.END_OF_MESSAGE_STREAM_TYPE)) {
                        System.out.println("SUBSCRIBER " + subscriberNumber 
                             + " THREAD: Received finished-publishing message");
                        monitor.allDone();
                    }
                } catch(JMSException e) {
                    System.out.println("Exception in onMessage(): " 
                                       + e.toString());
                }
            }
        }
        
        /**
         * Constructor.  Sets whatKind to indicate the type of
         * message this Subscriber object will listen for; sets
         * subscriberNumber based on Subscriber array index.
         *
         * @param str  a String from the MESSAGE_TYPES array
         * @param num  the index of the Subscriber array
         */
        public Subscriber(String str, int num) {
            whatKind = str;
            subscriberNumber = num + 1;
        }
 
        /**
         * Runs the thread.
         */
        public void run() {
            ConnectionFactory    connectionFactory = null;
            Connection           connection = null;
            Session              session = null;
            Topic                topic = null;
            String               selector = null;
            MessageConsumer      msgConsumer = null;
            MultipleListener     multipleListener = new MultipleListener();
            try {
                connectionFactory = 
                    SampleUtilities.getConnectionFactory();
                connection = 
                    connectionFactory.createConnection();
                session = connection.createSession(false, 
                    Session.AUTO_ACKNOWLEDGE);
                topic = SampleUtilities.getTopic(topicName, session);
            } catch (Exception e) {
                System.out.println("Connection problem: " + e.toString());
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException ee) {}
                }
              System.exit(1);
            } 
            
            /*
             * Create subscriber with message selector.
             * Start message delivery.
             * Send synchronize message to publisher, then wait till all
             * messages have arrived.
             * Listener displays the messages obtained.
             */
            try {
                selector = new String("NewsType = '" + whatKind + "'" + 
                                      " OR NewsType = '" + END_OF_MESSAGE_STREAM_TYPE + "'");
                System.out.println("SUBSCRIBER " + subscriberNumber 
                                    + " THREAD: selector is \"" + selector + "\"");
                msgConsumer = 
                    session.createConsumer(topic, selector, false);
                msgConsumer.setMessageListener(multipleListener);
                connection.start();
                
                // Let publisher know that subscriber is ready.
                try {
                    SampleUtilities.sendSynchronizeMessage("SUBSCRIBER " 
                                                            + subscriberNumber 
                                                            + " THREAD: ", 
                                                            CONTROL_QUEUE);
                } catch (Exception e) {
                    System.out.println("Queue probably missing: " + e.toString());
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException ee) {}
                    }
                  System.exit(1);
              }
                
                /*
                 * Asynchronously process appropriate news messages.
                 * Block until publisher issues a finished message.
                 */
                multipleListener.monitor.waitTillDone();
            } catch (JMSException e) {
                System.out.println("Exception occurred: " + e.toString());
                exitResult = 1;
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        exitResult = 1;
                    }
                }
            }         
        }
    }
    
    /**
     * Creates an array of Subscriber objects, one for each of three message  
     * types including the Finished type, and starts their threads.
     * Creates a Publisher object and starts its thread.
     * Calls the join method to wait for the threads to die.
     */
    public void run_threads() {
        final       int NUM_SUBSCRIBERS = 3;
        Subscriber  subscriberArray[] = new Subscriber[NUM_SUBSCRIBERS];
        Publisher   publisher = new Publisher(NUM_SUBSCRIBERS);
        subscriberArray[0] = new Subscriber(MESSAGE_TYPES[2], 0);
        subscriberArray[0].start();        
        subscriberArray[1] = new Subscriber(MESSAGE_TYPES[3], 1);
        subscriberArray[1].start();
        subscriberArray[2] = new Subscriber(MESSAGE_TYPES[4], 2);
        subscriberArray[2].start();    
        publisher.start();
        
        for (int i = 0; i < subscriberArray.length; i++) {
            try {
                subscriberArray[i].join();
            } catch (InterruptedException e) {}
        }
        
        try {
            publisher.join();
        } catch (InterruptedException e) {}
    }
    
    /**
     * Reads the topic name from the command line, then calls the
     * run_threads method to execute the program threads.
     *
     * @param args  the topic used by the example
     */
    public static void main(String[] args) {
        TopicSelectors  ts = new TopicSelectors();
        if (args.length != 1) {
          System.out.println("Usage: java TopicSelectors ");
          System.exit(1);
      }
      
        ts.topicName = new String(args[0]);
        System.out.println("Topic name is " + ts.topicName);
      ts.run_threads();
      SampleUtilities.exit(ts.exitResult);
    }
}
        
jms.zip( 65 k)