001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.concurrent.atomic.AtomicBoolean;
020
021import javax.jms.Connection;
022import javax.jms.Destination;
023import javax.jms.JMSException;
024import javax.jms.Message;
025import javax.jms.MessageConsumer;
026import javax.jms.MessageListener;
027import javax.jms.MessageProducer;
028
029import org.apache.activemq.Service;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Destination bridge is used to bridge between to different JMS systems
035 */
036public abstract class DestinationBridge implements Service, MessageListener {
037
038    private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
039
040    protected MessageConsumer consumer;
041    protected AtomicBoolean started = new AtomicBoolean(false);
042    protected JmsMesageConvertor jmsMessageConvertor;
043    protected boolean doHandleReplyTo = true;
044    protected JmsConnector jmsConnector;
045
046    /**
047     * @return Returns the consumer.
048     */
049    public MessageConsumer getConsumer() {
050        return consumer;
051    }
052
053    /**
054     * @param consumer The consumer to set.
055     */
056    public void setConsumer(MessageConsumer consumer) {
057        this.consumer = consumer;
058    }
059
060    /**
061     * @param connector
062     */
063    public void setJmsConnector(JmsConnector connector) {
064        this.jmsConnector = connector;
065    }
066
067    /**
068     * @return Returns the inboundMessageConvertor.
069     */
070    public JmsMesageConvertor getJmsMessageConvertor() {
071        return jmsMessageConvertor;
072    }
073
074    /**
075     * @param jmsMessageConvertor
076     */
077    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
078        this.jmsMessageConvertor = jmsMessageConvertor;
079    }
080
081    protected Destination processReplyToDestination(Destination destination) {
082        return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
083    }
084
085    @Override
086    public void start() throws Exception {
087        if (started.compareAndSet(false, true)) {
088            createConsumer();
089            createProducer();
090        }
091    }
092
093    @Override
094    public void stop() throws Exception {
095        started.set(false);
096    }
097
098    @Override
099    public void onMessage(Message message) {
100
101        int attempt = 0;
102        final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
103
104        while (started.get() && message != null && (maxRetries == ReconnectionPolicy.INFINITE || attempt <= maxRetries)) {
105
106            try {
107
108                if (attempt++ > 0) {
109                    try {
110                        Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
111                    } catch(InterruptedException e) {
112                        break;
113                    }
114                }
115
116                Message converted;
117                if (jmsMessageConvertor != null) {
118                    if (doHandleReplyTo) {
119                        Destination replyTo = message.getJMSReplyTo();
120                        if (replyTo != null) {
121                            converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
122                        } else {
123                            converted = jmsMessageConvertor.convert(message);
124                        }
125                    } else {
126                        message.setJMSReplyTo(null);
127                        converted = jmsMessageConvertor.convert(message);
128                    }
129                } else {
130                    // The Producer side is not up or not yet configured, retry.
131                    continue;
132                }
133
134                try {
135                    sendMessage(converted);
136                } catch(Exception e) {
137                    jmsConnector.handleConnectionFailure(getConnectionForProducer());
138                    continue;
139                }
140
141                try {
142                    message.acknowledge();
143                } catch(Exception e) {
144                    jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
145                    continue;
146                }
147
148                // if we got here then it made it out and was ack'd
149                return;
150
151            } catch (Exception e) {
152                LOG.info("failed to forward message on attempt: {} reason: {} message: {}", new Object[]{ attempt, e, message }, e);
153            }
154        }
155    }
156
157    /**
158     * @return Returns the doHandleReplyTo.
159     */
160    public boolean isDoHandleReplyTo() {
161        return doHandleReplyTo;
162    }
163
164    /**
165     * @param doHandleReplyTo The doHandleReplyTo to set.
166     */
167    public void setDoHandleReplyTo(boolean doHandleReplyTo) {
168        this.doHandleReplyTo = doHandleReplyTo;
169    }
170
171    protected abstract MessageConsumer createConsumer() throws JMSException;
172
173    protected abstract MessageProducer createProducer() throws JMSException;
174
175    protected abstract void sendMessage(Message message) throws JMSException;
176
177    protected abstract Connection getConnnectionForConsumer();
178
179    protected abstract Connection getConnectionForProducer();
180
181}