001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.broker.BrokerService; 030import org.apache.activemq.broker.SslContext; 031import org.apache.activemq.command.DiscoveryEvent; 032import org.apache.activemq.transport.Transport; 033import org.apache.activemq.transport.TransportFactory; 034import org.apache.activemq.transport.discovery.DiscoveryAgent; 035import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 036import org.apache.activemq.transport.discovery.DiscoveryListener; 037import org.apache.activemq.util.IntrospectionSupport; 038import org.apache.activemq.util.ServiceStopper; 039import org.apache.activemq.util.ServiceSupport; 040import org.apache.activemq.util.URISupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * A network connector which uses a discovery agent to detect the remote brokers 046 * available and setup a connection to each available remote broker 047 * 048 * @org.apache.xbean.XBean element="networkConnector" 049 * 050 */ 051public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 052 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class); 053 054 private DiscoveryAgent discoveryAgent; 055 private Map<String, String> parameters; 056 private final ConcurrentMap<URI, DiscoveryEvent> activeEvents = new ConcurrentHashMap<URI, DiscoveryEvent>(); 057 private URI discoveryUri; 058 public DiscoveryNetworkConnector() { 059 } 060 061 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { 062 setUri(discoveryURI); 063 } 064 065 public void setUri(URI discoveryURI) throws IOException { 066 this.discoveryUri = discoveryURI; 067 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); 068 try { 069 parameters = URISupport.parseParameters(discoveryURI); 070 // allow discovery agent to grab it's parameters 071 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); 072 } catch (URISyntaxException e) { 073 LOG.warn("failed to parse query parameters from discoveryURI: {}", discoveryURI, e); 074 } 075 } 076 077 public URI getUri() { 078 return discoveryUri; 079 } 080 081 @Override 082 public void onServiceAdd(DiscoveryEvent event) { 083 // Ignore events once we start stopping. 084 if (serviceSupport.isStopped() || serviceSupport.isStopping()) { 085 return; 086 } 087 String url = event.getServiceName(); 088 if (url != null) { 089 URI uri; 090 try { 091 uri = new URI(url); 092 } catch (URISyntaxException e) { 093 LOG.warn("Could not connect to remote URI: {} due to bad URI syntax: ", url, e); 094 return; 095 } 096 097 if (localURI.equals(uri)) { 098 LOG.debug("not connecting loopback: {}", uri); 099 return; 100 } 101 102 if (connectionFilter != null && !connectionFilter.connectTo(uri)) { 103 LOG.debug("connectionFilter disallows connection to: {}", uri); 104 return; 105 } 106 107 // Should we try to connect to that URI? 108 if (activeEvents.putIfAbsent(uri, event) != null) { 109 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: {}", uri); 110 return; 111 } 112 113 URI connectUri = uri; 114 try { 115 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); 116 } catch (URISyntaxException e) { 117 LOG.warn("could not apply query parameters: {} to: {}", new Object[]{ parameters, connectUri }, e); 118 } 119 120 LOG.info("Establishing network connection from {} to {}", localURI, connectUri); 121 122 Transport remoteTransport; 123 Transport localTransport; 124 try { 125 // Allows the transport to access the broker's ssl configuration. 126 SslContext.setCurrentSslContext(getBrokerService().getSslContext()); 127 try { 128 remoteTransport = TransportFactory.connect(connectUri); 129 } catch (Exception e) { 130 LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage()); 131 LOG.debug("Connection failure exception: ", e); 132 activeEvents.remove(uri); 133 return; 134 } 135 try { 136 localTransport = createLocalTransport(); 137 } catch (Exception e) { 138 ServiceSupport.dispose(remoteTransport); 139 LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage()); 140 LOG.debug("Connection failure exception: ", e); 141 activeEvents.remove(uri); 142 return; 143 } 144 } finally { 145 SslContext.setCurrentSslContext(null); 146 } 147 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); 148 try { 149 synchronized (bridges) { 150 bridges.put(uri, bridge); 151 } 152 bridge.start(); 153 } catch (Exception e) { 154 ServiceSupport.dispose(localTransport); 155 ServiceSupport.dispose(remoteTransport); 156 LOG.warn("Could not start network bridge between: {} and: {} due to: {}", new Object[]{ localURI, uri, e.getMessage() }); 157 LOG.debug("Start failure exception: ", e); 158 try { 159 // Will remove bridge and active event. 160 discoveryAgent.serviceFailed(event); 161 } catch (IOException e1) { 162 LOG.debug("Discovery agent failure while handling failure event: {}", e1.getMessage(), e1); 163 } 164 } 165 } 166 } 167 168 @Override 169 public void onServiceRemove(DiscoveryEvent event) { 170 String url = event.getServiceName(); 171 if (url != null) { 172 URI uri; 173 try { 174 uri = new URI(url); 175 } catch (URISyntaxException e) { 176 LOG.warn("Could not connect to remote URI: {} due to bad URI syntax: ", url, e); 177 return; 178 } 179 180 // Only remove bridge if this is the active discovery event for the URL. 181 if (activeEvents.remove(uri, event)) { 182 synchronized (bridges) { 183 bridges.remove(uri); 184 } 185 } 186 } 187 } 188 189 public DiscoveryAgent getDiscoveryAgent() { 190 return discoveryAgent; 191 } 192 193 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 194 this.discoveryAgent = discoveryAgent; 195 if (discoveryAgent != null) { 196 this.discoveryAgent.setDiscoveryListener(this); 197 } 198 } 199 200 @Override 201 protected void handleStart() throws Exception { 202 if (discoveryAgent == null) { 203 throw new IllegalStateException("You must configure the 'discoveryAgent' property"); 204 } 205 this.discoveryAgent.start(); 206 super.handleStart(); 207 } 208 209 @Override 210 protected void handleStop(ServiceStopper stopper) throws Exception { 211 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) { 212 NetworkBridge bridge = i.next(); 213 try { 214 bridge.stop(); 215 } catch (Exception e) { 216 stopper.onException(this, e); 217 } 218 } 219 bridges.clear(); 220 activeEvents.clear(); 221 try { 222 this.discoveryAgent.stop(); 223 } catch (Exception e) { 224 stopper.onException(this, e); 225 } 226 227 super.handleStop(stopper); 228 } 229 230 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { 231 class DiscoverNetworkBridgeListener extends MBeanNetworkListener { 232 233 public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { 234 super(brokerService, DiscoveryNetworkConnector.this, connectorName); 235 } 236 237 @Override 238 public void bridgeFailed() { 239 if (!serviceSupport.isStopped()) { 240 try { 241 discoveryAgent.serviceFailed(event); 242 } catch (IOException e) { 243 } 244 } 245 246 } 247 } 248 NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName()); 249 250 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); 251 result.setBrokerService(getBrokerService()); 252 return configureBridge(result); 253 } 254 255 @Override 256 public String toString() { 257 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); 258 } 259}