001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025 026/** 027 * Used to pool DataFileAccessors. 028 * 029 * @author chirino 030 */ 031public class DataFileAccessorPool { 032 033 private final Journal journal; 034 private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>(); 035 private boolean closed; 036 private int maxOpenReadersPerFile = 5; 037 038 class Pool { 039 040 private final DataFile file; 041 private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>(); 042 private boolean used; 043 private int openCounter; 044 private boolean disposed; 045 046 public Pool(DataFile file) { 047 this.file = file; 048 } 049 050 public DataFileAccessor openDataFileReader() throws IOException { 051 DataFileAccessor rc = null; 052 if (pool.isEmpty()) { 053 rc = new DataFileAccessor(journal, file); 054 } else { 055 rc = pool.remove(pool.size() - 1); 056 } 057 used = true; 058 openCounter++; 059 return rc; 060 } 061 062 public synchronized void closeDataFileReader(DataFileAccessor reader) { 063 openCounter--; 064 if (pool.size() >= maxOpenReadersPerFile || disposed) { 065 reader.dispose(); 066 } else { 067 pool.add(reader); 068 } 069 } 070 071 public synchronized void clearUsedMark() { 072 used = false; 073 } 074 075 public synchronized boolean isUsed() { 076 return used; 077 } 078 079 public synchronized void dispose() { 080 for (DataFileAccessor reader : pool) { 081 reader.dispose(); 082 } 083 pool.clear(); 084 disposed = true; 085 } 086 087 public synchronized int getOpenCounter() { 088 return openCounter; 089 } 090 091 } 092 093 public DataFileAccessorPool(Journal dataManager) { 094 this.journal = dataManager; 095 } 096 097 synchronized void clearUsedMark() { 098 for (Pool pool : pools.values()) { 099 pool.clearUsedMark(); 100 } 101 } 102 103 synchronized void disposeUnused() { 104 for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) { 105 Pool pool = iter.next(); 106 if (!pool.isUsed()) { 107 pool.dispose(); 108 iter.remove(); 109 } 110 } 111 } 112 113 synchronized void disposeDataFileAccessors(DataFile dataFile) { 114 if (closed) { 115 throw new IllegalStateException("Closed."); 116 } 117 Pool pool = pools.get(dataFile.getDataFileId()); 118 if (pool != null) { 119 if (pool.getOpenCounter() == 0) { 120 pool.dispose(); 121 pools.remove(dataFile.getDataFileId()); 122 } else { 123 throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter()); 124 } 125 } 126 } 127 128 synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException { 129 if (closed) { 130 throw new IOException("Closed."); 131 } 132 133 Pool pool = pools.get(dataFile.getDataFileId()); 134 if (pool == null) { 135 pool = new Pool(dataFile); 136 pools.put(dataFile.getDataFileId(), pool); 137 } 138 return pool.openDataFileReader(); 139 } 140 141 synchronized void closeDataFileAccessor(DataFileAccessor reader) { 142 Pool pool = pools.get(reader.getDataFile().getDataFileId()); 143 if (pool == null || closed) { 144 reader.dispose(); 145 } else { 146 pool.closeDataFileReader(reader); 147 } 148 } 149 150 public synchronized void close() { 151 if (closed) { 152 return; 153 } 154 closed = true; 155 for (Pool pool : pools.values()) { 156 pool.dispose(); 157 } 158 pools.clear(); 159 } 160 161}