Nirvana C# .NET: Asynchronous Named Channel Consumer

This example shows how to asynchronously subscribe to events on a Nirvana Channel using a named object.

Usage

namedsubscriber <rname> <channel name> [name] [start eid] [debug] [count] [auto ack] [cluster wide] [persistent] [selector] [priority]

<Required Arguments> 

<rname> - the rname of the server to connect to
<channel name> - Channel name parameter for the channel to subscribe to

[Optional Arguments] 

[name] - Specifies the unique name to be used for a named subscription (default: OS username) 
[start eid] - The Event ID to start subscribing from if the named subscriber needs to be created (doesn't exist)
[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All
[count] - The number of events to wait before printing out summary information (default: 1000) 
[auto ack] - Specifies whether each event will be automatically acknowledged by the api (default: true) 
[cluster wide] - Specifies whether the named object is to be used across a cluster (default: false) 
[persistent] - Specifies whether the named object state is to be stored to disk or held in server memory (default: false) 
[selector] - The event filter string to use
[priority] - Whether priority is enabled for this named subscriber (default: false)

Application Source Code

See also:

using System;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
    class namedsubscriber : nSampleApp, nEventListener, nNamedPriorityListener
    {
        private static long startEid;
        private static String selector;
        private static String subname;
        private String RNAME;
        private String channelName;
        private static bool autoAck = true;
        private static bool cluster = false;
        private static bool persistent = false;
        private static bool priority = false;
        private long lastEID;
        private long startTime;
        private long byteCount;
        private int count = -1;
        private int totalMsgs;
        private int logLevel = 1;
        private int reportCount = 1000;
        private nChannel myChannel;
        private static namedsubscriber mySelf;
        private nNamedObject named;
        /**
         * This method demonstrates the Nirvana API calls necessary to subscribe to
         * a channel as a named subscriber.
         * It is called after all command line arguments have been received and
         * validated
         *
         * @param realmDetails a String[] containing the possible RNAME values
         * @param achannelName the channel name to create
         * @param loglvl the specified debug level
         * @param repCount the specified report count
         */
        private void doit(String[] realmDetails, String achannelName)
        {
            mySelf.constructSession(realmDetails);
            //Subscribes to the specified channel
            try
            {
                //Create a channel attributes object
                nChannelAttributes nca = new nChannelAttributes();
                nca.setName(achannelName);
                //Obtain the channel reference
                myChannel = mySession.findChannel(nca);
                //Create a named object and add it as a subscribe to the channel with the specified message selector
                // and start eid
                try
                {
                    named = myChannel.getNamedObject(subname);                    
                } catch (Exception ex)
                {
                }
                if (named == null)
                {
                    named = myChannel.createNamedObject(subname, startEid, persistent, cluster, priority);                    
                }
                Console.WriteLine("Using selector : " + selector);
                myChannel.addSubscriber(this, named, selector, autoAck, this);
                //Stay subscribed until the user presses any key
                Console.WriteLine("Press enter key to quit !");
                Console.Read();
                Console.WriteLine("Finished. Consumed total of " + totalMsgs);
                //Remove this subscriber
                myChannel.removeSubscriber(this);
            }
            //Handle errors
            catch (nChannelNotFoundException cnfe)
            {
                Console.WriteLine("The channel specified could not be found.");
                Console.WriteLine("Please ensure that the channel exists in the REALM you connect to.");
                Console.WriteLine(cnfe.StackTrace);
                Environment.Exit(1);
            }
            catch (nSecurityException se)
            {
                Console.WriteLine("Unsufficient permissions for the requested operation.");
                Console.WriteLine("Please check the ACL settings on the server.");
                Console.WriteLine(se.StackTrace);
                Environment.Exit(1);
            }
            catch (nSessionNotConnectedException snce)
            {
                Console.WriteLine("The session object used is not physically connected to the Nirvana Realm.");
                Console.WriteLine("Please ensure the realm is running and check your RNAME value.");
                Console.WriteLine(snce.StackTrace);
                Environment.Exit(1);
            }
            catch (nUnexpectedResponseException ure)
            {
                Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
                Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
                Console.WriteLine(ure.StackTrace);
                Environment.Exit(1);
            }
            catch (nUnknownRemoteRealmException urre)
            {
                Console.WriteLine("The channel specified resided in a remote realm which could not be found.");
                Console.WriteLine("Please ensure the channel name specified is correct.");
                Console.WriteLine(urre.StackTrace);
                Environment.Exit(1);
            }
            catch (nRequestTimedOutException rtoe)
            {
                Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
                Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
                Console.WriteLine(rtoe.StackTrace);
                Environment.Exit(1);
            }
            catch (nChannelAlreadySubscribedException chase)
            {
                Console.WriteLine("You are already subscribed to this channel.");
                Console.WriteLine(chase.StackTrace);
                Environment.Exit(1);
            }
            catch (nSelectorParserException spe)
            {
                Console.WriteLine("An error occured while parsing the selector filter specified.");
                Console.WriteLine("Please check the JMS documentation on how to write a valid selector.");
                Console.WriteLine(spe.StackTrace);
                Environment.Exit(1);
            }
            catch (nBaseClientException nbce)
            {
                Console.WriteLine("An error occured while creating the Channel Attributes object.");
                Console.WriteLine(nbce.StackTrace);
                Environment.Exit(1);
            }
            //Close the session we opened
            try
            {
                nSessionFactory.close(mySession);
            }
            catch (Exception) { }
            //Close any other sessions so that we can exit
            nSessionFactory.shutdown();
        }
        protected override void processArgs(String[] args)
        {
            switch (args.Length)
            {
                case 11:
                    priority = args[10].ToLower().Equals("true");
                    goto case 10;
                case 10:
                    selector = args[9];
                    goto case 9;
                case 9:
                    persistent = args[8].ToLower().Equals("true");
                    goto case 8;
                case 8:
                    cluster = args[7].ToLower().Equals("true");
                    goto case 7;
                case 7:
                    autoAck = args[6].ToLower().Equals("true");
                    goto case 6;
                case 6:
                    reportCount = Convert.ToInt32(args[5]);
                    goto case 5;
                case 5:
                    logLevel = Convert.ToInt32(args[4]);
                    goto case 4;
                case 4:
                    startEid = Convert.ToInt64(args[3]);
                    goto case 3;
                case 3:
                    subname = args[2];
                    goto case 2;
                case 2:
                    channelName = args[1];
                    goto case 1;
                case 1:
                    Usage();
                    Environment.Exit(0); //not all required arguments specified
                    break;
                case 0:
                    Usage();
                    Environment.Exit(0);
                    break;
    
            }
        }

        public static void Main(String[] args)
        {
            //Create an instance for this class
            mySelf = new namedsubscriber();
            subname = Environment.UserName;
            //Process command line arguments
            mySelf.processArgs(args);
            //Process the local REALM RNAME details
            String[] rproperties = new String[4];
            rproperties = parseRealmProperties(mySelf.RNAME);
            //Subscribe to the channel specified
            mySelf.doit(rproperties, mySelf.channelName);
        }
        /**
         * A callback is received by the API to this method each time an event is received from
         * the nirvana channel. Be carefull not to spend too much time processing the message
         * inside this method, as until it exits the next message can not be pushed.
         *
         * @param evt An nConsumeEvent object containing the message received from the channel
         */
        public void go(nConsumeEvent evt)
        {
            
            // if not autAck, the acknowledge the event
            try
            {
                if (!autoAck)
                {
                    evt.ack();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.StackTrace);
            }
            //If this is the first message we receive
            if (count == -1)
            {
                //Get a timestamp to be used for message rate calculations
                startTime = DateTime.Now.Millisecond;
                count = 0;
            }
            //Increment he counter
            count++;
            totalMsgs++;
            //Have we reached the point where we need to report the rates?
            if (count == reportCount)
            {
                //Reset the counter
                count = 0;
                //Get a timestampt to calculate the rates
                long end = DateTime.Now.Millisecond;
                // Does the specified log level permits us to print on the screen?
                if (logLevel >= 1)
                {
                    //Dump the rates on the screen
                    if (end != startTime)
                    {
                        Console.WriteLine("Received " + reportCount + " in " + (end - startTime) + " Evt/Sec = " + ((reportCount * 1000) / (end - startTime)) + " Bytes/sec=" + ((byteCount * 1000) / (end - startTime)));
                        Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
                    }
                    else
                    {
                        Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter");
                    }
                }
                //Set the startTime for the next report equal to the end timestamp for the previous one
                startTime = end;
                //Reset the byte counter
                byteCount = 0;
            }
            //If the last EID counter is not equal to the current event ID
            if (lastEID != evt.getEventID())
            {
                //If yes, maybe we have missed an event, so print a message on the screen.
                //This message could be printed for a number of other reasons.
                //One of them would be someone purging a range creating an 'eid gap'.
                //As eids are never reused within a channel you could have a situation
                //where this gets printed but nothing is missed.
                Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1));
                //Reset the last eid counter
                lastEID = evt.getEventID() + 1;
            }
            else
            {
                //Increment the last eid counter
                lastEID++;
            }
            //Get the data of the message
            byte[] buffer = evt.getEventData();
            if (buffer != null)
            {
                //Add its Length to the byte counter
                byteCount += buffer.Length;
            }
            //If the loglevel permits printing on the screen
            if (logLevel >= 2)
            {
                //Print the eid
                Console.WriteLine("Event id : " + evt.getEventID());
                //If the loglevel permits printing on the screen
                if (logLevel >= 3)
                {
                    //Print the message tag
                    Console.WriteLine("Event tag : " + evt.getEventTag());
                    //Print the message data
                    Console.WriteLine("Event data : " + Convert.ToString(evt.getEventData()));
                    if (evt.hasAttributes())
                    {
                        displayEventAttributes(evt.getAttributes());
                    }
                    nEventProperties prop = evt.getProperties();
                    if (prop != null)
                    {
                        displayEventProperties(prop);
                    }
                }
            }
        }
        /**
         * Prints the usage message for this class
         */
        private static void Usage()
        {
            Console.WriteLine("Usage ...\n");
            Console.WriteLine("namedsubscriber <rname> <channel name> [name] [start eid] [debug] [count] [auto ack] [cluster wide] [persistent] [selector] [priority]\n");
            Console.WriteLine(
              "<Required Arguments> \n");
            Console.WriteLine(
              "<rname> - the rname of the server to connect to");
            Console.WriteLine(
              "<channel name> - Channel name parameter for the channel to subscribe to");
            Console.WriteLine(
              "\n[Optional Arguments] \n");
            Console.WriteLine(
              "[name] - Specifies the unique name to be used for a named subscription (default: OS username) ");
            Console.WriteLine(
              "[start eid] - The Event ID to start subscribing from if the named subscriber needs to be created (doesn't exist)");
            Console.WriteLine(
              "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All");
            Console.WriteLine(
              "[count] - The number of events to wait before printing out summary information (default: 1000) ");
            Console.WriteLine(
              "[auto ack] - Specifies whether each event will be automatically acknowledged by the api (default: true) ");
            Console.WriteLine(
                "[cluster wide] - Specifies whether the named object is to be used across a cluster (default: false) ");
            Console.WriteLine(
                "[persistent] - Specifies whether the named object state is to be stored to disk or held in server memory (default: false) ");
            Console.WriteLine(
              "[selector] - The event filter string to use");
            Console.WriteLine(
              "[priority] - Whether priority is enabled for this named subscriber (default: false)");
        }
   
        #region nNamedPriorityListener Members
        void nNamedPriorityListener.havePriority()
        {
            Console.WriteLine("Got named priority listener....");
        }
        #endregion
    }
}
EXAMPLE_SOURCE_END
Share this page with others:
Tell Your Tweets Facebook It! Add to Delicious Reddit! Digg It! Stumble Upon Add to Your Faves Mixx it
Follow Us:
Keep up with my-Channels on Twitter Become a fan on Facebook LinkedIn Profile Recent Highlights RSS Feed