001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.SelectionKey; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.HashMap; 033import java.util.concurrent.BlockingQueue; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import javax.net.ServerSocketFactory; 039import javax.net.ssl.SSLParameters; 040import javax.net.ssl.SSLServerSocket; 041 042import org.apache.activemq.Service; 043import org.apache.activemq.ThreadPriorities; 044import org.apache.activemq.TransportLoggerSupport; 045import org.apache.activemq.command.BrokerInfo; 046import org.apache.activemq.openwire.OpenWireFormatFactory; 047import org.apache.activemq.transport.Transport; 048import org.apache.activemq.transport.TransportFactory; 049import org.apache.activemq.transport.TransportServer; 050import org.apache.activemq.transport.TransportServerThreadSupport; 051import org.apache.activemq.transport.nio.SelectorManager; 052import org.apache.activemq.transport.nio.SelectorSelection; 053import org.apache.activemq.util.IOExceptionSupport; 054import org.apache.activemq.util.InetAddressUtil; 055import org.apache.activemq.util.IntrospectionSupport; 056import org.apache.activemq.util.ServiceListener; 057import org.apache.activemq.util.ServiceStopper; 058import org.apache.activemq.util.ServiceSupport; 059import org.apache.activemq.wireformat.WireFormat; 060import org.apache.activemq.wireformat.WireFormatFactory; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * A TCP based implementation of {@link TransportServer} 066 */ 067public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 068 069 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 070 protected ServerSocket serverSocket; 071 protected SelectorSelection selector; 072 protected int backlog = 5000; 073 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 074 protected final TcpTransportFactory transportFactory; 075 protected long maxInactivityDuration = 30000; 076 protected long maxInactivityDurationInitalDelay = 10000; 077 protected int minmumWireFormatVersion; 078 protected boolean useQueueForAccept = true; 079 protected boolean allowLinkStealing; 080 protected boolean verifyHostName = false; 081 082 /** 083 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 084 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 085 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 086 * TransportConnector URIs. 087 */ 088 protected boolean trace = false; 089 090 protected int soTimeout = 0; 091 protected int socketBufferSize = 64 * 1024; 092 protected int connectionTimeout = 30000; 093 094 /** 095 * Name of the LogWriter implementation to use. Names are mapped to classes in the 096 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 097 * set in Connection or TransportConnector URIs. 098 */ 099 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 100 101 /** 102 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 103 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 104 */ 105 protected boolean dynamicManagement = false; 106 107 /** 108 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 109 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 110 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 111 * TransportConnector URIs. 112 */ 113 protected boolean startLogging = true; 114 protected final ServerSocketFactory serverSocketFactory; 115 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 116 protected Thread socketHandlerThread; 117 118 /** 119 * The maximum number of sockets allowed for this server 120 */ 121 protected int maximumConnections = Integer.MAX_VALUE; 122 protected AtomicInteger currentTransportCount = new AtomicInteger(); 123 124 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 125 URISyntaxException { 126 super(location); 127 this.transportFactory = transportFactory; 128 this.serverSocketFactory = serverSocketFactory; 129 } 130 131 public void bind() throws IOException { 132 URI bind = getBindLocation(); 133 134 String host = bind.getHost(); 135 host = (host == null || host.length() == 0) ? "localhost" : host; 136 InetAddress addr = InetAddress.getByName(host); 137 138 try { 139 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 140 configureServerSocket(this.serverSocket); 141 } catch (IOException e) { 142 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 143 } 144 try { 145 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 146 bind.getQuery(), bind.getFragment())); 147 } catch (URISyntaxException e) { 148 149 // it could be that the host name contains invalid characters such 150 // as _ on unix platforms so lets try use the IP address instead 151 try { 152 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 153 bind.getQuery(), bind.getFragment())); 154 } catch (URISyntaxException e2) { 155 throw IOExceptionSupport.create(e2); 156 } 157 } 158 } 159 160 private void configureServerSocket(ServerSocket socket) throws SocketException { 161 socket.setSoTimeout(2000); 162 if (transportOptions != null) { 163 164 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 165 // to SSLServerSocket to configure it has a side effect on the socket rendering it 166 // useless as all suites are enabled many of which are considered as insecure. We 167 // instead trap that option here and throw an exception. We should really consider 168 // all invalid options as breaking and not start the transport but the current design 169 // doesn't really allow for this. 170 // 171 // see: https://issues.apache.org/jira/browse/AMQ-4582 172 // 173 if (socket instanceof SSLServerSocket) { 174 if (transportOptions.containsKey("verifyHostName")) { 175 verifyHostName = Boolean.parseBoolean(transportOptions.get("verifyHostName").toString()); 176 } else { 177 transportOptions.put("verifyHostName", verifyHostName); 178 } 179 180 if (verifyHostName) { 181 SSLParameters sslParams = new SSLParameters(); 182 sslParams.setEndpointIdentificationAlgorithm("HTTPS"); 183 ((SSLServerSocket)this.serverSocket).setSSLParameters(sslParams); 184 } 185 186 if (transportOptions.containsKey("enabledCipherSuites")) { 187 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 188 189 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 190 throw new SocketException(String.format( 191 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 192 } 193 } 194 195 } 196 197 IntrospectionSupport.setProperties(socket, transportOptions); 198 } 199 } 200 201 /** 202 * @return Returns the wireFormatFactory. 203 */ 204 public WireFormatFactory getWireFormatFactory() { 205 return wireFormatFactory; 206 } 207 208 /** 209 * @param wireFormatFactory 210 * The wireFormatFactory to set. 211 */ 212 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 213 this.wireFormatFactory = wireFormatFactory; 214 } 215 216 /** 217 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 218 * broker. 219 * 220 * @param brokerInfo 221 */ 222 @Override 223 public void setBrokerInfo(BrokerInfo brokerInfo) { 224 } 225 226 public long getMaxInactivityDuration() { 227 return maxInactivityDuration; 228 } 229 230 public void setMaxInactivityDuration(long maxInactivityDuration) { 231 this.maxInactivityDuration = maxInactivityDuration; 232 } 233 234 public long getMaxInactivityDurationInitalDelay() { 235 return this.maxInactivityDurationInitalDelay; 236 } 237 238 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 239 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 240 } 241 242 public int getMinmumWireFormatVersion() { 243 return minmumWireFormatVersion; 244 } 245 246 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 247 this.minmumWireFormatVersion = minmumWireFormatVersion; 248 } 249 250 public boolean isTrace() { 251 return trace; 252 } 253 254 public void setTrace(boolean trace) { 255 this.trace = trace; 256 } 257 258 public String getLogWriterName() { 259 return logWriterName; 260 } 261 262 public void setLogWriterName(String logFormat) { 263 this.logWriterName = logFormat; 264 } 265 266 public boolean isDynamicManagement() { 267 return dynamicManagement; 268 } 269 270 public void setDynamicManagement(boolean useJmx) { 271 this.dynamicManagement = useJmx; 272 } 273 274 public boolean isStartLogging() { 275 return startLogging; 276 } 277 278 public void setStartLogging(boolean startLogging) { 279 this.startLogging = startLogging; 280 } 281 282 /** 283 * @return the backlog 284 */ 285 public int getBacklog() { 286 return backlog; 287 } 288 289 /** 290 * @param backlog 291 * the backlog to set 292 */ 293 public void setBacklog(int backlog) { 294 this.backlog = backlog; 295 } 296 297 /** 298 * @return the useQueueForAccept 299 */ 300 public boolean isUseQueueForAccept() { 301 return useQueueForAccept; 302 } 303 304 /** 305 * @param useQueueForAccept 306 * the useQueueForAccept to set 307 */ 308 public void setUseQueueForAccept(boolean useQueueForAccept) { 309 this.useQueueForAccept = useQueueForAccept; 310 } 311 312 /** 313 * pull Sockets from the ServerSocket 314 */ 315 @Override 316 public void run() { 317 final ServerSocketChannel chan = serverSocket.getChannel(); 318 if (chan != null) { 319 try { 320 chan.configureBlocking(false); 321 selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() { 322 @Override 323 public void onSelect(SelectorSelection sel) { 324 try { 325 SocketChannel sc = chan.accept(); 326 if (sc != null) { 327 if (isStopped() || getAcceptListener() == null) { 328 sc.close(); 329 } else { 330 if (useQueueForAccept) { 331 socketQueue.put(sc.socket()); 332 } else { 333 handleSocket(sc.socket()); 334 } 335 } 336 } 337 } catch (Exception e) { 338 onError(sel, e); 339 } 340 } 341 @Override 342 public void onError(SelectorSelection sel, Throwable error) { 343 Exception e = null; 344 if (error instanceof Exception) { 345 e = (Exception)error; 346 } else { 347 e = new Exception(error); 348 } 349 if (!isStopping()) { 350 onAcceptError(e); 351 } else if (!isStopped()) { 352 LOG.warn("run()", e); 353 onAcceptError(e); 354 } 355 } 356 }); 357 selector.setInterestOps(SelectionKey.OP_ACCEPT); 358 selector.enable(); 359 } catch (IOException ex) { 360 selector = null; 361 } 362 } else { 363 while (!isStopped()) { 364 Socket socket = null; 365 try { 366 socket = serverSocket.accept(); 367 if (socket != null) { 368 if (isStopped() || getAcceptListener() == null) { 369 socket.close(); 370 } else { 371 if (useQueueForAccept) { 372 socketQueue.put(socket); 373 } else { 374 handleSocket(socket); 375 } 376 } 377 } 378 } catch (SocketTimeoutException ste) { 379 // expect this to happen 380 } catch (Exception e) { 381 if (!isStopping()) { 382 onAcceptError(e); 383 } else if (!isStopped()) { 384 LOG.warn("run()", e); 385 onAcceptError(e); 386 } 387 } 388 } 389 } 390 } 391 392 /** 393 * Allow derived classes to override the Transport implementation that this transport server creates. 394 * 395 * @param socket 396 * @param format 397 * 398 * @return a new Transport instance. 399 * 400 * @throws IOException 401 */ 402 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 403 return new TcpTransport(format, socket); 404 } 405 406 /** 407 * @return pretty print of this 408 */ 409 @Override 410 public String toString() { 411 return "" + getBindLocation(); 412 } 413 414 /** 415 * @param socket 416 * @param bindAddress 417 * @return real hostName 418 * @throws UnknownHostException 419 */ 420 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 421 String result = null; 422 if (socket.isBound()) { 423 if (socket.getInetAddress().isAnyLocalAddress()) { 424 // make it more human readable and useful, an alternative to 0.0.0.0 425 result = InetAddressUtil.getLocalHostName(); 426 } else { 427 result = socket.getInetAddress().getCanonicalHostName(); 428 } 429 } else { 430 result = bindAddress.getCanonicalHostName(); 431 } 432 return result; 433 } 434 435 @Override 436 protected void doStart() throws Exception { 437 if (useQueueForAccept) { 438 Runnable run = new Runnable() { 439 @Override 440 public void run() { 441 try { 442 while (!isStopped() && !isStopping()) { 443 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 444 if (sock != null) { 445 try { 446 handleSocket(sock); 447 } catch (Throwable thrown) { 448 if (!isStopping()) { 449 onAcceptError(new Exception(thrown)); 450 } else if (!isStopped()) { 451 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 452 onAcceptError(new Exception(thrown)); 453 } 454 } 455 } 456 } 457 458 } catch (InterruptedException e) { 459 if (!isStopped() || !isStopping()) { 460 LOG.info("socketQueue interrupted - stopping"); 461 onAcceptError(e); 462 } 463 } 464 } 465 }; 466 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 467 socketHandlerThread.setDaemon(true); 468 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 469 socketHandlerThread.start(); 470 } 471 super.doStart(); 472 } 473 474 @Override 475 protected void doStop(ServiceStopper stopper) throws Exception { 476 if (selector != null) { 477 selector.disable(); 478 selector.close(); 479 selector = null; 480 } 481 if (serverSocket != null) { 482 serverSocket.close(); 483 serverSocket = null; 484 } 485 if (socketHandlerThread != null) { 486 socketHandlerThread.interrupt(); 487 socketHandlerThread = null; 488 } 489 super.doStop(stopper); 490 } 491 492 @Override 493 public InetSocketAddress getSocketAddress() { 494 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 495 } 496 497 protected void handleSocket(Socket socket) { 498 doHandleSocket(socket); 499 } 500 501 final protected void doHandleSocket(Socket socket) { 502 boolean closeSocket = true; 503 boolean countIncremented = false; 504 try { 505 int currentCount; 506 do { 507 currentCount = currentTransportCount.get(); 508 if (currentCount >= this.maximumConnections) { 509 throw new ExceededMaximumConnectionsException( 510 "Exceeded the maximum number of allowed client connections. See the '" + 511 "maximumConnections' property on the TCP transport configuration URI " + 512 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 513 } 514 515 //Increment this value before configuring the transport 516 //This is necessary because some of the transport servers must read from the 517 //socket during configureTransport() so we want to make sure this value is 518 //accurate as the transport server could pause here waiting for data to be sent from a client 519 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 520 countIncremented = true; 521 522 HashMap<String, Object> options = new HashMap<String, Object>(); 523 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 524 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 525 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 526 options.put("trace", Boolean.valueOf(trace)); 527 options.put("soTimeout", Integer.valueOf(soTimeout)); 528 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 529 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 530 options.put("logWriterName", logWriterName); 531 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 532 options.put("startLogging", Boolean.valueOf(startLogging)); 533 options.putAll(transportOptions); 534 535 TransportInfo transportInfo = configureTransport(this, socket); 536 closeSocket = false; 537 538 if (transportInfo.transport instanceof ServiceSupport) { 539 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 540 } 541 542 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 543 transportInfo.transport, transportInfo.format, options); 544 545 getAcceptListener().onAccept(configuredTransport); 546 547 } catch (SocketTimeoutException ste) { 548 // expect this to happen 549 } catch (Exception e) { 550 if (closeSocket) { 551 try { 552 //if closing the socket, only decrement the count it was actually incremented 553 //where it was incremented 554 if (countIncremented) { 555 currentTransportCount.decrementAndGet(); 556 } 557 socket.close(); 558 } catch (Exception ignore) { 559 } 560 } 561 562 if (!isStopping()) { 563 onAcceptError(e); 564 } else if (!isStopped()) { 565 LOG.warn("run()", e); 566 onAcceptError(e); 567 } 568 } 569 } 570 571 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 572 WireFormat format = wireFormatFactory.createWireFormat(); 573 Transport transport = createTransport(socket, format); 574 return new TransportInfo(format, transport, transportFactory); 575 } 576 577 protected class TransportInfo { 578 final WireFormat format; 579 final Transport transport; 580 final TransportFactory transportFactory; 581 582 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 583 this.format = format; 584 this.transport = transport; 585 this.transportFactory = transportFactory; 586 } 587 } 588 589 public int getSoTimeout() { 590 return soTimeout; 591 } 592 593 public void setSoTimeout(int soTimeout) { 594 this.soTimeout = soTimeout; 595 } 596 597 public int getSocketBufferSize() { 598 return socketBufferSize; 599 } 600 601 public void setSocketBufferSize(int socketBufferSize) { 602 this.socketBufferSize = socketBufferSize; 603 } 604 605 public int getConnectionTimeout() { 606 return connectionTimeout; 607 } 608 609 public void setConnectionTimeout(int connectionTimeout) { 610 this.connectionTimeout = connectionTimeout; 611 } 612 613 /** 614 * @return the maximumConnections 615 */ 616 public int getMaximumConnections() { 617 return maximumConnections; 618 } 619 620 /** 621 * @param maximumConnections 622 * the maximumConnections to set 623 */ 624 public void setMaximumConnections(int maximumConnections) { 625 this.maximumConnections = maximumConnections; 626 } 627 628 public AtomicInteger getCurrentTransportCount() { 629 return currentTransportCount; 630 } 631 632 @Override 633 public void started(Service service) { 634 } 635 636 @Override 637 public void stopped(Service service) { 638 this.currentTransportCount.decrementAndGet(); 639 } 640 641 @Override 642 public boolean isSslServer() { 643 return false; 644 } 645 646 @Override 647 public boolean isAllowLinkStealing() { 648 return allowLinkStealing; 649 } 650 651 @Override 652 public void setAllowLinkStealing(boolean allowLinkStealing) { 653 this.allowLinkStealing = allowLinkStealing; 654 } 655}