Nirvana C# .NET: Channel Publisher
This example publishes events onto a Nirvana Channel.
Usage
publish <rname> <channel name> [count] [size] <Required Arguments> <rname> - the rname of the server to connect to <channel name> - Channel name parameter for the channel to publish to [Optional Arguments] [count] -The number of events to publish (default: 10) [size] - The size (bytes) of the event to publish (default: 100)
Application Source Code
See also:
using System;
using System.Threading;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
/**
* Publishes reliably to a nirvana channel
*/
class publish : nSampleApp
{
private bool isOk = true;
private nBaseClientException asyncException;
private static publish mySelf = null;
private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding();
/**
* This method demonstrates the Nirvana API calls necessary to publish to
* a channel.
* It is called after all command line arguments have been received and
* validated
*
* @param realmDetails a String[] containing the possible RNAME values
* @param achannelName the channel name to publish to
* @param count the number of messages to publish
* @param size the size (bytes) of each message to be published
*/
private void doit(String[] realmDetails, String achannelName, int count, int size)
{
mySelf.constructSession(realmDetails);
try
{
nChannelAttributes nca = new nChannelAttributes();
nca.setName(achannelName);
nChannel myChannel = mySession.findChannel(nca);
//Create a byte array filled with characters equal to
// the message size specified. This could be a result
//of String.getBytes() call in a real world scenario.
byte[] buffer = new byte[size];
for (int x = 0; x < size; x++)
{
buffer[x] = (byte)((x % 90) + 32);
}
//Construct a sample nEventProperties object and add 2 sample properties
//Instantiate the message to be published with the specified nEventPropeties and byte[]
//nConsumeEvent evt1 = new nConsumeEvent("Tag", buffer);
nEventProperties props = new nEventProperties();
props.put("key0string", "1");
props.put("key1int", (int)1);
props.put("key2long", (long)-11);
props.put("key3bool", true);
props.put("key4short", (short)1);
props.put("key5float", (float)1.123456);
props.put("key6double", (double)1.123456);
props.put("key7char", '1');
nConsumeEvent evt1 = new nConsumeEvent("Tag", buffer);
//Inform the user that publishing is about to start
Console.WriteLine("Starting publish of " + count + " events with a size of " + size + " bytes each");
//Obtain the channel's last event ID prior to us publishing anything
long startEid = myChannel.getLastEID();
//Get a timestamp to be used to calculate the message publishing rates
DateTime start = DateTime.Now;
//Loop as many times as the number of messages we want to publish
for (int x = 0; x < count; x++)
{
try
{
//Publish the event
myChannel.publish(evt1);
}
catch (nSessionNotConnectedException ex)
{
while (!mySession.isConnected())
{
Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second...");
try
{
Thread.Sleep(1000);
}
catch (Exception e) { }
}
x--;
//We need to repeat the publish for the event publish that caused the exception,
// so we reduce the counter
}
catch (nBaseClientException ex)
{
Console.WriteLine("Publish.java : Exception : " + ex.Message);
throw ex;
}
//Check if an asynchronous exception has been received
if (!isOk)
{
//If it has, then throw it
throw asyncException;
}
}
//Check if an asynchronous exception has been received
if (!isOk)
{
//If it has, then throw it
throw asyncException;
}
//Calculate the actual number of events published by obtaining
//the channel's last eid after our publishing and subtracting
//the channel's last eid before our publishing.
//This also ensures that all client queues have been flushed.
Console.WriteLine("Get Last EID");
long events = myChannel.getLastEID() - startEid;
DateTime end = DateTime.Now;
//Get a timestamp to calculate the publishing rates
//Calculate the events / sec rate
TimeSpan dif = end - start;
long eventPerSec = ((events * 1000) / (long)(dif.TotalMilliseconds));
//Calculate the bytes / sec rate
long bytesPerSec = eventPerSec * size;
//Inform the user of the resulting rates
Console.WriteLine("Publish Completed in " + dif.TotalMilliseconds + "ms :");
Console.WriteLine("[Events Published = " + events + "] [Events/Second = " + eventPerSec + "] [Bytes/Second = " + bytesPerSec + "]");
Console.WriteLine("Bandwidth data : Bytes Tx [" + mySession.getOutputByteCount() + "] Bytes Rx [" + mySession.getInputByteCount() + "]");
}
catch (nSessionPausedException ps)
{
Console.WriteLine("Session has been paused, please resume the session");
Environment.Exit(1);
}
catch (nSecurityException se)
{
Console.WriteLine("Unsufficient permissions for the requested operation.");
Console.WriteLine("Please check the ACL settings on the server.");
Environment.Exit(1);
}
catch (nSessionNotConnectedException snce)
{
Console.WriteLine("The session object used is not physically connected to the Nirvana realm.");
Console.WriteLine("Please ensure the realm is up and check your RNAME value.");
Environment.Exit(1);
}
catch (nUnexpectedResponseException ure)
{
Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
Environment.Exit(1);
}
catch (nRequestTimedOutException rtoe)
{
Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
Environment.Exit(1);
}
//Close the session we opened
try
{
nSessionFactory.close(mySession);
}
catch (Exception ex) { }
//Close any other sessions within this JVM so that we can exit
nSessionFactory.shutdown();
}
protected override void processArgs(String[] args)
{
//
// Need a min of 2, rname, channel name
if (args.Length < 3)
{
Usage();
UsageEnv();
Environment.Exit(2);
}
String RNAME = args[1]; ;
var channelName = args[2];
int count = 1000;
if (args.Length > 3)
{
count = Convert.ToInt32(args[3]);
}
int size = 1024;
if (args.Length > 4)
{
size = Convert.ToInt32(args[4]);
}
//
// Run the sample app
//
mySelf.doit(parseRealmProperties(RNAME), channelName, count, size);
}
public static void Main(String[] args)
{
//Create an instance for this class
mySelf = new publish();
//Process command line arguments
mySelf.processArgs(Environment.GetCommandLineArgs());
}
/**
* Prints the usage message for this class
*/
private static void Usage()
{
Console.WriteLine("Usage ...\n");
Console.WriteLine("publish <rname> <channel name> [count] [size] \n");
Console.WriteLine(
"<Required Arguments> \n");
Console.WriteLine(
"<rname> - the rname of the server to connect to");
Console.WriteLine(
"<channel name> - Channel name parameter for the channel to publish to");
Console.WriteLine(
"\n[Optional Arguments] \n");
Console.WriteLine(
"[count] -The number of events to publish (default: 10)");
Console.WriteLine(
"[size] - The size (bytes) of the event to publish (default: 100)");
}
}
}
EXAMPLE_SOURCE_END
