001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.BrokerServiceAware; 021import org.apache.activemq.openwire.OpenWireFormat; 022import org.apache.activemq.store.JournaledStore; 023import org.apache.activemq.store.PList; 024import org.apache.activemq.store.PListStore; 025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 026import org.apache.activemq.store.kahadb.disk.journal.Journal; 027import org.apache.activemq.store.kahadb.disk.journal.Location; 028import org.apache.activemq.store.kahadb.disk.page.Page; 029import org.apache.activemq.store.kahadb.disk.page.PageFile; 030import org.apache.activemq.store.kahadb.disk.page.Transaction; 031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 033import org.apache.activemq.thread.Scheduler; 034import org.apache.activemq.util.*; 035import org.apache.activemq.wireformat.WireFormat; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import java.io.DataInput; 040import java.io.DataOutput; 041import java.io.File; 042import java.io.IOException; 043import java.util.*; 044import java.util.Map.Entry; 045 046/** 047 * @org.apache.xbean.XBean 048 */ 049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore { 050 static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class); 051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 052 053 static final int CLOSED_STATE = 1; 054 static final int OPEN_STATE = 2; 055 056 private File directory; 057 PageFile pageFile; 058 private Journal journal; 059 private LockFile lockFile; 060 private boolean failIfDatabaseIsLocked; 061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 063 private boolean enableIndexWriteAsync = false; 064 private boolean initialized = false; 065 private boolean lazyInit = true; 066 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 067 MetaData metaData = new MetaData(this); 068 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 069 Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>(); 070 final Object indexLock = new Object(); 071 private Scheduler scheduler; 072 private long cleanupInterval = 30000; 073 074 private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE; 075 private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE; 076 private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 077 private boolean indexEnablePageCaching = true; 078 079 public Object getIndexLock() { 080 return indexLock; 081 } 082 083 @Override 084 public void setBrokerService(BrokerService brokerService) { 085 this.scheduler = brokerService.getScheduler(); 086 } 087 088 public int getIndexPageSize() { 089 return indexPageSize; 090 } 091 092 public int getIndexCacheSize() { 093 return indexCacheSize; 094 } 095 096 public int getIndexWriteBatchSize() { 097 return indexWriteBatchSize; 098 } 099 100 public void setIndexPageSize(int indexPageSize) { 101 this.indexPageSize = indexPageSize; 102 } 103 104 public void setIndexCacheSize(int indexCacheSize) { 105 this.indexCacheSize = indexCacheSize; 106 } 107 108 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 109 this.indexWriteBatchSize = indexWriteBatchSize; 110 } 111 112 public boolean getIndexEnablePageCaching() { 113 return indexEnablePageCaching; 114 } 115 116 public void setIndexEnablePageCaching(boolean indexEnablePageCaching) { 117 this.indexEnablePageCaching = indexEnablePageCaching; 118 } 119 120 protected class MetaData { 121 protected MetaData(PListStoreImpl store) { 122 this.store = store; 123 } 124 125 private final PListStoreImpl store; 126 Page<MetaData> page; 127 BTreeIndex<String, PListImpl> lists; 128 129 void createIndexes(Transaction tx) throws IOException { 130 this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId()); 131 } 132 133 void load(Transaction tx) throws IOException { 134 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 135 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 136 this.lists.load(tx); 137 } 138 139 void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException { 140 for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) { 141 Entry<String, PListImpl> entry = i.next(); 142 entry.getValue().load(tx); 143 lists.put(entry.getKey(), entry.getValue()); 144 } 145 } 146 147 public void read(DataInput is) throws IOException { 148 this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong()); 149 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 150 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 151 } 152 153 public void write(DataOutput os) throws IOException { 154 os.writeLong(this.lists.getPageId()); 155 } 156 } 157 158 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 159 private final PListStoreImpl store; 160 161 MetaDataMarshaller(PListStoreImpl store) { 162 this.store = store; 163 } 164 public MetaData readPayload(DataInput dataIn) throws IOException { 165 MetaData rc = new MetaData(this.store); 166 rc.read(dataIn); 167 return rc; 168 } 169 170 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 171 object.write(dataOut); 172 } 173 } 174 175 class PListMarshaller extends VariableMarshaller<PListImpl> { 176 private final PListStoreImpl store; 177 PListMarshaller(PListStoreImpl store) { 178 this.store = store; 179 } 180 public PListImpl readPayload(DataInput dataIn) throws IOException { 181 PListImpl result = new PListImpl(this.store); 182 result.read(dataIn); 183 return result; 184 } 185 186 public void writePayload(PListImpl list, DataOutput dataOut) throws IOException { 187 list.write(dataOut); 188 } 189 } 190 191 public Journal getJournal() { 192 return this.journal; 193 } 194 195 @Override 196 public File getDirectory() { 197 return directory; 198 } 199 200 @Override 201 public void setDirectory(File directory) { 202 this.directory = directory; 203 } 204 205 public long size() { 206 synchronized (this) { 207 if (!initialized) { 208 return 0; 209 } 210 } 211 try { 212 return journal.getDiskSize() + pageFile.getDiskSize(); 213 } catch (IOException e) { 214 throw new RuntimeException(e); 215 } 216 } 217 218 @Override 219 public PListImpl getPList(final String name) throws Exception { 220 if (!isStarted()) { 221 throw new IllegalStateException("Not started"); 222 } 223 intialize(); 224 synchronized (indexLock) { 225 synchronized (this) { 226 PListImpl result = this.persistentLists.get(name); 227 if (result == null) { 228 final PListImpl pl = new PListImpl(this); 229 pl.setName(name); 230 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 231 public void execute(Transaction tx) throws IOException { 232 pl.setHeadPageId(tx.allocate().getPageId()); 233 pl.load(tx); 234 metaData.lists.put(tx, name, pl); 235 } 236 }); 237 result = pl; 238 this.persistentLists.put(name, pl); 239 } 240 final PListImpl toLoad = result; 241 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 242 public void execute(Transaction tx) throws IOException { 243 toLoad.load(tx); 244 } 245 }); 246 247 return result; 248 } 249 } 250 } 251 252 @Override 253 public boolean removePList(final String name) throws Exception { 254 boolean result = false; 255 synchronized (indexLock) { 256 synchronized (this) { 257 final PList pl = this.persistentLists.remove(name); 258 result = pl != null; 259 if (result) { 260 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 261 public void execute(Transaction tx) throws IOException { 262 metaData.lists.remove(tx, name); 263 pl.destroy(); 264 } 265 }); 266 } 267 } 268 } 269 return result; 270 } 271 272 protected synchronized void intialize() throws Exception { 273 if (isStarted()) { 274 if (this.initialized == false) { 275 if (this.directory == null) { 276 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 277 } 278 IOHelper.mkdirs(this.directory); 279 IOHelper.deleteChildren(this.directory); 280 lock(); 281 this.journal = new Journal(); 282 this.journal.setDirectory(directory); 283 this.journal.setMaxFileLength(getJournalMaxFileLength()); 284 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 285 this.journal.start(); 286 this.pageFile = new PageFile(directory, "tmpDB"); 287 this.pageFile.setEnablePageCaching(getIndexEnablePageCaching()); 288 this.pageFile.setPageSize(getIndexPageSize()); 289 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize()); 290 this.pageFile.setPageCacheSize(getIndexCacheSize()); 291 this.pageFile.load(); 292 293 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 294 public void execute(Transaction tx) throws IOException { 295 if (pageFile.getPageCount() == 0) { 296 Page<MetaData> page = tx.allocate(); 297 assert page.getPageId() == 0; 298 page.set(metaData); 299 metaData.page = page; 300 metaData.createIndexes(tx); 301 tx.store(metaData.page, metaDataMarshaller, true); 302 303 } else { 304 Page<MetaData> page = tx.load(0, metaDataMarshaller); 305 metaData = page.get(); 306 metaData.page = page; 307 } 308 metaData.load(tx); 309 metaData.loadLists(tx, persistentLists); 310 } 311 }); 312 this.pageFile.flush(); 313 314 if (cleanupInterval > 0) { 315 if (scheduler == null) { 316 scheduler = new Scheduler(PListStoreImpl.class.getSimpleName()); 317 scheduler.start(); 318 } 319 scheduler.executePeriodically(this, cleanupInterval); 320 } 321 this.initialized = true; 322 LOG.info(this + " initialized"); 323 } 324 } 325 } 326 327 @Override 328 protected synchronized void doStart() throws Exception { 329 if (!lazyInit) { 330 intialize(); 331 } 332 LOG.info(this + " started"); 333 } 334 335 @Override 336 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 337 if (scheduler != null) { 338 if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) { 339 scheduler.stop(); 340 scheduler = null; 341 } 342 } 343 for (PListImpl pl : this.persistentLists.values()) { 344 pl.unload(null); 345 } 346 if (this.pageFile != null) { 347 this.pageFile.unload(); 348 } 349 if (this.journal != null) { 350 journal.close(); 351 } 352 if (this.lockFile != null) { 353 this.lockFile.unlock(); 354 } 355 this.lockFile = null; 356 this.initialized = false; 357 LOG.info(this + " stopped"); 358 359 } 360 361 public void run() { 362 try { 363 if (isStopping()) { 364 return; 365 } 366 final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId(); 367 final Set<Integer> candidates = journal.getFileMap().keySet(); 368 LOG.trace("Full gc candidate set:" + candidates); 369 if (candidates.size() > 1) { 370 // prune current write 371 for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) { 372 if (iterator.next() >= lastJournalFileId) { 373 iterator.remove(); 374 } 375 } 376 List<PListImpl> plists = null; 377 synchronized (indexLock) { 378 synchronized (this) { 379 plists = new ArrayList<PListImpl>(persistentLists.values()); 380 } 381 } 382 for (PListImpl list : plists) { 383 list.claimFileLocations(candidates); 384 if (isStopping()) { 385 return; 386 } 387 LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates); 388 } 389 LOG.trace("GC Candidate set:" + candidates); 390 this.journal.removeDataFiles(candidates); 391 } 392 } catch (IOException e) { 393 LOG.error("Exception on periodic cleanup: " + e, e); 394 } 395 } 396 397 ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 398 ByteSequence result = null; 399 result = this.journal.read(location); 400 return result; 401 } 402 403 Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 404 return this.journal.write(payload, sync); 405 } 406 407 private void lock() throws IOException { 408 if (lockFile == null) { 409 File lockFileName = new File(directory, "lock"); 410 lockFile = new LockFile(lockFileName, true); 411 if (failIfDatabaseIsLocked) { 412 lockFile.lock(); 413 } else { 414 while (true) { 415 try { 416 lockFile.lock(); 417 break; 418 } catch (IOException e) { 419 LOG.info("Database " + lockFileName + " is locked... waiting " 420 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 421 + " seconds for the database to be unlocked. Reason: " + e); 422 try { 423 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 424 } catch (InterruptedException e1) { 425 } 426 } 427 } 428 } 429 } 430 } 431 432 PageFile getPageFile() { 433 this.pageFile.isLoaded(); 434 return this.pageFile; 435 } 436 437 public boolean isFailIfDatabaseIsLocked() { 438 return failIfDatabaseIsLocked; 439 } 440 441 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 442 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 443 } 444 445 public int getJournalMaxFileLength() { 446 return journalMaxFileLength; 447 } 448 449 public void setJournalMaxFileLength(int journalMaxFileLength) { 450 this.journalMaxFileLength = journalMaxFileLength; 451 } 452 453 public int getJournalMaxWriteBatchSize() { 454 return journalMaxWriteBatchSize; 455 } 456 457 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 458 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 459 } 460 461 public boolean isEnableIndexWriteAsync() { 462 return enableIndexWriteAsync; 463 } 464 465 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 466 this.enableIndexWriteAsync = enableIndexWriteAsync; 467 } 468 469 public long getCleanupInterval() { 470 return cleanupInterval; 471 } 472 473 public void setCleanupInterval(long cleanupInterval) { 474 this.cleanupInterval = cleanupInterval; 475 } 476 477 public boolean isLazyInit() { 478 return lazyInit; 479 } 480 481 public void setLazyInit(boolean lazyInit) { 482 this.lazyInit = lazyInit; 483 } 484 485 @Override 486 public String toString() { 487 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 488 return "PListStore:[" + path + "]"; 489 } 490}