java - Unable to get ActiveMQ to dequeue -


there might stupid simple answer this, i'm trying use activemq pass messages between producers , consumers. have many producers , many consumers, want each message delivered once among consumers. seem mean cannot use topics, since deliver messages consumers listening, , want 1 consumer receive each message.

my problem able receive messages, messages not dequeued. if restart consumer process, of messages reprocessed. this answer seems pertinent not seem apply since can't create durable queue consumers, durable topic consumers (unless i'm missing in api docs).

my code follows.

topicconnectionfactory factory = new activemqconnectionfactory(props.getproperty("mq.url")); connection conn  = factory.createconnection(); session session = conn.createsession(true, session.client_acknowledge); queue queue = session.createqueue(props.getproperty("mq.source_queue")); conn.start(); messageconsumer consumer = session.createconsumer(queue); 

then later on

message msg = consumer.receive(); msg.acknowledge(); if (!(msg instanceof textmessage)) continue; string msgstr = ((textmessage)msg).gettext(); 

this current code. have tried session.auto_acknowledge , without msg.acknowledge(). working permutation of code seems retrieve messages, when restart consumer, of messages received again, if have been received prior restart.

you created session transacted session , therefore need call, session.commit if want inform broker messages consumed , don't need redelivered. if don't set first argument createsession true ack mode respected otherwise ignored, 1 of oddities of jms api i'm afraid. if this:

connectionfactory factory = new activemqconnectionfactory(props.getproperty("mq.url")); connection conn  = factory.createconnection(); session session = conn.createsession(false, session.client_acknowledge); queue queue = session.createqueue(props.getproperty("mq.source_queue")); conn.start(); messageconsumer consumer = session.createconsumer(queue); 

then work:

message msg = consumer.receive(); msg.acknowledge(); 

otherwise need do:

message msg = consumer.receive(); session.commit();  

but keep in mind single message transactions don't make sense client ack no transaction better option.


Comments

Popular posts from this blog

c# - SVN Error : "svnadmin: E205000: Too many arguments" -

c# - Copy ObservableCollection to another ObservableCollection -

All overlapping substrings matching a java regex -