Skip to content


Post transactional messenger.

Example.

We have a class that implements QueueService based on JmsTemplate (Spring)
Method sendMessage sends object to jms queue for future handle by message listeners
The problem is that queue injection should only be happens in case of successful transaction.

import org.springframework.jms.core.JmsTemplate;
import com.company.annotations.MessageQueue;
import com.company.jms.domain.QueueRequestHolder;
import com.company.jms.service.QueueService;

public class QueueServiceImpl implements QueueService
{
	private final JmsTemplate	jmsTemplate;

	public QueueServiceImpl(final JmsTemplate template)
	{
		jmsTemplate = template;
	}

	@MessageQueue()
	public QueueRequestHolder sendMessage(final Object objectToSend)
	{
		return new QueueRequestHolder(jmsTemplate, objectToSend);
	}

	public void sendMessageDirect(final Object objectToSend)
	{
		jmsTemplate.convertAndSend(objectToSend);

	}

}


In code above an annotation MessageQueue specified.

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MessageQueue
{
}

This annotation served as a signal for PostTransactionMessanger aspect.
This aspect collects all messages that sent during transaction and released it only after successful transaction.
In case of transaction rollback all messages will be destroy.

PostTransactionMessanger code:

import java.util.Hashtable;
import java.util.Vector;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.TransactionAspectSupport;

import com.company.jms.domain.QueueRequestHolder;

@Aspect
public class PostTransactionMessanger
{
	Hashtable>	messages	= new Hashtable>();

	private Integer getTransactionHash()
	{
		final TransactionStatus status = TransactionAspectSupport.currentTransactionStatus();
		if (status != null)
		{
			return status.hashCode();
		}
		return new Integer(0);
	}

	private boolean isInTransaction()
	{
		final TransactionStatus status = TransactionAspectSupport.currentTransactionStatus();
		return status != null;

	}

	@AfterThrowing("com.company.service.ServiceLayer.entryPointMethod()")
	public void cleanMessages()
	{
		messages.remove(getTransactionHash());
	}
	// com.company.service.ServiceLayer.entryPointMethod() is poincat definition for
	// transaction manager
	@AfterReturning("com.company.service.ServiceLayer.entryPointMethod()")
	public void releaseMessages(final JoinPoint joinPoint)
	{
		final Integer hash = getTransactionHash();
		final Vector holders = messages.get(hash);
		if (holders != null)
		{
			while (!holders.isEmpty())
			{
				final QueueRequestHolder holder = holders.firstElement();
				holders.remove(holder);
				holder.execute();
			}
			messages.remove(hash);
		}
	}

	@Around(value = "@annotation(com.company.annotations.MessageQueue)")
	public void onMessage(final ProceedingJoinPoint joinPoint) throws Throwable
	{

		final Integer hash = getTransactionHash();
		Vector holders = messages.get(hash);
		if (isInTransaction() && (holders == null))
		{
			holders = new Vector();
			messages.put(hash, holders);
		}
		final Object ret = joinPoint.proceed(joinPoint.getArgs());
		if (ret instanceof QueueRequestHolder)
		{
			if (isInTransaction())
			{
				holders.add((QueueRequestHolder) ret);
			}
			else
			{
				((QueueRequestHolder) ret).execute();
			}

		}

	}
}

PostTransactionMessanger use QueueRequestHolder object to cache all messages.
This objects holding pair of values:
template reference and message that should be send to template.

import java.io.Serializable;

import org.springframework.jms.core.JmsTemplate;

public class QueueRequestHolder implements Serializable
{
	private static final long	serialVersionUID	= 6730618191903049727L;
	JmsTemplate					tempale;
	Object						message;

	public QueueRequestHolder(final JmsTemplate tempale, final Object message)
	{
		super();
		this.tempale = tempale;
		this.message = message;
	}

	public JmsTemplate getTempale()
	{
		return tempale;
	}

	public void setTempale(final JmsTemplate tempale)
	{
		this.tempale = tempale;
	}

	public Object getMessage()
	{
		return message;
	}

	public void setMessage(final Object message)
	{
		this.message = message;
	}

	public void execute()
	{
		if ((tempale != null) && (message != null))
		{
			tempale.convertAndSend(message);
		}
	}

	@Override
	public String toString()
	{
		return "\nTemplate:" + tempale.getDefaultDestinationName() + "\nMessage" + message.toString();
	}
}

Posted in development.

Tagged with , , , , .


0 Responses

Stay in touch with the conversation, subscribe to the RSS feed for comments on this post.

You must be logged in to post a comment.