001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.util.Collection; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import javax.management.MalformedObjectNameException; 029import javax.management.ObjectName; 030 031import org.apache.activemq.Service; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.jmx.AnnotatedMBean; 034import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 035import org.apache.activemq.broker.jmx.NetworkBridgeView; 036import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ConsumerId; 039import org.apache.activemq.transport.Transport; 040import org.apache.activemq.transport.TransportFactory; 041import org.apache.activemq.util.ServiceStopper; 042import org.apache.activemq.util.ServiceSupport; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Connector class for bridging broker networks. 048 */ 049public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { 050 051 private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class); 052 protected URI localURI; 053 protected ConnectionFilter connectionFilter; 054 protected ConcurrentMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>(); 055 056 protected ServiceSupport serviceSupport = new ServiceSupport() { 057 058 @Override 059 protected void doStart() throws Exception { 060 handleStart(); 061 } 062 063 @Override 064 protected void doStop(ServiceStopper stopper) throws Exception { 065 handleStop(stopper); 066 } 067 }; 068 069 private Set<ActiveMQDestination> durableDestinations; 070 071 private BrokerService brokerService; 072 private ObjectName objectName; 073 074 public NetworkConnector() { 075 } 076 077 public NetworkConnector(URI localURI) { 078 this.localURI = localURI; 079 } 080 081 public URI getLocalUri() throws URISyntaxException { 082 return localURI; 083 } 084 085 public void setLocalUri(URI localURI) { 086 this.localURI = localURI; 087 } 088 089 /** 090 * @return Returns the durableDestinations. 091 */ 092 public Set<ActiveMQDestination> getDurableDestinations() { 093 return durableDestinations; 094 } 095 096 /** 097 * @param durableDestinations The durableDestinations to set. 098 */ 099 public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) { 100 this.durableDestinations = durableDestinations; 101 } 102 103 104 public void addExcludedDestination(ActiveMQDestination destiantion) { 105 this.excludedDestinations.add(destiantion); 106 } 107 108 109 public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { 110 this.staticallyIncludedDestinations.add(destiantion); 111 } 112 113 114 public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { 115 this.dynamicallyIncludedDestinations.add(destiantion); 116 } 117 118 public ConnectionFilter getConnectionFilter() { 119 return connectionFilter; 120 } 121 122 public void setConnectionFilter(ConnectionFilter connectionFilter) { 123 this.connectionFilter = connectionFilter; 124 } 125 126 // Implementation methods 127 // ------------------------------------------------------------------------- 128 protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) { 129 List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations(); 130 ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]); 131 result.setDynamicallyIncludedDestinations(dests); 132 destsList = getExcludedDestinations(); 133 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 134 result.setExcludedDestinations(dests); 135 destsList = getStaticallyIncludedDestinations(); 136 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); 137 result.setStaticallyIncludedDestinations(dests); 138 if (durableDestinations != null) { 139 140 HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>(); 141 for (ActiveMQDestination d : durableDestinations) { 142 if( d.isTopic() ) { 143 topics.add(d); 144 } 145 } 146 147 ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; 148 dest = topics.toArray(dest); 149 result.setDurableDestinations(dest); 150 } 151 return result; 152 } 153 154 protected Transport createLocalTransport() throws Exception { 155 return TransportFactory.connect(localURI); 156 } 157 158 @Override 159 public void start() throws Exception { 160 serviceSupport.start(); 161 } 162 163 @Override 164 public void stop() throws Exception { 165 serviceSupport.stop(); 166 } 167 168 protected void handleStart() throws Exception { 169 if (localURI == null) { 170 throw new IllegalStateException("You must configure the 'localURI' property"); 171 } 172 LOG.info("Network Connector {} started", this); 173 } 174 175 protected void handleStop(ServiceStopper stopper) throws Exception { 176 LOG.info("Network Connector {} stopped", this); 177 } 178 179 public boolean isStarted() { 180 return serviceSupport.isStarted(); 181 } 182 183 public boolean isStopped() { 184 return serviceSupport.isStopped(); 185 } 186 187 public boolean isStopping() { 188 return serviceSupport.isStopping(); 189 } 190 191 public ObjectName getObjectName() { 192 return objectName; 193 } 194 195 public void setObjectName(ObjectName objectName) { 196 this.objectName = objectName; 197 } 198 199 public BrokerService getBrokerService() { 200 return brokerService; 201 } 202 203 public void setBrokerService(BrokerService brokerService) { 204 this.brokerService = brokerService; 205 } 206 207 protected void registerNetworkBridgeMBean(NetworkBridge bridge) { 208 if (!getBrokerService().isUseJmx()) { 209 return; 210 } 211 NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); 212 try { 213 ObjectName objectName = createNetworkBridgeObjectName(bridge); 214 AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName); 215 } catch (Throwable e) { 216 LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e); 217 } 218 } 219 220 protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) { 221 if (!getBrokerService().isUseJmx()) { 222 return; 223 } 224 try { 225 ObjectName objectName = createNetworkBridgeObjectName(bridge); 226 getBrokerService().getManagementContext().unregisterMBean(objectName); 227 } catch (Throwable e) { 228 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 229 } 230 } 231 232 protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { 233 return BrokerMBeanSupport.createNetworkBridgeObjectName(getObjectName(), bridge.getRemoteAddress()); 234 } 235 236 // ask all the bridges as we can't know to which this consumer is tied 237 public boolean removeDemandSubscription(ConsumerId consumerId) { 238 boolean removeSucceeded = false; 239 for (NetworkBridge bridge : bridges.values()) { 240 if (bridge instanceof DemandForwardingBridgeSupport) { 241 DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge; 242 if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) { 243 removeSucceeded = true; 244 break; 245 } 246 } 247 } 248 return removeSucceeded; 249 } 250 251 public Collection<NetworkBridge> activeBridges() { 252 return bridges.values(); 253 } 254}