001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.ServerSocketChannel;
031import java.nio.channels.SocketChannel;
032import java.util.HashMap;
033import java.util.concurrent.BlockingQueue;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import javax.net.ServerSocketFactory;
039import javax.net.ssl.SSLParameters;
040import javax.net.ssl.SSLServerSocket;
041
042import org.apache.activemq.Service;
043import org.apache.activemq.ThreadPriorities;
044import org.apache.activemq.TransportLoggerSupport;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.openwire.OpenWireFormatFactory;
047import org.apache.activemq.transport.Transport;
048import org.apache.activemq.transport.TransportFactory;
049import org.apache.activemq.transport.TransportServer;
050import org.apache.activemq.transport.TransportServerThreadSupport;
051import org.apache.activemq.transport.nio.SelectorManager;
052import org.apache.activemq.transport.nio.SelectorSelection;
053import org.apache.activemq.util.IOExceptionSupport;
054import org.apache.activemq.util.InetAddressUtil;
055import org.apache.activemq.util.IntrospectionSupport;
056import org.apache.activemq.util.ServiceListener;
057import org.apache.activemq.util.ServiceStopper;
058import org.apache.activemq.util.ServiceSupport;
059import org.apache.activemq.wireformat.WireFormat;
060import org.apache.activemq.wireformat.WireFormatFactory;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * A TCP based implementation of {@link TransportServer}
066 */
067public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
068
069    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
070    protected ServerSocket serverSocket;
071    protected SelectorSelection selector;
072    protected int backlog = 5000;
073    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
074    protected final TcpTransportFactory transportFactory;
075    protected long maxInactivityDuration = 30000;
076    protected long maxInactivityDurationInitalDelay = 10000;
077    protected int minmumWireFormatVersion;
078    protected boolean useQueueForAccept = true;
079    protected boolean allowLinkStealing;
080    protected boolean verifyHostName = false;
081
082    /**
083     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
084     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
085     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
086     * TransportConnector URIs.
087     */
088    protected boolean trace = false;
089
090    protected int soTimeout = 0;
091    protected int socketBufferSize = 64 * 1024;
092    protected int connectionTimeout = 30000;
093
094    /**
095     * Name of the LogWriter implementation to use. Names are mapped to classes in the
096     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
097     * set in Connection or TransportConnector URIs.
098     */
099    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
100
101    /**
102     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
103     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
104     */
105    protected boolean dynamicManagement = false;
106
107    /**
108     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
109     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
110     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
111     * TransportConnector URIs.
112     */
113    protected boolean startLogging = true;
114    protected final ServerSocketFactory serverSocketFactory;
115    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
116    protected Thread socketHandlerThread;
117
118    /**
119     * The maximum number of sockets allowed for this server
120     */
121    protected int maximumConnections = Integer.MAX_VALUE;
122    protected AtomicInteger currentTransportCount = new AtomicInteger();
123
124    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
125        URISyntaxException {
126        super(location);
127        this.transportFactory = transportFactory;
128        this.serverSocketFactory = serverSocketFactory;
129    }
130
131    public void bind() throws IOException {
132        URI bind = getBindLocation();
133
134        String host = bind.getHost();
135        host = (host == null || host.length() == 0) ? "localhost" : host;
136        InetAddress addr = InetAddress.getByName(host);
137
138        try {
139            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
140            configureServerSocket(this.serverSocket);
141        } catch (IOException e) {
142            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
143        }
144        try {
145            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
146                bind.getQuery(), bind.getFragment()));
147        } catch (URISyntaxException e) {
148
149            // it could be that the host name contains invalid characters such
150            // as _ on unix platforms so lets try use the IP address instead
151            try {
152                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
153                    bind.getQuery(), bind.getFragment()));
154            } catch (URISyntaxException e2) {
155                throw IOExceptionSupport.create(e2);
156            }
157        }
158    }
159
160    private void configureServerSocket(ServerSocket socket) throws SocketException {
161        socket.setSoTimeout(2000);
162        if (transportOptions != null) {
163
164            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
165            // to SSLServerSocket to configure it has a side effect on the socket rendering it
166            // useless as all suites are enabled many of which are considered as insecure.  We
167            // instead trap that option here and throw an exception.  We should really consider
168            // all invalid options as breaking and not start the transport but the current design
169            // doesn't really allow for this.
170            //
171            //  see: https://issues.apache.org/jira/browse/AMQ-4582
172            //
173            if (socket instanceof SSLServerSocket) {
174                if (transportOptions.containsKey("verifyHostName")) {
175                    verifyHostName = Boolean.parseBoolean(transportOptions.get("verifyHostName").toString());
176                } else {
177                    transportOptions.put("verifyHostName", verifyHostName);
178                }
179
180                if (verifyHostName) {
181                    SSLParameters sslParams = new SSLParameters();
182                    sslParams.setEndpointIdentificationAlgorithm("HTTPS");
183                    ((SSLServerSocket)this.serverSocket).setSSLParameters(sslParams);
184                }
185
186                if (transportOptions.containsKey("enabledCipherSuites")) {
187                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
188
189                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
190                        throw new SocketException(String.format(
191                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
192                    }
193                }
194
195            }
196
197            IntrospectionSupport.setProperties(socket, transportOptions);
198        }
199    }
200
201    /**
202     * @return Returns the wireFormatFactory.
203     */
204    public WireFormatFactory getWireFormatFactory() {
205        return wireFormatFactory;
206    }
207
208    /**
209     * @param wireFormatFactory
210     *            The wireFormatFactory to set.
211     */
212    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
213        this.wireFormatFactory = wireFormatFactory;
214    }
215
216    /**
217     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
218     * broker.
219     *
220     * @param brokerInfo
221     */
222    @Override
223    public void setBrokerInfo(BrokerInfo brokerInfo) {
224    }
225
226    public long getMaxInactivityDuration() {
227        return maxInactivityDuration;
228    }
229
230    public void setMaxInactivityDuration(long maxInactivityDuration) {
231        this.maxInactivityDuration = maxInactivityDuration;
232    }
233
234    public long getMaxInactivityDurationInitalDelay() {
235        return this.maxInactivityDurationInitalDelay;
236    }
237
238    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
239        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
240    }
241
242    public int getMinmumWireFormatVersion() {
243        return minmumWireFormatVersion;
244    }
245
246    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
247        this.minmumWireFormatVersion = minmumWireFormatVersion;
248    }
249
250    public boolean isTrace() {
251        return trace;
252    }
253
254    public void setTrace(boolean trace) {
255        this.trace = trace;
256    }
257
258    public String getLogWriterName() {
259        return logWriterName;
260    }
261
262    public void setLogWriterName(String logFormat) {
263        this.logWriterName = logFormat;
264    }
265
266    public boolean isDynamicManagement() {
267        return dynamicManagement;
268    }
269
270    public void setDynamicManagement(boolean useJmx) {
271        this.dynamicManagement = useJmx;
272    }
273
274    public boolean isStartLogging() {
275        return startLogging;
276    }
277
278    public void setStartLogging(boolean startLogging) {
279        this.startLogging = startLogging;
280    }
281
282    /**
283     * @return the backlog
284     */
285    public int getBacklog() {
286        return backlog;
287    }
288
289    /**
290     * @param backlog
291     *            the backlog to set
292     */
293    public void setBacklog(int backlog) {
294        this.backlog = backlog;
295    }
296
297    /**
298     * @return the useQueueForAccept
299     */
300    public boolean isUseQueueForAccept() {
301        return useQueueForAccept;
302    }
303
304    /**
305     * @param useQueueForAccept
306     *            the useQueueForAccept to set
307     */
308    public void setUseQueueForAccept(boolean useQueueForAccept) {
309        this.useQueueForAccept = useQueueForAccept;
310    }
311
312    /**
313     * pull Sockets from the ServerSocket
314     */
315    @Override
316    public void run() {
317        final ServerSocketChannel chan = serverSocket.getChannel();
318        if (chan != null) {
319            try {
320                chan.configureBlocking(false);
321                selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() {
322                    @Override
323                    public void onSelect(SelectorSelection sel) {
324                        try {
325                            SocketChannel sc = chan.accept();
326                            if (sc != null) {
327                                if (isStopped() || getAcceptListener() == null) {
328                                    sc.close();
329                                } else {
330                                    if (useQueueForAccept) {
331                                        socketQueue.put(sc.socket());
332                                    } else {
333                                        handleSocket(sc.socket());
334                                    }
335                                }
336                            }
337                        } catch (Exception e) {
338                            onError(sel, e);
339                        }
340                    }
341                    @Override
342                    public void onError(SelectorSelection sel, Throwable error) {
343                        Exception e = null;
344                        if (error instanceof Exception) {
345                            e = (Exception)error;
346                        } else {
347                            e = new Exception(error);
348                        }
349                        if (!isStopping()) {
350                            onAcceptError(e);
351                        } else if (!isStopped()) {
352                            LOG.warn("run()", e);
353                            onAcceptError(e);
354                        }
355                    }
356                });
357                selector.setInterestOps(SelectionKey.OP_ACCEPT);
358                selector.enable();
359            } catch (IOException ex) {
360                selector = null;
361            }
362        } else {
363            while (!isStopped()) {
364                Socket socket = null;
365                try {
366                    socket = serverSocket.accept();
367                    if (socket != null) {
368                        if (isStopped() || getAcceptListener() == null) {
369                            socket.close();
370                        } else {
371                            if (useQueueForAccept) {
372                                socketQueue.put(socket);
373                            } else {
374                                handleSocket(socket);
375                            }
376                        }
377                    }
378                } catch (SocketTimeoutException ste) {
379                    // expect this to happen
380                } catch (Exception e) {
381                    if (!isStopping()) {
382                        onAcceptError(e);
383                    } else if (!isStopped()) {
384                        LOG.warn("run()", e);
385                        onAcceptError(e);
386                    }
387                }
388            }
389        }
390    }
391
392    /**
393     * Allow derived classes to override the Transport implementation that this transport server creates.
394     *
395     * @param socket
396     * @param format
397     *
398     * @return a new Transport instance.
399     *
400     * @throws IOException
401     */
402    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
403        return new TcpTransport(format, socket);
404    }
405
406    /**
407     * @return pretty print of this
408     */
409    @Override
410    public String toString() {
411        return "" + getBindLocation();
412    }
413
414    /**
415     * @param socket
416     * @param bindAddress
417     * @return real hostName
418     * @throws UnknownHostException
419     */
420    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
421        String result = null;
422        if (socket.isBound()) {
423            if (socket.getInetAddress().isAnyLocalAddress()) {
424                // make it more human readable and useful, an alternative to 0.0.0.0
425                result = InetAddressUtil.getLocalHostName();
426            } else {
427                result = socket.getInetAddress().getCanonicalHostName();
428            }
429        } else {
430            result = bindAddress.getCanonicalHostName();
431        }
432        return result;
433    }
434
435    @Override
436    protected void doStart() throws Exception {
437        if (useQueueForAccept) {
438            Runnable run = new Runnable() {
439                @Override
440                public void run() {
441                    try {
442                        while (!isStopped() && !isStopping()) {
443                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
444                            if (sock != null) {
445                                try {
446                                    handleSocket(sock);
447                                } catch (Throwable thrown) {
448                                    if (!isStopping()) {
449                                        onAcceptError(new Exception(thrown));
450                                    } else if (!isStopped()) {
451                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
452                                        onAcceptError(new Exception(thrown));
453                                    }
454                                }
455                            }
456                        }
457
458                    } catch (InterruptedException e) {
459                        if (!isStopped() || !isStopping()) {
460                            LOG.info("socketQueue interrupted - stopping");
461                            onAcceptError(e);
462                        }
463                    }
464                }
465            };
466            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
467            socketHandlerThread.setDaemon(true);
468            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
469            socketHandlerThread.start();
470        }
471        super.doStart();
472    }
473
474    @Override
475    protected void doStop(ServiceStopper stopper) throws Exception {
476        if (selector != null) {
477            selector.disable();
478            selector.close();
479            selector = null;
480        }
481        if (serverSocket != null) {
482            serverSocket.close();
483            serverSocket = null;
484        }
485        if (socketHandlerThread != null) {
486            socketHandlerThread.interrupt();
487            socketHandlerThread = null;
488        }
489        super.doStop(stopper);
490    }
491
492    @Override
493    public InetSocketAddress getSocketAddress() {
494        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
495    }
496
497    protected void handleSocket(Socket socket) {
498        doHandleSocket(socket);
499    }
500
501    final protected void doHandleSocket(Socket socket) {
502        boolean closeSocket = true;
503        boolean countIncremented = false;
504        try {
505            int currentCount;
506            do {
507                currentCount = currentTransportCount.get();
508                if (currentCount >= this.maximumConnections) {
509                     throw new ExceededMaximumConnectionsException(
510                         "Exceeded the maximum number of allowed client connections. See the '" +
511                         "maximumConnections' property on the TCP transport configuration URI " +
512                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
513                 }
514
515            //Increment this value before configuring the transport
516            //This is necessary because some of the transport servers must read from the
517            //socket during configureTransport() so we want to make sure this value is
518            //accurate as the transport server could pause here waiting for data to be sent from a client
519            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
520            countIncremented = true;
521
522            HashMap<String, Object> options = new HashMap<String, Object>();
523            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
524            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
525            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
526            options.put("trace", Boolean.valueOf(trace));
527            options.put("soTimeout", Integer.valueOf(soTimeout));
528            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
529            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
530            options.put("logWriterName", logWriterName);
531            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
532            options.put("startLogging", Boolean.valueOf(startLogging));
533            options.putAll(transportOptions);
534
535            TransportInfo transportInfo = configureTransport(this, socket);
536            closeSocket = false;
537
538            if (transportInfo.transport instanceof ServiceSupport) {
539                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
540            }
541
542            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
543                    transportInfo.transport, transportInfo.format, options);
544
545            getAcceptListener().onAccept(configuredTransport);
546
547        } catch (SocketTimeoutException ste) {
548            // expect this to happen
549        } catch (Exception e) {
550            if (closeSocket) {
551                try {
552                    //if closing the socket, only decrement the count it was actually incremented
553                    //where it was incremented
554                    if (countIncremented) {
555                        currentTransportCount.decrementAndGet();
556                    }
557                    socket.close();
558                } catch (Exception ignore) {
559                }
560            }
561
562            if (!isStopping()) {
563                onAcceptError(e);
564            } else if (!isStopped()) {
565                LOG.warn("run()", e);
566                onAcceptError(e);
567            }
568        }
569    }
570
571    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
572        WireFormat format = wireFormatFactory.createWireFormat();
573        Transport transport = createTransport(socket, format);
574        return new TransportInfo(format, transport, transportFactory);
575    }
576
577    protected class TransportInfo {
578        final WireFormat format;
579        final Transport transport;
580        final TransportFactory transportFactory;
581
582        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
583            this.format = format;
584            this.transport = transport;
585            this.transportFactory = transportFactory;
586        }
587    }
588
589    public int getSoTimeout() {
590        return soTimeout;
591    }
592
593    public void setSoTimeout(int soTimeout) {
594        this.soTimeout = soTimeout;
595    }
596
597    public int getSocketBufferSize() {
598        return socketBufferSize;
599    }
600
601    public void setSocketBufferSize(int socketBufferSize) {
602        this.socketBufferSize = socketBufferSize;
603    }
604
605    public int getConnectionTimeout() {
606        return connectionTimeout;
607    }
608
609    public void setConnectionTimeout(int connectionTimeout) {
610        this.connectionTimeout = connectionTimeout;
611    }
612
613    /**
614     * @return the maximumConnections
615     */
616    public int getMaximumConnections() {
617        return maximumConnections;
618    }
619
620    /**
621     * @param maximumConnections
622     *            the maximumConnections to set
623     */
624    public void setMaximumConnections(int maximumConnections) {
625        this.maximumConnections = maximumConnections;
626    }
627
628    public AtomicInteger getCurrentTransportCount() {
629        return currentTransportCount;
630    }
631
632    @Override
633    public void started(Service service) {
634    }
635
636    @Override
637    public void stopped(Service service) {
638        this.currentTransportCount.decrementAndGet();
639    }
640
641    @Override
642    public boolean isSslServer() {
643        return false;
644    }
645
646    @Override
647    public boolean isAllowLinkStealing() {
648        return allowLinkStealing;
649    }
650
651    @Override
652    public void setAllowLinkStealing(boolean allowLinkStealing) {
653        this.allowLinkStealing = allowLinkStealing;
654    }
655}