[ActiveMQ] Producer.send 파헤치기
분명 producer를 생성하는 로직이 있을텐데 아무리 찾아봐도 내용을 가져와 조립하고 mqAutoResProducer.send(response);
에서 끝나고있었다.
어떻게 producer가 생성되는지 궁금했기 때문에, send 부분을 파헤쳐보기로했다.
Producer.send(Response) 과정
try{
...
mqAutoResProducer.send(response);
...
}
//Producer 시작
private JmsTemplate jmsTemplate;
private String destination;
@Override
public void send(final Response response) {
jmsTemplate.convertAndSend(destination, response);
}
- 주입 받은 jmsTemplate을 converAndSend를 시작하면서부터가 jmsTemplate의 영역이다.
mqAutoResProducer.send(response);
는 response, jmsTemplate, destination 등등을 전달하기 위한 출입구 인터페이스 정도로 생각하면 될 것같다.
convertAndSend
//createMessage의 jmsTemplate에 등록되어있는 Converter를 찾아 toMessage를 호출한다.
//해당 converter를 사용해 전달받은 message와 session값으로 메세지를 바이트형 메세지로 변환한다.
//그 후 목적지와 변환한 메세지로 다시 send를 한다.
@Override
public void convertAndSend(String destinationName, final Object message) throws JmsException {
send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return getRequiredMessageConverter().toMessage(message, session);
}
});
}
//해당 jmsTemplate에 등록되어있는 messageConverter 클래스를 반환해준다.
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
MessageConverter converter = getMessageConverter();
if (converter == null) {
throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate.");
}
return converter;
}
- reponse 파라미터를 가지고 MessageCreator를 호출해 message를 만든다.
- 이때 createMessage의 session 값은 jms 인스턴스의 session을 말한다.
- jmsTemplate을 등록할때 정의해두었던 messageConverter 클래스를 찾아 toMessage로 메세지를 만든다.
- 이 과정에서 jms Api 전송을 위한 BytesMessage 가 만들어진다.
- 만들어진 메세지를 다시 던진다 (send)
send
@Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
execute(new SessionCallback<Object>() {
@Override
public Object doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
doSend(session, destination, messageCreator);
return null;
}
}, false);
}
//resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
//
protected Destination resolveDestinationName(Session session, String destinationName) throws JMSException {
return getDestinationResolver().resolveDestinationName(session, destinationName, isPubSubDomain());
}
-
resolveDestinationName
DestinationResolver의 default 값은
DynamicDestinationResolver
다.간단한 simple String destinationName으로 실제 구현 인스턴스를 확인한다.
resolver로 부터 destination을 받아왔으니 목적지도 확인했고, 메세지도 변환했고, 실제로 보낸다.
(doSend)
-
메소드 파라미터중에
isPubSubDomain()
이라는 boolean 값이 있다.-
도메인이 pub-sub 형태이면 true, P2P이면 false를 리턴한다.
Returns: the JMS destination (either a topic or a queue)
pub-sub 모델인지, 큐 모델인지에 따라서 리턴값이 topic or queue로 달라진다.
pub-sub 모델인지, 큐 모델인지에 따라서 리턴값이 topic or queue로 달라진다.
-
doSend
/**
* Send the given JMS message.
* @param session the JMS Session to operate on
* @param destination the JMS Destination to send to
* @param messageCreator callback to create a JMS Message
* @throws JMSException if thrown by JMS API methods
*/
protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
MessageProducer producer = createProducer(session, destination);
try {
Message message = messageCreator.createMessage(session);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + message);
}
doSend(producer, message);
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
}
finally {
JmsUtils.closeMessageProducer(producer);
}
}
//deliveryDelay 값이 있다면 스프링에서 제공해주는 리플렉션으로 해당 DeliveryDelay 메소드에 접근하게 해준다.
//isExplicitQosEnabled()라는 boolean값이 존재하는데, JMS는 ConnectionFactory에서 QOS라는 값을 설정할 수 있다. 값을 지정하게되면 JMS 사양에 지정된 기본값을 커스텀 할 수 있다.
//옵션 값을 true로 주어 활성화 시키면된다.
/**
* Actually send the given JMS message.
* @param producer the JMS MessageProducer to send with
* @param message the JMS Message to send
* @throws JMSException if thrown by JMS API methods
*/
protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (this.deliveryDelay >= 0) {
if (setDeliveryDelayMethod == null) {
throw new IllegalStateException("setDeliveryDelay requires JMS 2.0");
}
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay);
}
if (isExplicitQosEnabled()) {
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
}
else {
producer.send(message);
}
}
-
프로듀서는 그럼 도대체 어디서 만들어?? 라고 생각했던 부분이 여기에 있었다.
//session과 destination을 가지고 Producer를 생성한다.
MessageProducer producer = createProducer(session, destination);
producer를 생성했다면, 다시 doSend!
-
deliveryDelay 값이 있다면 스프링에서 제공해주는 리플렉션으로 해당 DeliveryDelay 메소드에 접근하게 해준다.
isExplicitQosEnabled()라는 boolean값이 존재하는데, JMS는 ConnectionFactory에서 QOS라는 값을 설정할 수 있다.값을 지정하게되면 JMS 사양에 지정된 기본값을 커스텀 할 수 있다.
옵션 값을 true로 주어 활성화 시키면된다.
if문을 보니 DeliveryMode, Priority, TimeToLive의 3가지 QOS 값을 커스텀 할 수 있나보다.
if (isExplicitQosEnabled()) {
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
}
else {
producer.send(message);
}
JMS API WORK
Send를 하기 까지의 과정을 나타낸 JMS API WORK이다.
ConnectionFactory-> Connection-> Session-> MessageProducer-> Send
JmsTemplate 클래스 인스턴스는 Thread safe
하게 구현된다!
참고
jms 스프링 공식문서