001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.Set; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.BrokerServiceAware; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.scheduler.JobSchedulerStore; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQQueue; 034import org.apache.activemq.command.ActiveMQTempQueue; 035import org.apache.activemq.command.ActiveMQTempTopic; 036import org.apache.activemq.command.ActiveMQTopic; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.ProducerId; 041import org.apache.activemq.command.SubscriptionInfo; 042import org.apache.activemq.command.TransactionId; 043import org.apache.activemq.command.XATransactionId; 044import org.apache.activemq.openwire.OpenWireFormat; 045import org.apache.activemq.protobuf.Buffer; 046import org.apache.activemq.store.AbstractMessageStore; 047import org.apache.activemq.store.MessageRecoveryListener; 048import org.apache.activemq.store.MessageStore; 049import org.apache.activemq.store.PersistenceAdapter; 050import org.apache.activemq.store.TopicMessageStore; 051import org.apache.activemq.store.TransactionRecoveryListener; 052import org.apache.activemq.store.TransactionStore; 053import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 054import org.apache.activemq.store.kahadb.data.KahaDestination; 055import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 056import org.apache.activemq.store.kahadb.data.KahaLocation; 057import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 058import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 059import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 060import org.apache.activemq.store.kahadb.disk.journal.Location; 061import org.apache.activemq.store.kahadb.disk.page.Transaction; 062import org.apache.activemq.usage.MemoryUsage; 063import org.apache.activemq.usage.SystemUsage; 064import org.apache.activemq.util.ByteSequence; 065import org.apache.activemq.wireformat.WireFormat; 066 067public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware { 068 069 private final WireFormat wireFormat = new OpenWireFormat(); 070 private BrokerService brokerService; 071 072 @Override 073 public void setBrokerName(String brokerName) { 074 } 075 @Override 076 public void setUsageManager(SystemUsage usageManager) { 077 } 078 079 @Override 080 public TransactionStore createTransactionStore() throws IOException { 081 return new TransactionStore(){ 082 083 @Override 084 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 085 if (preCommit != null) { 086 preCommit.run(); 087 } 088 processCommit(txid); 089 if (postCommit != null) { 090 postCommit.run(); 091 } 092 } 093 @Override 094 public void prepare(TransactionId txid) throws IOException { 095 processPrepare(txid); 096 } 097 @Override 098 public void rollback(TransactionId txid) throws IOException { 099 processRollback(txid); 100 } 101 @Override 102 public void recover(TransactionRecoveryListener listener) throws IOException { 103 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { 104 XATransactionId xid = (XATransactionId)entry.getKey(); 105 ArrayList<Message> messageList = new ArrayList<Message>(); 106 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 107 108 for (Operation op : entry.getValue()) { 109 if( op.getClass() == AddOpperation.class ) { 110 AddOpperation addOp = (AddOpperation)op; 111 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); 112 messageList.add(msg); 113 } else { 114 RemoveOpperation rmOp = (RemoveOpperation)op; 115 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); 116 ackList.add(ack); 117 } 118 } 119 120 Message[] addedMessages = new Message[messageList.size()]; 121 MessageAck[] acks = new MessageAck[ackList.size()]; 122 messageList.toArray(addedMessages); 123 ackList.toArray(acks); 124 listener.recover(xid, addedMessages, acks); 125 } 126 } 127 @Override 128 public void start() throws Exception { 129 } 130 @Override 131 public void stop() throws Exception { 132 } 133 }; 134 } 135 136 public class KahaDBMessageStore extends AbstractMessageStore { 137 protected KahaDestination dest; 138 139 public KahaDBMessageStore(ActiveMQDestination destination) { 140 super(destination); 141 this.dest = convert( destination ); 142 } 143 144 @Override 145 public ActiveMQDestination getDestination() { 146 return destination; 147 } 148 149 @Override 150 public void addMessage(ConnectionContext context, Message message) throws IOException { 151 KahaAddMessageCommand command = new KahaAddMessageCommand(); 152 command.setDestination(dest); 153 command.setMessageId(message.getMessageId().toProducerKey()); 154 processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); 155 } 156 157 @Override 158 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 159 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 160 command.setDestination(dest); 161 command.setMessageId(ack.getLastMessageId().toProducerKey()); 162 processRemove(command, ack.getTransactionId()); 163 } 164 165 @Override 166 public void removeAllMessages(ConnectionContext context) throws IOException { 167 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 168 command.setDestination(dest); 169 process(command); 170 } 171 172 @Override 173 public Message getMessage(MessageId identity) throws IOException { 174 final String key = identity.toProducerKey(); 175 176 // Hopefully one day the page file supports concurrent read operations... but for now we must 177 // externally synchronize... 178 ByteSequence data; 179 synchronized(indexMutex) { 180 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ 181 @Override 182 public ByteSequence execute(Transaction tx) throws IOException { 183 StoredDestination sd = getStoredDestination(dest, tx); 184 Long sequence = sd.messageIdIndex.get(tx, key); 185 if( sequence ==null ) { 186 return null; 187 } 188 return sd.orderIndex.get(tx, sequence).data; 189 } 190 }); 191 } 192 if( data == null ) { 193 return null; 194 } 195 196 Message msg = (Message)wireFormat.unmarshal( data ); 197 return msg; 198 } 199 200 @Override 201 public void recover(final MessageRecoveryListener listener) throws Exception { 202 synchronized(indexMutex) { 203 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 204 @Override 205 public void execute(Transaction tx) throws Exception { 206 StoredDestination sd = getStoredDestination(dest, tx); 207 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 208 Entry<Long, MessageRecord> entry = iterator.next(); 209 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) ); 210 } 211 } 212 }); 213 } 214 } 215 216 long cursorPos=0; 217 218 @Override 219 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 220 synchronized(indexMutex) { 221 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 222 @Override 223 public void execute(Transaction tx) throws Exception { 224 StoredDestination sd = getStoredDestination(dest, tx); 225 Entry<Long, MessageRecord> entry=null; 226 int counter = 0; 227 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 228 entry = iterator.next(); 229 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 230 counter++; 231 if( counter >= maxReturned ) { 232 break; 233 } 234 } 235 if( entry!=null ) { 236 cursorPos = entry.getKey()+1; 237 } 238 } 239 }); 240 } 241 } 242 243 @Override 244 public void resetBatching() { 245 cursorPos=0; 246 } 247 248 249 @Override 250 public void setBatch(MessageId identity) throws IOException { 251 final String key = identity.toProducerKey(); 252 253 // Hopefully one day the page file supports concurrent read operations... but for now we must 254 // externally synchronize... 255 Long location; 256 synchronized(indexMutex) { 257 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ 258 @Override 259 public Long execute(Transaction tx) throws IOException { 260 StoredDestination sd = getStoredDestination(dest, tx); 261 return sd.messageIdIndex.get(tx, key); 262 } 263 }); 264 } 265 if( location!=null ) { 266 cursorPos=location+1; 267 } 268 269 } 270 271 @Override 272 public void setMemoryUsage(MemoryUsage memoryUsage) { 273 } 274 @Override 275 public void start() throws Exception { 276 } 277 @Override 278 public void stop() throws Exception { 279 } 280 281 @Override 282 public void recoverMessageStoreStatistics() throws IOException { 283 int count = 0; 284 synchronized(indexMutex) { 285 count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 286 @Override 287 public Integer execute(Transaction tx) throws IOException { 288 // Iterate through all index entries to get a count of messages in the destination. 289 StoredDestination sd = getStoredDestination(dest, tx); 290 int rc=0; 291 for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { 292 iterator.next(); 293 rc++; 294 } 295 return rc; 296 } 297 }); 298 } 299 getMessageStoreStatistics().getMessageCount().setCount(count); 300 } 301 302 } 303 304 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 305 public KahaDBTopicMessageStore(ActiveMQTopic destination) { 306 super(destination); 307 } 308 309 @Override 310 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 311 MessageId messageId, MessageAck ack) throws IOException { 312 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 313 command.setDestination(dest); 314 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 315 command.setMessageId(messageId.toProducerKey()); 316 // We are not passed a transaction info.. so we can't participate in a transaction. 317 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack 318 // to pass back to the XA recover method. 319 // command.setTransactionInfo(); 320 processRemove(command, null); 321 } 322 323 @Override 324 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 325 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); 326 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 327 command.setDestination(dest); 328 command.setSubscriptionKey(subscriptionKey); 329 command.setRetroactive(retroactive); 330 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 331 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 332 process(command); 333 } 334 335 @Override 336 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 337 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 338 command.setDestination(dest); 339 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 340 process(command); 341 } 342 343 @Override 344 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 345 346 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 347 synchronized(indexMutex) { 348 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 349 @Override 350 public void execute(Transaction tx) throws IOException { 351 StoredDestination sd = getStoredDestination(dest, tx); 352 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { 353 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 354 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); 355 subscriptions.add(info); 356 357 } 358 } 359 }); 360 } 361 362 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; 363 subscriptions.toArray(rc); 364 return rc; 365 } 366 367 @Override 368 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 369 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 370 synchronized(indexMutex) { 371 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ 372 @Override 373 public SubscriptionInfo execute(Transaction tx) throws IOException { 374 StoredDestination sd = getStoredDestination(dest, tx); 375 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 376 if( command ==null ) { 377 return null; 378 } 379 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); 380 } 381 }); 382 } 383 } 384 385 @Override 386 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 387 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 388 synchronized(indexMutex) { 389 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 390 @Override 391 public Integer execute(Transaction tx) throws IOException { 392 StoredDestination sd = getStoredDestination(dest, tx); 393 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 394 if ( cursorPos==null ) { 395 // The subscription might not exist. 396 return 0; 397 } 398 cursorPos += 1; 399 400 int counter = 0; 401 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 402 iterator.next(); 403 counter++; 404 } 405 return counter; 406 } 407 }); 408 } 409 } 410 411 @Override 412 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 413 return 0; 414 } 415 416 @Override 417 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 418 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 419 synchronized(indexMutex) { 420 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 421 @Override 422 public void execute(Transaction tx) throws Exception { 423 StoredDestination sd = getStoredDestination(dest, tx); 424 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 425 cursorPos += 1; 426 427 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 428 Entry<Long, MessageRecord> entry = iterator.next(); 429 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 430 } 431 } 432 }); 433 } 434 } 435 436 @Override 437 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { 438 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 439 synchronized(indexMutex) { 440 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 441 @Override 442 public void execute(Transaction tx) throws Exception { 443 StoredDestination sd = getStoredDestination(dest, tx); 444 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); 445 if( cursorPos == null ) { 446 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 447 cursorPos += 1; 448 } 449 450 Entry<Long, MessageRecord> entry=null; 451 int counter = 0; 452 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 453 entry = iterator.next(); 454 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 455 counter++; 456 if( counter >= maxReturned ) { 457 break; 458 } 459 } 460 if( entry!=null ) { 461 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); 462 } 463 } 464 }); 465 } 466 } 467 468 @Override 469 public void resetBatching(String clientId, String subscriptionName) { 470 try { 471 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 472 synchronized(indexMutex) { 473 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 474 @Override 475 public void execute(Transaction tx) throws IOException { 476 StoredDestination sd = getStoredDestination(dest, tx); 477 sd.subscriptionCursors.remove(subscriptionKey); 478 } 479 }); 480 } 481 } catch (IOException e) { 482 throw new RuntimeException(e); 483 } 484 } 485 } 486 487 String subscriptionKey(String clientId, String subscriptionName){ 488 return clientId+":"+subscriptionName; 489 } 490 491 @Override 492 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 493 return new KahaDBMessageStore(destination); 494 } 495 496 @Override 497 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 498 return new KahaDBTopicMessageStore(destination); 499 } 500 501 /** 502 * Cleanup method to remove any state associated with the given destination. 503 * This method does not stop the message store (it might not be cached). 504 * 505 * @param destination Destination to forget 506 */ 507 @Override 508 public void removeQueueMessageStore(ActiveMQQueue destination) { 509 } 510 511 /** 512 * Cleanup method to remove any state associated with the given destination 513 * This method does not stop the message store (it might not be cached). 514 * 515 * @param destination Destination to forget 516 */ 517 @Override 518 public void removeTopicMessageStore(ActiveMQTopic destination) { 519 } 520 521 @Override 522 public void deleteAllMessages() throws IOException { 523 } 524 525 526 @Override 527 public Set<ActiveMQDestination> getDestinations() { 528 try { 529 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 530 synchronized(indexMutex) { 531 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 532 @Override 533 public void execute(Transaction tx) throws IOException { 534 for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { 535 Entry<String, StoredDestination> entry = iterator.next(); 536 rc.add(convert(entry.getKey())); 537 } 538 } 539 }); 540 } 541 return rc; 542 } catch (IOException e) { 543 throw new RuntimeException(e); 544 } 545 } 546 547 @Override 548 public long getLastMessageBrokerSequenceId() throws IOException { 549 return 0; 550 } 551 552 @Override 553 public long size() { 554 if ( !started.get() ) { 555 return 0; 556 } 557 try { 558 return pageFile.getDiskSize(); 559 } catch (IOException e) { 560 throw new RuntimeException(e); 561 } 562 } 563 564 @Override 565 public void beginTransaction(ConnectionContext context) throws IOException { 566 throw new IOException("Not yet implemented."); 567 } 568 @Override 569 public void commitTransaction(ConnectionContext context) throws IOException { 570 throw new IOException("Not yet implemented."); 571 } 572 @Override 573 public void rollbackTransaction(ConnectionContext context) throws IOException { 574 throw new IOException("Not yet implemented."); 575 } 576 577 @Override 578 public void checkpoint(boolean sync) throws IOException { 579 } 580 581 /////////////////////////////////////////////////////////////////// 582 // Internal conversion methods. 583 /////////////////////////////////////////////////////////////////// 584 585 586 587 KahaLocation convert(Location location) { 588 KahaLocation rc = new KahaLocation(); 589 rc.setLogId(location.getDataFileId()); 590 rc.setOffset(location.getOffset()); 591 return rc; 592 } 593 594 KahaDestination convert(ActiveMQDestination dest) { 595 KahaDestination rc = new KahaDestination(); 596 rc.setName(dest.getPhysicalName()); 597 switch( dest.getDestinationType() ) { 598 case ActiveMQDestination.QUEUE_TYPE: 599 rc.setType(DestinationType.QUEUE); 600 return rc; 601 case ActiveMQDestination.TOPIC_TYPE: 602 rc.setType(DestinationType.TOPIC); 603 return rc; 604 case ActiveMQDestination.TEMP_QUEUE_TYPE: 605 rc.setType(DestinationType.TEMP_QUEUE); 606 return rc; 607 case ActiveMQDestination.TEMP_TOPIC_TYPE: 608 rc.setType(DestinationType.TEMP_TOPIC); 609 return rc; 610 default: 611 return null; 612 } 613 } 614 615 ActiveMQDestination convert(String dest) { 616 int p = dest.indexOf(":"); 617 if( p<0 ) { 618 throw new IllegalArgumentException("Not in the valid destination format"); 619 } 620 int type = Integer.parseInt(dest.substring(0, p)); 621 String name = dest.substring(p+1); 622 623 switch( KahaDestination.DestinationType.valueOf(type) ) { 624 case QUEUE: 625 return new ActiveMQQueue(name); 626 case TOPIC: 627 return new ActiveMQTopic(name); 628 case TEMP_QUEUE: 629 return new ActiveMQTempQueue(name); 630 case TEMP_TOPIC: 631 return new ActiveMQTempTopic(name); 632 default: 633 throw new IllegalArgumentException("Not in the valid destination format"); 634 } 635 } 636 637 @Override 638 public long getLastProducerSequenceId(ProducerId id) { 639 return -1; 640 } 641 642 @Override 643 public void setBrokerService(BrokerService brokerService) { 644 this.brokerService = brokerService; 645 } 646 647 @Override 648 public void load() throws IOException { 649 if( brokerService!=null ) { 650 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 651 } 652 super.load(); 653 } 654 @Override 655 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 656 throw new UnsupportedOperationException(); 657 } 658}