001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.List;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import javax.jms.JMSException;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatch;
038import org.apache.activemq.command.MessageDispatchNotification;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.transaction.Synchronization;
044import org.apache.activemq.transport.TransmitCallback;
045import org.apache.activemq.usage.SystemUsage;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A subscription that honors the pre-fetch option of the ConsumerInfo.
051 */
052public abstract class PrefetchSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
055    protected final Scheduler scheduler;
056
057    protected PendingMessageCursor pending;
058    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
059    protected final AtomicInteger prefetchExtension = new AtomicInteger();
060    protected boolean usePrefetchExtension = true;
061    private int maxProducersToAudit=32;
062    private int maxAuditDepth=2048;
063    protected final SystemUsage usageManager;
064    protected final Object pendingLock = new Object();
065    protected final Object dispatchLock = new Object();
066    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
067
068    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
069        super(broker,context, info);
070        this.usageManager=usageManager;
071        pending = cursor;
072        try {
073            pending.start();
074        } catch (Exception e) {
075            throw new JMSException(e.getMessage());
076        }
077        this.scheduler = broker.getScheduler();
078    }
079
080    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
081        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
082    }
083
084    /**
085     * Allows a message to be pulled on demand by a client
086     */
087    @Override
088    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
089        // The slave should not deliver pull messages.
090        // TODO: when the slave becomes a master, He should send a NULL message to all the
091        // consumers to 'wake them up' in case they were waiting for a message.
092        if (getPrefetchSize() == 0) {
093            prefetchExtension.set(pull.getQuantity());
094            final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount();
095
096            // Have the destination push us some messages.
097            for (Destination dest : destinations) {
098                dest.iterate();
099            }
100            dispatchPending();
101
102            synchronized(this) {
103                // If there was nothing dispatched.. we may need to setup a timeout.
104                if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
105                    // immediate timeout used by receiveNoWait()
106                    if (pull.getTimeout() == -1) {
107                        // Null message indicates the pull is done or did not have pending.
108                        prefetchExtension.set(1);
109                        add(QueueMessageReference.NULL_MESSAGE);
110                        dispatchPending();
111                    }
112                    if (pull.getTimeout() > 0) {
113                        scheduler.executeAfterDelay(new Runnable() {
114                            @Override
115                            public void run() {
116                                pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
117                            }
118                        }, pull.getTimeout());
119                    }
120                }
121            }
122        }
123        return null;
124    }
125
126    /**
127     * Occurs when a pull times out. If nothing has been dispatched since the
128     * timeout was setup, then send the NULL message.
129     */
130    final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
131        synchronized (pendingLock) {
132            if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) {
133                try {
134                    prefetchExtension.set(1);
135                    add(QueueMessageReference.NULL_MESSAGE);
136                    dispatchPending();
137                } catch (Exception e) {
138                    context.getConnection().serviceException(e);
139                } finally {
140                    prefetchExtension.set(0);
141                }
142            }
143        }
144    }
145
146    @Override
147    public void add(MessageReference node) throws Exception {
148        synchronized (pendingLock) {
149            // The destination may have just been removed...
150            if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
151                // perhaps we should inform the caller that we are no longer valid to dispatch to?
152                return;
153            }
154
155            // Don't increment for the pullTimeout control message.
156            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
157                getSubscriptionStatistics().getEnqueues().increment();
158            }
159            pending.addMessageLast(node);
160        }
161        dispatchPending();
162    }
163
164    @Override
165    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
166        synchronized(pendingLock) {
167            try {
168                pending.reset();
169                while (pending.hasNext()) {
170                    MessageReference node = pending.next();
171                    node.decrementReferenceCount();
172                    if (node.getMessageId().equals(mdn.getMessageId())) {
173                        // Synchronize between dispatched list and removal of messages from pending list
174                        // related to remove subscription action
175                        synchronized(dispatchLock) {
176                            pending.remove();
177                            createMessageDispatch(node, node.getMessage());
178                            dispatched.add(node);
179                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
180                            onDispatch(node, node.getMessage());
181                        }
182                        return;
183                    }
184                }
185            } finally {
186                pending.release();
187            }
188        }
189        throw new JMSException(
190                "Slave broker out of sync with master: Dispatched message ("
191                        + mdn.getMessageId() + ") was not in the pending list for "
192                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
193    }
194
195    @Override
196    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
197        // Handle the standard acknowledgment case.
198        boolean callDispatchMatched = false;
199        Destination destination = null;
200
201        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
202            // suppress unexpected ack exception in this expected case
203            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
204            return;
205        }
206
207        LOG.trace("ack: {}", ack);
208
209        synchronized(dispatchLock) {
210            if (ack.isStandardAck()) {
211                // First check if the ack matches the dispatched. When using failover this might
212                // not be the case. We don't ever want to ack the wrong messages.
213                assertAckMatchesDispatched(ack);
214
215                // Acknowledge all dispatched messages up till the message id of
216                // the acknowledgment.
217                boolean inAckRange = false;
218                List<MessageReference> removeList = new ArrayList<MessageReference>();
219                for (final MessageReference node : dispatched) {
220                    MessageId messageId = node.getMessageId();
221                    if (ack.getFirstMessageId() == null
222                            || ack.getFirstMessageId().equals(messageId)) {
223                        inAckRange = true;
224                    }
225                    if (inAckRange) {
226                        // Don't remove the nodes until we are committed.
227                        if (!context.isInTransaction()) {
228                            getSubscriptionStatistics().getDequeues().increment();
229                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
230                            removeList.add(node);
231                        } else {
232                            registerRemoveSync(context, node);
233                        }
234                        acknowledge(context, ack, node);
235                        if (ack.getLastMessageId().equals(messageId)) {
236                            destination = (Destination) node.getRegionDestination();
237                            callDispatchMatched = true;
238                            break;
239                        }
240                    }
241                }
242                for (final MessageReference node : removeList) {
243                    dispatched.remove(node);
244                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
245                }
246                // this only happens after a reconnect - get an ack which is not
247                // valid
248                if (!callDispatchMatched) {
249                    LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
250                }
251            } else if (ack.isIndividualAck()) {
252                // Message was delivered and acknowledge - but only delete the
253                // individual message
254                for (final MessageReference node : dispatched) {
255                    MessageId messageId = node.getMessageId();
256                    if (ack.getLastMessageId().equals(messageId)) {
257                        // Don't remove the nodes until we are committed - immediateAck option
258                        if (!context.isInTransaction()) {
259                            getSubscriptionStatistics().getDequeues().increment();
260                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
261                            dispatched.remove(node);
262                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
263                        } else {
264                            registerRemoveSync(context, node);
265                        }
266
267                        if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
268                            // allow transaction batch to exceed prefetch
269                            while (true) {
270                                int currentExtension = prefetchExtension.get();
271                                int newExtension = Math.max(currentExtension, currentExtension + 1);
272                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
273                                    break;
274                                }
275                            }
276                        }
277
278                        acknowledge(context, ack, node);
279                        destination = (Destination) node.getRegionDestination();
280                        callDispatchMatched = true;
281                        break;
282                    }
283                }
284            }else if (ack.isDeliveredAck()) {
285                // Message was delivered but not acknowledged: update pre-fetch
286                // counters.
287                int index = 0;
288                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
289                    final MessageReference node = iter.next();
290                    Destination nodeDest = (Destination) node.getRegionDestination();
291                    if (ack.getLastMessageId().equals(node.getMessageId())) {
292                        if (usePrefetchExtension && getPrefetchSize() != 0) {
293                            // allow  batch to exceed prefetch
294                            while (true) {
295                                int currentExtension = prefetchExtension.get();
296                                int newExtension = Math.max(currentExtension, index + 1);
297                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
298                                    break;
299                                }
300                            }
301                        }
302                        destination = nodeDest;
303                        callDispatchMatched = true;
304                        break;
305                    }
306                }
307                if (!callDispatchMatched) {
308                    throw new JMSException(
309                            "Could not correlate acknowledgment with dispatched message: "
310                                    + ack);
311                }
312            } else if (ack.isExpiredAck()) {
313                // Message was expired
314                int index = 0;
315                boolean inAckRange = false;
316                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
317                    final MessageReference node = iter.next();
318                    Destination nodeDest = (Destination) node.getRegionDestination();
319                    MessageId messageId = node.getMessageId();
320                    if (ack.getFirstMessageId() == null
321                            || ack.getFirstMessageId().equals(messageId)) {
322                        inAckRange = true;
323                    }
324                    if (inAckRange) {
325                        if (node.isExpired()) {
326                            if (broker.isExpired(node)) {
327                                Destination regionDestination = nodeDest;
328                                regionDestination.messageExpired(context, this, node);
329                            }
330                            iter.remove();
331                            nodeDest.getDestinationStatistics().getInflight().decrement();
332                        }
333                        if (ack.getLastMessageId().equals(messageId)) {
334                            if (usePrefetchExtension && getPrefetchSize() != 0) {
335                                // allow  batch to exceed prefetch
336                                while (true) {
337                                    int currentExtension = prefetchExtension.get();
338                                    int newExtension = Math.max(currentExtension, index + 1);
339                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
340                                        break;
341                                    }
342                                }
343                            }
344
345                            destination = (Destination) node.getRegionDestination();
346                            callDispatchMatched = true;
347                            break;
348                        }
349                    }
350                }
351                if (!callDispatchMatched) {
352                    throw new JMSException(
353                            "Could not correlate expiration acknowledgment with dispatched message: "
354                                    + ack);
355                }
356            } else if (ack.isRedeliveredAck()) {
357                // Message was re-delivered but it was not yet considered to be
358                // a DLQ message.
359                boolean inAckRange = false;
360                for (final MessageReference node : dispatched) {
361                    MessageId messageId = node.getMessageId();
362                    if (ack.getFirstMessageId() == null
363                            || ack.getFirstMessageId().equals(messageId)) {
364                        inAckRange = true;
365                    }
366                    if (inAckRange) {
367                        if (ack.getLastMessageId().equals(messageId)) {
368                            destination = (Destination) node.getRegionDestination();
369                            callDispatchMatched = true;
370                            break;
371                        }
372                    }
373                }
374                if (!callDispatchMatched) {
375                    throw new JMSException(
376                            "Could not correlate acknowledgment with dispatched message: "
377                                    + ack);
378                }
379            } else if (ack.isPoisonAck()) {
380                // TODO: what if the message is already in a DLQ???
381                // Handle the poison ACK case: we need to send the message to a
382                // DLQ
383                if (ack.isInTransaction()) {
384                    throw new JMSException("Poison ack cannot be transacted: "
385                            + ack);
386                }
387                int index = 0;
388                boolean inAckRange = false;
389                List<MessageReference> removeList = new ArrayList<MessageReference>();
390                for (final MessageReference node : dispatched) {
391                    MessageId messageId = node.getMessageId();
392                    if (ack.getFirstMessageId() == null
393                            || ack.getFirstMessageId().equals(messageId)) {
394                        inAckRange = true;
395                    }
396                    if (inAckRange) {
397                        sendToDLQ(context, node, ack.getPoisonCause());
398                        Destination nodeDest = (Destination) node.getRegionDestination();
399                        nodeDest.getDestinationStatistics()
400                        .getInflight().decrement();
401                        removeList.add(node);
402                        getSubscriptionStatistics().getDequeues().increment();
403                        index++;
404                        acknowledge(context, ack, node);
405                        if (ack.getLastMessageId().equals(messageId)) {
406                            while (true) {
407                                int currentExtension = prefetchExtension.get();
408                                int newExtension = Math.max(0, currentExtension - (index + 1));
409                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
410                                    break;
411                                }
412                            }
413                            destination = nodeDest;
414                            callDispatchMatched = true;
415                            break;
416                        }
417                    }
418                }
419                for (final MessageReference node : removeList) {
420                    dispatched.remove(node);
421                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
422                }
423                if (!callDispatchMatched) {
424                    throw new JMSException(
425                            "Could not correlate acknowledgment with dispatched message: "
426                                    + ack);
427                }
428            }
429        }
430        if (callDispatchMatched && destination != null) {
431            destination.wakeup();
432            dispatchPending();
433
434            if (pending.isEmpty()) {
435                for (Destination dest : destinations) {
436                    dest.wakeup();
437                }
438            }
439        } else {
440            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
441        }
442    }
443
444    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
445        // setup a Synchronization to remove nodes from the
446        // dispatched list.
447        context.getTransaction().addSynchronization(
448                new Synchronization() {
449
450                    @Override
451                    public void beforeEnd() {
452                        if (usePrefetchExtension && getPrefetchSize() != 0) {
453                            while (true) {
454                                int currentExtension = prefetchExtension.get();
455                                int newExtension = Math.max(0, currentExtension - 1);
456                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
457                                    break;
458                                }
459                            }
460                        }
461                    }
462
463                    @Override
464                    public void afterCommit()
465                            throws Exception {
466                        Destination nodeDest = (Destination) node.getRegionDestination();
467                        synchronized(dispatchLock) {
468                            getSubscriptionStatistics().getDequeues().increment();
469                            dispatched.remove(node);
470                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
471                            nodeDest.getDestinationStatistics().getInflight().decrement();
472                        }
473                        nodeDest.wakeup();
474                        dispatchPending();
475                    }
476
477                    @Override
478                    public void afterRollback() throws Exception {
479                        synchronized(dispatchLock) {
480                            // poisionAck will decrement - otherwise still inflight on client
481                        }
482                    }
483                });
484    }
485
486    /**
487     * Checks an ack versus the contents of the dispatched list.
488     *  called with dispatchLock held
489     * @param ack
490     * @throws JMSException if it does not match
491     */
492    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
493        MessageId firstAckedMsg = ack.getFirstMessageId();
494        MessageId lastAckedMsg = ack.getLastMessageId();
495        int checkCount = 0;
496        boolean checkFoundStart = false;
497        boolean checkFoundEnd = false;
498        for (MessageReference node : dispatched) {
499
500            if (firstAckedMsg == null) {
501                checkFoundStart = true;
502            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
503                checkFoundStart = true;
504            }
505
506            if (checkFoundStart) {
507                checkCount++;
508            }
509
510            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
511                checkFoundEnd = true;
512                break;
513            }
514        }
515        if (!checkFoundStart && firstAckedMsg != null)
516            throw new JMSException("Unmatched acknowledge: " + ack
517                    + "; Could not find Message-ID " + firstAckedMsg
518                    + " in dispatched-list (start of ack)");
519        if (!checkFoundEnd && lastAckedMsg != null)
520            throw new JMSException("Unmatched acknowledge: " + ack
521                    + "; Could not find Message-ID " + lastAckedMsg
522                    + " in dispatched-list (end of ack)");
523        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
524            throw new JMSException("Unmatched acknowledge: " + ack
525                    + "; Expected message count (" + ack.getMessageCount()
526                    + ") differs from count in dispatched-list (" + checkCount
527                    + ")");
528        }
529    }
530
531    /**
532     *
533     * @param context
534     * @param node
535     * @param poisonCause
536     * @throws IOException
537     * @throws Exception
538     */
539    protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
540        broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
541    }
542
543    @Override
544    public int getInFlightSize() {
545        return dispatched.size();
546    }
547
548    /**
549     * Used to determine if the broker can dispatch to the consumer.
550     *
551     * @return true if the subscription is full
552     */
553    @Override
554    public boolean isFull() {
555        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
556    }
557
558    /**
559     * @return true when 60% or more room is left for dispatching messages
560     */
561    @Override
562    public boolean isLowWaterMark() {
563        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
564    }
565
566    /**
567     * @return true when 10% or less room is left for dispatching messages
568     */
569    @Override
570    public boolean isHighWaterMark() {
571        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
572    }
573
574    @Override
575    public int countBeforeFull() {
576        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
577    }
578
579    @Override
580    public int getPendingQueueSize() {
581        return pending.size();
582    }
583
584    @Override
585    public long getPendingMessageSize() {
586        synchronized (pendingLock) {
587            return pending.messageSize();
588        }
589    }
590
591    @Override
592    public int getDispatchedQueueSize() {
593        return dispatched.size();
594    }
595
596    @Override
597    public long getDequeueCounter() {
598        return getSubscriptionStatistics().getDequeues().getCount();
599    }
600
601    @Override
602    public long getDispatchedCounter() {
603        return getSubscriptionStatistics().getDispatched().getCount();
604    }
605
606    @Override
607    public long getEnqueueCounter() {
608        return getSubscriptionStatistics().getEnqueues().getCount();
609    }
610
611    @Override
612    public boolean isRecoveryRequired() {
613        return pending.isRecoveryRequired();
614    }
615
616    public PendingMessageCursor getPending() {
617        return this.pending;
618    }
619
620    public void setPending(PendingMessageCursor pending) {
621        this.pending = pending;
622        if (this.pending!=null) {
623            this.pending.setSystemUsage(usageManager);
624            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
625        }
626    }
627
628    @Override
629    public void add(ConnectionContext context, Destination destination) throws Exception {
630        synchronized(pendingLock) {
631            super.add(context, destination);
632            pending.add(context, destination);
633        }
634    }
635
636    @Override
637    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
638        return remove(context, destination, dispatched);
639    }
640
641    public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
642        List<MessageReference> rc = new ArrayList<MessageReference>();
643        synchronized(pendingLock) {
644            super.remove(context, destination);
645            // Here is a potential problem concerning Inflight stat:
646            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
647            // Except if each commit or rollback callback action comes before remove of subscriber.
648            rc.addAll(pending.remove(context, destination));
649
650            if (dispatched == null) {
651                return rc;
652            }
653
654            // Synchronized to DispatchLock if necessary
655            if (dispatched == this.dispatched) {
656                synchronized(dispatchLock) {
657                    updateDestinationStats(rc, destination, dispatched);
658                }
659            } else {
660                updateDestinationStats(rc, destination, dispatched);
661            }
662        }
663        return rc;
664    }
665
666    private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
667        ArrayList<MessageReference> references = new ArrayList<MessageReference>();
668        for (MessageReference r : dispatched) {
669            if (r.getRegionDestination() == destination) {
670                references.add(r);
671                getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
672            }
673        }
674        rc.addAll(references);
675        destination.getDestinationStatistics().getInflight().subtract(references.size());
676        dispatched.removeAll(references);
677    }
678
679    // made public so it can be used in MQTTProtocolConverter
680    public void dispatchPending() throws IOException {
681        synchronized(pendingLock) {
682            try {
683                int numberToDispatch = countBeforeFull();
684                if (numberToDispatch > 0) {
685                    setSlowConsumer(false);
686                    setPendingBatchSize(pending, numberToDispatch);
687                    int count = 0;
688                    pending.reset();
689                    while (pending.hasNext() && !isFull() && count < numberToDispatch) {
690                        MessageReference node = pending.next();
691                        if (node == null) {
692                            break;
693                        }
694
695                        // Synchronize between dispatched list and remove of message from pending list
696                        // related to remove subscription action
697                        synchronized(dispatchLock) {
698                            pending.remove();
699                            if (!isDropped(node) && canDispatch(node)) {
700
701                                // Message may have been sitting in the pending
702                                // list a while waiting for the consumer to ak the message.
703                                if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
704                                    //increment number to dispatch
705                                    numberToDispatch++;
706                                    if (broker.isExpired(node)) {
707                                        ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
708                                    }
709
710                                    if (!isBrowser()) {
711                                        node.decrementReferenceCount();
712                                        continue;
713                                    }
714                                }
715                                dispatch(node);
716                                count++;
717                            }
718                        }
719                        // decrement after dispatch has taken ownership to avoid usage jitter
720                        node.decrementReferenceCount();
721                    }
722                } else if (!isSlowConsumer()) {
723                    setSlowConsumer(true);
724                    for (Destination dest :destinations) {
725                        dest.slowConsumer(context, this);
726                    }
727                }
728            } finally {
729                pending.release();
730            }
731        }
732    }
733
734    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
735        pending.setMaxBatchSize(numberToDispatch);
736    }
737
738    // called with dispatchLock held
739    protected boolean dispatch(final MessageReference node) throws IOException {
740        final Message message = node.getMessage();
741        if (message == null) {
742            return false;
743        }
744
745        okForAckAsDispatchDone.countDown();
746
747        MessageDispatch md = createMessageDispatch(node, message);
748        if (node != QueueMessageReference.NULL_MESSAGE) {
749            getSubscriptionStatistics().getDispatched().increment();
750            dispatched.add(node);
751            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
752        }
753        if (getPrefetchSize() == 0) {
754            while (true) {
755                int currentExtension = prefetchExtension.get();
756                int newExtension = Math.max(0, currentExtension - 1);
757                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
758                    break;
759                }
760            }
761        }
762        if (info.isDispatchAsync()) {
763            md.setTransmitCallback(new TransmitCallback() {
764
765                @Override
766                public void onSuccess() {
767                    // Since the message gets queued up in async dispatch, we don't want to
768                    // decrease the reference count until it gets put on the wire.
769                    onDispatch(node, message);
770                }
771
772                @Override
773                public void onFailure() {
774                    Destination nodeDest = (Destination) node.getRegionDestination();
775                    if (nodeDest != null) {
776                        if (node != QueueMessageReference.NULL_MESSAGE) {
777                            nodeDest.getDestinationStatistics().getDispatched().increment();
778                            nodeDest.getDestinationStatistics().getInflight().increment();
779                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
780                        }
781                    }
782                    if (node instanceof QueueMessageReference) {
783                        ((QueueMessageReference) node).unlock();
784                    }
785                }
786            });
787            context.getConnection().dispatchAsync(md);
788        } else {
789            context.getConnection().dispatchSync(md);
790            onDispatch(node, message);
791        }
792        return true;
793    }
794
795    protected void onDispatch(final MessageReference node, final Message message) {
796        Destination nodeDest = (Destination) node.getRegionDestination();
797        if (nodeDest != null) {
798            if (node != QueueMessageReference.NULL_MESSAGE) {
799                nodeDest.getDestinationStatistics().getDispatched().increment();
800                nodeDest.getDestinationStatistics().getInflight().increment();
801                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
802            }
803        }
804
805        if (info.isDispatchAsync()) {
806            try {
807                dispatchPending();
808            } catch (IOException e) {
809                context.getConnection().serviceExceptionAsync(e);
810            }
811        }
812    }
813
814    /**
815     * inform the MessageConsumer on the client to change it's prefetch
816     *
817     * @param newPrefetch
818     */
819    @Override
820    public void updateConsumerPrefetch(int newPrefetch) {
821        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
822            ConsumerControl cc = new ConsumerControl();
823            cc.setConsumerId(info.getConsumerId());
824            cc.setPrefetch(newPrefetch);
825            context.getConnection().dispatchAsync(cc);
826        }
827    }
828
829    /**
830     * @param node
831     * @param message
832     * @return MessageDispatch
833     */
834    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
835        MessageDispatch md = new MessageDispatch();
836        md.setConsumerId(info.getConsumerId());
837
838        if (node == QueueMessageReference.NULL_MESSAGE) {
839            md.setMessage(null);
840            md.setDestination(null);
841        } else {
842            Destination regionDestination = (Destination) node.getRegionDestination();
843            md.setDestination(regionDestination.getActiveMQDestination());
844            md.setMessage(message);
845            md.setRedeliveryCounter(node.getRedeliveryCounter());
846        }
847
848        return md;
849    }
850
851    /**
852     * Use when a matched message is about to be dispatched to the client.
853     *
854     * @param node
855     * @return false if the message should not be dispatched to the client
856     *         (another sub may have already dispatched it for example).
857     * @throws IOException
858     */
859    protected abstract boolean canDispatch(MessageReference node) throws IOException;
860
861    protected abstract boolean isDropped(MessageReference node);
862
863    /**
864     * Used during acknowledgment to remove the message.
865     *
866     * @throws IOException
867     */
868    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
869
870
871    public int getMaxProducersToAudit() {
872        return maxProducersToAudit;
873    }
874
875    public void setMaxProducersToAudit(int maxProducersToAudit) {
876        this.maxProducersToAudit = maxProducersToAudit;
877        if (this.pending != null) {
878            this.pending.setMaxProducersToAudit(maxProducersToAudit);
879        }
880    }
881
882    public int getMaxAuditDepth() {
883        return maxAuditDepth;
884    }
885
886    public void setMaxAuditDepth(int maxAuditDepth) {
887        this.maxAuditDepth = maxAuditDepth;
888        if (this.pending != null) {
889            this.pending.setMaxAuditDepth(maxAuditDepth);
890        }
891    }
892
893    public boolean isUsePrefetchExtension() {
894        return usePrefetchExtension;
895    }
896
897    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
898        this.usePrefetchExtension = usePrefetchExtension;
899    }
900
901    protected int getPrefetchExtension() {
902        return this.prefetchExtension.get();
903    }
904
905    @Override
906    public void setPrefetchSize(int prefetchSize) {
907        this.info.setPrefetchSize(prefetchSize);
908        try {
909            this.dispatchPending();
910        } catch (Exception e) {
911            LOG.trace("Caught exception during dispatch after prefetch change.", e);
912        }
913    }
914}