001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Queue;
024import javax.jms.QueueConnection;
025import javax.jms.QueueConnectionFactory;
026import javax.jms.QueueSession;
027import javax.jms.Session;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 */
035public class SimpleJmsQueueConnector extends JmsConnector {
036    private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class);
037    private String outboundQueueConnectionFactoryName;
038    private String localConnectionFactoryName;
039    private QueueConnectionFactory outboundQueueConnectionFactory;
040    private QueueConnectionFactory localQueueConnectionFactory;
041    private InboundQueueBridge[] inboundQueueBridges;
042    private OutboundQueueBridge[] outboundQueueBridges;
043
044    /**
045     * @return Returns the inboundQueueBridges.
046     */
047    public InboundQueueBridge[] getInboundQueueBridges() {
048        return inboundQueueBridges;
049    }
050
051    /**
052     * @param inboundQueueBridges The inboundQueueBridges to set.
053     */
054    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
055        this.inboundQueueBridges = inboundQueueBridges;
056    }
057
058    /**
059     * @return Returns the outboundQueueBridges.
060     */
061    public OutboundQueueBridge[] getOutboundQueueBridges() {
062        return outboundQueueBridges;
063    }
064
065    /**
066     * @param outboundQueueBridges The outboundQueueBridges to set.
067     */
068    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
069        this.outboundQueueBridges = outboundQueueBridges;
070    }
071
072    /**
073     * @return Returns the localQueueConnectionFactory.
074     */
075    public QueueConnectionFactory getLocalQueueConnectionFactory() {
076        return localQueueConnectionFactory;
077    }
078
079    /**
080     * @param localConnectionFactory The localQueueConnectionFactory to
081     *                set.
082     */
083    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
084        this.localQueueConnectionFactory = localConnectionFactory;
085    }
086
087    /**
088     * @return Returns the outboundQueueConnectionFactory.
089     */
090    public QueueConnectionFactory getOutboundQueueConnectionFactory() {
091        return outboundQueueConnectionFactory;
092    }
093
094    /**
095     * @return Returns the outboundQueueConnectionFactoryName.
096     */
097    public String getOutboundQueueConnectionFactoryName() {
098        return outboundQueueConnectionFactoryName;
099    }
100
101    /**
102     * @param foreignQueueConnectionFactoryName The
103     *                foreignQueueConnectionFactoryName to set.
104     */
105    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
106        this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
107    }
108
109    /**
110     * @return Returns the localConnectionFactoryName.
111     */
112    public String getLocalConnectionFactoryName() {
113        return localConnectionFactoryName;
114    }
115
116    /**
117     * @param localConnectionFactoryName The localConnectionFactoryName to set.
118     */
119    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
120        this.localConnectionFactoryName = localConnectionFactoryName;
121    }
122
123    /**
124     * @return Returns the localQueueConnection.
125     */
126    public QueueConnection getLocalQueueConnection() {
127        return (QueueConnection) localConnection.get();
128    }
129
130    /**
131     * @param localQueueConnection The localQueueConnection to set.
132     */
133    public void setLocalQueueConnection(QueueConnection localQueueConnection) {
134        this.localConnection.set(localQueueConnection);
135    }
136
137    /**
138     * @return Returns the outboundQueueConnection.
139     */
140    public QueueConnection getOutboundQueueConnection() {
141        return (QueueConnection) foreignConnection.get();
142    }
143
144    /**
145     * @param foreignQueueConnection The foreignQueueConnection to set.
146     */
147    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
148        this.foreignConnection.set(foreignQueueConnection);
149    }
150
151    /**
152     * @param foreignQueueConnectionFactory The foreignQueueConnectionFactory to set.
153     */
154    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
155        this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
156    }
157
158    @Override
159    protected void initializeForeignConnection() throws NamingException, JMSException {
160
161        final QueueConnection newConnection;
162
163        if (foreignConnection.get() == null) {
164            // get the connection factories
165            if (outboundQueueConnectionFactory == null) {
166                // look it up from JNDI
167                if (outboundQueueConnectionFactoryName != null) {
168                    outboundQueueConnectionFactory = jndiOutboundTemplate
169                        .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
170                    if (outboundUsername != null) {
171                        newConnection = outboundQueueConnectionFactory
172                            .createQueueConnection(outboundUsername, outboundPassword);
173                    } else {
174                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
175                    }
176                } else {
177                    throw new JMSException("Cannot create foreignConnection - no information");
178                }
179            } else {
180                if (outboundUsername != null) {
181                    newConnection = outboundQueueConnectionFactory
182                        .createQueueConnection(outboundUsername, outboundPassword);
183                } else {
184                    newConnection = outboundQueueConnectionFactory.createQueueConnection();
185                }
186            }
187        } else {
188            // Clear if for now in case something goes wrong during the init.
189            newConnection = (QueueConnection) foreignConnection.getAndSet(null);
190        }
191
192        if (outboundClientId != null && outboundClientId.length() > 0) {
193            newConnection.setClientID(getOutboundClientId());
194        }
195        newConnection.start();
196
197        outboundMessageConvertor.setConnection(newConnection);
198
199        // Configure the bridges with the new Outbound connection.
200        initializeInboundDestinationBridgesOutboundSide(newConnection);
201        initializeOutboundDestinationBridgesOutboundSide(newConnection);
202
203        // Register for any async error notifications now so we can reset in the
204        // case where there's not a lot of activity and a connection drops.
205        newConnection.setExceptionListener(new ExceptionListener() {
206            @Override
207            public void onException(JMSException exception) {
208                handleConnectionFailure(newConnection);
209            }
210        });
211
212        // At this point all looks good, so this our current connection now.
213        foreignConnection.set(newConnection);
214    }
215
216    @Override
217    protected void initializeLocalConnection() throws NamingException, JMSException {
218
219        final QueueConnection newConnection;
220
221        if (localConnection.get() == null) {
222            // get the connection factories
223            if (localQueueConnectionFactory == null) {
224                if (embeddedConnectionFactory == null) {
225                    // look it up from JNDI
226                    if (localConnectionFactoryName != null) {
227                        localQueueConnectionFactory = jndiLocalTemplate
228                            .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
229                        if (localUsername != null) {
230                            newConnection = localQueueConnectionFactory
231                                .createQueueConnection(localUsername, localPassword);
232                        } else {
233                            newConnection = localQueueConnectionFactory.createQueueConnection();
234                        }
235                    } else {
236                        throw new JMSException("Cannot create localConnection - no information");
237                    }
238                } else {
239                    newConnection = embeddedConnectionFactory.createQueueConnection();
240                }
241            } else {
242                if (localUsername != null) {
243                    newConnection = localQueueConnectionFactory.
244                            createQueueConnection(localUsername, localPassword);
245                } else {
246                    newConnection = localQueueConnectionFactory.createQueueConnection();
247                }
248            }
249
250        } else {
251            // Clear if for now in case something goes wrong during the init.
252            newConnection = (QueueConnection) localConnection.getAndSet(null);
253        }
254
255        if (localClientId != null && localClientId.length() > 0) {
256            newConnection.setClientID(getLocalClientId());
257        }
258        newConnection.start();
259
260        inboundMessageConvertor.setConnection(newConnection);
261
262        // Configure the bridges with the new Local connection.
263        initializeInboundDestinationBridgesLocalSide(newConnection);
264        initializeOutboundDestinationBridgesLocalSide(newConnection);
265
266        // Register for any async error notifications now so we can reset in the
267        // case where there's not a lot of activity and a connection drops.
268        newConnection.setExceptionListener(new ExceptionListener() {
269            @Override
270            public void onException(JMSException exception) {
271                handleConnectionFailure(newConnection);
272            }
273        });
274
275        // At this point all looks good, so this our current connection now.
276        localConnection.set(newConnection);
277    }
278
279    protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
280        if (inboundQueueBridges != null) {
281            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
282
283            for (InboundQueueBridge bridge : inboundQueueBridges) {
284                String queueName = bridge.getInboundQueueName();
285                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
286                bridge.setConsumer(null);
287                bridge.setConsumerQueue(foreignQueue);
288                bridge.setConsumerConnection(connection);
289                bridge.setJmsConnector(this);
290                addInboundBridge(bridge);
291            }
292            outboundSession.close();
293        }
294    }
295
296    protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
297        if (inboundQueueBridges != null) {
298            QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
299
300            for (InboundQueueBridge bridge : inboundQueueBridges) {
301                String localQueueName = bridge.getLocalQueueName();
302                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
303                bridge.setProducerQueue(activemqQueue);
304                bridge.setProducerConnection(connection);
305                if (bridge.getJmsMessageConvertor() == null) {
306                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
307                }
308                bridge.setJmsConnector(this);
309                addInboundBridge(bridge);
310            }
311            localSession.close();
312        }
313    }
314
315    protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
316        if (outboundQueueBridges != null) {
317            QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
318
319            for (OutboundQueueBridge bridge : outboundQueueBridges) {
320                String queueName = bridge.getOutboundQueueName();
321                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
322                bridge.setProducerQueue(foreignQueue);
323                bridge.setProducerConnection(connection);
324                if (bridge.getJmsMessageConvertor() == null) {
325                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
326                }
327                bridge.setJmsConnector(this);
328                addOutboundBridge(bridge);
329            }
330            outboundSession.close();
331        }
332    }
333
334    protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
335        if (outboundQueueBridges != null) {
336            QueueSession localSession =
337                    connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
338
339            for (OutboundQueueBridge bridge : outboundQueueBridges) {
340                String localQueueName = bridge.getLocalQueueName();
341                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
342                bridge.setConsumer(null);
343                bridge.setConsumerQueue(activemqQueue);
344                bridge.setConsumerConnection(connection);
345                bridge.setJmsConnector(this);
346                addOutboundBridge(bridge);
347            }
348            localSession.close();
349        }
350    }
351
352    @Override
353    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
354                                              Connection replyToConsumerConnection) {
355        Queue replyToProducerQueue = (Queue)destination;
356        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
357
358        if (isInbound) {
359            InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
360            if (bridge == null) {
361                bridge = new InboundQueueBridge() {
362                    @Override
363                    protected Destination processReplyToDestination(Destination destination) {
364                        return null;
365                    }
366                };
367                try {
368                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
369                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
370                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
371                    replyToConsumerSession.close();
372                    bridge.setConsumerQueue(replyToConsumerQueue);
373                    bridge.setProducerQueue(replyToProducerQueue);
374                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
375                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
376                    bridge.setDoHandleReplyTo(false);
377                    if (bridge.getJmsMessageConvertor() == null) {
378                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
379                    }
380                    bridge.setJmsConnector(this);
381                    bridge.start();
382                    LOG.info("Created replyTo bridge for {}", replyToProducerQueue);
383                } catch (Exception e) {
384                    LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e);
385                    return null;
386                }
387                replyToBridges.put(replyToProducerQueue, bridge);
388            }
389            return bridge.getConsumerQueue();
390        } else {
391            OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
392            if (bridge == null) {
393                bridge = new OutboundQueueBridge() {
394                    @Override
395                    protected Destination processReplyToDestination(Destination destination) {
396                        return null;
397                    }
398                };
399                try {
400                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
401                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
402                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
403                    replyToConsumerSession.close();
404                    bridge.setConsumerQueue(replyToConsumerQueue);
405                    bridge.setProducerQueue(replyToProducerQueue);
406                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
407                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
408                    bridge.setDoHandleReplyTo(false);
409                    if (bridge.getJmsMessageConvertor() == null) {
410                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
411                    }
412                    bridge.setJmsConnector(this);
413                    bridge.start();
414                    LOG.info("Created replyTo bridge for {}", replyToProducerQueue);
415                } catch (Exception e) {
416                    LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e);
417                    return null;
418                }
419                replyToBridges.put(replyToProducerQueue, bridge);
420            }
421            return bridge.getConsumerQueue();
422        }
423    }
424
425    protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
426        return session.createQueue(queueName);
427    }
428
429    protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
430        Queue result = null;
431
432        if (preferJndiDestinationLookup) {
433            try {
434                // look-up the Queue
435                result = jndiOutboundTemplate.lookup(queueName, Queue.class);
436            } catch (NamingException e) {
437                try {
438                    result = session.createQueue(queueName);
439                } catch (JMSException e1) {
440                    String errStr = "Failed to look-up or create Queue for name: " + queueName;
441                    LOG.error(errStr, e);
442                    JMSException jmsEx = new JMSException(errStr);
443                    jmsEx.setLinkedException(e1);
444                    throw jmsEx;
445                }
446            }
447        } else {
448            try {
449                result = session.createQueue(queueName);
450            } catch (JMSException e) {
451                // look-up the Queue
452                try {
453                    result = jndiOutboundTemplate.lookup(queueName, Queue.class);
454                } catch (NamingException e1) {
455                    String errStr = "Failed to look-up Queue for name: " + queueName;
456                    LOG.error(errStr, e);
457                    JMSException jmsEx = new JMSException(errStr);
458                    jmsEx.setLinkedException(e1);
459                    throw jmsEx;
460                }
461            }
462        }
463
464        return result;
465    }
466}