ActiveMQ

[ActiveMQ] Producer.send 파헤치기

코리늬 2020. 10. 24. 16:00

분명 producer를 생성하는 로직이 있을텐데 아무리 찾아봐도 내용을 가져와 조립하고 mqAutoResProducer.send(response); 에서 끝나고있었다.

어떻게 producer가 생성되는지 궁금했기 때문에, send 부분을 파헤쳐보기로했다.

 

Producer.send(Response) 과정

    try{
        ...
            mqAutoResProducer.send(response);
        ...
    }
   

    //Producer 시작
    private JmsTemplate jmsTemplate;
    private String destination;

    @Override
    public void send(final Response response) {
        jmsTemplate.convertAndSend(destination, response);
    }
  • 주입 받은 jmsTemplate을 converAndSend를 시작하면서부터가 jmsTemplate의 영역이다.
  • mqAutoResProducer.send(response);는 response, jmsTemplate, destination 등등을 전달하기 위한 출입구 인터페이스 정도로 생각하면 될 것같다.

 

convertAndSend

    //createMessage의 jmsTemplate에 등록되어있는 Converter를 찾아 toMessage를 호출한다.
    //해당 converter를 사용해 전달받은 message와 session값으로 메세지를 바이트형 메세지로 변환한다.
    //그 후 목적지와 변환한 메세지로 다시 send를 한다.
    @Override
    public void convertAndSend(String destinationName, final Object message) throws JmsException {
        send(destinationName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return getRequiredMessageConverter().toMessage(message, session);
            }
        });
    }

    //해당 jmsTemplate에 등록되어있는 messageConverter 클래스를 반환해준다.
    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        MessageConverter converter = getMessageConverter();
        if (converter == null) {
            throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate.");
        }
        return converter;
    }
  • reponse 파라미터를 가지고 MessageCreator를 호출해 message를 만든다.
  • 이때 createMessage의 session 값은 jms 인스턴스의 session을 말한다.
  • jmsTemplate을 등록할때 정의해두었던 messageConverter 클래스를 찾아 toMessage로 메세지를 만든다.
    • 이 과정에서 jms Api 전송을 위한 BytesMessage 가 만들어진다.
  • 만들어진 메세지를 다시 던진다 (send)

 

send

    @Override
    public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
        execute(new SessionCallback<Object>() {
            @Override
            public Object doInJms(Session session) throws JMSException {
                Destination destination = resolveDestinationName(session, destinationName);
                doSend(session, destination, messageCreator);
                return null;
            }
        }, false);
    }

    //resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
    //

    protected Destination resolveDestinationName(Session session, String destinationName) throws JMSException {
        return getDestinationResolver().resolveDestinationName(session, destinationName, isPubSubDomain());
    }
  • resolveDestinationName

    DestinationResolver의 default 값은 DynamicDestinationResolver다.

    간단한 simple String destinationName으로 실제 구현 인스턴스를 확인한다.

    resolver로 부터 destination을 받아왔으니 목적지도 확인했고, 메세지도 변환했고, 실제로 보낸다.

    (doSend)

  • 메소드 파라미터중에 isPubSubDomain()이라는 boolean 값이 있다.

    • 도메인이 pub-sub 형태이면 true, P2P이면 false를 리턴한다.

      Returns:
      the JMS destination (either a topic or a queue)

      ​ pub-sub 모델인지, 큐 모델인지에 따라서 리턴값이 topic or queue로 달라진다.

      ​ pub-sub 모델인지, 큐 모델인지에 따라서 리턴값이 topic or queue로 달라진다.

 

doSend

    /**
     * Send the given JMS message.
     * @param session the JMS Session to operate on
     * @param destination the JMS Destination to send to
     * @param messageCreator callback to create a JMS Message
     * @throws JMSException if thrown by JMS API methods
     */
    protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
            throws JMSException {

        Assert.notNull(messageCreator, "MessageCreator must not be null");
        MessageProducer producer = createProducer(session, destination);
        try {
            Message message = messageCreator.createMessage(session);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending created message: " + message);
            }
            doSend(producer, message);
            // Check commit - avoid commit call within a JTA transaction.
            if (session.getTransacted() && isSessionLocallyTransacted(session)) {
                // Transacted session created by this template -> commit.
                JmsUtils.commitIfNecessary(session);
            }
        }
        finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }


    //deliveryDelay 값이 있다면 스프링에서 제공해주는 리플렉션으로 해당 DeliveryDelay 메소드에 접근하게 해준다.
    //isExplicitQosEnabled()라는 boolean값이 존재하는데, JMS는 ConnectionFactory에서 QOS라는 값을 설정할 수 있다. 값을 지정하게되면 JMS 사양에 지정된 기본값을 커스텀 할 수 있다.
    //옵션 값을 true로 주어 활성화 시키면된다.
    /**
     * Actually send the given JMS message.
     * @param producer the JMS MessageProducer to send with
     * @param message the JMS Message to send
     * @throws JMSException if thrown by JMS API methods
     */
    protected void doSend(MessageProducer producer, Message message) throws JMSException {
        if (this.deliveryDelay >= 0) {
            if (setDeliveryDelayMethod == null) {
                throw new IllegalStateException("setDeliveryDelay requires JMS 2.0");
            }
            ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay);
        }
        if (isExplicitQosEnabled()) {
            producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
        }
        else {
            producer.send(message);
        }
    }
  • 프로듀서는 그럼 도대체 어디서 만들어?? 라고 생각했던 부분이 여기에 있었다.

    //session과 destination을 가지고 Producer를 생성한다.

    MessageProducer producer = createProducer(session, destination);

    producer를 생성했다면, 다시 doSend!

  • deliveryDelay 값이 있다면 스프링에서 제공해주는 리플렉션으로 해당 DeliveryDelay 메소드에 접근하게 해준다.
    isExplicitQosEnabled()라는 boolean값이 존재하는데, JMS는 ConnectionFactory에서 QOS라는 값을 설정할 수 있다.

    값을 지정하게되면 JMS 사양에 지정된 기본값을 커스텀 할 수 있다.
    옵션 값을 true로 주어 활성화 시키면된다.

if문을 보니 DeliveryMode, Priority, TimeToLive의 3가지 QOS 값을 커스텀 할 수 있나보다.

if (isExplicitQosEnabled()) {
            producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
        }
        else {
            producer.send(message);
        }

 

JMS API WORK

Send를 하기 까지의 과정을 나타낸 JMS API WORK이다.

ConnectionFactory-> Connection-> Session-> MessageProducer-> Send

JmsTemplate 클래스 인스턴스는 Thread safe 하게 구현된다!

 

참고

DestinationResolver

InvokeMethod

jms 스프링 공식문서