Nirvana C# .NET: Mutiplex a Session
Multiplex two Nirvana sessions over one channel.
Usage
multiplex <channel name> [start eid] [debug] [count] [selector] <Required Arguments> <channel name> - Channel name parameter for the channel to subscribe to" ); [Optional Arguments] [start eid] - The Event ID to start subscribing from" ); [debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All" ); [count] - The number of events to wait before printing out summary information [selector] - The event filter string to use Note: -? provides help on environment variables
Application Source Code
See also:
/*
* ----------------------------------------------------------------------------------
*
* PCB Systems Limited License Version 1.1
* Copyright ? PCB Systems Limited. All rights reserved
*
* In the event that you should download or otherwise use this software
* ( the "Software" ) you hereby acknowledge and agree that:
*
* 1. The Software is the property of PCB Systems Limited: Title, Copyright and all
* other proprietary rights, interest and benefit in and to the Software is and
* shall be owned by PCB Systems Limited;
*
* 2. You will not make copies of the Software whatsoever other than, if you should
* so wish, a single copy for archival purposes only;
*
* 3. You will not modify, reverse assemble, decompile, reverse engineer or otherwise
* translate the Software;
*
* 4. You will not redistribute, copy, forward electronically or circulate the Software
* to any person for any purpose whatsoever without the prior written consent of
* PCB Systems Limited;
*
* 5. You will not charge for, market or provide any managed service or product that
* is based upon or includes the Software or any variant of it; and
*
* 6. You will not use the Software for any purpose apart from your own personal,
* noncommercial and lawful use;
*
* You hereby agree that the software is used by you on an "as is" basis, without
* warranty of any kind. PCB Systems Limited hereby expressly disclaim all warranties
* and conditions, either expressed or implied, including but not limited to any
* implied warranties or conditions or merchantability and fitness for a particular
* purpose.
*
* You agree that you are solely responsible for determining the appropriateness of
* using the Software and assume all risks associated with it including but not
* limited to the risks of program errors, damage to or loss of of data, programs or
* equipment and unavailability or interruption of operations.
*
* PCB Systems Limited will not be liable for any direct damages or for any, special,
* incidental or indirect damages or for any economic consequential damages ( including
* lost profits or savings ), or any damage howsoever arising.
*
* ------------------------------------------------------------------------
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
class multiplex : nSampleApp, nEventListener
{
static long startEid;
static String selector = null;
String channelName;
int startEID = -1;
private long lastEID = 0;
private long startTime = 0;
private long byteCount = 0;
private String rname;
private int logLevel = 0;
private int count = -1;
private int totalMsgs = 0;
private int reportCount = 10000;
private nChannel myChannel;
private static multiplex mySelf = null;
public String Selector;
private void doit(String[] realmDetails, String achannelName, String selector, long startEid, int loglvl, int repCount) {
logLevel = loglvl;
reportCount = repCount;
nSession otherSession =null;
nChannel otherChannel = null;
try {
nsa = new nSessionAttributes(realmDetails, 2);
//Construct first session.
mySession = nSessionFactory.create(nsa, this);
mySession.init();
//Construct second session by multiplexing the first session.
otherSession = nSessionFactory.createMultiplexed(mySession);
otherSession.init();
//Connect to a channel using the first session.
nChannelAttributes nca = new nChannelAttributes();
nca.setName(achannelName);
//Obtain the channel reference
myChannel = mySession.findChannel( nca );
otherChannel = otherSession.findChannel(nca);
//if the latest event has been implied (by specifying -1)
if(startEid == -1){
//Get the last eid on the channel and reset the start eid with that value
startEid = myChannel.getLastEID();
}
//Add this object as a subscribe to the channel with the specified message selector
// and start eid
myChannel.addSubscriber( this, selector, startEid );
otherChannel.addSubscriber(this,selector,startEid);
//Stay subscribed until the user presses any key
Console.WriteLine( "Press any key to quit !" );
Console.Read();
Console.WriteLine( "Finished. Consumed total of "+totalMsgs );
//Remove this subscriber
myChannel.removeSubscriber( this );
}
//Handle errors
catch(nChannelNotFoundException cnfe){
Console.WriteLine("The channel specified could not be found.");
Console.WriteLine("Please ensure that the channel exists in the REALM you connect to.");
Console.WriteLine(cnfe.StackTrace);
Environment.Exit(1);
}
catch (nSecurityException se) {
Console.WriteLine("Unsufficient permissions for the requested operation.");
Console.WriteLine("Please check the ACL settings on the server.");
Console.WriteLine(se.StackTrace);
Environment.Exit(1);
}
catch (nSessionNotConnectedException snce) {
Console.WriteLine("The session object used is not physically connected to the Nirvana Realm.");
Console.WriteLine("Please ensure the realm is running and check your RNAME value.");
Console.WriteLine(snce.StackTrace);
Environment.Exit(1);
}
catch (nUnexpectedResponseException ure) {
Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
Console.WriteLine(ure.StackTrace);
Environment.Exit(1);
}
catch (nUnknownRemoteRealmException urre) {
Console.WriteLine("The channel specified resided in a remote realm which could not be found.");
Console.WriteLine("Please ensure the channel name specified is correct.");
Console.WriteLine(urre.StackTrace);
Environment.Exit(1);
}
catch (nRequestTimedOutException rtoe) {
Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
Console.WriteLine(rtoe.StackTrace);
Environment.Exit(1);
}
catch (nChannelAlreadySubscribedException chase) {
Console.WriteLine("You are already subscribed to this channel.");
Console.WriteLine(chase.StackTrace);
Environment.Exit(1);
}
catch (nSelectorParserException spe) {
Console.WriteLine("An error occured while parsing the selector filter specified.");
Console.WriteLine("Please check the JMS documentation on how to write a valid selector.");
Console.WriteLine(spe.StackTrace);
Environment.Exit(1);
}
catch (nBaseClientException nbce) {
Console.WriteLine("An error occured while creating the Channel Attributes object.");
Console.WriteLine(nbce.StackTrace);
Environment.Exit(1);
}
//Close the session we opened
try{
nSessionFactory.close ( mySession );
nSessionFactory.close(otherSession);
}
catch(Exception ex){}
//Close any other sessions within this JVM so that we can exit
nSessionFactory.shutdown ( );
}
protected override void processArgs(String[] args)
{
switch (args.Length)
{
case 6:
Selector = args[5];
goto case 5;
case 5:
count = Convert.ToInt32(args[4]);
goto case 4;
case 4:
logLevel = Convert.ToInt32(args[3]);
goto case 3;
case 3:
startEID = Convert.ToInt32(args[2]);
goto case 2;
case 2:
channelName = args[1];
goto case 1;
case 1:
if (args[0].Equals("-?"))
{
Usage();
UsageEnv();
}
rname = args[0];
break;
}
}
public static new void Main(String[] args)
{
//Create an instance for this class
mySelf = new multiplex();
//Process command line arguments
mySelf.processArgs(args);
nSampleApp.processEnvironmentVariables();
//Process the local REALM RNAME details
String[] rproperties = new String[4];
rproperties = parseRealmProperties(mySelf.rname);
//Subscribe to the channel specified
mySelf.doit(rproperties, mySelf.channelName, selector, startEid, mySelf.logLevel, mySelf.count);
}
/**
* A callback is received by the API to this method each time an event is received from
* the nirvana channel. Be carefull not to spend too much time processing the message
* inside this method, as until it exits the next message can not be pushed.
*
* @param evt An nConsumeEvent object containing the message received from the channel
*/
public void go(nConsumeEvent evt) {
//If this is the first message we receive
if(count == -1){
//Get a timestamp to be used for message rate calculations
startTime = nConstants.currentTimeMillis();
count = 0;
}
//Increment he counter
count++;
totalMsgs++;
//Have we reached the point where we need to report the rates?
if(count == reportCount){
//Reset the counter
count = 0;
//Get a timestampt to calculate the rates
long end = nConstants.currentTimeMillis();
// Does the specified log level permits us to print on the screen?
if(logLevel >= 1){
//Dump the rates on the screen
if(end != startTime){
Console.WriteLine("Received "+reportCount+" in "+(end-startTime)+" Evt/Sec = "+((reportCount * 1000)/(end-startTime))+" Bytes/sec="+((byteCount*1000)/(end-startTime)));
Console.WriteLine("Bandwidth data : Bytes Tx ["+mySession.getOutputByteCount()+"] Bytes Rx ["+mySession.getInputByteCount()+"]");
}
else{
Console.WriteLine("Received "+reportCount+" faster than the system millisecond counter");
}
}
//Set the startTime for the next report equal to the end timestamp for the previous one
startTime = end;
//Reset the byte counter
byteCount = 0;
}
//If the last EID counter is not equal to the current event ID
if(lastEID != evt.getEventID()){
//If yes, maybe we have missed an event, so print a message on the screen.
//This message could be printed for a number of other reasons.
//One of them would be someone purging a range creating an 'eid gap'.
//As eids are never reused within a channel you could have a situation
//where this gets printed but nothing is missed.
Console.WriteLine("Expired event range "+(lastEID)+" - "+(evt.getEventID()-1));
//Reset the last eid counter
lastEID = evt.getEventID()+1;
}
else{
//Increment the last eid counter
lastEID++;
}
//Get the data of the message
byte[] buffer = evt.getEventData();
if(buffer != null){
//Add its length to the byte counter
byteCount += buffer.Length;
}
//If the loglevel permits printing on the screen
if(logLevel >= 2){
//Print the eid
Console.WriteLine( "Event id : " + evt.getEventID() );
if(evt.isEndOfChannel()){
Console.WriteLine("End of channel reached");
}
//If the loglevel permits printing on the screen
if(logLevel >= 3){
//Print the message tag
Console.WriteLine( "Event tag : " + evt.getEventTag() );
//Print the message data
Console.WriteLine( "Event data : " + Convert.ToString( evt.getEventData() ) );
if(evt.hasAttributes()){
displayEventAttributes(evt.getAttributes());
}
nEventProperties prop = evt.getProperties();
if(prop != null) {
displayEventProperties(prop);
}
}
}
}
/**
* Prints the usage message for this class
*/
private static void Usage() {
Console.WriteLine( "Usage ...\n" );
Console.WriteLine("multiplex <channel name> [start eid] [debug] [count] [selector] \n");
Console.WriteLine(
"<Required Arguments> \n");
Console.WriteLine(
"<channel name> - Channel name parameter for the channel to subscribe to" );
Console.WriteLine(
"\n[Optional Arguments] \n");
Console.WriteLine(
"[start eid] - The Event ID to start subscribing from" );
Console.WriteLine(
"[debug] - The level of output from each event, 0 - none, 1 - summary, 2 - EIDs, 3 - All" );
Console.WriteLine(
"[count] - The number of events to wait before printing out summary information");
Console.WriteLine(
"[selector] - The event filter string to use");
Console.WriteLine(
"\n\nNote: -? provides help on environment variables \n");
}
}
}
EXAMPLE_SOURCE_END
