001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.broker.scheduler.memory; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Date; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Timer; 028import java.util.TimerTask; 029import java.util.TreeMap; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.ReentrantReadWriteLock; 033 034import javax.jms.MessageFormatException; 035 036import org.apache.activemq.broker.scheduler.CronParser; 037import org.apache.activemq.broker.scheduler.Job; 038import org.apache.activemq.broker.scheduler.JobListener; 039import org.apache.activemq.broker.scheduler.JobScheduler; 040import org.apache.activemq.broker.scheduler.JobSupport; 041import org.apache.activemq.util.ByteSequence; 042import org.apache.activemq.util.IdGenerator; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Implements an in-memory JobScheduler instance. 048 */ 049public class InMemoryJobScheduler implements JobScheduler { 050 051 private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobScheduler.class); 052 053 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 054 055 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 056 private final String name; 057 private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<Long, ScheduledTask>(); 058 private final AtomicBoolean started = new AtomicBoolean(false); 059 private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false); 060 private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>(); 061 private final Timer timer = new Timer(); 062 063 public InMemoryJobScheduler(String name) { 064 this.name = name; 065 } 066 067 @Override 068 public String getName() throws Exception { 069 return name; 070 } 071 072 public void start() throws Exception { 073 if (started.compareAndSet(false, true)) { 074 startDispatching(); 075 LOG.trace("JobScheduler[{}] started", name); 076 } 077 } 078 079 public void stop() throws Exception { 080 if (started.compareAndSet(true, false)) { 081 stopDispatching(); 082 timer.cancel(); 083 jobs.clear(); 084 LOG.trace("JobScheduler[{}] stopped", name); 085 } 086 } 087 088 public boolean isStarted() { 089 return started.get(); 090 } 091 092 public boolean isDispatchEnabled() { 093 return dispatchEnabled.get(); 094 } 095 096 @Override 097 public void startDispatching() throws Exception { 098 dispatchEnabled.set(true); 099 } 100 101 @Override 102 public void stopDispatching() throws Exception { 103 dispatchEnabled.set(false); 104 } 105 106 @Override 107 public void addListener(JobListener listener) throws Exception { 108 this.jobListeners.add(listener); 109 } 110 111 @Override 112 public void removeListener(JobListener listener) throws Exception { 113 this.jobListeners.remove(listener); 114 } 115 116 @Override 117 public void schedule(String jobId, ByteSequence payload, long delay) throws Exception { 118 doSchedule(jobId, payload, "", 0, delay, 0); 119 } 120 121 @Override 122 public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception { 123 doSchedule(jobId, payload, cronEntry, 0, 0, 0); 124 } 125 126 @Override 127 public void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception { 128 doSchedule(jobId, payload, cronEntry, delay, period, repeat); 129 } 130 131 @Override 132 public void remove(long time) throws Exception { 133 doRemoveRange(time, time); 134 } 135 136 @Override 137 public void remove(String jobId) throws Exception { 138 doRemoveJob(jobId); 139 } 140 141 @Override 142 public void removeAllJobs() throws Exception { 143 doRemoveRange(0, Long.MAX_VALUE); 144 } 145 146 @Override 147 public void removeAllJobs(long start, long finish) throws Exception { 148 doRemoveRange(start, finish); 149 } 150 151 @Override 152 public long getNextScheduleTime() throws Exception { 153 long nextExecutionTime = -1L; 154 155 lock.readLock().lock(); 156 try { 157 if (!jobs.isEmpty()) { 158 nextExecutionTime = jobs.entrySet().iterator().next().getKey(); 159 } 160 } finally { 161 lock.readLock().unlock(); 162 } 163 return nextExecutionTime; 164 } 165 166 @Override 167 public List<Job> getNextScheduleJobs() throws Exception { 168 List<Job> result = new ArrayList<Job>(); 169 lock.readLock().lock(); 170 try { 171 if (!jobs.isEmpty()) { 172 result.addAll(jobs.entrySet().iterator().next().getValue().getAllJobs()); 173 } 174 } finally { 175 lock.readLock().unlock(); 176 } 177 return result; 178 } 179 180 @Override 181 public List<Job> getAllJobs() throws Exception { 182 final List<Job> result = new ArrayList<Job>(); 183 this.lock.readLock().lock(); 184 try { 185 for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) { 186 result.addAll(entry.getValue().getAllJobs()); 187 } 188 } finally { 189 this.lock.readLock().unlock(); 190 } 191 192 return result; 193 } 194 195 @Override 196 public List<Job> getAllJobs(long start, long finish) throws Exception { 197 final List<Job> result = new ArrayList<Job>(); 198 this.lock.readLock().lock(); 199 try { 200 for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) { 201 long jobTime = entry.getKey(); 202 if (start <= jobTime && jobTime <= finish) { 203 result.addAll(entry.getValue().getAllJobs()); 204 } 205 } 206 } finally { 207 this.lock.readLock().unlock(); 208 } 209 return result; 210 } 211 212 @Override 213 public int hashCode() { 214 return name.hashCode(); 215 } 216 217 @Override 218 public String toString() { 219 return "JobScheduler: " + name; 220 } 221 222 private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException { 223 long startTime = System.currentTimeMillis(); 224 long executionTime = 0; 225 // round startTime - so we can schedule more jobs at the same time 226 startTime = (startTime / 1000) * 1000; 227 if (cronEntry != null && cronEntry.length() > 0) { 228 try { 229 executionTime = CronParser.getNextScheduledTime(cronEntry, startTime); 230 } catch (MessageFormatException e) { 231 throw new IOException(e.getMessage()); 232 } 233 } 234 235 if (executionTime == 0) { 236 // start time not set by CRON - so it it to the current time 237 executionTime = startTime; 238 } 239 240 if (delay > 0) { 241 executionTime += delay; 242 } else { 243 executionTime += period; 244 } 245 246 InMemoryJob newJob = new InMemoryJob(jobId); 247 newJob.setStart(startTime); 248 newJob.setCronEntry(cronEntry); 249 newJob.setDelay(delay); 250 newJob.setPeriod(period); 251 newJob.setRepeat(repeat); 252 newJob.setNextTime(executionTime); 253 newJob.setPayload(payload.getData()); 254 255 LOG.trace("JobScheduler adding job[{}] to fire at: {}", jobId, JobSupport.getDateTime(executionTime)); 256 257 lock.writeLock().lock(); 258 try { 259 ScheduledTask task = jobs.get(executionTime); 260 if (task == null) { 261 task = new ScheduledTask(executionTime); 262 task.add(newJob); 263 jobs.put(task.getExecutionTime(), task); 264 timer.schedule(task, new Date(newJob.getNextTime())); 265 } else { 266 task.add(newJob); 267 } 268 } finally { 269 lock.writeLock().unlock(); 270 } 271 } 272 273 private void doReschedule(InMemoryJob job, long nextExecutionTime) { 274 job.setNextTime(nextExecutionTime); 275 job.incrementExecutionCount(); 276 if (!job.isCron()) { 277 job.decrementRepeatCount(); 278 } 279 280 LOG.trace("JobScheduler rescheduling job[{}] to fire at: {}", job.getJobId(), JobSupport.getDateTime(nextExecutionTime)); 281 282 lock.writeLock().lock(); 283 try { 284 ScheduledTask task = jobs.get(nextExecutionTime); 285 if (task == null) { 286 task = new ScheduledTask(nextExecutionTime); 287 task.add(job); 288 jobs.put(task.getExecutionTime(), task); 289 timer.schedule(task, new Date(task.getExecutionTime())); 290 } else { 291 task.add(job); 292 } 293 } finally { 294 lock.writeLock().unlock(); 295 } 296 } 297 298 private void doRemoveJob(String jobId) throws IOException { 299 this.lock.writeLock().lock(); 300 try { 301 Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator(); 302 while (scheduled.hasNext()) { 303 Map.Entry<Long, ScheduledTask> entry = scheduled.next(); 304 ScheduledTask task = entry.getValue(); 305 if (task.remove(jobId)) { 306 LOG.trace("JobScheduler removing job[{}]", jobId); 307 if (task.isEmpty()) { 308 task.cancel(); 309 scheduled.remove(); 310 } 311 return; 312 } 313 } 314 } finally { 315 this.lock.writeLock().unlock(); 316 } 317 } 318 319 private void doRemoveRange(long start, long end) throws IOException { 320 this.lock.writeLock().lock(); 321 try { 322 Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator(); 323 while (scheduled.hasNext()) { 324 Map.Entry<Long, ScheduledTask> entry = scheduled.next(); 325 long executionTime = entry.getKey(); 326 if (start <= executionTime && executionTime <= end) { 327 ScheduledTask task = entry.getValue(); 328 task.cancel(); 329 scheduled.remove(); 330 } 331 332 // Don't look beyond the end range. 333 if (end < executionTime) { 334 break; 335 } 336 } 337 } finally { 338 this.lock.writeLock().unlock(); 339 } 340 } 341 342 private boolean canDispatch() { 343 return isStarted() && isDispatchEnabled(); 344 } 345 346 private long calculateNextExecutionTime(InMemoryJob job, long currentTime, int repeat) throws MessageFormatException { 347 long result = currentTime; 348 String cron = job.getCronEntry(); 349 if (cron != null && cron.length() > 0) { 350 result = CronParser.getNextScheduledTime(cron, result); 351 } else if (job.getRepeat() != 0) { 352 result += job.getPeriod(); 353 } 354 return result; 355 } 356 357 private void dispatch(InMemoryJob job) throws IllegalStateException, IOException { 358 if (canDispatch()) { 359 LOG.debug("Firing: {}", job); 360 for (JobListener l : jobListeners) { 361 l.scheduledJob(job.getJobId(), new ByteSequence(job.getPayload())); 362 } 363 } 364 } 365 366 /* 367 * A TimerTask instance that can aggregate the execution of a number 368 * scheduled Jobs and handle rescheduling the jobs that require it. 369 */ 370 private class ScheduledTask extends TimerTask { 371 372 private final Map<String, InMemoryJob> jobs = new TreeMap<String, InMemoryJob>(); 373 private final long executionTime; 374 375 public ScheduledTask(long executionTime) { 376 this.executionTime = executionTime; 377 } 378 379 public long getExecutionTime() { 380 return executionTime; 381 } 382 383 /** 384 * @return a Collection containing all the managed jobs for this task. 385 */ 386 public Collection<InMemoryJob> getAllJobs() { 387 return new ArrayList<InMemoryJob>(jobs.values()); 388 } 389 390 /** 391 * @return true if the internal list of jobs has become empty. 392 */ 393 public boolean isEmpty() { 394 return jobs.isEmpty(); 395 } 396 397 /** 398 * Adds the job to the internal list of scheduled Jobs managed by this task. 399 * 400 * @param newJob 401 * the new job to add to the list of Jobs. 402 */ 403 public void add(InMemoryJob newJob) { 404 this.jobs.put(newJob.getJobId(), newJob); 405 } 406 407 /** 408 * Removes the job from the internal list of scheduled Jobs managed by this task. 409 * 410 * @param jobId 411 * the job ID to remove from the list of Jobs. 412 * 413 * @return true if the job was removed from the list of managed jobs. 414 */ 415 public boolean remove(String jobId) { 416 return jobs.remove(jobId) != null; 417 } 418 419 @Override 420 public void run() { 421 if (!isStarted()) { 422 return; 423 } 424 425 try { 426 long currentTime = System.currentTimeMillis(); 427 lock.writeLock().lock(); 428 try { 429 // Remove this entry as it will now fire any scheduled jobs, if new 430 // jobs or rescheduled jobs land in the same time slot we want them 431 // to go into a new ScheduledTask in the Timer instance. 432 InMemoryJobScheduler.this.jobs.remove(executionTime); 433 } finally { 434 lock.writeLock().unlock(); 435 } 436 437 long nextExecutionTime = 0; 438 439 for (InMemoryJob job : jobs.values()) { 440 441 if (!isStarted()) { 442 break; 443 } 444 445 int repeat = job.getRepeat(); 446 nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); 447 if (!job.isCron()) { 448 dispatch(job); 449 if (repeat != 0) { 450 // Reschedule for the next time, the scheduler will take care of 451 // updating the repeat counter on the update. 452 doReschedule(job, nextExecutionTime); 453 } 454 } else { 455 if (repeat == 0) { 456 // This is a non-repeating Cron entry so we can fire and forget it. 457 dispatch(job); 458 } 459 460 if (nextExecutionTime > currentTime) { 461 // Reschedule the cron job as a new event, if the cron entry signals 462 // a repeat then it will be stored separately and fired as a normal 463 // event with decrementing repeat. 464 doReschedule(job, nextExecutionTime); 465 466 if (repeat != 0) { 467 // we have a separate schedule to run at this time 468 // so the cron job is used to set of a separate schedule 469 // hence we won't fire the original cron job to the 470 // listeners but we do need to start a separate schedule 471 String jobId = ID_GENERATOR.generateId(); 472 ByteSequence payload = new ByteSequence(job.getPayload()); 473 schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); 474 } 475 } 476 } 477 } 478 } catch (Throwable e) { 479 LOG.error("Error while processing scheduled job(s).", e); 480 } 481 } 482 } 483}