Nirvana C# .NET: Synchronous Queue Consumer
This example shows how to synchronously consume events from a Nirvana Queue. See also: Asynchronous Queue Subscription
Usage
npopq <rname> <queue name> [timeout] [debug] [count] [selector] <Required Arguments> <rname> - the rname of the server to connect to <queue name> - Queue name parameter for the queue to pop from [Optional Arguments] [timeout] - The timeout for the dequeue operation [debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All [count] - The number of events to wait before printing out summary information [selector] - The event filter string to use
Application Source Code
See also:
using System;
using System.IO;
using System.Threading;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
/**
* Creates a synchronous queue reader and pops the queue
*/
class qreader : nSampleApp, nEventListener
{
private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding();
private long timeout;
private long lastEID = 0;
private DateTime startTime;
private long byteCount = 0;
private int logLevel = 0;
private int count = -1;
private int totalMsgs = 0;
private int reportCount = 1000;
private bool isTx = false;
private nQueue myQueue;
private nQueueSyncReader reader = null;
private Thread qPopper = null;
private static qreader mySelf = null;
/**
* This method demonstrates the Nirvana API calls necessary to create a
* synchronous queue reader
*
* It is called after all command line arguments have been received and
* validated
*
* @param realmDetails a String[] containing the possible RNAME values
* @param aqueueName the queue name to pop
* @param selector the pop selector filter
* @param startEid the eid to start popping from
* @param loglvl the specified log level
* @param repCount the specified report count
*/
private void doit(String[] realmDetails, String aqueueName, long time, int loglvl, int repCount, String sel)
{
logLevel = loglvl;
reportCount = repCount;
timeout = time;
mySelf.constructSession(realmDetails);
//Subscribes to the specified queue
try
{
//Create a channel attributes object
var nca = new nChannelAttributes();
nca.setName(aqueueName);
//Obtain the queue reference
myQueue = mySession.findQueue(nca);
//output queue details
nQueueDetails details = myQueue.getDetails();
Console.WriteLine("Current queue size = " + details.getNoOfEvents());
Console.WriteLine("Current queue age = " + (details.getLastEventTime() - details.getFirstEventTime()));
Console.WriteLine("Current storage size = " + details.getTotalMemorySize());
Console.WriteLine("Current readers = " + details.getNoOfReaders());
//create the queue reader
reader = myQueue.createReader(new nQueueReaderContext(this, sel));
ThreadStart ts = new ThreadStart(run);
qPopper = new Thread(ts);
qPopper.IsBackground = true;
qPopper.Start();
qPopper.Name = "Q Reader thread";
//Stay subscribed until the user presses any key
Console.ReadLine();
Console.WriteLine("Finished. Consumed total of " + totalMsgs);
//Destroy the queue reader
nQueue.destroyReader(reader);
}
//Handle errors
catch (nChannelNotFoundException cnfe)
{
Console.WriteLine("The queue specified could not be found.");
Console.WriteLine("Please ensure that the queue exists in the REALM you connect to.");
Environment.Exit(1);
}
catch (nSecurityException se)
{
Console.WriteLine("Unsufficient permissions for the requested operation.");
Console.WriteLine("Please check the ACL settings on the server.");
Environment.Exit(1);
}
catch (nSessionNotConnectedException snce)
{
Console.WriteLine("The session object used is not physically connected to the Nirvana realm.");
Console.WriteLine("Please ensure the realm is up and check your RNAME value.");
Environment.Exit(1);
}
catch (nUnexpectedResponseException ure)
{
Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
Environment.Exit(1);
}
catch (nUnknownRemoteRealmException urre)
{
Console.WriteLine("The queue specified resided in a remote realm which could not be found.");
Console.WriteLine("Please ensure the queue name specified is correct.");
Environment.Exit(1);
}
catch (nRequestTimedOutException rtoe)
{
Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
Environment.Exit(1);
}
catch (nBaseClientException nbce)
{
Console.WriteLine("An error occured while creating the Channel Attributes object.");
Environment.Exit(1);
}
catch (IOException e)
{
Console.WriteLine(e.StackTrace);
}
//Close the session we opened
try
{
nSessionFactory.close(mySession);
}
catch (Exception ex) { }
//Close any other sessions within this JVM so that we can exit
nSessionFactory.shutdown();
}
protected override void processArgs(String[] args)
{
//
// Need a min of 2, rname, channel name
if (args.Length < 3)
{
Usage();
UsageEnv();
Environment.Exit(2);
}
String RNAME = args[1]; ;
var queueName = args[2];
var time = 10000;
if (args.Length > 3)
{
time = Convert.ToInt32(args[3]);
}
var loglvl = 3;
if (args.Length > 4)
{
loglvl = Convert.ToInt32(args[4]);
}
//
// Optional Parameters
//
var report = 1000;
//Check for a selector message filter value
if (args.Length > 5)
{
report = Convert.ToInt32(args[5]);
}
String sel = null;
if (args.Length > 6)
{
sel = args[6];
}
//
// Run the sample app
//
mySelf.doit(parseRealmProperties(RNAME), queueName, time, loglvl, report, sel);
}
public static void Main(String[] args)
{
//Create an instance for this class
mySelf = new qreader();
mySelf.processArgs(Environment.GetCommandLineArgs());
}
/**
* A callback is received by the API to this method each time an event is received from
* the nirvana channel. Be carefull not to spend too much time processing the message
* inside this method, as until it exits the next message can not be pushed.
*
* @param evt An nConsumeEvent object containing the message received from the channel
*/
public void go(nConsumeEvent evt)
{
if (isTx)
{
((nQueueSyncTransactionReader)reader).commit(evt.getEventID());
}
//If this is the first message we receive
if (count == -1)
{
//Get a timestamp to be used for message rate calculations
startTime = DateTime.Now;
count = 0;
}
//Increment he counter
count++;
totalMsgs++;
//Have we reached the point where we need to report the rates?
if (count == reportCount)
{
//Reset the counter
count = 0;
//Get a timestampt to calculate the rates
DateTime end = DateTime.Now;
// Does the specified log level permits us to print on the screen?
if (logLevel >= 1)
{
//Dump the rates on the screen
if (!end.Equals(startTime))
{
TimeSpan ts = end - startTime;
Console.WriteLine("Received " + reportCount + " in " + (ts.Milliseconds) + " Evt/Sec = " + ((reportCount * 1000) / (ts.Milliseconds)) + " Bytes/sec=" + ((byteCount * 1000) / (ts.Milliseconds)));
Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
}
else
{
Console.WriteLine("Received " + reportCount + " faster than the system millisecond counter");
}
}
//Set the startTime for the next report equal to the end timestamp for the previous one
startTime = end;
//Reset the byte counter
byteCount = 0;
}
//If the last EID counter is not equal to the current event ID
if (lastEID != evt.getEventID())
{
//If yes, maybe we have missed an event, so print a message on the screen.
//This message could be printed for a number of other reasons.
//One of them would be someone purging a range creating an 'eid gap'.
//As eids are never reused within a channel you could have a situation
//where this gets printed but nothing is missed.
Console.WriteLine("Expired event range " + (lastEID) + " - " + (evt.getEventID() - 1));
//Reset the last eid counter
lastEID = evt.getEventID() + 1;
}
else
{
//Increment the last eid counter
lastEID++;
}
//Get the data of the message
byte[] buffer = evt.getEventData();
if (buffer != null)
{
//Add its length to the byte counter
byteCount += buffer.Length;
}
//If the loglevel permits printing on the screen
if (logLevel >= 2)
{
//Print the eid
Console.WriteLine("Event id : " + evt.getEventID());
if (evt.isEndOfChannel())
{
Console.WriteLine("End of channel reached");
}
//If the loglevel permits printing on the screen
if (logLevel >= 3)
{
//Print the message tag
Console.WriteLine("Event tag : " + evt.getEventTag());
//Print the message data
Console.WriteLine("Event data : " + encoding.GetString(evt.getEventData()));
if (evt.hasAttributes())
{
displayEventAttributes(evt.getAttributes());
}
nEventProperties prop = evt.getProperties();
if (prop != null)
{
displayEventProperties(prop);
}
}
}
}
public void run()
{
while (true)
{
try
{
nConsumeEvent evt = reader.pop(timeout);
if (evt != null)
{
go(evt);
}
}
catch (Exception e)
{
Console.WriteLine("Exception in pop....exiting!");
Console.WriteLine(e.StackTrace);
break;
}
}
Environment.Exit(1);
}
/**
* Prints the usage message for this class
*/
private static void Usage()
{
Console.WriteLine("Usage ...\n");
Console.WriteLine("npopq <rname> <queue name> [timeout] [debug] [count] [selector] \n");
Console.WriteLine(
"<Required Arguments> \n");
Console.WriteLine(
"<rname> - the rname of the server to connect to");
Console.WriteLine(
"<queue name> - Queue name parameter for the queue to pop from");
Console.WriteLine(
"\n[Optional Arguments] \n");
Console.WriteLine(
"[timeout] - The timeout for the dequeue operation");
Console.WriteLine(
"[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All");
Console.WriteLine(
"[count] - The number of events to wait before printing out summary information");
Console.WriteLine(
"[selector] - The event filter string to use");
}
}
}
EXAMPLE_SOURCE_END
