Nirvana C# .NET: Channel Publisher

This example publishes events onto a Nirvana Channel.

Usage

publish <rname> <channel name> [count] [size] 

<Required Arguments> 

<rname> - the rname of the server to connect to
<channel name> - Channel name parameter for the channel to publish to

[Optional Arguments] 

[count] -The number of events to publish (default: 10)
[size] - The size (bytes) of the event to publish (default: 100)

Application Source Code

See also:

using System;
using System.Threading;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
    /**
     *  Publishes reliably to a nirvana channel
     */
    class publish : nSampleApp
    {
        private bool isOk = true;
        private nBaseClientException asyncException;
        private static publish mySelf = null;
        private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding();
        /**
         * This method demonstrates the Nirvana API calls necessary to publish to
         * a channel.
         * It is called after all command line arguments have been received and
         * validated
         *
         * @param realmDetails a String[] containing the possible RNAME values
         * @param achannelName the channel name to publish to
         * @param count the number of messages to publish
         * @param size the size (bytes) of each message to be published
         */
        private void doit(String[] realmDetails, String achannelName, int count, int size)
        {
            mySelf.constructSession(realmDetails);
            try
            {
                nChannelAttributes nca = new nChannelAttributes();
                nca.setName(achannelName);
                nChannel myChannel = mySession.findChannel(nca);

                //Create a byte array filled with characters equal to
                // the message size specified. This could be a result
                //of String.getBytes() call in a real world scenario.
                byte[] buffer = new byte[size];
                for (int x = 0; x < size; x++)
                {
                    buffer[x] = (byte)((x % 90) + 32);
                }
                //Construct a sample nEventProperties object and add 2 sample properties      
                //Instantiate the message to be published with the specified nEventPropeties and byte[]
                //nConsumeEvent evt1 = new nConsumeEvent("Tag", buffer);
                nEventProperties props = new nEventProperties();
                props.put("key0string", "1");
                props.put("key1int", (int)1);
                props.put("key2long", (long)-11);
                props.put("key3bool", true);
                props.put("key4short", (short)1);
                props.put("key5float", (float)1.123456);
                props.put("key6double", (double)1.123456);
                props.put("key7char", '1');
                nConsumeEvent evt1 = new nConsumeEvent("Tag", buffer);
                //Inform the user that publishing is about to start
                Console.WriteLine("Starting publish of " + count + " events with a size of " + size + " bytes each");
                //Obtain the channel's last event ID prior to us publishing anything
                long startEid = myChannel.getLastEID();
                //Get a timestamp to be used to calculate the message publishing rates
                DateTime start = DateTime.Now;
                //Loop as many times as the number of messages we want to publish
                for (int x = 0; x < count; x++)
                {
                    try
                    {
                        //Publish the event
                        myChannel.publish(evt1);
                    }
                    catch (nSessionNotConnectedException ex)
                    {
                        while (!mySession.isConnected())
                        {
                            Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second...");
                            try
                            {
                                Thread.Sleep(1000);
                            }
                            catch (Exception e) { }
                        }
                        x--; 
                        //We need to repeat the publish for the event publish that caused the exception,
                        // so we reduce the counter
                    }
                    catch (nBaseClientException ex)
                    {
                        Console.WriteLine("Publish.java : Exception : " + ex.Message);
                        throw ex;
                    }

                    //Check if an asynchronous exception has been received
                    if (!isOk)
                    {
                        //If it has, then throw it
                        throw asyncException;
                    }
                }
                //Check if an asynchronous exception has been received
                if (!isOk)
                {
                    //If it has, then throw it
                    throw asyncException;
                }
                //Calculate the actual number of events published by obtaining
                //the channel's last eid after our publishing and subtracting
                //the channel's last eid before our publishing.
                //This also ensures that all client queues have been flushed.
                Console.WriteLine("Get Last EID");
                long events = myChannel.getLastEID() - startEid;
                DateTime end = DateTime.Now;
                //Get a timestamp to calculate the publishing rates
                //Calculate the events / sec rate
                TimeSpan dif = end - start;
                long eventPerSec = ((events * 1000) / (long)(dif.TotalMilliseconds));
                //Calculate the bytes / sec rate
                long bytesPerSec = eventPerSec * size;
                //Inform the user of the resulting rates
                Console.WriteLine("Publish Completed in " + dif.TotalMilliseconds + "ms :");
                Console.WriteLine("[Events Published = " + events + "]  [Events/Second = " + eventPerSec + "]  [Bytes/Second = " + bytesPerSec + "]");
                Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
            }
            catch (nSessionPausedException ps)
            {
                Console.WriteLine("Session has been paused, please resume the session");
                Environment.Exit(1);
            }
            catch (nSecurityException se)
            {
                Console.WriteLine("Unsufficient permissions for the requested operation.");
                Console.WriteLine("Please check the ACL settings on the server.");
                Environment.Exit(1);
            }
            catch (nSessionNotConnectedException snce)
            {
                Console.WriteLine("The session object used is not physically connected to the Nirvana realm.");
                Console.WriteLine("Please ensure the realm is up and check your RNAME value.");
                Environment.Exit(1);
            }
            catch (nUnexpectedResponseException ure)
            {
                Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
                Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
                Environment.Exit(1);
            }
            catch (nRequestTimedOutException rtoe)
            {
                Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
                Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
                Environment.Exit(1);
            }
            //Close the session we opened
            try
            {
                nSessionFactory.close(mySession);
            }
            catch (Exception ex) { }
            //Close any other sessions within this JVM so that we can exit
            nSessionFactory.shutdown();
        }
        protected override void processArgs(String[] args)
        {
            //
            // Need a min of 2, rname, channel name
            if (args.Length < 3)
            {
                Usage();
                UsageEnv();
                Environment.Exit(2);
            }
            String RNAME = args[1]; ;
            var channelName = args[2];

            int count = 1000;
            if (args.Length > 3)
            {
                count = Convert.ToInt32(args[3]);
            }
            int size = 1024;
            if (args.Length > 4)
            {
                size = Convert.ToInt32(args[4]);
            }
            //
            // Run the sample app
            //
            mySelf.doit(parseRealmProperties(RNAME), channelName, count, size);
        }
        public static void Main(String[] args)
        {
            //Create an instance for this class
            mySelf = new publish();
            //Process command line arguments
            mySelf.processArgs(Environment.GetCommandLineArgs());
        }
        /**
         * Prints the usage message for this class
         */
        private static void Usage()
        {
            Console.WriteLine("Usage ...\n");
            Console.WriteLine("publish <rname> <channel name> [count] [size] \n");
            Console.WriteLine(
              "<Required Arguments> \n");
            Console.WriteLine(
              "<rname> - the rname of the server to connect to");
            Console.WriteLine(
              "<channel name> - Channel name parameter for the channel to publish to");
            Console.WriteLine(
              "\n[Optional Arguments] \n");
            Console.WriteLine(
              "[count] -The number of events to publish (default: 10)");
            Console.WriteLine(
              "[size] - The size (bytes) of the event to publish (default: 100)");
        }
    }
}
EXAMPLE_SOURCE_END
Share this page with others:
Tell Your Tweets Facebook It! Add to Delicious Reddit! Digg It! Stumble Upon Add to Your Faves Mixx it
Follow Us:
Keep up with my-Channels on Twitter Become a fan on Facebook LinkedIn Profile Recent Highlights RSS Feed