001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.activemq.broker.ConnectionContext; 044import org.apache.activemq.broker.region.BaseDestination; 045import org.apache.activemq.broker.scheduler.JobSchedulerStore; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQQueue; 048import org.apache.activemq.command.ActiveMQTempQueue; 049import org.apache.activemq.command.ActiveMQTempTopic; 050import org.apache.activemq.command.ActiveMQTopic; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.MessageId; 054import org.apache.activemq.command.ProducerId; 055import org.apache.activemq.command.SubscriptionInfo; 056import org.apache.activemq.command.TransactionId; 057import org.apache.activemq.openwire.OpenWireFormat; 058import org.apache.activemq.protobuf.Buffer; 059import org.apache.activemq.store.AbstractMessageStore; 060import org.apache.activemq.store.IndexListener; 061import org.apache.activemq.store.ListenableFuture; 062import org.apache.activemq.store.MessageRecoveryListener; 063import org.apache.activemq.store.MessageStore; 064import org.apache.activemq.store.MessageStoreStatistics; 065import org.apache.activemq.store.PersistenceAdapter; 066import org.apache.activemq.store.TopicMessageStore; 067import org.apache.activemq.store.TransactionIdTransformer; 068import org.apache.activemq.store.TransactionStore; 069import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; 070import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 071import org.apache.activemq.store.kahadb.data.KahaDestination; 072import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 073import org.apache.activemq.store.kahadb.data.KahaLocation; 074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 076import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 077import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 078import org.apache.activemq.store.kahadb.disk.journal.Location; 079import org.apache.activemq.store.kahadb.disk.page.Transaction; 080import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 081import org.apache.activemq.usage.MemoryUsage; 082import org.apache.activemq.usage.SystemUsage; 083import org.apache.activemq.util.ServiceStopper; 084import org.apache.activemq.util.ThreadPoolUtils; 085import org.apache.activemq.wireformat.WireFormat; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 090 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 091 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 092 093 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 094 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 095 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 096 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 097 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 098 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 099 100 protected ExecutorService queueExecutor; 101 protected ExecutorService topicExecutor; 102 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 103 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 104 final WireFormat wireFormat = new OpenWireFormat(); 105 private SystemUsage usageManager; 106 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 107 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 108 Semaphore globalQueueSemaphore; 109 Semaphore globalTopicSemaphore; 110 private boolean concurrentStoreAndDispatchQueues = true; 111 // when true, message order may be compromised when cache is exhausted if store is out 112 // or order w.r.t cache 113 private boolean concurrentStoreAndDispatchTopics = false; 114 private final boolean concurrentStoreAndDispatchTransactions = false; 115 private int maxAsyncJobs = MAX_ASYNC_JOBS; 116 private final KahaDBTransactionStore transactionStore; 117 private TransactionIdTransformer transactionIdTransformer; 118 119 public KahaDBStore() { 120 this.transactionStore = new KahaDBTransactionStore(this); 121 this.transactionIdTransformer = new TransactionIdTransformer() { 122 @Override 123 public TransactionId transform(TransactionId txid) { 124 return txid; 125 } 126 }; 127 } 128 129 @Override 130 public String toString() { 131 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 132 } 133 134 @Override 135 public void setBrokerName(String brokerName) { 136 } 137 138 @Override 139 public void setUsageManager(SystemUsage usageManager) { 140 this.usageManager = usageManager; 141 } 142 143 public SystemUsage getUsageManager() { 144 return this.usageManager; 145 } 146 147 /** 148 * @return the concurrentStoreAndDispatch 149 */ 150 public boolean isConcurrentStoreAndDispatchQueues() { 151 return this.concurrentStoreAndDispatchQueues; 152 } 153 154 /** 155 * @param concurrentStoreAndDispatch 156 * the concurrentStoreAndDispatch to set 157 */ 158 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 159 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 160 } 161 162 /** 163 * @return the concurrentStoreAndDispatch 164 */ 165 public boolean isConcurrentStoreAndDispatchTopics() { 166 return this.concurrentStoreAndDispatchTopics; 167 } 168 169 /** 170 * @param concurrentStoreAndDispatch 171 * the concurrentStoreAndDispatch to set 172 */ 173 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 174 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 175 } 176 177 public boolean isConcurrentStoreAndDispatchTransactions() { 178 return this.concurrentStoreAndDispatchTransactions; 179 } 180 181 /** 182 * @return the maxAsyncJobs 183 */ 184 public int getMaxAsyncJobs() { 185 return this.maxAsyncJobs; 186 } 187 188 /** 189 * @param maxAsyncJobs 190 * the maxAsyncJobs to set 191 */ 192 public void setMaxAsyncJobs(int maxAsyncJobs) { 193 this.maxAsyncJobs = maxAsyncJobs; 194 } 195 196 197 @Override 198 protected void configureMetadata() { 199 if (brokerService != null) { 200 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 201 wireFormat.setVersion(metadata.openwireVersion); 202 203 if (LOG.isDebugEnabled()) { 204 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 205 } 206 207 } 208 } 209 210 @Override 211 public void doStart() throws Exception { 212 //configure the metadata before start, right now 213 //this is just the open wire version 214 configureMetadata(); 215 216 super.doStart(); 217 218 if (brokerService != null) { 219 // In case the recovered store used a different OpenWire version log a warning 220 // to assist in determining why journal reads fail. 221 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 222 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 223 "than the version configured[{}] reverting to the version " + 224 "used by this store, some newer broker features may not work" + 225 "as expected.", 226 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 227 228 // Update the broker service instance to the actual version in use. 229 wireFormat.setVersion(metadata.openwireVersion); 230 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 231 } 232 } 233 234 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 235 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 236 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 237 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 238 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 239 asyncQueueJobQueue, new ThreadFactory() { 240 @Override 241 public Thread newThread(Runnable runnable) { 242 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 243 thread.setDaemon(true); 244 return thread; 245 } 246 }); 247 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 248 asyncTopicJobQueue, new ThreadFactory() { 249 @Override 250 public Thread newThread(Runnable runnable) { 251 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 252 thread.setDaemon(true); 253 return thread; 254 } 255 }); 256 } 257 258 @Override 259 public void doStop(ServiceStopper stopper) throws Exception { 260 // drain down async jobs 261 LOG.info("Stopping async queue tasks"); 262 if (this.globalQueueSemaphore != null) { 263 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 264 } 265 synchronized (this.asyncQueueMaps) { 266 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 267 synchronized (m) { 268 for (StoreTask task : m.values()) { 269 task.cancel(); 270 } 271 } 272 } 273 this.asyncQueueMaps.clear(); 274 } 275 LOG.info("Stopping async topic tasks"); 276 if (this.globalTopicSemaphore != null) { 277 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 278 } 279 synchronized (this.asyncTopicMaps) { 280 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 281 synchronized (m) { 282 for (StoreTask task : m.values()) { 283 task.cancel(); 284 } 285 } 286 } 287 this.asyncTopicMaps.clear(); 288 } 289 if (this.globalQueueSemaphore != null) { 290 this.globalQueueSemaphore.drainPermits(); 291 } 292 if (this.globalTopicSemaphore != null) { 293 this.globalTopicSemaphore.drainPermits(); 294 } 295 if (this.queueExecutor != null) { 296 ThreadPoolUtils.shutdownNow(queueExecutor); 297 queueExecutor = null; 298 } 299 if (this.topicExecutor != null) { 300 ThreadPoolUtils.shutdownNow(topicExecutor); 301 topicExecutor = null; 302 } 303 LOG.info("Stopped KahaDB"); 304 super.doStop(stopper); 305 } 306 307 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 308 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 309 @Override 310 public Location execute(Transaction tx) throws IOException { 311 StoredDestination sd = getStoredDestination(destination, tx); 312 Long sequence = sd.messageIdIndex.get(tx, key); 313 if (sequence == null) { 314 return null; 315 } 316 return sd.orderIndex.get(tx, sequence).location; 317 } 318 }); 319 } 320 321 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 322 StoreQueueTask task = null; 323 synchronized (store.asyncTaskMap) { 324 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 325 } 326 return task; 327 } 328 329 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 330 synchronized (store.asyncTaskMap) { 331 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 332 } 333 this.queueExecutor.execute(task); 334 } 335 336 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 337 StoreTopicTask task = null; 338 synchronized (store.asyncTaskMap) { 339 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 340 } 341 return task; 342 } 343 344 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 345 synchronized (store.asyncTaskMap) { 346 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 347 } 348 this.topicExecutor.execute(task); 349 } 350 351 @Override 352 public TransactionStore createTransactionStore() throws IOException { 353 return this.transactionStore; 354 } 355 356 public boolean getForceRecoverIndex() { 357 return this.forceRecoverIndex; 358 } 359 360 public void setForceRecoverIndex(boolean forceRecoverIndex) { 361 this.forceRecoverIndex = forceRecoverIndex; 362 } 363 364 public class KahaDBMessageStore extends AbstractMessageStore { 365 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 366 protected KahaDestination dest; 367 private final int maxAsyncJobs; 368 private final Semaphore localDestinationSemaphore; 369 370 double doneTasks, canceledTasks = 0; 371 372 public KahaDBMessageStore(ActiveMQDestination destination) { 373 super(destination); 374 this.dest = convert(destination); 375 this.maxAsyncJobs = getMaxAsyncJobs(); 376 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 377 } 378 379 @Override 380 public ActiveMQDestination getDestination() { 381 return destination; 382 } 383 384 @Override 385 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 386 throws IOException { 387 if (isConcurrentStoreAndDispatchQueues()) { 388 StoreQueueTask result = new StoreQueueTask(this, context, message); 389 ListenableFuture<Object> future = result.getFuture(); 390 message.getMessageId().setFutureOrSequenceLong(future); 391 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 392 result.aquireLocks(); 393 addQueueTask(this, result); 394 if (indexListener != null) { 395 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 396 } 397 return future; 398 } else { 399 return super.asyncAddQueueMessage(context, message); 400 } 401 } 402 403 @Override 404 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 405 if (isConcurrentStoreAndDispatchQueues()) { 406 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 407 StoreQueueTask task = null; 408 synchronized (asyncTaskMap) { 409 task = (StoreQueueTask) asyncTaskMap.get(key); 410 } 411 if (task != null) { 412 if (ack.isInTransaction() || !task.cancel()) { 413 try { 414 task.future.get(); 415 } catch (InterruptedException e) { 416 throw new InterruptedIOException(e.toString()); 417 } catch (Exception ignored) { 418 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 419 } 420 removeMessage(context, ack); 421 } else { 422 synchronized (asyncTaskMap) { 423 asyncTaskMap.remove(key); 424 } 425 } 426 } else { 427 removeMessage(context, ack); 428 } 429 } else { 430 removeMessage(context, ack); 431 } 432 } 433 434 @Override 435 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 436 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 437 command.setDestination(dest); 438 command.setMessageId(message.getMessageId().toProducerKey()); 439 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 440 command.setPriority(message.getPriority()); 441 command.setPrioritySupported(isPrioritizedMessages()); 442 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 443 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 444 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 445 // sync add? (for async, future present from getFutureOrSequenceLong) 446 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 447 448 @Override 449 public void sequenceAssignedWithIndexLocked(final long sequence) { 450 message.getMessageId().setFutureOrSequenceLong(sequence); 451 if (indexListener != null) { 452 if (possibleFuture == null) { 453 trackPendingAdd(dest, sequence); 454 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 455 @Override 456 public void run() { 457 trackPendingAddComplete(dest, sequence); 458 } 459 })); 460 } 461 } 462 } 463 }, null); 464 } 465 466 @Override 467 public void updateMessage(Message message) throws IOException { 468 if (LOG.isTraceEnabled()) { 469 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 470 } 471 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 472 KahaAddMessageCommand command = new KahaAddMessageCommand(); 473 command.setDestination(dest); 474 command.setMessageId(message.getMessageId().toProducerKey()); 475 command.setPriority(message.getPriority()); 476 command.setPrioritySupported(prioritizedMessages); 477 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 478 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 479 updateMessageCommand.setMessage(command); 480 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 481 } 482 483 @Override 484 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 485 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 486 command.setDestination(dest); 487 command.setMessageId(ack.getLastMessageId().toProducerKey()); 488 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 489 490 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 491 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 492 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 493 } 494 495 @Override 496 public void removeAllMessages(ConnectionContext context) throws IOException { 497 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 498 command.setDestination(dest); 499 store(command, true, null, null); 500 } 501 502 @Override 503 public Message getMessage(MessageId identity) throws IOException { 504 final String key = identity.toProducerKey(); 505 506 // Hopefully one day the page file supports concurrent read 507 // operations... but for now we must 508 // externally synchronize... 509 Location location; 510 indexLock.writeLock().lock(); 511 try { 512 location = findMessageLocation(key, dest); 513 } finally { 514 indexLock.writeLock().unlock(); 515 } 516 if (location == null) { 517 return null; 518 } 519 520 return loadMessage(location); 521 } 522 523 @Override 524 public boolean isEmpty() throws IOException { 525 indexLock.writeLock().lock(); 526 try { 527 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 528 @Override 529 public Boolean execute(Transaction tx) throws IOException { 530 // Iterate through all index entries to get a count of 531 // messages in the destination. 532 StoredDestination sd = getStoredDestination(dest, tx); 533 return sd.locationIndex.isEmpty(tx); 534 } 535 }); 536 } finally { 537 indexLock.writeLock().unlock(); 538 } 539 } 540 541 @Override 542 public void recover(final MessageRecoveryListener listener) throws Exception { 543 // recovery may involve expiry which will modify 544 indexLock.writeLock().lock(); 545 try { 546 pageFile.tx().execute(new Transaction.Closure<Exception>() { 547 @Override 548 public void execute(Transaction tx) throws Exception { 549 StoredDestination sd = getStoredDestination(dest, tx); 550 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 551 sd.orderIndex.resetCursorPosition(); 552 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 553 .hasNext(); ) { 554 Entry<Long, MessageKeys> entry = iterator.next(); 555 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 556 continue; 557 } 558 Message msg = loadMessage(entry.getValue().location); 559 listener.recoverMessage(msg); 560 } 561 } 562 }); 563 } finally { 564 indexLock.writeLock().unlock(); 565 } 566 } 567 568 @Override 569 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 570 indexLock.writeLock().lock(); 571 try { 572 pageFile.tx().execute(new Transaction.Closure<Exception>() { 573 @Override 574 public void execute(Transaction tx) throws Exception { 575 StoredDestination sd = getStoredDestination(dest, tx); 576 Entry<Long, MessageKeys> entry = null; 577 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 578 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 579 entry = iterator.next(); 580 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 581 continue; 582 } 583 Message msg = loadMessage(entry.getValue().location); 584 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 585 listener.recoverMessage(msg); 586 counter++; 587 if (counter >= maxReturned) { 588 break; 589 } 590 } 591 sd.orderIndex.stoppedIterating(); 592 } 593 }); 594 } finally { 595 indexLock.writeLock().unlock(); 596 } 597 } 598 599 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 600 int counter = 0; 601 String id; 602 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 603 id = iterator.next(); 604 iterator.remove(); 605 Long sequence = sd.messageIdIndex.get(tx, id); 606 if (sequence != null) { 607 if (sd.orderIndex.alreadyDispatched(sequence)) { 608 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 609 counter++; 610 if (counter >= maxReturned) { 611 break; 612 } 613 } else { 614 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 615 } 616 } else { 617 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 618 } 619 } 620 return counter; 621 } 622 623 624 @Override 625 public void resetBatching() { 626 if (pageFile.isLoaded()) { 627 indexLock.writeLock().lock(); 628 try { 629 pageFile.tx().execute(new Transaction.Closure<Exception>() { 630 @Override 631 public void execute(Transaction tx) throws Exception { 632 StoredDestination sd = getExistingStoredDestination(dest, tx); 633 if (sd != null) { 634 sd.orderIndex.resetCursorPosition();} 635 } 636 }); 637 } catch (Exception e) { 638 LOG.error("Failed to reset batching",e); 639 } finally { 640 indexLock.writeLock().unlock(); 641 } 642 } 643 } 644 645 @Override 646 public void setBatch(final MessageId identity) throws IOException { 647 indexLock.writeLock().lock(); 648 try { 649 pageFile.tx().execute(new Transaction.Closure<IOException>() { 650 @Override 651 public void execute(Transaction tx) throws IOException { 652 StoredDestination sd = getStoredDestination(dest, tx); 653 Long location = (Long) identity.getFutureOrSequenceLong(); 654 Long pending = sd.orderIndex.minPendingAdd(); 655 if (pending != null) { 656 location = Math.min(location, pending-1); 657 } 658 sd.orderIndex.setBatch(tx, location); 659 } 660 }); 661 } finally { 662 indexLock.writeLock().unlock(); 663 } 664 } 665 666 @Override 667 public void setMemoryUsage(MemoryUsage memoryUsage) { 668 } 669 @Override 670 public void start() throws Exception { 671 super.start(); 672 } 673 @Override 674 public void stop() throws Exception { 675 super.stop(); 676 } 677 678 protected void lockAsyncJobQueue() { 679 try { 680 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 681 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 682 } 683 } catch (Exception e) { 684 LOG.error("Failed to lock async jobs for " + this.destination, e); 685 } 686 } 687 688 protected void unlockAsyncJobQueue() { 689 this.localDestinationSemaphore.release(this.maxAsyncJobs); 690 } 691 692 protected void acquireLocalAsyncLock() { 693 try { 694 this.localDestinationSemaphore.acquire(); 695 } catch (InterruptedException e) { 696 LOG.error("Failed to aquire async lock for " + this.destination, e); 697 } 698 } 699 700 protected void releaseLocalAsyncLock() { 701 this.localDestinationSemaphore.release(); 702 } 703 704 @Override 705 public String toString(){ 706 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 707 } 708 709 @Override 710 protected void recoverMessageStoreStatistics() throws IOException { 711 try { 712 MessageStoreStatistics recoveredStatistics; 713 lockAsyncJobQueue(); 714 indexLock.writeLock().lock(); 715 try { 716 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 717 @Override 718 public MessageStoreStatistics execute(Transaction tx) throws IOException { 719 MessageStoreStatistics statistics = new MessageStoreStatistics(); 720 721 // Iterate through all index entries to get the size of each message 722 StoredDestination sd = getStoredDestination(dest, tx); 723 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 724 int locationSize = iterator.next().getKey().getSize(); 725 statistics.getMessageCount().increment(); 726 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 727 } 728 return statistics; 729 } 730 }); 731 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 732 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 733 } finally { 734 indexLock.writeLock().unlock(); 735 } 736 } finally { 737 unlockAsyncJobQueue(); 738 } 739 } 740 } 741 742 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 743 private final AtomicInteger subscriptionCount = new AtomicInteger(); 744 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 745 super(destination); 746 this.subscriptionCount.set(getAllSubscriptions().length); 747 if (isConcurrentStoreAndDispatchTopics()) { 748 asyncTopicMaps.add(asyncTaskMap); 749 } 750 } 751 752 @Override 753 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 754 throws IOException { 755 if (isConcurrentStoreAndDispatchTopics()) { 756 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 757 result.aquireLocks(); 758 addTopicTask(this, result); 759 return result.getFuture(); 760 } else { 761 return super.asyncAddTopicMessage(context, message); 762 } 763 } 764 765 @Override 766 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 767 MessageId messageId, MessageAck ack) throws IOException { 768 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 769 if (isConcurrentStoreAndDispatchTopics()) { 770 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 771 StoreTopicTask task = null; 772 synchronized (asyncTaskMap) { 773 task = (StoreTopicTask) asyncTaskMap.get(key); 774 } 775 if (task != null) { 776 if (task.addSubscriptionKey(subscriptionKey)) { 777 removeTopicTask(this, messageId); 778 if (task.cancel()) { 779 synchronized (asyncTaskMap) { 780 asyncTaskMap.remove(key); 781 } 782 } 783 } 784 } else { 785 doAcknowledge(context, subscriptionKey, messageId, ack); 786 } 787 } else { 788 doAcknowledge(context, subscriptionKey, messageId, ack); 789 } 790 } 791 792 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 793 throws IOException { 794 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 795 command.setDestination(dest); 796 command.setSubscriptionKey(subscriptionKey); 797 command.setMessageId(messageId.toProducerKey()); 798 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 799 if (ack != null && ack.isUnmatchedAck()) { 800 command.setAck(UNMATCHED); 801 } else { 802 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 803 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 804 } 805 store(command, false, null, null); 806 } 807 808 @Override 809 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 810 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 811 .getSubscriptionName()); 812 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 813 command.setDestination(dest); 814 command.setSubscriptionKey(subscriptionKey.toString()); 815 command.setRetroactive(retroactive); 816 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 817 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 818 store(command, isEnableJournalDiskSyncs() && true, null, null); 819 this.subscriptionCount.incrementAndGet(); 820 } 821 822 @Override 823 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 824 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 825 command.setDestination(dest); 826 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 827 store(command, isEnableJournalDiskSyncs() && true, null, null); 828 this.subscriptionCount.decrementAndGet(); 829 } 830 831 @Override 832 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 833 834 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 835 indexLock.writeLock().lock(); 836 try { 837 pageFile.tx().execute(new Transaction.Closure<IOException>() { 838 @Override 839 public void execute(Transaction tx) throws IOException { 840 StoredDestination sd = getStoredDestination(dest, tx); 841 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 842 .hasNext();) { 843 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 844 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 845 .getValue().getSubscriptionInfo().newInput())); 846 subscriptions.add(info); 847 848 } 849 } 850 }); 851 } finally { 852 indexLock.writeLock().unlock(); 853 } 854 855 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 856 subscriptions.toArray(rc); 857 return rc; 858 } 859 860 @Override 861 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 862 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 863 indexLock.writeLock().lock(); 864 try { 865 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 866 @Override 867 public SubscriptionInfo execute(Transaction tx) throws IOException { 868 StoredDestination sd = getStoredDestination(dest, tx); 869 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 870 if (command == null) { 871 return null; 872 } 873 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 874 .getSubscriptionInfo().newInput())); 875 } 876 }); 877 } finally { 878 indexLock.writeLock().unlock(); 879 } 880 } 881 882 @Override 883 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 884 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 885 indexLock.writeLock().lock(); 886 try { 887 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 888 @Override 889 public Integer execute(Transaction tx) throws IOException { 890 StoredDestination sd = getStoredDestination(dest, tx); 891 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 892 if (cursorPos == null) { 893 // The subscription might not exist. 894 return 0; 895 } 896 897 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 898 } 899 }); 900 } finally { 901 indexLock.writeLock().unlock(); 902 } 903 } 904 905 906 @Override 907 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 908 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 909 indexLock.writeLock().lock(); 910 try { 911 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 912 @Override 913 public Integer execute(Transaction tx) throws IOException { 914 StoredDestination sd = getStoredDestination(dest, tx); 915 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 916 if (cursorPos == null) { 917 // The subscription might not exist. 918 return 0; 919 } 920 921 return (int) getStoredMessageSize(tx, sd, subscriptionKey); 922 } 923 }); 924 } finally { 925 indexLock.writeLock().unlock(); 926 } 927 } 928 929 @Override 930 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 931 throws Exception { 932 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 933 @SuppressWarnings("unused") 934 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 935 indexLock.writeLock().lock(); 936 try { 937 pageFile.tx().execute(new Transaction.Closure<Exception>() { 938 @Override 939 public void execute(Transaction tx) throws Exception { 940 StoredDestination sd = getStoredDestination(dest, tx); 941 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 942 sd.orderIndex.setBatch(tx, cursorPos); 943 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 944 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 945 .hasNext();) { 946 Entry<Long, MessageKeys> entry = iterator.next(); 947 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 948 continue; 949 } 950 listener.recoverMessage(loadMessage(entry.getValue().location)); 951 } 952 sd.orderIndex.resetCursorPosition(); 953 } 954 }); 955 } finally { 956 indexLock.writeLock().unlock(); 957 } 958 } 959 960 @Override 961 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 962 final MessageRecoveryListener listener) throws Exception { 963 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 964 @SuppressWarnings("unused") 965 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 966 indexLock.writeLock().lock(); 967 try { 968 pageFile.tx().execute(new Transaction.Closure<Exception>() { 969 @Override 970 public void execute(Transaction tx) throws Exception { 971 StoredDestination sd = getStoredDestination(dest, tx); 972 sd.orderIndex.resetCursorPosition(); 973 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 974 if (moc == null) { 975 LastAck pos = getLastAck(tx, sd, subscriptionKey); 976 if (pos == null) { 977 // sub deleted 978 return; 979 } 980 sd.orderIndex.setBatch(tx, pos); 981 moc = sd.orderIndex.cursor; 982 } else { 983 sd.orderIndex.cursor.sync(moc); 984 } 985 986 Entry<Long, MessageKeys> entry = null; 987 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 988 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 989 .hasNext();) { 990 entry = iterator.next(); 991 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 992 continue; 993 } 994 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 995 counter++; 996 } 997 if (counter >= maxReturned || listener.hasSpace() == false) { 998 break; 999 } 1000 } 1001 sd.orderIndex.stoppedIterating(); 1002 if (entry != null) { 1003 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1004 sd.subscriptionCursors.put(subscriptionKey, copy); 1005 } 1006 } 1007 }); 1008 } finally { 1009 indexLock.writeLock().unlock(); 1010 } 1011 } 1012 1013 @Override 1014 public void resetBatching(String clientId, String subscriptionName) { 1015 try { 1016 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1017 indexLock.writeLock().lock(); 1018 try { 1019 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1020 @Override 1021 public void execute(Transaction tx) throws IOException { 1022 StoredDestination sd = getStoredDestination(dest, tx); 1023 sd.subscriptionCursors.remove(subscriptionKey); 1024 } 1025 }); 1026 }finally { 1027 indexLock.writeLock().unlock(); 1028 } 1029 } catch (IOException e) { 1030 throw new RuntimeException(e); 1031 } 1032 } 1033 } 1034 1035 String subscriptionKey(String clientId, String subscriptionName) { 1036 return clientId + ":" + subscriptionName; 1037 } 1038 1039 @Override 1040 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1041 String key = key(convert(destination)); 1042 MessageStore store = storeCache.get(key(convert(destination))); 1043 if (store == null) { 1044 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1045 store = storeCache.putIfAbsent(key, queueStore); 1046 if (store == null) { 1047 store = queueStore; 1048 } 1049 } 1050 1051 return store; 1052 } 1053 1054 @Override 1055 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1056 String key = key(convert(destination)); 1057 MessageStore store = storeCache.get(key(convert(destination))); 1058 if (store == null) { 1059 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1060 store = storeCache.putIfAbsent(key, topicStore); 1061 if (store == null) { 1062 store = topicStore; 1063 } 1064 } 1065 1066 return (TopicMessageStore) store; 1067 } 1068 1069 /** 1070 * Cleanup method to remove any state associated with the given destination. 1071 * This method does not stop the message store (it might not be cached). 1072 * 1073 * @param destination 1074 * Destination to forget 1075 */ 1076 @Override 1077 public void removeQueueMessageStore(ActiveMQQueue destination) { 1078 } 1079 1080 /** 1081 * Cleanup method to remove any state associated with the given destination 1082 * This method does not stop the message store (it might not be cached). 1083 * 1084 * @param destination 1085 * Destination to forget 1086 */ 1087 @Override 1088 public void removeTopicMessageStore(ActiveMQTopic destination) { 1089 } 1090 1091 @Override 1092 public void deleteAllMessages() throws IOException { 1093 deleteAllMessages = true; 1094 } 1095 1096 @Override 1097 public Set<ActiveMQDestination> getDestinations() { 1098 try { 1099 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1100 indexLock.writeLock().lock(); 1101 try { 1102 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1103 @Override 1104 public void execute(Transaction tx) throws IOException { 1105 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1106 .hasNext();) { 1107 Entry<String, StoredDestination> entry = iterator.next(); 1108 //Removing isEmpty topic check - see AMQ-5875 1109 rc.add(convert(entry.getKey())); 1110 } 1111 } 1112 }); 1113 }finally { 1114 indexLock.writeLock().unlock(); 1115 } 1116 return rc; 1117 } catch (IOException e) { 1118 throw new RuntimeException(e); 1119 } 1120 } 1121 1122 @Override 1123 public long getLastMessageBrokerSequenceId() throws IOException { 1124 return 0; 1125 } 1126 1127 @Override 1128 public long getLastProducerSequenceId(ProducerId id) { 1129 indexLock.writeLock().lock(); 1130 try { 1131 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1132 } finally { 1133 indexLock.writeLock().unlock(); 1134 } 1135 } 1136 1137 @Override 1138 public long size() { 1139 try { 1140 return journalSize.get() + getPageFile().getDiskSize(); 1141 } catch (IOException e) { 1142 throw new RuntimeException(e); 1143 } 1144 } 1145 1146 @Override 1147 public void beginTransaction(ConnectionContext context) throws IOException { 1148 throw new IOException("Not yet implemented."); 1149 } 1150 @Override 1151 public void commitTransaction(ConnectionContext context) throws IOException { 1152 throw new IOException("Not yet implemented."); 1153 } 1154 @Override 1155 public void rollbackTransaction(ConnectionContext context) throws IOException { 1156 throw new IOException("Not yet implemented."); 1157 } 1158 1159 @Override 1160 public void checkpoint(boolean sync) throws IOException { 1161 super.checkpointCleanup(sync); 1162 } 1163 1164 // ///////////////////////////////////////////////////////////////// 1165 // Internal helper methods. 1166 // ///////////////////////////////////////////////////////////////// 1167 1168 /** 1169 * @param location 1170 * @return 1171 * @throws IOException 1172 */ 1173 Message loadMessage(Location location) throws IOException { 1174 JournalCommand<?> command = load(location); 1175 KahaAddMessageCommand addMessage = null; 1176 switch (command.type()) { 1177 case KAHA_UPDATE_MESSAGE_COMMAND: 1178 addMessage = ((KahaUpdateMessageCommand)command).getMessage(); 1179 break; 1180 default: 1181 addMessage = (KahaAddMessageCommand) command; 1182 } 1183 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1184 return msg; 1185 } 1186 1187 // ///////////////////////////////////////////////////////////////// 1188 // Internal conversion methods. 1189 // ///////////////////////////////////////////////////////////////// 1190 1191 KahaLocation convert(Location location) { 1192 KahaLocation rc = new KahaLocation(); 1193 rc.setLogId(location.getDataFileId()); 1194 rc.setOffset(location.getOffset()); 1195 return rc; 1196 } 1197 1198 KahaDestination convert(ActiveMQDestination dest) { 1199 KahaDestination rc = new KahaDestination(); 1200 rc.setName(dest.getPhysicalName()); 1201 switch (dest.getDestinationType()) { 1202 case ActiveMQDestination.QUEUE_TYPE: 1203 rc.setType(DestinationType.QUEUE); 1204 return rc; 1205 case ActiveMQDestination.TOPIC_TYPE: 1206 rc.setType(DestinationType.TOPIC); 1207 return rc; 1208 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1209 rc.setType(DestinationType.TEMP_QUEUE); 1210 return rc; 1211 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1212 rc.setType(DestinationType.TEMP_TOPIC); 1213 return rc; 1214 default: 1215 return null; 1216 } 1217 } 1218 1219 ActiveMQDestination convert(String dest) { 1220 int p = dest.indexOf(":"); 1221 if (p < 0) { 1222 throw new IllegalArgumentException("Not in the valid destination format"); 1223 } 1224 int type = Integer.parseInt(dest.substring(0, p)); 1225 String name = dest.substring(p + 1); 1226 return convert(type, name); 1227 } 1228 1229 private ActiveMQDestination convert(KahaDestination commandDestination) { 1230 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1231 } 1232 1233 private ActiveMQDestination convert(int type, String name) { 1234 switch (KahaDestination.DestinationType.valueOf(type)) { 1235 case QUEUE: 1236 return new ActiveMQQueue(name); 1237 case TOPIC: 1238 return new ActiveMQTopic(name); 1239 case TEMP_QUEUE: 1240 return new ActiveMQTempQueue(name); 1241 case TEMP_TOPIC: 1242 return new ActiveMQTempTopic(name); 1243 default: 1244 throw new IllegalArgumentException("Not in the valid destination format"); 1245 } 1246 } 1247 1248 public TransactionIdTransformer getTransactionIdTransformer() { 1249 return transactionIdTransformer; 1250 } 1251 1252 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1253 this.transactionIdTransformer = transactionIdTransformer; 1254 } 1255 1256 static class AsyncJobKey { 1257 MessageId id; 1258 ActiveMQDestination destination; 1259 1260 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1261 this.id = id; 1262 this.destination = destination; 1263 } 1264 1265 @Override 1266 public boolean equals(Object obj) { 1267 if (obj == this) { 1268 return true; 1269 } 1270 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1271 && destination.equals(((AsyncJobKey) obj).destination); 1272 } 1273 1274 @Override 1275 public int hashCode() { 1276 return id.hashCode() + destination.hashCode(); 1277 } 1278 1279 @Override 1280 public String toString() { 1281 return destination.getPhysicalName() + "-" + id; 1282 } 1283 } 1284 1285 public interface StoreTask { 1286 public boolean cancel(); 1287 1288 public void aquireLocks(); 1289 1290 public void releaseLocks(); 1291 } 1292 1293 class StoreQueueTask implements Runnable, StoreTask { 1294 protected final Message message; 1295 protected final ConnectionContext context; 1296 protected final KahaDBMessageStore store; 1297 protected final InnerFutureTask future; 1298 protected final AtomicBoolean done = new AtomicBoolean(); 1299 protected final AtomicBoolean locked = new AtomicBoolean(); 1300 1301 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1302 this.store = store; 1303 this.context = context; 1304 this.message = message; 1305 this.future = new InnerFutureTask(this); 1306 } 1307 1308 public ListenableFuture<Object> getFuture() { 1309 return this.future; 1310 } 1311 1312 @Override 1313 public boolean cancel() { 1314 if (this.done.compareAndSet(false, true)) { 1315 return this.future.cancel(false); 1316 } 1317 return false; 1318 } 1319 1320 @Override 1321 public void aquireLocks() { 1322 if (this.locked.compareAndSet(false, true)) { 1323 try { 1324 globalQueueSemaphore.acquire(); 1325 store.acquireLocalAsyncLock(); 1326 message.incrementReferenceCount(); 1327 } catch (InterruptedException e) { 1328 LOG.warn("Failed to aquire lock", e); 1329 } 1330 } 1331 1332 } 1333 1334 @Override 1335 public void releaseLocks() { 1336 if (this.locked.compareAndSet(true, false)) { 1337 store.releaseLocalAsyncLock(); 1338 globalQueueSemaphore.release(); 1339 message.decrementReferenceCount(); 1340 } 1341 } 1342 1343 @Override 1344 public void run() { 1345 this.store.doneTasks++; 1346 try { 1347 if (this.done.compareAndSet(false, true)) { 1348 this.store.addMessage(context, message); 1349 removeQueueTask(this.store, this.message.getMessageId()); 1350 this.future.complete(); 1351 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1352 System.err.println(this.store.dest.getName() + " cancelled: " 1353 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1354 this.store.canceledTasks = this.store.doneTasks = 0; 1355 } 1356 } catch (Exception e) { 1357 this.future.setException(e); 1358 } 1359 } 1360 1361 protected Message getMessage() { 1362 return this.message; 1363 } 1364 1365 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1366 1367 private Runnable listener; 1368 public InnerFutureTask(Runnable runnable) { 1369 super(runnable, null); 1370 1371 } 1372 1373 public void setException(final Exception e) { 1374 super.setException(e); 1375 } 1376 1377 public void complete() { 1378 super.set(null); 1379 } 1380 1381 @Override 1382 public void done() { 1383 fireListener(); 1384 } 1385 1386 @Override 1387 public void addListener(Runnable listener) { 1388 this.listener = listener; 1389 if (isDone()) { 1390 fireListener(); 1391 } 1392 } 1393 1394 private void fireListener() { 1395 if (listener != null) { 1396 try { 1397 listener.run(); 1398 } catch (Exception ignored) { 1399 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1400 } 1401 } 1402 } 1403 } 1404 } 1405 1406 class StoreTopicTask extends StoreQueueTask { 1407 private final int subscriptionCount; 1408 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1409 private final KahaDBTopicMessageStore topicStore; 1410 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1411 int subscriptionCount) { 1412 super(store, context, message); 1413 this.topicStore = store; 1414 this.subscriptionCount = subscriptionCount; 1415 1416 } 1417 1418 @Override 1419 public void aquireLocks() { 1420 if (this.locked.compareAndSet(false, true)) { 1421 try { 1422 globalTopicSemaphore.acquire(); 1423 store.acquireLocalAsyncLock(); 1424 message.incrementReferenceCount(); 1425 } catch (InterruptedException e) { 1426 LOG.warn("Failed to aquire lock", e); 1427 } 1428 } 1429 } 1430 1431 @Override 1432 public void releaseLocks() { 1433 if (this.locked.compareAndSet(true, false)) { 1434 message.decrementReferenceCount(); 1435 store.releaseLocalAsyncLock(); 1436 globalTopicSemaphore.release(); 1437 } 1438 } 1439 1440 /** 1441 * add a key 1442 * 1443 * @param key 1444 * @return true if all acknowledgements received 1445 */ 1446 public boolean addSubscriptionKey(String key) { 1447 synchronized (this.subscriptionKeys) { 1448 this.subscriptionKeys.add(key); 1449 } 1450 return this.subscriptionKeys.size() >= this.subscriptionCount; 1451 } 1452 1453 @Override 1454 public void run() { 1455 this.store.doneTasks++; 1456 try { 1457 if (this.done.compareAndSet(false, true)) { 1458 this.topicStore.addMessage(context, message); 1459 // apply any acks we have 1460 synchronized (this.subscriptionKeys) { 1461 for (String key : this.subscriptionKeys) { 1462 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1463 1464 } 1465 } 1466 removeTopicTask(this.topicStore, this.message.getMessageId()); 1467 this.future.complete(); 1468 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1469 System.err.println(this.store.dest.getName() + " cancelled: " 1470 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1471 this.store.canceledTasks = this.store.doneTasks = 0; 1472 } 1473 } catch (Exception e) { 1474 this.future.setException(e); 1475 } 1476 } 1477 } 1478 1479 public class StoreTaskExecutor extends ThreadPoolExecutor { 1480 1481 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1482 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1483 } 1484 1485 @Override 1486 protected void afterExecute(Runnable runnable, Throwable throwable) { 1487 super.afterExecute(runnable, throwable); 1488 1489 if (runnable instanceof StoreTask) { 1490 ((StoreTask)runnable).releaseLocks(); 1491 } 1492 } 1493 } 1494 1495 @Override 1496 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1497 return new JobSchedulerStoreImpl(); 1498 } 1499}