001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.store.kahadb; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.Date; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.AtomicLong; 025import java.util.concurrent.locks.ReentrantReadWriteLock; 026 027import org.apache.activemq.broker.LockableServiceSupport; 028import org.apache.activemq.broker.Locker; 029import org.apache.activemq.store.SharedFileLocker; 030import org.apache.activemq.store.kahadb.data.KahaEntryType; 031import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 032import org.apache.activemq.store.kahadb.disk.journal.Journal; 033import org.apache.activemq.store.kahadb.disk.journal.Location; 034import org.apache.activemq.store.kahadb.disk.page.PageFile; 035import org.apache.activemq.store.kahadb.disk.page.Transaction; 036import org.apache.activemq.util.ByteSequence; 037import org.apache.activemq.util.DataByteArrayInputStream; 038import org.apache.activemq.util.DataByteArrayOutputStream; 039import org.apache.activemq.util.IOHelper; 040import org.apache.activemq.util.ServiceStopper; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044public abstract class AbstractKahaDBStore extends LockableServiceSupport { 045 046 static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class); 047 048 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 049 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 050 051 protected File directory; 052 protected PageFile pageFile; 053 protected Journal journal; 054 protected AtomicLong journalSize = new AtomicLong(0); 055 protected boolean failIfDatabaseIsLocked; 056 protected long checkpointInterval = 5*1000; 057 protected long cleanupInterval = 30*1000; 058 protected boolean checkForCorruptJournalFiles = false; 059 protected boolean checksumJournalFiles = true; 060 protected boolean forceRecoverIndex = false; 061 protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 062 protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 063 protected boolean archiveCorruptedIndex = false; 064 protected boolean enableIndexWriteAsync = false; 065 protected boolean enableJournalDiskSyncs = false; 066 protected boolean deleteAllJobs = false; 067 protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 068 protected boolean useIndexLFRUEviction = false; 069 protected float indexLFUEvictionFactor = 0.2f; 070 protected boolean ignoreMissingJournalfiles = false; 071 protected int indexCacheSize = 1000; 072 protected boolean enableIndexDiskSyncs = true; 073 protected boolean enableIndexRecoveryFile = true; 074 protected boolean enableIndexPageCaching = true; 075 protected boolean archiveDataLogs; 076 protected boolean purgeStoreOnStartup; 077 protected File directoryArchive; 078 079 protected AtomicBoolean opened = new AtomicBoolean(); 080 protected Thread checkpointThread; 081 protected final Object checkpointThreadLock = new Object(); 082 protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 083 protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 084 085 /** 086 * @return the name to give this store's PageFile instance. 087 */ 088 protected abstract String getPageFileName(); 089 090 /** 091 * @return the location of the data directory if no set by configuration. 092 */ 093 protected abstract File getDefaultDataDirectory(); 094 095 /** 096 * Loads the store from disk. 097 * 098 * Based on configuration this method can either load an existing store or it can purge 099 * an existing store and start in a clean state. 100 * 101 * @throws IOException if an error occurs during the load. 102 */ 103 public abstract void load() throws IOException; 104 105 /** 106 * Unload the state of the Store to disk and shuts down all resources assigned to this 107 * KahaDB store implementation. 108 * 109 * @throws IOException if an error occurs during the store unload. 110 */ 111 public abstract void unload() throws IOException; 112 113 @Override 114 protected void doStart() throws Exception { 115 this.indexLock.writeLock().lock(); 116 if (getDirectory() == null) { 117 setDirectory(getDefaultDataDirectory()); 118 } 119 IOHelper.mkdirs(getDirectory()); 120 try { 121 if (isPurgeStoreOnStartup()) { 122 getJournal().start(); 123 getJournal().delete(); 124 getJournal().close(); 125 journal = null; 126 getPageFile().delete(); 127 LOG.info("{} Persistence store purged.", this); 128 setPurgeStoreOnStartup(false); 129 } 130 131 load(); 132 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 133 } finally { 134 this.indexLock.writeLock().unlock(); 135 } 136 } 137 138 @Override 139 protected void doStop(ServiceStopper stopper) throws Exception { 140 unload(); 141 } 142 143 public PageFile getPageFile() { 144 if (pageFile == null) { 145 pageFile = createPageFile(); 146 } 147 return pageFile; 148 } 149 150 public Journal getJournal() throws IOException { 151 if (journal == null) { 152 journal = createJournal(); 153 } 154 return journal; 155 } 156 157 public File getDirectory() { 158 return directory; 159 } 160 161 public void setDirectory(File directory) { 162 this.directory = directory; 163 } 164 165 public boolean isArchiveCorruptedIndex() { 166 return archiveCorruptedIndex; 167 } 168 169 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 170 this.archiveCorruptedIndex = archiveCorruptedIndex; 171 } 172 173 public boolean isFailIfDatabaseIsLocked() { 174 return failIfDatabaseIsLocked; 175 } 176 177 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 178 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 179 } 180 181 public boolean isCheckForCorruptJournalFiles() { 182 return checkForCorruptJournalFiles; 183 } 184 185 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 186 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 187 } 188 189 public long getCheckpointInterval() { 190 return checkpointInterval; 191 } 192 193 public void setCheckpointInterval(long checkpointInterval) { 194 this.checkpointInterval = checkpointInterval; 195 } 196 197 public long getCleanupInterval() { 198 return cleanupInterval; 199 } 200 201 public void setCleanupInterval(long cleanupInterval) { 202 this.cleanupInterval = cleanupInterval; 203 } 204 205 public boolean isChecksumJournalFiles() { 206 return checksumJournalFiles; 207 } 208 209 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 210 this.checksumJournalFiles = checksumJournalFiles; 211 } 212 213 public boolean isForceRecoverIndex() { 214 return forceRecoverIndex; 215 } 216 217 public void setForceRecoverIndex(boolean forceRecoverIndex) { 218 this.forceRecoverIndex = forceRecoverIndex; 219 } 220 221 public int getJournalMaxFileLength() { 222 return journalMaxFileLength; 223 } 224 225 public void setJournalMaxFileLength(int journalMaxFileLength) { 226 this.journalMaxFileLength = journalMaxFileLength; 227 } 228 229 public int getJournalMaxWriteBatchSize() { 230 return journalMaxWriteBatchSize; 231 } 232 233 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 234 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 235 } 236 237 public boolean isEnableIndexWriteAsync() { 238 return enableIndexWriteAsync; 239 } 240 241 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 242 this.enableIndexWriteAsync = enableIndexWriteAsync; 243 } 244 245 public boolean isEnableJournalDiskSyncs() { 246 return enableJournalDiskSyncs; 247 } 248 249 public void setEnableJournalDiskSyncs(boolean syncWrites) { 250 this.enableJournalDiskSyncs = syncWrites; 251 } 252 253 public boolean isDeleteAllJobs() { 254 return deleteAllJobs; 255 } 256 257 public void setDeleteAllJobs(boolean deleteAllJobs) { 258 this.deleteAllJobs = deleteAllJobs; 259 } 260 261 /** 262 * @return the archiveDataLogs 263 */ 264 public boolean isArchiveDataLogs() { 265 return this.archiveDataLogs; 266 } 267 268 /** 269 * @param archiveDataLogs the archiveDataLogs to set 270 */ 271 public void setArchiveDataLogs(boolean archiveDataLogs) { 272 this.archiveDataLogs = archiveDataLogs; 273 } 274 275 /** 276 * @return the directoryArchive 277 */ 278 public File getDirectoryArchive() { 279 return this.directoryArchive; 280 } 281 282 /** 283 * @param directoryArchive the directoryArchive to set 284 */ 285 public void setDirectoryArchive(File directoryArchive) { 286 this.directoryArchive = directoryArchive; 287 } 288 289 public int getIndexCacheSize() { 290 return indexCacheSize; 291 } 292 293 public void setIndexCacheSize(int indexCacheSize) { 294 this.indexCacheSize = indexCacheSize; 295 } 296 297 public int getIndexWriteBatchSize() { 298 return indexWriteBatchSize; 299 } 300 301 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 302 this.indexWriteBatchSize = indexWriteBatchSize; 303 } 304 305 public boolean isUseIndexLFRUEviction() { 306 return useIndexLFRUEviction; 307 } 308 309 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 310 this.useIndexLFRUEviction = useIndexLFRUEviction; 311 } 312 313 public float getIndexLFUEvictionFactor() { 314 return indexLFUEvictionFactor; 315 } 316 317 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 318 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 319 } 320 321 public boolean isEnableIndexDiskSyncs() { 322 return enableIndexDiskSyncs; 323 } 324 325 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 326 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 327 } 328 329 public boolean isEnableIndexRecoveryFile() { 330 return enableIndexRecoveryFile; 331 } 332 333 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 334 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 335 } 336 337 public boolean isEnableIndexPageCaching() { 338 return enableIndexPageCaching; 339 } 340 341 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 342 this.enableIndexPageCaching = enableIndexPageCaching; 343 } 344 345 public boolean isPurgeStoreOnStartup() { 346 return this.purgeStoreOnStartup; 347 } 348 349 public void setPurgeStoreOnStartup(boolean purge) { 350 this.purgeStoreOnStartup = purge; 351 } 352 353 public boolean isIgnoreMissingJournalfiles() { 354 return ignoreMissingJournalfiles; 355 } 356 357 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 358 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 359 } 360 361 public long size() { 362 if (!isStarted()) { 363 return 0; 364 } 365 try { 366 return journalSize.get() + pageFile.getDiskSize(); 367 } catch (IOException e) { 368 throw new RuntimeException(e); 369 } 370 } 371 372 @Override 373 public Locker createDefaultLocker() throws IOException { 374 SharedFileLocker locker = new SharedFileLocker(); 375 locker.setDirectory(this.getDirectory()); 376 return locker; 377 } 378 379 @Override 380 public void init() throws Exception { 381 } 382 383 /** 384 * Store a command in the Journal and process to update the Store index. 385 * 386 * @param command 387 * The specific JournalCommand to store and process. 388 * 389 * @returns the Location where the data was written in the Journal. 390 * 391 * @throws IOException if an error occurs storing or processing the command. 392 */ 393 public Location store(JournalCommand<?> command) throws IOException { 394 return store(command, isEnableIndexDiskSyncs(), null, null, null); 395 } 396 397 /** 398 * Store a command in the Journal and process to update the Store index. 399 * 400 * @param command 401 * The specific JournalCommand to store and process. 402 * @param sync 403 * Should the store operation be done synchronously. (ignored if completion passed). 404 * 405 * @returns the Location where the data was written in the Journal. 406 * 407 * @throws IOException if an error occurs storing or processing the command. 408 */ 409 public Location store(JournalCommand<?> command, boolean sync) throws IOException { 410 return store(command, sync, null, null, null); 411 } 412 413 /** 414 * Store a command in the Journal and process to update the Store index. 415 * 416 * @param command 417 * The specific JournalCommand to store and process. 418 * @param onJournalStoreComplete 419 * The Runnable to call when the Journal write operation completes. 420 * 421 * @returns the Location where the data was written in the Journal. 422 * 423 * @throws IOException if an error occurs storing or processing the command. 424 */ 425 public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException { 426 return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete); 427 } 428 429 /** 430 * Store a command in the Journal and process to update the Store index. 431 * 432 * @param command 433 * The specific JournalCommand to store and process. 434 * @param sync 435 * Should the store operation be done synchronously. (ignored if completion passed). 436 * @param before 437 * The Runnable instance to execute before performing the store and process operation. 438 * @param after 439 * The Runnable instance to execute after performing the store and process operation. 440 * 441 * @returns the Location where the data was written in the Journal. 442 * 443 * @throws IOException if an error occurs storing or processing the command. 444 */ 445 public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException { 446 return store(command, sync, before, after, null); 447 } 448 449 /** 450 * All updated are are funneled through this method. The updates are converted to a 451 * JournalMessage which is logged to the journal and then the data from the JournalMessage 452 * is used to update the index just like it would be done during a recovery process. 453 * 454 * @param command 455 * The specific JournalCommand to store and process. 456 * @param sync 457 * Should the store operation be done synchronously. (ignored if completion passed). 458 * @param before 459 * The Runnable instance to execute before performing the store and process operation. 460 * @param after 461 * The Runnable instance to execute after performing the store and process operation. 462 * @param onJournalStoreComplete 463 * Callback to be run when the journal write operation is complete. 464 * 465 * @returns the Location where the data was written in the Journal. 466 * 467 * @throws IOException if an error occurs storing or processing the command. 468 */ 469 public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 470 try { 471 472 if (before != null) { 473 before.run(); 474 } 475 476 ByteSequence sequence = toByteSequence(command); 477 Location location; 478 checkpointLock.readLock().lock(); 479 try { 480 481 long start = System.currentTimeMillis(); 482 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : 483 journal.write(sequence, onJournalStoreComplete); 484 long start2 = System.currentTimeMillis(); 485 486 process(command, location); 487 488 long end = System.currentTimeMillis(); 489 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 490 LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms", 491 (start2-start), (end-start2)); 492 } 493 } finally { 494 checkpointLock.readLock().unlock(); 495 } 496 497 if (after != null) { 498 after.run(); 499 } 500 501 if (checkpointThread != null && !checkpointThread.isAlive()) { 502 startCheckpoint(); 503 } 504 return location; 505 } catch (IOException ioe) { 506 LOG.error("KahaDB failed to store to Journal", ioe); 507 if (brokerService != null) { 508 brokerService.handleIOException(ioe); 509 } 510 throw ioe; 511 } 512 } 513 514 /** 515 * Loads a previously stored JournalMessage 516 * 517 * @param location 518 * The location of the journal command to read. 519 * 520 * @return a new un-marshaled JournalCommand instance. 521 * 522 * @throws IOException if an error occurs reading the stored command. 523 */ 524 protected JournalCommand<?> load(Location location) throws IOException { 525 ByteSequence data = journal.read(location); 526 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 527 byte readByte = is.readByte(); 528 KahaEntryType type = KahaEntryType.valueOf(readByte); 529 if (type == null) { 530 try { 531 is.close(); 532 } catch (IOException e) { 533 } 534 throw new IOException("Could not load journal record. Invalid location: " + location); 535 } 536 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 537 message.mergeFramed(is); 538 return message; 539 } 540 541 /** 542 * Process a stored or recovered JournalCommand instance and update the DB Index with the 543 * state changes that this command produces. This can be called either as a new DB operation 544 * or as a replay during recovery operations. 545 * 546 * @param command 547 * The JournalCommand to process. 548 * @param location 549 * The location in the Journal where the command was written or read from. 550 */ 551 protected abstract void process(JournalCommand<?> command, Location location) throws IOException; 552 553 /** 554 * Perform a checkpoint operation with optional cleanup. 555 * 556 * Called by the checkpoint background thread periodically to initiate a checkpoint operation 557 * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no 558 * longer needed journal log files etc. 559 * 560 * @param cleanup 561 * Should the method do a simple checkpoint or also perform a journal cleanup. 562 * 563 * @throws IOException if an error occurs during the checkpoint operation. 564 */ 565 protected void checkpointUpdate(final boolean cleanup) throws IOException { 566 checkpointLock.writeLock().lock(); 567 try { 568 this.indexLock.writeLock().lock(); 569 try { 570 pageFile.tx().execute(new Transaction.Closure<IOException>() { 571 @Override 572 public void execute(Transaction tx) throws IOException { 573 checkpointUpdate(tx, cleanup); 574 } 575 }); 576 } finally { 577 this.indexLock.writeLock().unlock(); 578 } 579 580 } finally { 581 checkpointLock.writeLock().unlock(); 582 } 583 } 584 585 /** 586 * Perform the checkpoint update operation. If the cleanup flag is true then the 587 * operation should also purge any unused Journal log files. 588 * 589 * This method must always be called with the checkpoint and index write locks held. 590 * 591 * @param tx 592 * The TX under which to perform the checkpoint update. 593 * @param cleanup 594 * Should the checkpoint also do unused Journal file cleanup. 595 * 596 * @throws IOException if an error occurs while performing the checkpoint. 597 */ 598 protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException; 599 600 /** 601 * Creates a new ByteSequence that represents the marshaled form of the given Journal Command. 602 * 603 * @param command 604 * The Journal Command that should be marshaled to bytes for writing. 605 * 606 * @return the byte representation of the given journal command. 607 * 608 * @throws IOException if an error occurs while serializing the command. 609 */ 610 protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 611 int size = data.serializedSizeFramed(); 612 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 613 os.writeByte(data.type().getNumber()); 614 data.writeFramed(os); 615 return os.toByteSequence(); 616 } 617 618 /** 619 * Create the PageFile instance and configure it using the configuration options 620 * currently set. 621 * 622 * @return the newly created and configured PageFile instance. 623 */ 624 protected PageFile createPageFile() { 625 PageFile index = new PageFile(getDirectory(), getPageFileName()); 626 index.setEnableWriteThread(isEnableIndexWriteAsync()); 627 index.setWriteBatchSize(getIndexWriteBatchSize()); 628 index.setPageCacheSize(getIndexCacheSize()); 629 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 630 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 631 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 632 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 633 index.setEnablePageCaching(isEnableIndexPageCaching()); 634 return index; 635 } 636 637 /** 638 * Create a new Journal instance and configure it using the currently set configuration 639 * options. If an archive directory is configured than this method will attempt to create 640 * that directory if it does not already exist. 641 * 642 * @return the newly created an configured Journal instance. 643 * 644 * @throws IOException if an error occurs while creating the Journal object. 645 */ 646 protected Journal createJournal() throws IOException { 647 Journal manager = new Journal(); 648 manager.setDirectory(getDirectory()); 649 manager.setMaxFileLength(getJournalMaxFileLength()); 650 manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles()); 651 manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles()); 652 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 653 manager.setArchiveDataLogs(isArchiveDataLogs()); 654 manager.setSizeAccumulator(journalSize); 655 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 656 if (getDirectoryArchive() != null) { 657 IOHelper.mkdirs(getDirectoryArchive()); 658 manager.setDirectoryArchive(getDirectoryArchive()); 659 } 660 return manager; 661 } 662 663 /** 664 * Starts the checkpoint Thread instance if not already running and not disabled 665 * by configuration. 666 */ 667 protected void startCheckpoint() { 668 if (checkpointInterval == 0 && cleanupInterval == 0) { 669 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 670 return; 671 } 672 synchronized (checkpointThreadLock) { 673 boolean start = false; 674 if (checkpointThread == null) { 675 start = true; 676 } else if (!checkpointThread.isAlive()) { 677 start = true; 678 LOG.info("KahaDB: Recovering checkpoint thread after death"); 679 } 680 if (start) { 681 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 682 @Override 683 public void run() { 684 try { 685 long lastCleanup = System.currentTimeMillis(); 686 long lastCheckpoint = System.currentTimeMillis(); 687 // Sleep for a short time so we can periodically check 688 // to see if we need to exit this thread. 689 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 690 while (opened.get()) { 691 Thread.sleep(sleepTime); 692 long now = System.currentTimeMillis(); 693 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { 694 checkpointCleanup(true); 695 lastCleanup = now; 696 lastCheckpoint = now; 697 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { 698 checkpointCleanup(false); 699 lastCheckpoint = now; 700 } 701 } 702 } catch (InterruptedException e) { 703 // Looks like someone really wants us to exit this thread... 704 } catch (IOException ioe) { 705 LOG.error("Checkpoint failed", ioe); 706 brokerService.handleIOException(ioe); 707 } 708 } 709 }; 710 711 checkpointThread.setDaemon(true); 712 checkpointThread.start(); 713 } 714 } 715 } 716 717 /** 718 * Called from the worker thread to start a checkpoint. 719 * 720 * This method ensure that the store is in an opened state and optionaly logs information 721 * related to slow store access times. 722 * 723 * @param cleanup 724 * Should a cleanup of the journal occur during the checkpoint operation. 725 * 726 * @throws IOException if an error occurs during the checkpoint operation. 727 */ 728 protected void checkpointCleanup(final boolean cleanup) throws IOException { 729 long start; 730 this.indexLock.writeLock().lock(); 731 try { 732 start = System.currentTimeMillis(); 733 if (!opened.get()) { 734 return; 735 } 736 } finally { 737 this.indexLock.writeLock().unlock(); 738 } 739 checkpointUpdate(cleanup); 740 long end = System.currentTimeMillis(); 741 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 742 LOG.info("Slow KahaDB access: cleanup took {}", (end - start)); 743 } 744 } 745}