Nirvana C# .NET: Synchronous Queue Consumer

This example shows how to synchronously consume events from a Nirvana Queue. See also: Asynchronous Queue Subscription

Usage

npopq <rname> <queue name> [timeout] [debug] [count] [selector] 

<Required Arguments> 

<rname> - the rname of the server to connect to
<queue name> - Queue name parameter for the queue to pop from

[Optional Arguments] 

[timeout] - The timeout for the dequeue operation
[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All
[count] - The number of events to wait before printing out summary information
[selector] - The event filter string to use

Application Source Code

See also:

using System;
using System.IO;
using System.Threading;
using com.pcbsys.nirvana.client;

namespace com.pcbsys.nirvana.apps
{
    /**
     * Creates a synchronous queue reader and pops the queue
     */
    class qreader : nSampleApp, nEventListener
    {
        private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding();
        private long timeout;
        private long lastEID = 0;
        private DateTime startTime;
        private long byteCount = 0;
        private int logLevel = 0;
        private int count = -1;
        private int totalMsgs = 0;
        private int reportCount = 1000;
        private bool isTx = false;
        private nQueue myQueue;
        private nQueueSyncReader reader = null;
        private Thread qPopper = null;
        private static qreader mySelf = null;
        /**
         * This method demonstrates the Nirvana API calls necessary to create a 
         * synchronous queue reader
         *
         * It is called after all command line arguments have been received and
         * validated
         *
         * @param realmDetails a String[] containing the possible RNAME values
         * @param aqueueName the queue name to pop
         * @param selector the pop selector filter
         * @param startEid the eid to start popping from
         * @param loglvl the specified log level
         * @param repCount the specified report count
         */
        private void doit(String[] realmDetails, String aqueueName, long time, int loglvl, int repCount, String sel)
        {
            logLevel = loglvl;
            reportCount = repCount;
            timeout = time;
            mySelf.constructSession(realmDetails);
            //Subscribes to the specified queue
            try
            {
                //Create a channel attributes object
                var nca = new nChannelAttributes();
                nca.setName(aqueueName);
                //Obtain the queue reference
                myQueue = mySession.findQueue(nca);
                //output queue details
                nQueueDetails details = myQueue.getDetails();
                Console.WriteLine("Current queue size = " + details.getNoOfEvents());
                Console.WriteLine("Current queue age  = " + (details.getLastEventTime() - details.getFirstEventTime()));
                Console.WriteLine("Current storage size = " + details.getTotalMemorySize());
                Console.WriteLine("Current readers    = " + details.getNoOfReaders());
                //create the queue reader
                reader = myQueue.createReader(new nQueueReaderContext(this, sel));
                ThreadStart ts = new ThreadStart(run);
                qPopper = new Thread(ts);
                qPopper.IsBackground = true;
                qPopper.Start();
                qPopper.Name = "Q Reader thread";
                //Stay subscribed until the user presses any key
                Console.ReadLine();
                Console.WriteLine("Finished. Consumed total of " + totalMsgs);
                //Destroy the queue reader
                nQueue.destroyReader(reader);
            }
            //Handle errors
            catch (nChannelNotFoundException cnfe)
            {
                Console.WriteLine("The queue specified could not be found.");
                Console.WriteLine("Please ensure that the queue exists in the REALM you connect to.");
                Environment.Exit(1);
            }
            catch (nSecurityException se)
            {
                Console.WriteLine("Unsufficient permissions for the requested operation.");
                Console.WriteLine("Please check the ACL settings on the server.");
                Environment.Exit(1);
            }
            catch (nSessionNotConnectedException snce)
            {
                Console.WriteLine("The session object used is not physically connected to the Nirvana realm.");
                Console.WriteLine("Please ensure the realm is up and check your RNAME value.");
                Environment.Exit(1);
            }
            catch (nUnexpectedResponseException ure)
            {
                Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
                Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
                Environment.Exit(1);
            }
            catch (nUnknownRemoteRealmException urre)
            {
                Console.WriteLine("The queue specified resided in a remote realm which could not be found.");
                Console.WriteLine("Please ensure the queue name specified is correct.");
                Environment.Exit(1);
            }
            catch (nRequestTimedOutException rtoe)
            {
                Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
                Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
                Environment.Exit(1);
            }
            catch (nBaseClientException nbce)
            {
                Console.WriteLine("An error occured while creating the Channel Attributes object.");
                Environment.Exit(1);
            }
            catch (IOException e)
            {
                Console.WriteLine(e.StackTrace);
            }
            //Close the session we opened
            try
            {
                nSessionFactory.close(mySession);
            }
            catch (Exception ex) { }
            //Close any other sessions within this JVM so that we can exit
            nSessionFactory.shutdown();
        }
        protected override void processArgs(String[] args)
        {
            //
            // Need a min of 2, rname, channel name
            if (args.Length < 3)
            {
                Usage();
                UsageEnv();
                Environment.Exit(2);
            }
            String RNAME = args[1]; ;
            var queueName = args[2];

            var time = 10000;
            if (args.Length > 3)
            {
                time = Convert.ToInt32(args[3]);
            }
            var loglvl = 3;
            if (args.Length > 4)
            {
                loglvl = Convert.ToInt32(args[4]);
            }
            //
            // Optional Parameters
            //
            var report = 1000;
            //Check for a selector message filter value
            if (args.Length > 5)
            {
                report = Convert.ToInt32(args[5]);
            }
            String sel = null;
            if (args.Length > 6)
            {
                sel = args[6];
            }
            //
            // Run the sample app
            //
            mySelf.doit(parseRealmProperties(RNAME), queueName, time, loglvl, report, sel);
        }
        public static void Main(String[] args)
        {
            //Create an instance for this class
            mySelf = new qreader();
            mySelf.processArgs(Environment.GetCommandLineArgs());
        }
        /**
         * A callback is received by the API to this method each time an event is received from
         * the nirvana channel. Be carefull not to spend too much time processing the message
         * inside this method, as until it exits the next message can not be pushed.
         *
         * @param evt An nConsumeEvent object containing the message received from the channel
         */
        public void go(nConsumeEvent evt)
        {
            if (isTx)
            {
                ((nQueueSyncTransactionReader)reader).commit(evt.getEventID());
            }
            //If this is the first message we receive
            if (count == -1)
            {
                //Get a timestamp to be used for message rate calculations
                startTime = DateTime.Now;
                count = 0;
            }
            //Increment he counter
            count++;
            totalMsgs++;
            //Have we reached the point where we need to report the rates?
            if (count == reportCount)
            {
                //Reset the counter
                count = 0;
                //Get a timestampt to calculate the rates
                DateTime end = DateTime.Now;
                // Does the specified log level permits us to print on the screen?
                if (logLevel >= 1)
                {
                    //Dump the rates on the screen
                    if (!end.Equals(startTime))
                    {
                        TimeSpan ts = end - startTime;
                        Console.WriteLine("Received " + reportCount + " in " + (ts.Milliseconds) + " Evt/Sec = " + ((reportCount * 1000) / (ts.Milliseconds)) + " Bytes/sec=" + ((byteCount * 1000) / (ts.Milliseconds)));
                        Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
                    }
                    else
                    {
                        Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter");
                    }
                }
                //Set the startTime for the next report equal to the end timestamp for the previous one
                startTime = end;
                //Reset the byte counter
                byteCount = 0;
            }
            //If the last EID counter is not equal to the current event ID
            if (lastEID != evt.getEventID())
            {
                //If yes, maybe we have missed an event, so print a message on the screen.
                //This message could be printed for a number of other reasons.
                //One of them would be someone purging a range creating an 'eid gap'.
                //As eids are never reused within a channel you could have a situation
                //where this gets printed but nothing is missed.
                Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1));
                //Reset the last eid counter
                lastEID = evt.getEventID() + 1;
            }
            else
            {
                //Increment the last eid counter
                lastEID++;
            }
            //Get the data of the message
            byte[] buffer = evt.getEventData();
            if (buffer != null)
            {
                //Add its length to the byte counter
                byteCount += buffer.Length;
            }
            //If the loglevel permits printing on the screen
            if (logLevel >= 2)
            {
                //Print the eid
                Console.WriteLine("Event id : " + evt.getEventID());
                if (evt.isEndOfChannel())
                {
                    Console.WriteLine("End of channel reached");
                }
                //If the loglevel permits printing on the screen
                if (logLevel >= 3)
                {
                    //Print the message tag
                    Console.WriteLine("Event tag : " + evt.getEventTag());
                    //Print the message data
                    Console.WriteLine("Event data : " + encoding.GetString(evt.getEventData()));
                    if (evt.hasAttributes())
                    {
                        displayEventAttributes(evt.getAttributes());
                    }
                    nEventProperties prop = evt.getProperties();
                    if (prop != null)
                    {
                        displayEventProperties(prop);
                    }
                }
            }
        }
        public void run()
        {
            while (true)
            {
                try
                {
                    nConsumeEvent evt = reader.pop(timeout);
                    if (evt != null)
                    {
                        go(evt);
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Exception in pop....exiting!");
                    Console.WriteLine(e.StackTrace);
                    break;
                }
            }
            Environment.Exit(1);
        }
        /**
         * Prints the usage message for this class
         */
        private static void Usage()
        {
            Console.WriteLine("Usage ...\n");
            Console.WriteLine("npopq <rname> <queue name> [timeout] [debug] [count] [selector] \n");
            Console.WriteLine(
              "<Required Arguments> \n");
            Console.WriteLine(
              "<rname> - the rname of the server to connect to");
            Console.WriteLine(
              "<queue name> - Queue name parameter for the queue to pop from");
            Console.WriteLine(
              "\n[Optional Arguments] \n");
            Console.WriteLine(
              "[timeout] - The timeout for the dequeue operation");
            Console.WriteLine(
              "[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All");
            Console.WriteLine(
              "[count] - The number of events to wait before printing out summary information");
            Console.WriteLine(
              "[selector] - The event filter string to use");
        }
    }
}
EXAMPLE_SOURCE_END
Share this page with others:
Tell Your Tweets Facebook It! Add to Delicious Reddit! Digg It! Stumble Upon Add to Your Faves Mixx it
Follow Us:
Keep up with my-Channels on Twitter Become a fan on Facebook LinkedIn Profile Recent Highlights RSS Feed