001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.discovery.simple; 018 019import java.io.IOException; 020import java.net.URI; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import org.apache.activemq.command.DiscoveryEvent; 024import org.apache.activemq.thread.TaskRunnerFactory; 025import org.apache.activemq.transport.discovery.DiscoveryAgent; 026import org.apache.activemq.transport.discovery.DiscoveryListener; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A simple DiscoveryAgent that allows static configuration of the discovered 032 * services. 033 * 034 * 035 */ 036public class SimpleDiscoveryAgent implements DiscoveryAgent { 037 038 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class); 039 private long initialReconnectDelay = 1000; 040 private long maxReconnectDelay = 1000 * 30; 041 private long backOffMultiplier = 2; 042 private boolean useExponentialBackOff=true; 043 private int maxReconnectAttempts; 044 private final Object sleepMutex = new Object(); 045 private long minConnectTime = 5000; 046 private DiscoveryListener listener; 047 private String services[] = new String[] {}; 048 private final AtomicBoolean running = new AtomicBoolean(false); 049 private TaskRunnerFactory taskRunner; 050 051 class SimpleDiscoveryEvent extends DiscoveryEvent { 052 053 private int connectFailures; 054 private long reconnectDelay = initialReconnectDelay; 055 private long connectTime = System.currentTimeMillis(); 056 private final AtomicBoolean failed = new AtomicBoolean(false); 057 058 public SimpleDiscoveryEvent(String service) { 059 super(service); 060 } 061 062 public SimpleDiscoveryEvent(SimpleDiscoveryEvent copy) { 063 super(copy); 064 connectFailures = copy.connectFailures; 065 reconnectDelay = copy.reconnectDelay; 066 connectTime = copy.connectTime; 067 failed.set(copy.failed.get()); 068 } 069 070 @Override 071 public String toString() { 072 return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]"; 073 } 074 } 075 076 @Override 077 public void setDiscoveryListener(DiscoveryListener listener) { 078 this.listener = listener; 079 } 080 081 @Override 082 public void registerService(String name) throws IOException { 083 } 084 085 @Override 086 public void start() throws Exception { 087 taskRunner = new TaskRunnerFactory(); 088 taskRunner.init(); 089 090 running.set(true); 091 for (int i = 0; i < services.length; i++) { 092 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); 093 } 094 } 095 096 @Override 097 public void stop() throws Exception { 098 running.set(false); 099 100 if (taskRunner != null) { 101 taskRunner.shutdown(); 102 } 103 104 // TODO: Should we not remove the services on the listener? 105 106 synchronized (sleepMutex) { 107 sleepMutex.notifyAll(); 108 } 109 } 110 111 public String[] getServices() { 112 return services; 113 } 114 115 public void setServices(String services) { 116 this.services = services.split(","); 117 } 118 119 public void setServices(String services[]) { 120 this.services = services; 121 } 122 123 public void setServices(URI services[]) { 124 this.services = new String[services.length]; 125 for (int i = 0; i < services.length; i++) { 126 this.services[i] = services[i].toString(); 127 } 128 } 129 130 @Override 131 public void serviceFailed(DiscoveryEvent devent) throws IOException { 132 133 final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent; 134 if (running.get() && sevent.failed.compareAndSet(false, true)) { 135 136 listener.onServiceRemove(sevent); 137 taskRunner.execute(new Runnable() { 138 @Override 139 public void run() { 140 SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent); 141 142 // We detect a failed connection attempt because the service 143 // fails right away. 144 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 145 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event); 146 147 event.connectFailures++; 148 149 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 150 LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event); 151 return; 152 } 153 154 synchronized (sleepMutex) { 155 try { 156 if (!running.get()) { 157 LOG.debug("Reconnecting disabled: stopped"); 158 return; 159 } 160 161 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); 162 sleepMutex.wait(event.reconnectDelay); 163 } catch (InterruptedException ie) { 164 LOG.debug("Reconnecting disabled: " + ie); 165 Thread.currentThread().interrupt(); 166 return; 167 } 168 } 169 170 if (!useExponentialBackOff) { 171 event.reconnectDelay = initialReconnectDelay; 172 } else { 173 // Exponential increment of reconnect delay. 174 event.reconnectDelay *= backOffMultiplier; 175 if (event.reconnectDelay > maxReconnectDelay) { 176 event.reconnectDelay = maxReconnectDelay; 177 } 178 } 179 180 } else { 181 event.connectFailures = 0; 182 event.reconnectDelay = initialReconnectDelay; 183 } 184 185 if (!running.get()) { 186 LOG.debug("Reconnecting disabled: stopped"); 187 return; 188 } 189 190 event.connectTime = System.currentTimeMillis(); 191 event.failed.set(false); 192 listener.onServiceAdd(event); 193 } 194 }, "Simple Discovery Agent"); 195 } 196 } 197 198 public long getBackOffMultiplier() { 199 return backOffMultiplier; 200 } 201 202 public void setBackOffMultiplier(long backOffMultiplier) { 203 this.backOffMultiplier = backOffMultiplier; 204 } 205 206 public long getInitialReconnectDelay() { 207 return initialReconnectDelay; 208 } 209 210 public void setInitialReconnectDelay(long initialReconnectDelay) { 211 this.initialReconnectDelay = initialReconnectDelay; 212 } 213 214 public int getMaxReconnectAttempts() { 215 return maxReconnectAttempts; 216 } 217 218 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 219 this.maxReconnectAttempts = maxReconnectAttempts; 220 } 221 222 public long getMaxReconnectDelay() { 223 return maxReconnectDelay; 224 } 225 226 public void setMaxReconnectDelay(long maxReconnectDelay) { 227 this.maxReconnectDelay = maxReconnectDelay; 228 } 229 230 public long getMinConnectTime() { 231 return minConnectTime; 232 } 233 234 public void setMinConnectTime(long minConnectTime) { 235 this.minConnectTime = minConnectTime; 236 } 237 238 public boolean isUseExponentialBackOff() { 239 return useExponentialBackOff; 240 } 241 242 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 243 this.useExponentialBackOff = useExponentialBackOff; 244 } 245}