Nirvana Java Client: Queue Publisher

This example publishes events onto a Nirvana Queue.

Usage

npushq <queue name> [count] [size] 

<Required Arguments> 

<queue name> - Queue name parameter for the queue to publish to

[Optional Arguments] 

[count] -The number of events to publish (default: 10)
[size] - The size (bytes) of the event to publish (default: 100)


Note: -? provides help on environment variables

Application Source Code

See also:

/**
 *
 *  ----------------------------------------------------------------------------------
 *
 *  PCB Systems Limited License Version 1.1
 *  Copyright  PCB Systems Limited. All rights reserved
 *
 *  In the event that you should download or otherwise use this software
 *  ( the "Software" ) you hereby acknowledge and agree that:
 *
 *  1. The Software is the property of PCB Systems Limited: Title, Copyright and all
 *  other proprietary rights, interest and benefit in and to the Software is and
 *  shall be owned by PCB Systems Limited;
 *
 *  2. You will not make copies of the Software whatsoever other than, if you should
 *  so wish, a single copy for archival purposes only;
 *
 *  3. You will not modify, reverse assemble, decompile, reverse engineer or otherwise
 *  translate the Software;
 *
 *  4. You will not redistribute, copy, forward electronically or circulate the Software
 *  to any person for any purpose whatsoever without the prior written consent of
 *  PCB Systems Limited;
 *
 *  5. You will not charge for, market or provide any managed service or product that
 *  is based upon or includes the Software or any variant of it; and
 *
 *  6. You will not use the Software for any purpose apart from your own personal,
 *  noncommercial and lawful use;
 *
 *  You hereby agree that the software is used by you on an "as is" basis, without
 *  warranty of any kind. PCB Systems Limited hereby expressly disclaim all warranties
 *  and conditions, either expressed or implied, including but not limited to any
 *  implied warranties or conditions or merchantability and fitness for a particular
 *  purpose.
 *
 *  You agree that you are solely responsible for determining the appropriateness of
 *  using the Software and assume all risks associated with it including but not
 *  limited to the risks of program errors, damage to or loss of of data, programs or
 *  equipment and unavailability or interruption of operations.
 *
 *  PCB Systems Limited will not be liable for any direct damages or for any, special,
 *  incidental or indirect damages or for any economic consequential damages ( including
 *  lost profits or savings ), or any damage howsoever arising.
 *
 *
 */

package com.pcbsys.nirvana.apps;
import com.pcbsys.nirvana.client.*;

/**
 *  Pushes events to a nirvana queue
 */
public class QPublish extends nSampleApp {
  private boolean isOk = true;
  private nBaseClientException asyncException;
  private static QPublish mySelf=null;
  /**
   * This method demonstrates the Nirvana API calls necessary to publish to
   * a queue.
   * It is called after all command line arguments have been received and
   * validated
   *
   * @param realmDetails a String[] containing the possible RNAME values
   * @param aqueueName the queue name to publish to
   * @param count the number of messages to publish
   * @param size the size (bytes) of each message to be published
   */
  private void doit( String[] realmDetails, String aqueueName, int count, int size ) {
    mySelf.constructSession(realmDetails);
    //Publishes to the specified queue
    try{
      //Create a channel attributes object
      nChannelAttributes nca = new nChannelAttributes();
      nca.setName( aqueueName );
      //Obtain a reference to the queue
      nQueue myQueue = mySession.findQueue( nca );

      //Create a byte array filled with characters equal to
      // the message size specified. This could be a result
      //of String.getBytes() call in a real world scenario.
      byte[] buffer = new byte[size];
      for(int x = 0; x< size;x++){
        buffer[x] = (byte) ((x%90) + 32);
      }
      nEventProperties props = new nEventProperties();
      props.put("test", true);
      //Instantiate the message to be published with the specified TAG and byte[]
      nConsumeEvent evt1 = new nConsumeEvent( props, buffer );
      nEventProperties props1 = new nEventProperties();
      props1.put("test", false);
      //Instantiate the message to be published with the specified TAG and byte[]
      nConsumeEvent evt2 = new nConsumeEvent( props1, buffer );
      evt1.setPersistant(true);
      //Inform the user that publishing is about to start
      System.out.println("Starting push of "+count+" events of size "+size);

      //Get a timestamp to be used to calculate the message publishing rates
      long start = System.currentTimeMillis();
      //Loop as many times as the number of messages we want to publish
      for ( int x = 0; x < count; x++ ) {
        //Publish the event
        myQueue.push(evt1);
        //Check if an asynchronous exception has been received
        if(!isOk){
          //If it has, then throw it
          throw asyncException;
        }
      }
      //Do a synchronous call before exiting the sample to ensure all buffers have been flushed
      myQueue.getDetails().getNoOfEvents();
      //Check if an asynchronous exception has been received
      if(!isOk){
         //If it has, then throw it
        throw asyncException;
      }
      //Get a timestamp to calculate the publishing rates
      long end = System.currentTimeMillis();
      //Calculate the events / sec rate
      long eventPerSec = ( ( (count) * 1000 ) / ( (end + 1)- start ) );
      //Calculate the bytes / sec rate
      long bytesPerSec = eventPerSec * size;
      //Inform the user of the resulting rates
      System.out.println( "Events = "+count+" Events/sec = " + eventPerSec + " Bytes/Sec = "+ bytesPerSec);
    }
    //Handle errors
    catch(nChannelNotFoundException cnfe){
      System.out.println("The queue specified could not be found.");
      System.out.println("Please ensure that the queue exists in the REALM you connect to.");
      cnfe.printStackTrace();
      System.exit(1);
    }
    catch (nSecurityException se) {
      System.out.println("Unsufficient permissions for the requested operation.");
      System.out.println("Please check the ACL settings on the server.");
      se.printStackTrace();
      System.exit(1);
    }
    catch (nSessionNotConnectedException snce) {
      System.out.println("The session object used is not physically connected to the Nirvana realm.");
      System.out.println("Please ensure the realm is up and check your RNAME value.");
      snce.printStackTrace();
      System.exit(1);
    }
    catch (nUnexpectedResponseException ure) {
      System.out.println("The Nirvana REALM has returned an unexpected response.");
      System.out.println("Please ensure the Nirvana REALM and client API used are compatible.");
      ure.printStackTrace();
      System.exit(1);
    }
    catch (nUnknownRemoteRealmException urre) {
      System.out.println("The queue specified resided in a remote realm which could not be found.");
      System.out.println("Please ensure the queue name specified is correct.");
      urre.printStackTrace();
      System.exit(1);
    }
    catch (nRequestTimedOutException rtoe) {
      System.out.println("The requested operation has timed out waiting for a response from the REALM.");
      System.out.println("If this is a very busy REALM ask your administrator to increase the client timeout values.");
      rtoe.printStackTrace();
      System.exit(1);
    }
    catch (nBaseClientException nbce) {
      System.out.println("An error occured while creating the Channel Attributes object.");
      nbce.printStackTrace();
      System.exit(1);
    }
    //Close the session we opened
    try{
      nSessionFactory.close ( mySession );
    }
    catch(Exception ex){}
    //Close any other sessions within this JVM so that we can exit
    nSessionFactory.shutdown ( );
  }
  protected void processArgs(String[] args){
    switch (args.length){
      case 1:
        if (args[0].equals("-?")) {
          Usage();
          UsageEnv();
        }
        System.getProperties().put("QUEUENAME",args[0]);
        break;
      case 2:
        System.getProperties().put("QUEUENAME",args[0]);
        System.getProperties().put("COUNT",args[1]);
        break;
      case 3:
        System.getProperties().put("QUEUENAME",args[0]);
        System.getProperties().put("COUNT",args[1]);
        System.getProperties().put("SIZE",args[2]);
        break;
    }
  }
  public static void main( String[] args ) {
    //Create an instance for this class
    mySelf = new QPublish();
    //Process command line arguments
    mySelf.processArgs(args);
    //Process Environment Variables
    nSampleApp.processEnvironmentVariables();
    //Check the queue name specified
    String queueName=null;
    if ( System.getProperty( "QUEUENAME" ) != null )
      queueName= System.getProperty( "QUEUENAME" ) ;
    else{
      Usage();
      System.exit( 1 );
    }
    int count=10; //default value
    //Check if the number of messages to be published has been specified
    if( System.getProperty( "COUNT" ) != null){
      try{
        count = Integer.parseInt (System.getProperty("COUNT"));
      }
      catch(Exception num){} //Ignore and use the defaults
    }
    int size = 100; //default value
    //Check if the size (bytes) of each message has been specified
    if( System.getProperty( "SIZE" ) != null){
      try{
        size = Integer.parseInt (System.getProperty("SIZE"));
      }
      catch(Exception num){} // Ignore and use the default
    }
    //Check the local realm details
    int idx = 0;
    String RNAME=null;
    if ( System.getProperty( "RNAME" ) != null )
      RNAME= System.getProperty( "RNAME" ) ;
    else{
      Usage();
      System.exit( 1 );
    }
    //Process the local REALM RNAME details
    String[] rproperties = new String[4];
    rproperties=parseRealmProperties(RNAME);
    //Push events to the queue specified
    mySelf.doit(rproperties, queueName, count, size);
  }
  /**
   * Prints the usage message for this class
   */
  private static void Usage() {
    System.out.println( "Usage ...\n" );
    System.out.println("npushq <queue name> [count] [size] \n");
    System.out.println(
      "<Required Arguments> \n");
    System.out.println(
      "<queue name> - Queue name parameter for the queue to publish to" );
    System.out.println(
      "\n[Optional Arguments] \n");
    System.out.println(
      "[count] -The number of events to publish (default: 10)" );
    System.out.println(
      "[size] - The size (bytes) of the event to publish (default: 100)" );
    System.out.println(
      "\n\nNote: -? provides help on environment variables \n");
  }
} // End of publish Class

EXAMPLE_SOURCE_END
Share this page with others:
Tell Your Tweets Facebook It! Add to Delicious Reddit! Digg It! Stumble Upon Add to Your Faves Mixx it
Follow Us:
Keep up with my-Channels on Twitter Become a fan on Facebook LinkedIn Profile Recent Highlights RSS Feed