Nirvana C# .NET: Mutiplex a Session

Multiplex two Nirvana sessions over one channel.

Usage

multiplex <channel name> [start eid] [debug] [count] [selector] 

<Required Arguments> 

<channel name> - Channel name parameter for the channel to subscribe to" );

[Optional Arguments] 

[start eid] - The Event ID to start subscribing from" );
[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All" );
[count] - The number of events to wait before printing out summary information
[selector] - The event filter string to use


Note: -? provides help on environment variables

Application Source Code

See also:

/*
 *  ----------------------------------------------------------------------------------
 *
 *  PCB Systems Limited License Version 1.1
 *  Copyright ? PCB Systems Limited. All rights reserved
 *
 *  In the event that you should download or otherwise use this software
 *  ( the "Software" ) you hereby acknowledge and agree that:
 *
 *  1. The Software is the property of PCB Systems Limited: Title, Copyright and all
 *  other proprietary rights, interest and benefit in and to the Software is and
 *  shall be owned by PCB Systems Limited;
 *
 *  2. You will not make copies of the Software whatsoever other than, if you should
 *  so wish, a single copy for archival purposes only;
 *
 *  3. You will not modify, reverse assemble, decompile, reverse engineer or otherwise
 *  translate the Software;
 *
 *  4. You will not redistribute, copy, forward electronically or circulate the Software
 *  to any person for any purpose whatsoever without the prior written consent of
 *  PCB Systems Limited;
 *
 *  5. You will not charge for, market or provide any managed service or product that
 *  is based upon or includes the Software or any variant of it; and
 *
 *  6. You will not use the Software for any purpose apart from your own personal,
 *  noncommercial and lawful use;
 *
 *  You hereby agree that the software is used by you on an "as is" basis, without
 *  warranty of any kind. PCB Systems Limited hereby expressly disclaim all warranties
 *  and conditions, either expressed or implied, including but not limited to any
 *  implied warranties or conditions or merchantability and fitness for a particular
 *  purpose.
 *
 *  You agree that you are solely responsible for determining the appropriateness of
 *  using the Software and assume all risks associated with it including but not
 *  limited to the risks of program errors, damage to or loss of of data, programs or
 *  equipment and unavailability or interruption of operations.
 *
 *  PCB Systems Limited will not be liable for any direct damages or for any, special,
 *  incidental or indirect damages or for any economic consequential damages ( including
 *  lost profits or savings ), or any damage howsoever arising.
 *
 *  ------------------------------------------------------------------------ 
 */
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
    class multiplex : nSampleApp, nEventListener
    {
        static long startEid;
        static String selector = null;
        String channelName;
        int startEID = -1;
        private long lastEID = 0;
        private long startTime = 0;
        private long byteCount = 0;
        private String rname;

        private int logLevel = 0;
        private int count = -1;
        private int totalMsgs = 0;
        private int reportCount = 10000;

        private nChannel myChannel;
        private static multiplex mySelf = null;
        public String Selector;
        private void doit(String[] realmDetails, String achannelName, String selector, long startEid, int loglvl, int repCount) {
    logLevel = loglvl;
    reportCount = repCount;
    nSession otherSession =null;
    nChannel otherChannel = null;

    try {
nsa = new nSessionAttributes(realmDetails, 2);
//Construct first session.
    mySession = nSessionFactory.create(nsa, this);
    mySession.init();
   
    //Construct second session by multiplexing the first session.
    otherSession = nSessionFactory.createMultiplexed(mySession);
    otherSession.init();
   

    //Connect to a channel using the first session.
    nChannelAttributes nca = new nChannelAttributes();
    nca.setName(achannelName);

      //Obtain the channel reference
      myChannel = mySession.findChannel( nca );
      otherChannel = otherSession.findChannel(nca);
     

      //if the latest event has been implied (by specifying -1)
      if(startEid == -1){
        //Get the last eid on the channel and reset the start eid with that value
        startEid = myChannel.getLastEID();
      }

      //Add this object as a subscribe to the channel with the specified message selector
      // and start eid
      myChannel.addSubscriber( this, selector, startEid );
      otherChannel.addSubscriber(this,selector,startEid);

      //Stay subscribed until the user presses any key
      Console.WriteLine( "Press any key to quit !" );
      Console.Read();
      Console.WriteLine( "Finished. Consumed total of "+totalMsgs );
      //Remove this subscriber
      myChannel.removeSubscriber( this );
    }
    //Handle errors
    catch(nChannelNotFoundException cnfe){
      Console.WriteLine("The channel specified could not be found.");
      Console.WriteLine("Please ensure that the channel exists in the REALM you connect to.");
      Console.WriteLine(cnfe.StackTrace);
      Environment.Exit(1);
    }
    catch (nSecurityException se) {
      Console.WriteLine("Unsufficient permissions for the requested operation.");
      Console.WriteLine("Please check the ACL settings on the server.");
      Console.WriteLine(se.StackTrace);
      Environment.Exit(1);
    }
    catch (nSessionNotConnectedException snce) {
      Console.WriteLine("The session object used is not physically connected to the Nirvana Realm.");
      Console.WriteLine("Please ensure the realm is running and check your RNAME value.");
      Console.WriteLine(snce.StackTrace);
      Environment.Exit(1);
    }
    catch (nUnexpectedResponseException ure) {
      Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
      Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
      Console.WriteLine(ure.StackTrace);
      Environment.Exit(1);
    }
    catch (nUnknownRemoteRealmException urre) {
      Console.WriteLine("The channel specified resided in a remote realm which could not be found.");
      Console.WriteLine("Please ensure the channel name specified is correct.");
      Console.WriteLine(urre.StackTrace);
      Environment.Exit(1);
    }
    catch (nRequestTimedOutException rtoe) {
      Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
      Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
      Console.WriteLine(rtoe.StackTrace);
      Environment.Exit(1);
    }
    catch (nChannelAlreadySubscribedException chase) {
      Console.WriteLine("You are already subscribed to this channel.");
      Console.WriteLine(chase.StackTrace);
      Environment.Exit(1);
    }
    catch (nSelectorParserException spe) {
      Console.WriteLine("An error occured while parsing the selector filter specified.");
      Console.WriteLine("Please check the JMS documentation on how to write a valid selector.");
      Console.WriteLine(spe.StackTrace);
      Environment.Exit(1);
    }
    catch (nBaseClientException nbce) {
      Console.WriteLine("An error occured while creating the Channel Attributes object.");
      Console.WriteLine(nbce.StackTrace);
      Environment.Exit(1);
    }
    //Close the session we opened
    try{
      nSessionFactory.close ( mySession );
      nSessionFactory.close(otherSession);
    }
    catch(Exception ex){}
    //Close any other sessions within this JVM so that we can exit
    nSessionFactory.shutdown ( );
  }

        protected override void processArgs(String[] args)
        {
            switch (args.Length)
            {
                case 6:
                    Selector = args[5];
                    goto case 5;
                case 5:
                    count = Convert.ToInt32(args[4]);
                    goto case 4;
                case 4:
                    logLevel = Convert.ToInt32(args[3]);
                    goto case 3;
                case 3:
                    startEID = Convert.ToInt32(args[2]);
                    goto case 2;
                case 2:
                    channelName = args[1];
                    goto case 1;
                case 1:
                    if (args[0].Equals("-?"))
                    {
                        Usage();
                        UsageEnv();
                    }
                    rname = args[0];
                    break;
            }
        }

        public static new void Main(String[] args)
        {

            //Create an instance for this class
            mySelf = new multiplex();

            //Process command line arguments
            mySelf.processArgs(args);

            nSampleApp.processEnvironmentVariables();
            //Process the local REALM RNAME details
            String[] rproperties = new String[4];
            rproperties = parseRealmProperties(mySelf.rname);

            //Subscribe to the channel specified
            mySelf.doit(rproperties, mySelf.channelName, selector, startEid, mySelf.logLevel, mySelf.count);
        }

        /**
        * A callback is received by the API to this method each time an event is received from
        * the nirvana channel. Be carefull not to spend too much time processing the message
        * inside this method, as until it exits the next message can not be pushed.
        *
        * @param evt An nConsumeEvent object containing the message received from the channel
        */
        public void go(nConsumeEvent evt) {
    //If this is the first message we receive
    if(count == -1){
      //Get a timestamp to be used for message rate calculations
        startTime = nConstants.currentTimeMillis();
      count = 0;
    }

    //Increment he counter
    count++;
    totalMsgs++;

    //Have we reached the point where we need to report the rates?
    if(count == reportCount){

      //Reset the counter
      count = 0;

      //Get a timestampt to calculate the rates
      long end = nConstants.currentTimeMillis();

      // Does the specified log level permits us to print on the screen?
      if(logLevel >= 1){

        //Dump the rates on the screen
        if(end != startTime){
          Console.WriteLine("Received "+reportCount+" in "+(end-startTime)+" Evt/Sec = "+((reportCount * 1000)/(end-startTime))+" Bytes/sec="+((byteCount*1000)/(end-startTime)));
          Console.WriteLine("Bandwidth data : Bytes Tx ["+mySession.getOutputByteCount()+"] Bytes Rx ["+mySession.getInputByteCount()+"]");
        }
        else{
          Console.WriteLine("Received "+reportCount+" faster than the system millisecond counter");
        }
      }
      //Set the startTime for the next report equal to the end timestamp for the previous one
      startTime = end;

      //Reset the byte counter
      byteCount = 0;
    }

    //If the last EID counter is not equal to the current event ID
    if(lastEID != evt.getEventID()){
      //If yes, maybe we have missed an event, so print a message on the screen.
      //This message could be printed for a number of other reasons.
      //One of them would be someone purging a range creating an 'eid gap'.
      //As eids are never reused within a channel you could have a situation
      //where this gets printed but nothing is missed.
      Console.WriteLine("Expired event range "+(lastEID)+" - "+(evt.getEventID()-1));
      //Reset the last eid counter
      lastEID = evt.getEventID()+1;
    }
    else{

      //Increment the last eid counter
      lastEID++;
    }
    //Get the data of the message
    byte[] buffer = evt.getEventData();
    if(buffer != null){
      //Add its length to the byte counter
      byteCount += buffer.Length;
    }
    //If the loglevel permits printing on the screen
    if(logLevel >= 2){
      //Print the eid
      Console.WriteLine( "Event id : " + evt.getEventID() );
      if(evt.isEndOfChannel()){
        Console.WriteLine("End of channel reached");
      }
      //If the loglevel permits printing on the screen
      if(logLevel >= 3){
        //Print the message tag
        Console.WriteLine( "Event tag : " + evt.getEventTag() );
        //Print the message data
        Console.WriteLine( "Event data : " + Convert.ToString( evt.getEventData() ) );
        if(evt.hasAttributes()){
          displayEventAttributes(evt.getAttributes());
        }

        nEventProperties prop = evt.getProperties();
        if(prop != null) {
          displayEventProperties(prop);
        }
      }
    }
  }

        /**
        * Prints the usage message for this class
        */
        private static void Usage() {

    Console.WriteLine( "Usage ...\n" );
    Console.WriteLine("multiplex <channel name> [start eid] [debug] [count] [selector] \n");

    Console.WriteLine(
      "<Required Arguments> \n");
    Console.WriteLine(
      "<channel name> - Channel name parameter for the channel to subscribe to" );
    Console.WriteLine(
      "\n[Optional Arguments] \n");
    Console.WriteLine(
      "[start eid] - The Event ID to start subscribing from" );
    Console.WriteLine(
      "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All" );
    Console.WriteLine(
      "[count] - The number of events to wait before printing out summary information");
    Console.WriteLine(
      "[selector] - The event filter string to use");
    Console.WriteLine(
      "\n\nNote: -? provides help on environment variables \n");
  }

    }
}
EXAMPLE_SOURCE_END
Share this page with others:
Tell Your Tweets Facebook It! Add to Delicious Reddit! Digg It! Stumble Upon Add to Your Faves Mixx it
Follow Us:
Keep up with my-Channels on Twitter Become a fan on Facebook LinkedIn Profile Recent Highlights RSS Feed