Simple Java Messaging
- Remote procedure call (RPC) systems, including Java RMI, are synchronous -- the caller must block and wait until the called method completes execution, and thus offer no potential for developing loosely coupled enterprise applications without the use of multiple threads. In other words, RPC systems require the client and the server to be available at the same time.
- However, such tight coupling may not be possible or desired in some applications. Message-Oriented Middleware (MOM) systems provide solutions to such problems. They are based on the asynchronous interaction model, and provide the abstraction of a message queue that can be accessed across a network. Note, however, that messaging here refers to asynchronous requests or events that are consumed by enterprise applications and not humans as in electronic mail (email). These messages contain formatted data that describe specific business actions.
- A JMS provider: A messaging system that implements the JMS specification.
- JMS clients: Java applications that send and receive messages.
- Messages: Objects that are used to communicate information between JMS clients.
- Administered objects: Preconfigured JMS objects that are created by an administrator for the use of JMS clients.
JMS supports two different message delivery models:
- Point-to-Point (Queue destination): In this model, a message is delivered from a producer to one consumer. The messages are delivered to the destination, which is a queue, and then delivered to one of the consumers registered for the queue. While any number of producers can send messages to the queue, each message is guaranteed to be delivered, and consumed by one consumer. If no consumers are registered to consume the messages, the queue holds them until a consumer registers to consume them.
- Publish/Subscribe (Topic destination): In this model, a message is delivered from a producer to any number of consumers. Messages are delivered to the topic destination, and then to all active consumers who have subscribed to the topic. In addition, any number of producers can send messages to a topic destination, and each message can be delivered to any number of subscribers. If there are no consumers registered, the topic destination doesn't hold messages unless it has durable subscription for inactive consumers. A durable subscription represents a consumer registered with the topic destination that can be inactive at the time the messages are sent to the topic.
- The header, which is required for every message, contains information that is used for routing and identifying messages. Some of these fields are set automatically, by the JMS provider, during producing and delivering a message, and others are set by the client on a message by message basis.
- Properties, which are optional, provide values that clients can use to filter messages. They provide additional information about the data, such as which process created it, the time it was created. Properties can be considered as an extension to the header, and consist of property name/value pairs. Using properties, clients can fine-tune their selection of messages by specifying certain values that act as selection criteria.
- The body, which is also optional, contains the actual data to be exchanged. The JMS specification defined six type or classes of messages that a JMS provider must support:
- Message: This represents a message without a message body.
- StreamMessage: A message whose body contains a stream of Java primitive types. It is written and read sequentially.
- MapMessage: A message whose body contains a set of name/value pairs. The order of entries is not defined.
- TextMessage: A message whose body contains a Java string...such as an XML message.
- ObjectMessage: A message whose body contains a serialized Java object.
- BytesMessage: A message whose body contains a stream of uninterpreted bytes.
- Connection factories
- Connections
- Sessions
- Destinations
- Messages
- Producers
- Consumers
Connection Factories
ConnectionFactories are a factory for Connections (a little obvious and probably a trifle confusing since we haven't talked about connections yet). The key thing to know about them is that they exist to hide all the proprietary stuff that a JMS provider may want you to set up, for example how to connect to the server. What happens is that you set up a configuration using JNDI which is specific to the implementation that you are using, and then everything in your code is generic. That way, if you switch to use someone else, all you have to do is change those settings. I'll talk about how to set up JNDI and use imqobjmgr (the MQ specific tool for doing that configuration) in a future blog
For now I am going to cheat and use the MQ specific API to create it.(from what I'm seen, other providers have these convenience apis as well but, in the end, you want to use JNDI to retrieve it because it makes your application portable).
ConnectionFactory cf= new com.sun.messaging.ConnectionFactory();
Connections
The JMS Specification defines a Connection as "the client's active connection to a provider". So connection is the stream of messages to and from the provider (aka the implementation used by MQ). In our case, it represents a TCP connection to MQ's server (called the Broker) that handles all of the heavy lifting of routing and persisting messages. When I create a connection, I have the option of passing in a username and password which is authenticated by the provider. (how I actually set that up depends on the specific implementation of JMS being used - for MQ you useimqusermgr if I am using our simple file-based version or else LDAP through JNDI). Since I haven't set up any authentication, I'll just use the defaults and not pass anything in.
I can also start and stop the connection which starts and stops the flow of messages.
Connection connection = cf.createConnection(); connection.start();
Sessions
A session is a single-threaded stream of control of messages. This means that a session can not be used concurrently by two threads at the same time. (warning - once you start feeling good about JMS and decide to fork some of the work on to other threads you are going to break this and get strange behavior - I think everyone does it at least once). Sessions have cool properties. Message order is guaranteed. You can acknowledge or commit (depending on whether or not you are using transactions) several message at a time. You can rollback or recover the messages if you need to reprocess them.
Session are going to be important because everything else starts from here. You create messages, producers, consumers, destinations from the Session.
The first thing you need to know about sessions is acknowledge modes. Acknowledgement is the way the client implementation (whether the piece supplied by MQ or another provider) or the client application (what you write) tells the system (aka notifies the broker) that you are done processing a message.
AUTO_ACKNOWLEDGE -> The client implementation informs the broker its done once the client has seen it
DUPS_OK_ACKNOWLEGE -> The client implementation informs the broker whenever it feels like it that the client has seen the message
CLIENT_ACKNOWLEDGE -> The application has control (by calling acknowledge() on the message that its been seen. Note: since the acknowledge() method is on Message, another easy mistake is to assume that only that message is acknowledged when the routine is called - nope - it also acknowledges the messages seen on the client before that message.Transacted -> identifies the session as part of a single-phase transaction, so messages on the session (produced and consumed) are handled as a single block - sent or not sent togetherAt this point, I am going to create a simple session in auto acknowledge mode.
Session session = connection.createSession( false /\* not transacted \*/, Session.AUTO_ACKNOWLEDGE);
JMS has two models for receiving messages. An application can use a "synchronous" api (receive) and explicitly call for a message where it wants it or it can register a message listener (which is a callback that is triggered when a message comes in). For now, I'm not going to set a message listener.
Destinations
I talked a little about destinations in my last entry. I'm not planning to go back over that, its just a storehouse for sending and retrieving a messages. It has a type (queue or topic) and a name. Similar to connection factories, you can retrieve an already created destination using JDNI but I'm going to go ahead and use the simple api's and create it on the fly (this time its not cheating, this is a standard part of the api).
I'll just create a queue, since that seems to lend itself more for a hello world example.
Destination destination = session.createQueue("HelloWorld");
Messages
Messages are the pieces that are passed back and forth - the core of the messaging. There are three pieces to a message:
- header - stuff stuck on by the provider
- properties - name/value pairs that tag simple information on to the message. This data can add additional information or even be used to pick out specific messages through what are called selectors (which is a kind of SQL-lite language used to help a receiver identify a specific message)
- body - the content of the message. It can be text, a java object, a map (so name/value pairs), bytes. At the point I create the message, I determine its type
TextMessage message = session.createTextMessage(); message.setText("Hello Word");
Producers
Producers put messages into the system. You can specify a lot of information about how the message is handled - how long it will live, whether or not it is persistent (stored on disk), how important it is. Since this is a simple example, I'm going to use the easiest method not set any of those.
MessageProducer producer = session.createProducer(destination); producer.send(message);
Consumers
Consumers get messages from the system. You can create it with a selector to pull out a specific set of messages but I'm going to go for the simple approach (if nothing else because it always takes me 3 iterations to get the selector right). Since I didn't use a MessageListener (see session) I'm also going to call receive to get the message.
MessageConsumer consumer = session.createConsumer(destination); Message = consumer.receive();
Putting it together Now I'm going to put together a pair of simple applications based on what we just discussed. The first will send a message to a queue called HelloWorld. The second will receive the message off of that Queue.
The Producing application
This simple application will:
- Create a connection factory (cheating)
- Create a connection
- Create a session - non-transacted, AUTO_ACKNOWLEDGE
- Create the queue HelloWorld
- Create a message producer
- Start the connection
- Create a HelloWorld message
- Send the message
- Clean everything up
import javax.jms.*;
public class HelloProducer
{
public HelloProducer() {
try {
ConnectionFactory cf= new com.sun.messaging.ConnectionFactory();
Connection connection = cf.createConnection();
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("HelloWorld");
MessageProducer producer = session.createProducer(destination);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("Hello World");
System.out.println("Sending Hello World");
producer.send(message);
producer.close();
session.close();
connection.close();
} catch (JMSException ex) {
System.out.println("Error running program");
ex.printStackTrace();
}
}
public static void main(String args[]) {
new HelloProducer();
}}
The Consuming application
This simple application will:
- Create a connection factory (cheating)
- Create a connection
- Create a session - non-transacted, AUTO_ACKNOWLEDGE
- Create the queue HelloWorld
- Create a message consumer
- Start the connection
- call receive to get the message from the queue.
- Print the body of the message
- Clean everything up
import javax.jms.*;
public class HelloConsumer
{
public HelloConsumer() {
try {
ConnectionFactory cf= new com.sun.messaging.ConnectionFactory();
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("HelloWorld");
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
TextMessage m = (TextMessage)consumer.receive();
System.out.println(m.getText());
consumer.close();
session.close();
connection.close();
} catch (JMSException ex) {
System.out.println("Error running program");
ex.printStackTrace();
}
}
public static void main(String args[]) {
new HelloConsumer();
}}
Running it with MQ
To run the application we just created, we need to do four things:
- Compile the classes
- Start the broker (MQ's server)
- Run the producing client to send a message
- Run the consuming client to receive a message
Compiling
To compile you need to have imq.jar and jms.jar in the class path.
% javac -classpath /usr/share/lib/imq.jar:/usr/share/lib/jms.jar HelloConsumer.java HelloProducer.java
Starting the broker
To start the broker, you run the command imqbrokerd. When you see a string that says Broker ready, its ready to receive messages.[19/Oct/2007:16:00:35 PDT]================================================================================Sun Java(tm) System Message Queue 4.1Sun Microsystems, Inc.Version: 4.1 (Build 34-b)Compile: Thu Jun 28 22:33:50 PDT 2007Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.This product includes code licensed from RSA Data Security.================================================================================Java Runtime: 1.5.0_09 Sun Microsystems Inc. /usr/jdk/instances/jdk1.5.0/jre[19/Oct/2007:16:00:35 PDT] IMQ_HOME=/[19/Oct/2007:16:00:35 PDT] IMQ_VARHOME=/var/imq[19/Oct/2007:16:00:35 PDT] SunOS 5.10 sparc hostname (2 cpu) username[19/Oct/2007:16:00:35 PDT] Max file descriptors: 65536 (65536)[19/Oct/2007:16:00:35 PDT] Java Heap Size: max=174784k, current=35328k[19/Oct/2007:16:00:35 PDT] Arguments:[19/Oct/2007:16:00:35 PDT] [B1060]: Loading persistent data...[19/Oct/2007:16:00:35 PDT] Using built-in file-based persistent store: /var/imq/ instances/imqbroker/[19/Oct/2007:16:00:36 PDT] [B1039]: Broker "imqbroker@hostname:7676" ready.
Running the producing clientNow I'll start the producing client who sends a message.
% java -classpath .:/usr/share/lib/imq.jar:/usr/share/lib/jms.jar HelloProducer Sending Hello World
Running the consuming clientFinally, I'll run the consumer.
% java -classpath .:/usr/share/lib/imq.jar:/usr/share/lib/jms.jar HelloConsumer Hello World
Web Services HTTP vs. JMS
Web Services Protocol Choices
HTTP is the most common Web Services protocol. However, web services are currently built on multiple transports each with different communications Quality of Service (QoS) and message semantics. The platforms that support these transports, the most common being HTTP(s) and JMS, also provide different performance, scalability and reliability characteristics.
Message semantics include synchronous/asynchronous, request/reply, transactional, publish/subscribe and guaranteed delivery. The choice of protocol should depend on messaging requirements, for example a common requirement is point-to-point versus one-to-many, which drives the request/reply or publish/subscribe decision. Another example would be a fast message producer paired with a slow message consumer, which is easily implemented with a message queue.
HTTP is the defacto standard for web services and should be considered first but potentially eliminated due to requirements. For example, HTTP would be eliminated if the requirement is one-to-many communications. There are WS standard proposals to close some of the gaps such as transactions, routing, security, but these standards are not widely implemented or it may be simpler to use JMS.
Since HTTP has limitations many SOA implementations use JMS internally and web services externally by implementing an internal service bus. For example, it is straightforward to consume web services and publish JMS messages with an ESB mediation.
Consider using SOAP over HTTP for:
• Externally facing web services (e.g. customers or suppliers)
• For simple point-to-point and stateless services
• Where you need a thin client with no MOM installations• Protocol tends to be faster then JMS (depending on JMS configuration) as it's point-to-point with less routing overhead
Consider using SOAP over JMS for:
• High-volume distributed messaging
• Asynchronous messaging
• Where a transaction boundary is needed in the middleware
• Where the message consumers are slower than the producers
• Guaranteed deliver and/or only once delivery of messages
• Publish/subscribe
• Vendor implementation of SOAP over JMS. There are still some vendor specific items in how endpoint binding is done
• General support of JMS support• Distributed peer systems that might at times be disconnected
Note also that you may not need the SOAP envelope inside the firewall if there are no WS policies to communicate. For example, you could switch to context based routing (message routing based on the content of the message) inside the firewall.
-----------------------------------------------------------------------------------------------------------------------------------------------Steps to write Publisher (sender) application :-
• We have to get the TopicConnection through jndi.
• Create TopicSession by invoking a method createTopicSession() .
• create a Topic object by invoking createTopic() on TopicSession interface.
• create a TopicPublisher object of a Topic by invoking createPublisher(javax.jms.Topic t) on TopicSession.(t is a Topic object that specifies the Topic we want to subscribe to).
• create TextMessage object and set the text to be published .
• publish the message by using a method publish() in Publisher interface .
Steps to write subscriber (receiver) application :-
• We have to get the TopicConnection through jndi.
• Create TopicSession by invoking a method createTopicSession() .
• create a Topic object by invoking createTopic() on TopicSession interface.
• create a TopicSubscriber object of a Topic by invoking createSubscriber(javax.jms.Topic) or createDurableSubscriber(javax.jms.Topic t,String name,String messageselector,boolean nolocal) on TopicSession.
- t is a Topic object that specifies the Topic we want to subscribe to.
- name is a String that indicates the name under which to maintain the Durable Subscribtion to the specified topic.
- messageselector is a String that defines selection criteria.
- nolocal is a boolean if it is true the Subscriber will not recive messages that were published by the client application .
DurableSubscription indicates that the client wants to recive all the messages published to a topic,including messages published when the client connection is not active.
Non-DurableSubscription :-
Non-DurableSubscription will not recive the messages published when the client connection is not active.
Steps to write Sender application :-
• We have to get the QueueConnection through jndi.
• Create QueueSession by invoking a method createQueueSession() .
• Create a Queue object by invoking createQueue() on QueueSession interface.
• Create a QueueSender object of a Queue by invoking createSender(javax.jms.Queue q)
• on QueueSession.
• Create TextMessage object and set the text to be send .
• Send the message by using a method send() .
Steps to write Receiver application :-
• We have to get the QueueConnection through jndi.
• Create QueueSession by invoking a method createQueueSession() .
• Create a Queue object by invoking createQueue() on QueueSession interface.
• Create a QueueReceiver object of a Queue by invoking createReceiver(javax.jms.Queue) on QueueSession.