Nirvana C# .NET: Asynchronous Named Channel Consumer
This example shows how to asynchronously subscribe to events on a Nirvana Channel using a named object.
Usage
namedsubscriber <rname> <channel name> [name] [start eid] [debug] [count] [auto ack] [cluster wide] [persistent] [selector] [priority] <Required Arguments> <rname> - the rname of the server to connect to <channel name> - Channel name parameter for the channel to subscribe to [Optional Arguments] [name] - Specifies the unique name to be used for a named subscription (default: OS username) [start eid] - The Event ID to start subscribing from if the named subscriber needs to be created (doesn't exist) [debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All [count] - The number of events to wait before printing out summary information (default: 1000) [auto ack] - Specifies whether each event will be automatically acknowledged by the api (default: true) [cluster wide] - Specifies whether the named object is to be used across a cluster (default: false) [persistent] - Specifies whether the named object state is to be stored to disk or held in server memory (default: false) [selector] - The event filter string to use [priority] - Whether priority is enabled for this named subscriber (default: false)
Application Source Code
See also:
using System;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
class namedsubscriber : nSampleApp, nEventListener, nNamedPriorityListener
{
private static long startEid;
private static String selector;
private static String subname;
private String RNAME;
private String channelName;
private static bool autoAck = true;
private static bool cluster = false;
private static bool persistent = false;
private static bool priority = false;
private long lastEID;
private long startTime;
private long byteCount;
private int count = -1;
private int totalMsgs;
private int logLevel = 1;
private int reportCount = 1000;
private nChannel myChannel;
private static namedsubscriber mySelf;
private nNamedObject named;
/**
* This method demonstrates the Nirvana API calls necessary to subscribe to
* a channel as a named subscriber.
* It is called after all command line arguments have been received and
* validated
*
* @param realmDetails a String[] containing the possible RNAME values
* @param achannelName the channel name to create
* @param loglvl the specified debug level
* @param repCount the specified report count
*/
private void doit(String[] realmDetails, String achannelName)
{
mySelf.constructSession(realmDetails);
//Subscribes to the specified channel
try
{
//Create a channel attributes object
nChannelAttributes nca = new nChannelAttributes();
nca.setName(achannelName);
//Obtain the channel reference
myChannel = mySession.findChannel(nca);
//Create a named object and add it as a subscribe to the channel with the specified message selector
// and start eid
try
{
named = myChannel.getNamedObject(subname);
} catch (Exception ex)
{
}
if (named == null)
{
named = myChannel.createNamedObject(subname, startEid, persistent, cluster, priority);
}
Console.WriteLine("Using selector : " + selector);
myChannel.addSubscriber(this, named, selector, autoAck, this);
//Stay subscribed until the user presses any key
Console.WriteLine("Press enter key to quit !");
Console.Read();
Console.WriteLine("Finished. Consumed total of " + totalMsgs);
//Remove this subscriber
myChannel.removeSubscriber(this);
}
//Handle errors
catch (nChannelNotFoundException cnfe)
{
Console.WriteLine("The channel specified could not be found.");
Console.WriteLine("Please ensure that the channel exists in the REALM you connect to.");
Console.WriteLine(cnfe.StackTrace);
Environment.Exit(1);
}
catch (nSecurityException se)
{
Console.WriteLine("Unsufficient permissions for the requested operation.");
Console.WriteLine("Please check the ACL settings on the server.");
Console.WriteLine(se.StackTrace);
Environment.Exit(1);
}
catch (nSessionNotConnectedException snce)
{
Console.WriteLine("The session object used is not physically connected to the Nirvana Realm.");
Console.WriteLine("Please ensure the realm is running and check your RNAME value.");
Console.WriteLine(snce.StackTrace);
Environment.Exit(1);
}
catch (nUnexpectedResponseException ure)
{
Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
Console.WriteLine(ure.StackTrace);
Environment.Exit(1);
}
catch (nUnknownRemoteRealmException urre)
{
Console.WriteLine("The channel specified resided in a remote realm which could not be found.");
Console.WriteLine("Please ensure the channel name specified is correct.");
Console.WriteLine(urre.StackTrace);
Environment.Exit(1);
}
catch (nRequestTimedOutException rtoe)
{
Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
Console.WriteLine(rtoe.StackTrace);
Environment.Exit(1);
}
catch (nChannelAlreadySubscribedException chase)
{
Console.WriteLine("You are already subscribed to this channel.");
Console.WriteLine(chase.StackTrace);
Environment.Exit(1);
}
catch (nSelectorParserException spe)
{
Console.WriteLine("An error occured while parsing the selector filter specified.");
Console.WriteLine("Please check the JMS documentation on how to write a valid selector.");
Console.WriteLine(spe.StackTrace);
Environment.Exit(1);
}
catch (nBaseClientException nbce)
{
Console.WriteLine("An error occured while creating the Channel Attributes object.");
Console.WriteLine(nbce.StackTrace);
Environment.Exit(1);
}
//Close the session we opened
try
{
nSessionFactory.close(mySession);
}
catch (Exception) { }
//Close any other sessions so that we can exit
nSessionFactory.shutdown();
}
protected override void processArgs(String[] args)
{
switch (args.Length)
{
case 11:
priority = args[10].ToLower().Equals("true");
goto case 10;
case 10:
selector = args[9];
goto case 9;
case 9:
persistent = args[8].ToLower().Equals("true");
goto case 8;
case 8:
cluster = args[7].ToLower().Equals("true");
goto case 7;
case 7:
autoAck = args[6].ToLower().Equals("true");
goto case 6;
case 6:
reportCount = Convert.ToInt32(args[5]);
goto case 5;
case 5:
logLevel = Convert.ToInt32(args[4]);
goto case 4;
case 4:
startEid = Convert.ToInt64(args[3]);
goto case 3;
case 3:
subname = args[2];
goto case 2;
case 2:
channelName = args[1];
goto case 1;
case 1:
Usage();
Environment.Exit(0); //not all required arguments specified
break;
case 0:
Usage();
Environment.Exit(0);
break;
}
}
public static void Main(String[] args)
{
//Create an instance for this class
mySelf = new namedsubscriber();
subname = Environment.UserName;
//Process command line arguments
mySelf.processArgs(args);
//Process the local REALM RNAME details
String[] rproperties = new String[4];
rproperties = parseRealmProperties(mySelf.RNAME);
//Subscribe to the channel specified
mySelf.doit(rproperties, mySelf.channelName);
}
/**
* A callback is received by the API to this method each time an event is received from
* the nirvana channel. Be carefull not to spend too much time processing the message
* inside this method, as until it exits the next message can not be pushed.
*
* @param evt An nConsumeEvent object containing the message received from the channel
*/
public void go(nConsumeEvent evt)
{
// if not autAck, the acknowledge the event
try
{
if (!autoAck)
{
evt.ack();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.StackTrace);
}
//If this is the first message we receive
if (count == -1)
{
//Get a timestamp to be used for message rate calculations
startTime = DateTime.Now.Millisecond;
count = 0;
}
//Increment he counter
count++;
totalMsgs++;
//Have we reached the point where we need to report the rates?
if (count == reportCount)
{
//Reset the counter
count = 0;
//Get a timestampt to calculate the rates
long end = DateTime.Now.Millisecond;
// Does the specified log level permits us to print on the screen?
if (logLevel >= 1)
{
//Dump the rates on the screen
if (end != startTime)
{
Console.WriteLine("Received " + reportCount + " in " + (end - startTime) + " Evt/Sec = " + ((reportCount * 1000) / (end - startTime)) + " Bytes/sec=" + ((byteCount * 1000) / (end - startTime)));
Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
}
else
{
Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter");
}
}
//Set the startTime for the next report equal to the end timestamp for the previous one
startTime = end;
//Reset the byte counter
byteCount = 0;
}
//If the last EID counter is not equal to the current event ID
if (lastEID != evt.getEventID())
{
//If yes, maybe we have missed an event, so print a message on the screen.
//This message could be printed for a number of other reasons.
//One of them would be someone purging a range creating an 'eid gap'.
//As eids are never reused within a channel you could have a situation
//where this gets printed but nothing is missed.
Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1));
//Reset the last eid counter
lastEID = evt.getEventID() + 1;
}
else
{
//Increment the last eid counter
lastEID++;
}
//Get the data of the message
byte[] buffer = evt.getEventData();
if (buffer != null)
{
//Add its Length to the byte counter
byteCount += buffer.Length;
}
//If the loglevel permits printing on the screen
if (logLevel >= 2)
{
//Print the eid
Console.WriteLine("Event id : " + evt.getEventID());
//If the loglevel permits printing on the screen
if (logLevel >= 3)
{
//Print the message tag
Console.WriteLine("Event tag : " + evt.getEventTag());
//Print the message data
Console.WriteLine("Event data : " + Convert.ToString(evt.getEventData()));
if (evt.hasAttributes())
{
displayEventAttributes(evt.getAttributes());
}
nEventProperties prop = evt.getProperties();
if (prop != null)
{
displayEventProperties(prop);
}
}
}
}
/**
* Prints the usage message for this class
*/
private static void Usage()
{
Console.WriteLine("Usage ...\n");
Console.WriteLine("namedsubscriber <rname> <channel name> [name] [start eid] [debug] [count] [auto ack] [cluster wide] [persistent] [selector] [priority]\n");
Console.WriteLine(
"<Required Arguments> \n");
Console.WriteLine(
"<rname> - the rname of the server to connect to");
Console.WriteLine(
"<channel name> - Channel name parameter for the channel to subscribe to");
Console.WriteLine(
"\n[Optional Arguments] \n");
Console.WriteLine(
"[name] - Specifies the unique name to be used for a named subscription (default: OS username) ");
Console.WriteLine(
"[start eid] - The Event ID to start subscribing from if the named subscriber needs to be created (doesn't exist)");
Console.WriteLine(
"[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All");
Console.WriteLine(
"[count] - The number of events to wait before printing out summary information (default: 1000) ");
Console.WriteLine(
"[auto ack] - Specifies whether each event will be automatically acknowledged by the api (default: true) ");
Console.WriteLine(
"[cluster wide] - Specifies whether the named object is to be used across a cluster (default: false) ");
Console.WriteLine(
"[persistent] - Specifies whether the named object state is to be stored to disk or held in server memory (default: false) ");
Console.WriteLine(
"[selector] - The event filter string to use");
Console.WriteLine(
"[priority] - Whether priority is enabled for this named subscriber (default: false)");
}
#region nNamedPriorityListener Members
void nNamedPriorityListener.havePriority()
{
Console.WriteLine("Got named priority listener....");
}
#endregion
}
}
EXAMPLE_SOURCE_END
