001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.List; 023 024import org.apache.activemq.broker.region.MessageReference; 025import org.apache.activemq.broker.region.QueueMessageReference; 026import org.apache.activemq.command.MessageId; 027 028/** 029 * An abstraction that keeps the correct order of messages that need to be dispatched 030 * to consumers, but also hides the fact that there might be redelivered messages that 031 * should be dispatched ahead of any other paged in messages. 032 * 033 * Direct usage of this class is recommended as you can control when redeliveries need 034 * to be added vs regular pending messages (the next set of messages that can be dispatched) 035 * 036 * Created by ceposta 037 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. 038 */ 039public class QueueDispatchPendingList implements PendingList { 040 041 private PendingList pagedInPendingDispatch = new OrderedPendingList(); 042 private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); 043 // when true use one PrioritizedPendingList for everything 044 private boolean prioritized = false; 045 046 047 @Override 048 public boolean isEmpty() { 049 return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty(); 050 } 051 052 @Override 053 public void clear() { 054 pagedInPendingDispatch.clear(); 055 redeliveredWaitingDispatch.clear(); 056 } 057 058 /** 059 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 060 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 061 * method 062 * @param message 063 * The MessageReference that is to be added to this list. 064 * 065 * @return the pending node. 066 */ 067 @Override 068 public PendingNode addMessageFirst(MessageReference message) { 069 return pagedInPendingDispatch.addMessageFirst(message); 070 } 071 072 /** 073 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 074 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 075 * method 076 * @param message 077 * The MessageReference that is to be added to this list. 078 * 079 * @return the pending node. 080 */ 081 @Override 082 public PendingNode addMessageLast(MessageReference message) { 083 return pagedInPendingDispatch.addMessageLast(message); 084 } 085 086 @Override 087 public PendingNode remove(MessageReference message) { 088 if (pagedInPendingDispatch.contains(message)) { 089 return pagedInPendingDispatch.remove(message); 090 }else if (redeliveredWaitingDispatch.contains(message)) { 091 return redeliveredWaitingDispatch.remove(message); 092 } 093 return null; 094 } 095 096 @Override 097 public int size() { 098 return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size(); 099 } 100 101 @Override 102 public long messageSize() { 103 return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize(); 104 } 105 106 @Override 107 public Iterator<MessageReference> iterator() { 108 return new Iterator<MessageReference>() { 109 110 Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator(); 111 Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator(); 112 Iterator<MessageReference> current = redeliveries; 113 114 115 @Override 116 public boolean hasNext() { 117 if (!redeliveries.hasNext() && (current == redeliveries)) { 118 current = pendingDispatch; 119 } 120 return current.hasNext(); 121 } 122 123 @Override 124 public MessageReference next() { 125 return current.next(); 126 } 127 128 @Override 129 public void remove() { 130 current.remove(); 131 } 132 }; 133 } 134 135 @Override 136 public boolean contains(MessageReference message) { 137 return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message); 138 } 139 140 @Override 141 public Collection<MessageReference> values() { 142 List<MessageReference> messageReferences = new ArrayList<MessageReference>(); 143 Iterator<MessageReference> iterator = iterator(); 144 while (iterator.hasNext()) { 145 messageReferences.add(iterator.next()); 146 } 147 return messageReferences; 148 } 149 150 @Override 151 public void addAll(PendingList pendingList) { 152 pagedInPendingDispatch.addAll(pendingList); 153 } 154 155 @Override 156 public MessageReference get(MessageId messageId) { 157 MessageReference rc = pagedInPendingDispatch.get(messageId); 158 if (rc == null) { 159 return redeliveredWaitingDispatch.get(messageId); 160 } 161 return rc; 162 } 163 164 public void setPrioritizedMessages(boolean prioritizedMessages) { 165 prioritized = prioritizedMessages; 166 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { 167 pagedInPendingDispatch = new PrioritizedPendingList(); 168 redeliveredWaitingDispatch = new PrioritizedPendingList(); 169 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { 170 pagedInPendingDispatch = new OrderedPendingList(); 171 redeliveredWaitingDispatch = new OrderedPendingList(); 172 } 173 } 174 175 public void addMessageForRedelivery(QueueMessageReference qmr) { 176 if (prioritized) { 177 pagedInPendingDispatch.addMessageLast(qmr); 178 } else { 179 redeliveredWaitingDispatch.addMessageLast(qmr); 180 } 181 } 182 183 public boolean hasRedeliveries(){ 184 return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty(); 185 } 186}