Cliente JMS multithreaded ActiveMQ

Eu estou usando o código abaixo para criar várias sessões JMS para vários consumidores consumirem mensagens. Meu problema é que o código está sendo executado de uma maneira única thread. Mesmo se as mensagens estiverem presentes na fila, a segunda thread não poderá receber nada e apenas manterá a pesquisa. O primeiro thread, entretanto, termina de processar o primeiro lote e volta e consome as mensagens restantes. Há algo de errado com o uso aqui?

static { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616"); connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { LOGGER.error("Unable to initialise JMS Queue.", e); } } public JMSClientReader(boolean isQueue, String name) throws QueueException { init(isQueue,name); } @Override public void init(boolean isQueue, String name) throws QueueException { // Create a Connection try { // Create a Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); if (isQueue) { destination = new ActiveMQQueue(name);// session.createQueue("queue"); } else { destination = new ActiveMQTopic(name);// session.createTopic("topic"); } consumer = session.createConsumer(destination); } catch (JMSException e) { LOGGER.error("Unable to initialise JMS Queue.", e); throw new QueueException(e); } } public String readQueue() throws QueueException { // connection.setExceptionListener(this); // Wait for a message String text = null; Message message; try { message = consumer.receive(1000); if(message==null) return "done"; if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; text = textMessage.getText(); LOGGER.info("Received: " + text); } else { throw new JMSException("Invalid message found"); } } catch (JMSException e) { LOGGER.error("Unable to read message from Queue", e); throw new QueueException(e); } LOGGER.info("Message read is " + text); return text; } 

seu problema é o prefetchPolicy.

 persistent queues (default value: 1000) non-persistent queues (default value: 1000) persistent topics (default value: 100) non-persistent topics (default value: Short.MAX_VALUE - 1) 

todas as mensagens foram despachadas para o primeiro consumidor conectado e quando outro se conecta ele não recebe mensagens, portanto, para mudar esse comportamento se você tiver um consumidor simultâneo para uma fila, será necessário definir o prefetchPolicy como um valor inferior ao padrão. por exemplo, adicione este jms.prefetchPolicy.queuePrefetch=1 à configuração uri em activemq.xml ou configure-o no URL do cliente como este

 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1"); 

Valores grandes de pré-busca são recomendados para alto desempenho com altos volumes de mensagens. No entanto, para volumes de mensagens mais baixos, em que cada mensagem leva muito tempo para ser processada, a pré-busca deve ser configurada como 1. Isso garante que um consumidor esteja processando apenas uma mensagem por vez. Especificar um limite de pré-busca de zero, no entanto, fará com que o consumidor pesquise mensagens, uma de cada vez, em vez de a mensagem ser enviada ao consumidor.

Dê uma olhada no http://activemq.apache.org/what-is-the-prefetch-limit-for.html

E

http://activemq.apache.org/destination-options.html