Nirvana C# .NET: Transactional Queue Publisher
This example publishes events transactionally to a Nirvana Queue. A Nirvana transaction can contain one or more events. The events which make up the transaction are only made available by the Nirvana server if the entire transaction has been commited succesfully.
Usage
ntxpushq <rname> <queue name> [count] [size] [tx size] <Required Arguments> <rname> - the rname of the server to connect to <queue name> - Queue name parameter for the queue to publish to [Optional Arguments] [count] -The number of events to publish (default: 10) [size] - The size (bytes) of the event to publish (default: 100) [tx size] - The number of events per transaction (default: 1)
Application Source Code
See also:
using System;
using System.Collections.Generic;
using System.Threading;
using com.pcbsys.nirvana.client;
namespace com.pcbsys.nirvana.apps
{
/**
* Pushes transactional events to a nirvana queue
*/
class txpushq : nSampleApp
{
private static txpushq mySelf = null;
private static System.Text.UTF8Encoding encoding = new System.Text.UTF8Encoding();
/**
* This method demonstrates the Nirvana API calls necessary to publish to
* a queue transactionally.
* It is called after all command line arguments have been received and
* validated
*
* @param realmDetails a String[] containing the possible RNAME values
* @param aqueueName the queue name to publish to
* @param count the number of messages to publish
* @param size the size (bytes) of each message to be published
* @param txSize the number of messages to be published in each transaction
*/
private void doit(String[] realmDetails, String aqueueName, int count, int size, int txSize)
{
mySelf.constructSession(realmDetails);
//Publishes to the specified queue
try
{
//Create a channel attributes object
nChannelAttributes nca = new nChannelAttributes();
nca.setName(aqueueName);
//Obtain a reference to the queue
nQueue myQueue = mySession.findQueue(nca);
//Create a byte array filled with characters equal to
// the message size specified. This could be a result
//of String.getBytes() call in a real world scenario.
byte[] buffer = new byte[size];
for (int x = 0; x < size; x++)
{
buffer[x] = (byte)((x % 90) + 32);
}
//Create a List that contains the messages to be published on each
//transactional publish call
List<nConsumeEvent> vevents = new List<nConsumeEvent>();
for (int x = 0; x < txSize; x++)
{
nEventProperties props = new nEventProperties();
props.put("floatval", (float)1.2345);
nConsumeEvent evt = new nConsumeEvent(props, encoding.GetBytes("TXID-" + x)); // Note the Tag does NOT need the TX
vevents.Add(evt);
}
//Inform the user that publishing is about to start
Console.WriteLine("Starting publish of " + count + " events of size " + size);
//Get a timestamp to be used to calculate the message publishing rates
DateTime start = DateTime.Now;
//Create a transaction attributes object based on the queue we wish to publish to
nTransactionAttributes TXAttrib = new nTransactionAttributes(myQueue, 1000);
//Define a transaction object
nTransaction tx = null;
//Loop as many times as the number of messages we want to publish
//divided by the number of events per transaction
for (int x = 0; x < ((int)(count / txSize)); x++)
{
try
{
//Create a new transaction object
tx = nTransactionFactory.create(TXAttrib);
//Publish the List of messages create previously
tx.publish(vevents);
//Commit the transaction
tx.commit();
}
catch (nSessionNotConnectedException ex)
{
while (!mySession.isConnected())
{
Console.WriteLine("Disconnected from Nirvana, Sleeping for 1 second...");
try
{
Thread.Sleep(1000);
}
catch (Exception e)
{
}
}
if (tx != null)
{
try
{
tx.commit();
Console.WriteLine("Commited tx after failover...");
}
catch (Exception e)
{
Console.WriteLine("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event..");
x--;
}
}
else
{
Console.WriteLine("Got error trying to commit transaction as it had started on another Nirvana realm, republishing event..");
x--;
}
}
}
//Calculate the actual number of events
//This also ensures that all client queues have been flushed.
mySession.getServerTime();
//Get a timestamp to calculate the publishing rates
DateTime end = DateTime.Now;
TimeSpan dif = end - start;
//Calculate the events / sec rate
long eventPerSec = ((count * 1000) / ((dif.Milliseconds)));
//Calculate the bytes / sec rate
long bytesPerSec = eventPerSec * size;
//Inform the user of the resulting rates
Console.WriteLine("Events = " + count + " Events/sec = " + eventPerSec + " Bytes/Sec = " + bytesPerSec);
}
//Handle errors
catch (nChannelNotFoundException cnfe)
{
Console.WriteLine("The queue specified could not be found.");
Console.WriteLine("Please ensure that the queue exists in the REALM you connect to.");
Environment.Exit(1);
}
catch (nSecurityException se)
{
Console.WriteLine("Unsufficient permissions for the requested operation.");
Console.WriteLine("Please check the ACL settings on the server.");
Environment.Exit(1);
}
catch (nSessionNotConnectedException snce)
{
Console.WriteLine("The session object used is not physically connected to the Nirvana realm.");
Console.WriteLine("Please ensure the realm is up and check your RNAME value.");
Environment.Exit(1);
}
catch (nUnexpectedResponseException ure)
{
Console.WriteLine("The Nirvana REALM has returned an unexpected response.");
Console.WriteLine("Please ensure the Nirvana REALM and client API used are compatible.");
Environment.Exit(1);
}
catch (nUnknownRemoteRealmException urre)
{
Console.WriteLine("The queue specified resided in a remote realm which could not be found.");
Console.WriteLine("Please ensure the queue name specified is correct.");
Environment.Exit(1);
}
catch (nRequestTimedOutException rtoe)
{
Console.WriteLine("The requested operation has timed out waiting for a response from the REALM.");
Console.WriteLine("If this is a very busy REALM ask your administrator to increase the client timeout values.");
Environment.Exit(1);
}
catch (nTransactionAlreadyCommittedException tace)
{
Console.WriteLine("The transaction you attempted to commit has already been commited on this REALM.");
Environment.Exit(1);
}
catch (nTransactionNotStartedException tnse)
{
Console.WriteLine("The transaction you attempted to commit has not yet started on this REALM.");
Environment.Exit(1);
}
catch (nBaseClientException nbce)
{
Console.WriteLine("An error occured while creating the Channel Attributes object.");
Environment.Exit(1);
}
//Close the session we opened
try
{
nSessionFactory.close(mySession);
}
catch (Exception ex)
{
}
//Close any other sessions within this JVM so that we can exit
nSessionFactory.shutdown();
}
protected override void processArgs(String[] args)
{
//
// Need a min of 2, rname, channel name
if (args.Length < 3)
{
Usage();
UsageEnv();
Environment.Exit(2);
}
String RNAME = args[1]; ;
var queueName = args[2];
int count = 1000;
if (args.Length > 3)
{
count = Convert.ToInt32(args[3]);
}
int size = 1024;
if (args.Length > 4)
{
size = Convert.ToInt32(args[4]);
}
int txSize = 1;
if (args.Length > 5)
{
txSize = Convert.ToInt32(args[5]);
}
//
// Run the sample app
//
mySelf.doit(parseRealmProperties(RNAME), queueName, count, size, txSize);
}
public static void Main(String[] args)
{
//Create an instance for this class
mySelf = new txpushq();
//Process command line arguments
mySelf.processArgs(Environment.GetCommandLineArgs());
}
/**
* Prints the usage message for this class
*/
private static void Usage()
{
Console.WriteLine("Usage ...\n");
Console.WriteLine("ntxpushq <rname> <queue name> [count] [size] [tx size] \n");
Console.WriteLine(
"<Required Arguments> \n");
Console.WriteLine(
"<rname> - the rname of the server to connect to");
Console.WriteLine(
"<queue name> - Queue name parameter for the queue to publish to");
Console.WriteLine(
"\n[Optional Arguments] \n");
Console.WriteLine(
"[count] -The number of events to publish (default: 10)");
Console.WriteLine(
"[size] - The size (bytes) of the event to publish (default: 100)");
Console.WriteLine(
"[tx size] - The number of events per transaction (default: 1)");
}
}
}
EXAMPLE_SOURCE_END
