001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.io.PrintWriter; 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import javax.jms.Connection; 023import javax.jms.ExceptionListener; 024import javax.jms.JMSException; 025import javax.resource.ResourceException; 026import javax.resource.spi.ConnectionEvent; 027import javax.resource.spi.ConnectionEventListener; 028import javax.resource.spi.ConnectionRequestInfo; 029import javax.resource.spi.LocalTransaction; 030import javax.resource.spi.ManagedConnection; 031import javax.resource.spi.ManagedConnectionMetaData; 032import javax.security.auth.Subject; 033import javax.transaction.xa.XAResource; 034import org.apache.activemq.ActiveMQConnection; 035import org.apache.activemq.LocalTransactionEventListener; 036import org.apache.activemq.TransactionContext; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * ActiveMQManagedConnection maps to real physical connection to the server. 042 * Since a ManagedConnection has to provide a transaction managment interface to 043 * the physical connection, and sessions are the objects implement transaction 044 * managment interfaces in the JMS API, this object also maps to a singe 045 * physical JMS session. <p/> The side-effect is that JMS connection the 046 * application gets will allways create the same session object. This is good if 047 * running in an app server since the sessions are elisted in the context 048 * transaction. This is bad if used outside of an app server since the user may 049 * be trying to create 2 different sessions to coordinate 2 different uow. 050 * 051 * 052 */ 053public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: 054 // , 055 // DissociatableManagedConnection 056 // { 057 058 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQManagedConnection.class); 059 060 private PrintWriter logWriter; 061 062 private final ActiveMQConnection physicalConnection; 063 private final TransactionContext transactionContext; 064 private final List<ManagedConnectionProxy> proxyConnections = new CopyOnWriteArrayList<ManagedConnectionProxy>(); 065 private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); 066 private final LocalAndXATransaction localAndXATransaction; 067 068 private Subject subject; 069 private ActiveMQConnectionRequestInfo info; 070 private boolean destroyed; 071 072 public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException { 073 try { 074 this.subject = subject; 075 this.info = info; 076 this.physicalConnection = physicalConnection; 077 this.transactionContext = new TransactionContext(physicalConnection); 078 079 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) { 080 public void setInManagedTx(boolean inManagedTx) throws JMSException { 081 super.setInManagedTx(inManagedTx); 082 for (ManagedConnectionProxy proxy:proxyConnections) { 083 proxy.setUseSharedTxContext(inManagedTx); 084 } 085 } 086 }; 087 088 this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() { 089 public void beginEvent() { 090 fireBeginEvent(); 091 } 092 093 public void commitEvent() { 094 fireCommitEvent(); 095 } 096 097 public void rollbackEvent() { 098 fireRollbackEvent(); 099 } 100 }); 101 102 physicalConnection.setExceptionListener(this); 103 } catch (JMSException e) { 104 throw new ResourceException("Could not create a new connection: " + e.getMessage(), e); 105 } 106 } 107 108 public boolean isInManagedTx() { 109 return localAndXATransaction.isInManagedTx(); 110 } 111 112 public static boolean matches(Object x, Object y) { 113 if (x == null ^ y == null) { 114 return false; 115 } 116 if (x != null && !x.equals(y)) { 117 return false; 118 } 119 return true; 120 } 121 122 public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException { 123 124 // Do we need to change the associated userid/password 125 if (!matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword())) { 126 physicalConnection.changeUserInfo(info.getUserName(), info.getPassword()); 127 } 128 129 // Do we need to set the clientId? 130 if (info.getClientid() != null && info.getClientid().length() > 0) { 131 physicalConnection.setClientID(info.getClientid()); 132 } 133 134 this.subject = subject; 135 this.info = info; 136 } 137 138 public Connection getPhysicalConnection() { 139 return physicalConnection; 140 } 141 142 private void fireBeginEvent() { 143 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED); 144 for(ConnectionEventListener l:listeners) { 145 l.localTransactionStarted(event); 146 } 147 } 148 149 private void fireCommitEvent() { 150 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 151 for(ConnectionEventListener l:listeners) { 152 l.localTransactionCommitted(event); 153 } 154 } 155 156 private void fireRollbackEvent() { 157 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 158 for(ConnectionEventListener l:listeners) { 159 l.localTransactionRolledback(event); 160 } 161 } 162 163 private void fireCloseEvent(ManagedConnectionProxy proxy) { 164 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_CLOSED); 165 event.setConnectionHandle(proxy); 166 167 for(ConnectionEventListener l:listeners) { 168 l.connectionClosed(event); 169 } 170 } 171 172 private void fireErrorOccurredEvent(Exception error) { 173 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, error); 174 for(ConnectionEventListener l:listeners) { 175 l.connectionErrorOccurred(event); 176 } 177 } 178 179 /** 180 * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject, 181 * javax.resource.spi.ConnectionRequestInfo) 182 */ 183 public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException { 184 ManagedConnectionProxy proxy = new ManagedConnectionProxy(this, info); 185 proxyConnections.add(proxy); 186 return proxy; 187 } 188 189 private boolean isDestroyed() { 190 return destroyed; 191 } 192 193 /** 194 * Close down the physical connection to the server. 195 * 196 * @see javax.resource.spi.ManagedConnection#destroy() 197 */ 198 public void destroy() throws ResourceException { 199 // Have we already been destroyed?? 200 if (isDestroyed()) { 201 return; 202 } 203 204 try { 205 cleanup(); 206 } finally { 207 try { 208 physicalConnection.close(); 209 destroyed = true; 210 } catch (JMSException e) { 211 LOG.trace("Error occurred during close of a JMS connection.", e); 212 } 213 } 214 } 215 216 /** 217 * Cleans up all proxy handles attached to this physical connection so that 218 * they cannot be used anymore. 219 * 220 * @see javax.resource.spi.ManagedConnection#cleanup() 221 */ 222 public void cleanup() throws ResourceException { 223 224 // Have we already been destroyed?? 225 if (isDestroyed()) { 226 return; 227 } 228 229 for (ManagedConnectionProxy proxy:proxyConnections) { 230 proxy.cleanup(); 231 } 232 proxyConnections.clear(); 233 234 try { 235 physicalConnection.cleanup(); 236 } catch (JMSException e) { 237 throw new ResourceException("Could not cleanup the ActiveMQ connection: " + e, e); 238 } finally { 239 // defer transaction cleanup till after close so that close is aware of the current tx 240 localAndXATransaction.cleanup(); 241 } 242 } 243 244 /** 245 * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object) 246 */ 247 public void associateConnection(Object connection) throws ResourceException { 248 if (connection instanceof ManagedConnectionProxy) { 249 ManagedConnectionProxy proxy = (ManagedConnectionProxy)connection; 250 proxyConnections.add(proxy); 251 } else { 252 throw new ResourceException("Not supported : associating connection instance of " + connection.getClass().getName()); 253 } 254 } 255 256 /** 257 * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener) 258 */ 259 public void addConnectionEventListener(ConnectionEventListener listener) { 260 listeners.add(listener); 261 } 262 263 /** 264 * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener) 265 */ 266 public void removeConnectionEventListener(ConnectionEventListener listener) { 267 listeners.remove(listener); 268 } 269 270 /** 271 * @see javax.resource.spi.ManagedConnection#getXAResource() 272 */ 273 public XAResource getXAResource() throws ResourceException { 274 return localAndXATransaction; 275 } 276 277 /** 278 * @see javax.resource.spi.ManagedConnection#getLocalTransaction() 279 */ 280 public LocalTransaction getLocalTransaction() throws ResourceException { 281 return localAndXATransaction; 282 } 283 284 /** 285 * @see javax.resource.spi.ManagedConnection#getMetaData() 286 */ 287 public ManagedConnectionMetaData getMetaData() throws ResourceException { 288 return new ManagedConnectionMetaData() { 289 290 public String getEISProductName() throws ResourceException { 291 if (physicalConnection == null) { 292 throw new ResourceException("Not connected."); 293 } 294 try { 295 return physicalConnection.getMetaData().getJMSProviderName(); 296 } catch (JMSException e) { 297 throw new ResourceException("Error accessing provider.", e); 298 } 299 } 300 301 public String getEISProductVersion() throws ResourceException { 302 if (physicalConnection == null) { 303 throw new ResourceException("Not connected."); 304 } 305 try { 306 return physicalConnection.getMetaData().getProviderVersion(); 307 } catch (JMSException e) { 308 throw new ResourceException("Error accessing provider.", e); 309 } 310 } 311 312 public int getMaxConnections() throws ResourceException { 313 if (physicalConnection == null) { 314 throw new ResourceException("Not connected."); 315 } 316 return Integer.MAX_VALUE; 317 } 318 319 public String getUserName() throws ResourceException { 320 if (physicalConnection == null) { 321 throw new ResourceException("Not connected."); 322 } 323 try { 324 return physicalConnection.getClientID(); 325 } catch (JMSException e) { 326 throw new ResourceException("Error accessing provider.", e); 327 } 328 } 329 }; 330 } 331 332 /** 333 * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter) 334 */ 335 public void setLogWriter(PrintWriter logWriter) throws ResourceException { 336 this.logWriter = logWriter; 337 } 338 339 /** 340 * @see javax.resource.spi.ManagedConnection#getLogWriter() 341 */ 342 public PrintWriter getLogWriter() throws ResourceException { 343 return logWriter; 344 } 345 346 /** 347 * @param subject subject to match 348 * @param info cri to match 349 * @return whether the subject and cri match sufficiently to allow using this connection under the new circumstances 350 */ 351 public boolean matches(Subject subject, ConnectionRequestInfo info) { 352 // Check to see if it is our info class 353 if (info == null) { 354 return false; 355 } 356 if (info.getClass() != ActiveMQConnectionRequestInfo.class) { 357 return false; 358 } 359 360 // Do the subjects match? 361 if (subject == null ^ this.subject == null) { 362 return false; 363 } 364 if (subject != null && !subject.equals(this.subject)) { 365 return false; 366 } 367 368 // Does the info match? 369 return info.equals(this.info); 370 } 371 372 /** 373 * When a proxy is closed this cleans up the proxy and notifies the 374 * ConnectionEventListeners that a connection closed. 375 * 376 * @param proxy 377 */ 378 public void proxyClosedEvent(ManagedConnectionProxy proxy) { 379 proxyConnections.remove(proxy); 380 proxy.cleanup(); 381 fireCloseEvent(proxy); 382 } 383 384 public void onException(JMSException e) { 385 LOG.warn("Connection failed: " + e); 386 LOG.debug("Cause: ", e); 387 388 for (ManagedConnectionProxy proxy:proxyConnections) { 389 proxy.onException(e); 390 } 391 // Let the container know that the error occurred. 392 fireErrorOccurredEvent(e); 393 } 394 395 /** 396 * @return Returns the transactionContext. 397 */ 398 public TransactionContext getTransactionContext() { 399 return transactionContext; 400 } 401 402 @Override 403 public String toString() { 404 return "[" + super.toString() + "," + physicalConnection +"]"; 405 } 406 407}