/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/ package org.apache.tomcat.util.net;
/** * Use System.inheritableChannel to obtain channel from stdin/stdout.
*/ privateboolean useInheritedChannel = false; publicvoid setUseInheritedChannel(boolean useInheritedChannel) { this.useInheritedChannel = useInheritedChannel; } publicboolean getUseInheritedChannel() { return useInheritedChannel; }
/** * Path for the Unix domain socket, used to create the socket address.
*/ private String unixDomainSocketPath = null; public String getUnixDomainSocketPath() { returnthis.unixDomainSocketPath; } publicvoid setUnixDomainSocketPath(String unixDomainSocketPath) { this.unixDomainSocketPath = unixDomainSocketPath;
}
/** * Permissions which will be set on the Unix domain socket if it is created.
*/ private String unixDomainSocketPathPermissions = null; public String getUnixDomainSocketPathPermissions() { returnthis.unixDomainSocketPathPermissions; } publicvoid setUnixDomainSocketPathPermissions(String unixDomainSocketPathPermissions) { this.unixDomainSocketPathPermissions = unixDomainSocketPathPermissions;
}
// --------------------------------------------------------- Public Methods
/** * Number of keep-alive sockets. * * @return The number of sockets currently in the keep-alive state waiting * for the next request to be received on the socket
*/ publicint getKeepAliveCount() { if (poller == null) { return 0;
} else { return poller.getKeyCount();
}
}
@Override protectedvoid unlockAccept() { if (getUnixDomainSocketPath() == null) { super.unlockAccept();
} else { // Only try to unlock the acceptor if it is necessary if (acceptor == null || acceptor.getState() != AcceptorState.RUNNING) { return;
} try {
SocketAddress sa = JreCompat.getInstance().getUnixDomainSocketAddress(getUnixDomainSocketPath()); try (SocketChannel socket = JreCompat.getInstance().openUnixDomainSocketChannel()) { // With a UDS, expect no delay connecting and no defer accept
socket.connect(sa);
} // Wait for up to 1000ms acceptor threads to unlock long waitLeft = 1000; while (waitLeft > 0 &&
acceptor.getState() == AcceptorState.RUNNING) { Thread.sleep(5);
waitLeft -= 5;
}
} catch(Throwable t) {
ExceptionUtils.handleThrowable(t); if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString( "endpoint.debug.unlock.fail", String.valueOf(getPortWithOffset())), t);
}
}
}
}
/** * Process the specified connection. * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately
*/
@Override protectedboolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null; try { // Allocate channel and wrapper
NioChannel channel = null; if (nioChannels != null) {
channel = nioChannels.pop();
} if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer()); if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties // Disable blocking, polling will be used
socket.configureBlocking(false); if (getUnixDomainSocketPath() == null) {
socketProperties.setProperties(socket.socket());
}
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
poller.register(socketWrapper); returntrue;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t); try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
} if (socketWrapper == null) {
destroySocket(socket);
}
} // Tell to close the socket if needed returnfalse;
}
private AtomicLong wakeupCounter = new AtomicLong(0);
privatevolatileint keyCount = 0;
public Poller() throws IOException { this.selector = Selector.open();
}
publicint getKeyCount() { return keyCount; }
public Selector getSelector() { return selector; }
/** * Destroy the poller.
*/ protectedvoid destroy() { // Wait for polltime before doing anything, so that the poller threads // exit, otherwise parallel closure of sockets which are still // in the poller can cause problems
close = true;
selector.wakeup();
}
private PollerEvent createPollerEvent(NioSocketWrapper socketWrapper, int interestOps) {
PollerEvent r = null; if (eventCache != null) {
r = eventCache.pop();
} if (r == null) {
r = new PollerEvent(socketWrapper, interestOps);
} else {
r.reset(socketWrapper, interestOps);
} return r;
}
/** * Add specified socket and associated pool to the poller. The socket will * be added to a temporary array, and polled first after a maximum amount * of time equal to pollTime (in most cases, latency will be much lower, * however). * * @param socketWrapper to add to the poller * @param interestOps Operations for which to register this socket with * the Poller
*/ publicvoid add(NioSocketWrapper socketWrapper, int interestOps) {
PollerEvent pollerEvent = createPollerEvent(socketWrapper, interestOps);
addEvent(pollerEvent); if (close) {
processSocket(socketWrapper, SocketEvent.STOP, false);
}
}
/** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty
*/ publicboolean events() { boolean result = false;
PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
NioSocketWrapper socketWrapper = pe.getSocketWrapper();
SocketChannel sc = socketWrapper.getSocket().getIOChannel(); int interestOps = pe.getInterestOps(); if (sc == null) {
log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
socketWrapper.close();
} elseif (interestOps == OP_REGISTER) { try {
sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else { final SelectionKey key = sc.keyFor(getSelector()); if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed.
socketWrapper.close();
} else { final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment(); if (attachment != null) { // We are registering the key to start with, reset the fairness counter. try { int ops = key.interestOps() | interestOps;
attachment.interestOps(ops);
key.interestOps(ops);
} catch (CancelledKeyException ckx) {
socketWrapper.close();
}
} else {
socketWrapper.close();
}
}
} if (running && eventCache != null) {
pe.reset();
eventCache.push(pe);
}
}
return result;
}
/** * Registers a newly created socket with the poller. * * @param socketWrapper The socket wrapper
*/ publicvoid register(final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
addEvent(pollerEvent);
}
/** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur.
*/
@Override publicvoid run() { // Loop until destroy() is called while (true) {
boolean hasEvents = false;
try { if (!close) {
hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
} if (close) {
events();
timeout(0, false); try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
} break;
} // Either we timed out or we woke up, process events first if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue;
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) {
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
if (sd.fchannel == null) { // Setup the file channel
File f = new File(sd.fileName);
@SuppressWarnings("resource") // Closed when channel is closed
FileInputStream fis = new FileInputStream(f);
sd.fchannel = fis.getChannel();
}
// We still have data in the buffer if (sc.getOutboundRemaining() > 0) { if (sc.flushOutbound()) {
socketWrapper.updateLastWrite();
}
} else { long written = sd.fchannel.transferTo(sd.pos, sd.length, wc); if (written > 0) {
sd.pos += written;
sd.length -= written;
socketWrapper.updateLastWrite();
} else { // Unusual not to be able to transfer any bytes // Check the length was set correctly if (sd.fchannel.size() <= sd.pos) { thrownew IOException(sm.getString("endpoint.sendfile.tooMuchData"));
}
}
} if (sd.length <= 0 && sc.getOutboundRemaining()<=0) { if (log.isDebugEnabled()) {
log.debug("Send file complete for: " + sd.fileName);
}
socketWrapper.setSendfileData(null); try {
sd.fchannel.close();
} catch (Exception ignore) {
} // For calls from outside the Poller, the caller is // responsible for registering the socket for the // appropriate event(s) if sendfile completes. if (!calledByProcessor) { switch (sd.keepAliveState) { case NONE: { if (log.isDebugEnabled()) {
log.debug("Send file connection is being closed");
}
socketWrapper.close(); break;
} case PIPELINED: { if (log.isDebugEnabled()) {
log.debug("Connection is keep alive, processing pipe-lined data");
} if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
socketWrapper.close();
} break;
} case OPEN: { if (log.isDebugEnabled()) {
log.debug("Connection is keep alive, registering back for OP_READ");
}
reg(sk, socketWrapper, SelectionKey.OP_READ); break;
}
}
} return SendfileState.DONE;
} else { if (log.isDebugEnabled()) {
log.debug("OP_WRITE for sendfile: " + sd.fileName);
} if (calledByProcessor) {
add(socketWrapper, SelectionKey.OP_WRITE);
} else {
reg(sk, socketWrapper, SelectionKey.OP_WRITE);
} return SendfileState.PENDING;
}
} catch (IOException e) { if (log.isDebugEnabled()) {
log.debug("Unable to complete sendfile request:", e);
} if (!calledByProcessor && sc != null) {
socketWrapper.close();
} return SendfileState.ERROR;
} catch (Throwable t) {
log.error(sm.getString("endpoint.sendfile.error"), t); if (!calledByProcessor && sc != null) {
socketWrapper.close();
} return SendfileState.ERROR;
}
}
protectedvoid unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) { // This is a must, so that we don't have multiple threads messing with the socket
reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
}
protectedvoid timeout(int keyCount, boolean hasEvents) { long now = System.currentTimeMillis(); // This method is called on every loop of the Poller. Don't process // timeouts on every loop of the Poller since that would create too // much load and timeouts can afford to wait a few seconds. // However, do process timeouts if any of the following are true: // - the selector simply timed out (suggests there isn't much load) // - the nextExpiration time has passed // - the server socket is being closed if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) { return;
} int keycount = 0; try { for (SelectionKey key : selector.keys()) {
keycount++;
NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); try { if (socketWrapper == null) { // We don't support any keys without attachments if (key.isValid()) {
key.cancel();
}
} elseif (close) {
key.interestOps(0); // Avoid duplicate stop calls
socketWrapper.interestOps(0);
socketWrapper.close();
} elseif (socketWrapper.interestOpsHas(SelectionKey.OP_READ) ||
socketWrapper.interestOpsHas(SelectionKey.OP_WRITE)) { boolean readTimeout = false; boolean writeTimeout = false; // Check for read timeout if (socketWrapper.interestOpsHas(SelectionKey.OP_READ)) { long delta = now - socketWrapper.getLastRead(); long timeout = socketWrapper.getReadTimeout(); if (timeout > 0 && delta > timeout) {
readTimeout = true;
}
} // Check for write timeout if (!readTimeout && socketWrapper.interestOpsHas(SelectionKey.OP_WRITE)) { long delta = now - socketWrapper.getLastWrite(); long timeout = socketWrapper.getWriteTimeout(); if (timeout > 0 && delta > timeout) {
writeTimeout = true;
}
} if (readTimeout || writeTimeout) {
key.interestOps(0); // Avoid duplicate timeout calls
socketWrapper.interestOps(0);
socketWrapper.setError(new SocketTimeoutException()); if (readTimeout && socketWrapper.readOperation != null) { if (!socketWrapper.readOperation.process()) {
socketWrapper.close();
}
} elseif (writeTimeout && socketWrapper.writeOperation != null) { if (!socketWrapper.writeOperation.process()) {
socketWrapper.close();
}
} elseif (!processSocket(socketWrapper, SocketEvent.ERROR, true)) {
socketWrapper.close();
}
}
}
} catch (CancelledKeyException ckx) { if (socketWrapper != null) {
socketWrapper.close();
}
}
}
} catch (ConcurrentModificationException cme) { // See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
} // For logging purposes only long prevExp = nextExpiration;
nextExpiration = System.currentTimeMillis() +
socketProperties.getTimeoutInterval(); if (log.isTraceEnabled()) {
log.trace("timeout completed: keys processed=" + keycount + "; now=" + now + "; nextExpiration=" + prevExp + "; keyCount=" + keyCount + "; hasEvents=" + hasEvents + "; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
}
}
}
// --------------------------------------------------- Socket Wrapper Class
@Override publicint read(boolean block, byte[] b, int off, int len) throws IOException { int nRead = populateReadBuffer(b, off, len); if (nRead > 0) { return nRead; /* * Since more bytes may have arrived since the buffer was last * filled, it is an option at this point to perform a * non-blocking read. However correctly handling the case if * that read returns end of stream adds complexity. Therefore, * at the moment, the preference is for simplicity.
*/
}
// Fill the read buffer as best we can.
nRead = fillReadBuffer(block);
updateLastRead();
// Fill as much of the remaining byte array as possible with the // data that was just read if (nRead > 0) {
socketBufferHandler.configureReadBufferForRead();
nRead = Math.min(nRead, len);
socketBufferHandler.getReadBuffer().get(b, off, nRead);
} return nRead;
}
@Override publicint read(boolean block, ByteBuffer to) throws IOException { int nRead = populateReadBuffer(to); if (nRead > 0) { return nRead; /* * Since more bytes may have arrived since the buffer was last * filled, it is an option at this point to perform a * non-blocking read. However correctly handling the case if * that read returns end of stream adds complexity. Therefore, * at the moment, the preference is for simplicity.
*/
}
// The socket read buffer capacity is socket.appReadBufSize int limit = socketBufferHandler.getReadBuffer().capacity(); if (to.remaining() >= limit) {
to.limit(to.position() + limit);
nRead = fillReadBuffer(block, to); if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read direct from socket: [" + nRead + "]");
}
updateLastRead();
} else { // Fill the read buffer as best we can.
nRead = fillReadBuffer(block); if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read into buffer: [" + nRead + "]");
}
updateLastRead();
// Fill as much of the remaining byte array as possible with the // data that was just read if (nRead > 0) {
nRead = populateReadBuffer(to);
}
} return nRead;
}
/* * https://bz.apache.org/bugzilla/show_bug.cgi?id=66076 * * When using TLS an additional buffer is used for the encrypted data * before it is written to the network. It is possible for this network * output buffer to contain data while the socket write buffer is empty. * * For NIO with non-blocking I/O, this case is handling by ensuring that * flush only returns false (i.e. no data left to flush) if all buffers * are empty.
*/ privateboolean socketOrNetworkBufferHasDataLeft() { return !socketBufferHandler.isWriteBufferEmpty() || getSocket().getOutboundRemaining() > 0;
}
@Override protectedvoid doWrite(boolean block, ByteBuffer buffer) throws IOException { int n = 0; if (getSocket() == NioChannel.CLOSED_NIO_CHANNEL) { thrownew ClosedChannelException();
} if (block) { if (previousIOException != null) { /* * Socket has previously timed out. * * Blocking writes assume that buffer is always fully * written so there is no code checking for incomplete * writes, retaining the unwritten data and attempting to * write it as part of a subsequent write call. * * Because of the above, when a timeout is triggered we need * to skip subsequent attempts to write as otherwise it will * appear to the client as if some data was dropped just * before the connection is lost. It is better if the client * just sees the dropped connection.
*/ thrownew IOException(previousIOException);
} long timeout = getWriteTimeout(); long startNanos = 0; do { if (startNanos > 0) { long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); if (elapsedMillis == 0) {
elapsedMillis = 1;
}
timeout -= elapsedMillis; if (timeout <= 0) {
previousIOException = new SocketTimeoutException(); throw previousIOException;
}
} synchronized (writeLock) {
n = getSocket().write(buffer); // n == 0 could be an incomplete write but it could also // indicate that a previous incomplete write of the // outbound buffer (for TLS) has now completed. Only // block if there is still data to write. if (n == 0 && (buffer.hasRemaining() || getSocket().getOutboundRemaining() > 0)) { // Ensure a spurious wake-up doesn't trigger a duplicate registration if (!writeBlocking) {
writeBlocking = true;
registerWriteInterest();
} try { if (timeout > 0) {
startNanos = System.nanoTime();
writeLock.wait(timeout);
} else {
writeLock.wait();
}
} catch (InterruptedException e) { // Continue
}
} elseif (startNanos > 0) { // If something was written, reset timeout
timeout = getWriteTimeout();
startNanos = 0;
}
}
} while (buffer.hasRemaining() || getSocket().getOutboundRemaining() > 0);
} else { do {
n = getSocket().write(buffer);
} while (n > 0 && buffer.hasRemaining()); // If there is data left in the buffer the socket will be registered for // write further up the stack. This is to ensure the socket is only // registered for write once as both container and user code can trigger // write registration.
}
updateLastWrite();
}
@Override public SendfileDataBase createSendfileData(String filename, long pos, long length) { returnnew SendfileData(filename, pos, length);
}
@Override public SendfileState processSendfile(SendfileDataBase sendfileData) {
setSendfileData((SendfileData) sendfileData);
SelectionKey key = getSocket().getIOChannel().keyFor(getPoller().getSelector()); if (key == null) { return SendfileState.ERROR;
} else { // Might as well do the first write on this thread return getPoller().processSendfile(key, this, true);
}
}
@Override publicvoid run() { // Perform the IO operation // Called from the poller to continue the IO operation long nBytes = 0; if (getError() == null) { try { synchronized (this) { if (!completionDone) { // This filters out same notification until processing // of the current one is done if (log.isDebugEnabled()) {
log.debug("Skip concurrent " + (read ? "read" : "write") + " notification");
} return;
} if (read) { // Read from main buffer first if (!socketBufferHandler.isReadBufferEmpty()) { // There is still data inside the main read buffer, it needs to be read first
socketBufferHandler.configureReadBufferForRead(); for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
}
} if (nBytes == 0) {
nBytes = getSocket().read(buffers, offset, length);
updateLastRead();
}
} else { boolean doWrite = true; // Write from main buffer first if (socketOrNetworkBufferHasDataLeft()) { // There is still data inside the main write buffer, it needs to be written first
socketBufferHandler.configureWriteBufferForRead(); do {
nBytes = getSocket().write(socketBufferHandler.getWriteBuffer());
} while (socketOrNetworkBufferHasDataLeft() && nBytes > 0); if (socketOrNetworkBufferHasDataLeft()) {
doWrite = false;
} // Preserve a negative value since it is an error
--> --------------------
--> maximum size reached
--> --------------------
¤ Dauer der Verarbeitung: 0.45 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.