Как: реализовать BatchMessageListenerContainer для массового использования очереди JMS

Недавно я столкнулся с потребностью в JMS-потребителе в Spring Integration, способном обрабатывать большие объемы пакетов, не нагружая мою целевую базу данных Oracle слишком большим количеством коммитов.

DefaultMessageListenerContainer, похоже, не поддерживает ничего, кроме транзакций сообщения за сообщением.

Я искал решения в Google и нашел пару, но многие из них пострадали от того, что были реализованы не путем наследования от DMLC, а, скорее, путем клонирования и изменения исходного исходного кода из того же самого, что сделало его уязвимым для взлома в случае, если я позже захочу перейти на более свежая версия spring-jms. Кроме того, клонируемый код ссылается на частные свойства DMLC, которые, следовательно, должны быть исключены. И для того, чтобы все это работало, также потребовались пара интерфейсов и настраиваемый прослушиватель сообщений. В общем, я не чувствовал себя комфортно.

Так что делать?


person Jens Krogsboell    schedule 10.03.2015    source источник


Ответы (1)


Что ж - это простое и компактное решение, полностью основанное на одном классе, производном от DefaultMessageListenerContainer.

Я тестировал только канал-адаптер, управляемый сообщениями, и ChainedTransactionManager - поскольку это своего рода базовый сценарий, когда нужно делать что-то подобное.

Это код:

package dk.itealisten.myservice.spring.components;

import org.springframework.jms.listener.DefaultMessageListenerContainer;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.Enumeration;

public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {

    public static final int DEFAULT_BATCH_SIZE = 100;

    public int batchSize = DEFAULT_BATCH_SIZE;

    /**
     * Override the method receiveMessage to return an instance of BatchMessage - an inner class being declared further down.
     */
    @Override
    protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
        BatchMessage batch = new BatchMessage();
        while (!batch.releaseAfterMessage(super.receiveMessage(consumer))) ;
        return batch.messages.size() == 0 ? null : batch;
    }

    /**
     * As BatchMessage implements the javax.jms.Message interface it fits perfectly into the DMLC - only caveat is that SimpleMessageConverter dont know how to convert it to a Spring Integration Message - but that can be helped.
     * As BatchMessage will only serve as a container to carry the actual javax.jms.Message's from DMLC to the MessageListener it need not provide meaningful implementations of the methods of the interface as long as they are there.
     */
    protected class BatchMessage implements Message {

        public ArrayList<Message> messages = new ArrayList<Message>();

        /**
         * Add message to the collection of messages and return true if the batch meets the criteria for releasing it to the MessageListener.
         */
        public boolean releaseAfterMessage(Message message) {
            if (message != null) {
                messages.add(message);
            }
            // Are we ready to release?
            return message == null || messages.size() >= batchSize;
        }

        // Below is only dummy-implementations of the abstract methods of javax.jms.Message

        @Override
        public String getJMSMessageID() throws JMSException {
            return null;
        }

        @Override
        public void setJMSMessageID(String s) throws JMSException {

        }

        @Override
        public long getJMSTimestamp() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSTimestamp(long l) throws JMSException {

        }

        @Override
        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
            return new byte[0];
        }

        @Override
        public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {

        }

        @Override
        public void setJMSCorrelationID(String s) throws JMSException {

        }

        @Override
        public String getJMSCorrelationID() throws JMSException {
            return null;
        }

        @Override
        public Destination getJMSReplyTo() throws JMSException {
            return null;
        }

        @Override
        public void setJMSReplyTo(Destination destination) throws JMSException {

        }

        @Override
        public Destination getJMSDestination() throws JMSException {
            return null;
        }

        @Override
        public void setJMSDestination(Destination destination) throws JMSException {

        }

        @Override
        public int getJMSDeliveryMode() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSDeliveryMode(int i) throws JMSException {

        }

        @Override
        public boolean getJMSRedelivered() throws JMSException {
            return false;
        }

        @Override
        public void setJMSRedelivered(boolean b) throws JMSException {

        }

        @Override
        public String getJMSType() throws JMSException {
            return null;
        }

        @Override
        public void setJMSType(String s) throws JMSException {

        }

        @Override
        public long getJMSExpiration() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSExpiration(long l) throws JMSException {

        }

        @Override
        public long getJMSDeliveryTime() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSDeliveryTime(long l) throws JMSException {

        }

        @Override
        public int getJMSPriority() throws JMSException {
            return 0;
        }

        @Override
        public void setJMSPriority(int i) throws JMSException {

        }

        @Override
        public void clearProperties() throws JMSException {

        }

        @Override
        public boolean propertyExists(String s) throws JMSException {
            return false;
        }

        @Override
        public boolean getBooleanProperty(String s) throws JMSException {
            return false;
        }

        @Override
        public byte getByteProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public short getShortProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public int getIntProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public long getLongProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public float getFloatProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public double getDoubleProperty(String s) throws JMSException {
            return 0;
        }

        @Override
        public String getStringProperty(String s) throws JMSException {
            return null;
        }

        @Override
        public Object getObjectProperty(String s) throws JMSException {
            return null;
        }

        @Override
        public Enumeration getPropertyNames() throws JMSException {
            return null;
        }

        @Override
        public void setBooleanProperty(String s, boolean b) throws JMSException {

        }

        @Override
        public void setByteProperty(String s, byte b) throws JMSException {

        }

        @Override
        public void setShortProperty(String s, short i) throws JMSException {

        }

        @Override
        public void setIntProperty(String s, int i) throws JMSException {

        }

        @Override
        public void setLongProperty(String s, long l) throws JMSException {

        }

        @Override
        public void setFloatProperty(String s, float v) throws JMSException {

        }

        @Override
        public void setDoubleProperty(String s, double v) throws JMSException {

        }

        @Override
        public void setStringProperty(String s, String s1) throws JMSException {

        }

        @Override
        public void setObjectProperty(String s, Object o) throws JMSException {

        }

        @Override
        public void acknowledge() throws JMSException {

        }

        @Override
        public void clearBody() throws JMSException {

        }

        @Override
        public <T> T getBody(Class<T> aClass) throws JMSException {
            return null;
        }

        @Override
        public boolean isBodyAssignableTo(Class aClass) throws JMSException {
            return false;
        }
    }
}

Ниже приведен пример, показывающий, как его можно использовать в контексте приложения Spring:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:jms="http://www.springframework.org/schema/integration/jms"
  xmlns:p="http://www.springframework.org/schema/p"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans     
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
    http://www.springframework.org/schema/integration/jms
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.0.xsd">

<!-- Plug in the BatchMessageListenerContainer in a message-driven-channel-adapter -->
<jms:message-driven-channel-adapter container-class="dk.itealisten.myservice.spring.components.BatchMessageListenerContainer"
  acknowledge="transacted"
  channel="from.mq"
  concurrent-consumers="5"
  max-concurrent-consumers="15"
  connection-factory="jmsConnectionFactory"
  transaction-manager="transactionManager"
  destination="my.mq.queue"
  />

<!-- Flow processing the BatchMessages being posted on the "from.mq" channel -->
<int:chain input-channel="from.mq" output-channel="nullChannel">
  <int:splitter expression="payload.messages" />
  <!-- This is where we deal with conversion to spring messages as the payload is now a single standard javax.jms.Message implementation -->
  <int:transformer ref="smc" method="fromMessage"/>
  <!-- And finally we persist -->
  <int:service-activator ref="jdbcPublisher" method="persist"/>
</int:chain>

<!-- Various supporting beans -->

<!-- A bean to handle the database persistance --> 
<bean id="jdbcPersistor" class="dk.itealisten.myservice.spring.components.JdbcPersistor" p:dataSource-ref="dataSource" />

<!-- A bean to handle the conversion that could not take place in the MessageListener as it don't know how to convert a BatchMessage -->
<bean id="smc" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

<!-- Transaction manager must make sure messages are committed outbound (JDBC) before cleaned up inbound (JMS). -->
<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
  <constructor-arg name="transactionManagers">
    <list>
      <bean class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="jmsConnectionFactory" />
      <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" />
    </list>
  </constructor-arg>
</bean>
person Jens Krogsboell    schedule 10.03.2015
comment
Не могли бы вы опубликовать enitire класс BatchMessageListenerContainer? или не могли бы вы поделиться ссылкой? - person Alagammal P; 29.03.2019
comment
@AlagammalP Я добавил полную реализацию BatchMessageListenerContainer :-) - person Jens Krogsboell; 01.04.2019
comment
Спасибо Jens Krogsboell за обновление. Я также хочу вставить часть сообщений в БД после получения сообщений из очереди. Но я не использую весеннюю интеграцию, и я использую контейнер, я думаю, что если использовать этот метод, смогу ли я получить BatchMessage.messages внутри onMessage () прослушивателя сообщений. stackoverflow.com/questions/55107244/ - person Alagammal P; 01.04.2019
comment
@AlagammalP Я не могу дать вам никаких подробностей, так как у меня нет опыта работы с JMS без Spring-интеграции. Но это должно быть возможно. Только - я не уверен, действительно ли вам необходимо использовать BatchMessage - мне это просто нужно, потому что весенняя интеграция не обеспечивает уровень контроля, который мне нужен из коробки. - person Jens Krogsboell; 02.04.2019
comment
@AlagammalP Прошло почти три года с тех пор, как я ушел из компании, где создавал это решение - детали несколько размыты. Но я не припоминаю, чтобы задержки с доставкой сообщений наблюдались. - person Jens Krogsboell; 24.04.2019