001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Date; 022import java.util.HashSet; 023import java.util.Set; 024import java.util.TreeSet; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import org.apache.activemq.broker.Broker; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageAck; 032import org.apache.activemq.command.MessageId; 033import org.apache.activemq.command.TransactionId; 034import org.apache.activemq.command.XATransactionId; 035import org.apache.activemq.store.AbstractMessageStore; 036import org.apache.activemq.store.ListenableFuture; 037import org.apache.activemq.store.MessageStore; 038import org.apache.activemq.store.PersistenceAdapter; 039import org.apache.activemq.store.ProxyMessageStore; 040import org.apache.activemq.store.ProxyTopicMessageStore; 041import org.apache.activemq.store.TopicMessageStore; 042import org.apache.activemq.store.TransactionRecoveryListener; 043import org.apache.activemq.store.TransactionStore; 044import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 045import org.apache.activemq.store.kahadb.data.KahaEntryType; 046import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 047import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 048import org.apache.activemq.store.kahadb.disk.journal.Journal; 049import org.apache.activemq.store.kahadb.disk.journal.Location; 050import org.apache.activemq.util.DataByteArrayInputStream; 051import org.apache.activemq.util.DataByteArrayOutputStream; 052import org.apache.activemq.util.IOHelper; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056public class MultiKahaDBTransactionStore implements TransactionStore { 057 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 058 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 059 final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 060 final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>(); 061 private Journal journal; 062 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 063 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 064 065 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 066 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 067 } 068 069 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 070 return new ProxyMessageStore(messageStore) { 071 @Override 072 public void addMessage(ConnectionContext context, final Message send) throws IOException { 073 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 074 } 075 076 @Override 077 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 078 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 079 } 080 081 @Override 082 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 083 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 084 } 085 086 @Override 087 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 088 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 089 } 090 091 @Override 092 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 093 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 094 } 095 096 @Override 097 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 098 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 099 } 100 }; 101 } 102 103 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 104 return new ProxyTopicMessageStore(messageStore) { 105 @Override 106 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 107 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 108 } 109 110 @Override 111 public void addMessage(ConnectionContext context, final Message send) throws IOException { 112 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 113 } 114 115 @Override 116 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 117 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 118 } 119 120 @Override 121 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 122 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 123 } 124 125 @Override 126 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 127 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 128 } 129 130 @Override 131 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 132 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 133 } 134 135 @Override 136 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 137 MessageId messageId, MessageAck ack) throws IOException { 138 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 139 subscriptionName, messageId, ack); 140 } 141 }; 142 } 143 144 public void deleteAllMessages() { 145 IOHelper.deleteChildren(getDirectory()); 146 } 147 148 public int getJournalMaxFileLength() { 149 return journalMaxFileLength; 150 } 151 152 public void setJournalMaxFileLength(int journalMaxFileLength) { 153 this.journalMaxFileLength = journalMaxFileLength; 154 } 155 156 public int getJournalMaxWriteBatchSize() { 157 return journalWriteBatchSize; 158 } 159 160 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 161 this.journalWriteBatchSize = journalWriteBatchSize; 162 } 163 164 public class Tx { 165 private final Set<TransactionStore> stores = new HashSet<TransactionStore>(); 166 private int prepareLocationId = 0; 167 168 public void trackStore(TransactionStore store) { 169 stores.add(store); 170 } 171 172 public Set<TransactionStore> getStores() { 173 return stores; 174 } 175 176 public void trackPrepareLocation(Location location) { 177 this.prepareLocationId = location.getDataFileId(); 178 } 179 180 public int getPreparedLocationId() { 181 return prepareLocationId; 182 } 183 } 184 185 public Tx getTx(TransactionId txid) { 186 Tx tx = inflightTransactions.get(txid); 187 if (tx == null) { 188 tx = new Tx(); 189 inflightTransactions.put(txid, tx); 190 } 191 return tx; 192 } 193 194 public Tx removeTx(TransactionId txid) { 195 return inflightTransactions.remove(txid); 196 } 197 198 @Override 199 public void prepare(TransactionId txid) throws IOException { 200 Tx tx = getTx(txid); 201 for (TransactionStore store : tx.getStores()) { 202 store.prepare(txid); 203 } 204 } 205 206 @Override 207 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 208 throws IOException { 209 210 if (preCommit != null) { 211 preCommit.run(); 212 } 213 214 Tx tx = getTx(txid); 215 if (wasPrepared) { 216 for (TransactionStore store : tx.getStores()) { 217 store.commit(txid, true, null, null); 218 } 219 } else { 220 // can only do 1pc on a single store 221 if (tx.getStores().size() == 1) { 222 for (TransactionStore store : tx.getStores()) { 223 store.commit(txid, false, null, null); 224 } 225 } else { 226 // need to do local 2pc 227 for (TransactionStore store : tx.getStores()) { 228 store.prepare(txid); 229 } 230 persistOutcome(tx, txid); 231 for (TransactionStore store : tx.getStores()) { 232 store.commit(txid, true, null, null); 233 } 234 persistCompletion(txid); 235 } 236 } 237 removeTx(txid); 238 if (postCommit != null) { 239 postCommit.run(); 240 } 241 } 242 243 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 244 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); 245 } 246 247 public void persistCompletion(TransactionId txid) throws IOException { 248 store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 249 } 250 251 private Location store(JournalCommand<?> data) throws IOException { 252 int size = data.serializedSizeFramed(); 253 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 254 os.writeByte(data.type().getNumber()); 255 data.writeFramed(os); 256 Location location = journal.write(os.toByteSequence(), true); 257 journal.setLastAppendLocation(location); 258 return location; 259 } 260 261 @Override 262 public void rollback(TransactionId txid) throws IOException { 263 Tx tx = removeTx(txid); 264 if (tx != null) { 265 for (TransactionStore store : tx.getStores()) { 266 store.rollback(txid); 267 } 268 } 269 } 270 271 @Override 272 public void start() throws Exception { 273 journal = new Journal() { 274 @Override 275 protected void cleanup() { 276 super.cleanup(); 277 txStoreCleanup(); 278 } 279 }; 280 journal.setDirectory(getDirectory()); 281 journal.setMaxFileLength(journalMaxFileLength); 282 journal.setWriteBatchSize(journalWriteBatchSize); 283 IOHelper.mkdirs(journal.getDirectory()); 284 journal.start(); 285 recoverPendingLocalTransactions(); 286 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 287 } 288 289 private void txStoreCleanup() { 290 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 291 for (Tx tx : inflightTransactions.values()) { 292 knownDataFileIds.remove(tx.getPreparedLocationId()); 293 } 294 try { 295 journal.removeDataFiles(knownDataFileIds); 296 } catch (Exception e) { 297 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 298 } 299 } 300 301 private File getDirectory() { 302 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 303 } 304 305 @Override 306 public void stop() throws Exception { 307 journal.close(); 308 journal = null; 309 } 310 311 private void recoverPendingLocalTransactions() throws IOException { 312 Location location = journal.getNextLocation(null); 313 while (location != null) { 314 process(load(location)); 315 location = journal.getNextLocation(location); 316 } 317 recoveredPendingCommit.addAll(inflightTransactions.keySet()); 318 LOG.info("pending local transactions: " + recoveredPendingCommit); 319 } 320 321 public JournalCommand<?> load(Location location) throws IOException { 322 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 323 byte readByte = is.readByte(); 324 KahaEntryType type = KahaEntryType.valueOf(readByte); 325 if (type == null) { 326 throw new IOException("Could not load journal record. Invalid location: " + location); 327 } 328 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 329 message.mergeFramed(is); 330 return message; 331 } 332 333 public void process(JournalCommand<?> command) throws IOException { 334 switch (command.type()) { 335 case KAHA_PREPARE_COMMAND: 336 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 337 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())); 338 break; 339 case KAHA_COMMIT_COMMAND: 340 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 341 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 342 break; 343 case KAHA_TRACE_COMMAND: 344 break; 345 default: 346 throw new IOException("Unexpected command in transaction journal: " + command); 347 } 348 } 349 350 351 @Override 352 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 353 354 for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 355 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 356 @Override 357 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 358 try { 359 getTx(xid).trackStore(adapter.createTransactionStore()); 360 } catch (IOException e) { 361 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 362 } 363 listener.recover(xid, addedMessages, acks); 364 } 365 }); 366 } 367 368 try { 369 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 370 // force completion of local xa 371 for (TransactionId txid : broker.getPreparedTransactions(null)) { 372 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 373 try { 374 if (recoveredPendingCommit.contains(txid)) { 375 LOG.info("delivering pending commit outcome for tid: " + txid); 376 broker.commitTransaction(null, txid, false); 377 378 } else { 379 LOG.info("delivering rollback outcome to store for tid: " + txid); 380 broker.forgetTransaction(null, txid); 381 } 382 persistCompletion(txid); 383 } catch (Exception ex) { 384 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 385 } 386 } 387 } 388 } catch (Exception e) { 389 LOG.error("failed to resolve pending local transactions", e); 390 } 391 } 392 393 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 394 throws IOException { 395 if (message.getTransactionId() != null) { 396 getTx(message.getTransactionId()).trackStore(transactionStore); 397 } 398 destination.addMessage(context, message); 399 } 400 401 ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 402 throws IOException { 403 if (message.getTransactionId() != null) { 404 getTx(message.getTransactionId()).trackStore(transactionStore); 405 destination.addMessage(context, message); 406 return AbstractMessageStore.FUTURE; 407 } else { 408 return destination.asyncAddQueueMessage(context, message); 409 } 410 } 411 412 ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 413 throws IOException { 414 415 if (message.getTransactionId() != null) { 416 getTx(message.getTransactionId()).trackStore(transactionStore); 417 destination.addMessage(context, message); 418 return AbstractMessageStore.FUTURE; 419 } else { 420 return destination.asyncAddTopicMessage(context, message); 421 } 422 } 423 424 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 425 throws IOException { 426 if (ack.getTransactionId() != null) { 427 getTx(ack.getTransactionId()).trackStore(transactionStore); 428 } 429 destination.removeMessage(context, ack); 430 } 431 432 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 433 throws IOException { 434 if (ack.getTransactionId() != null) { 435 getTx(ack.getTransactionId()).trackStore(transactionStore); 436 } 437 destination.removeAsyncMessage(context, ack); 438 } 439 440 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 441 final String clientId, final String subscriptionName, 442 final MessageId messageId, final MessageAck ack) throws IOException { 443 if (ack.getTransactionId() != null) { 444 getTx(ack.getTransactionId()).trackStore(transactionStore); 445 } 446 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 447 } 448 449}