/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/ package org.apache.coyote.http2;
/** * This represents an HTTP/2 connection from a client to Tomcat. It is designed on the basis that there will never be * more than one thread performing I/O at a time. <br> * For reading, this implementation is blocking within frames and non-blocking between frames. <br> * Note: * <ul> * <li>You will need to nest an <UpgradeProtocol className="org.apache.coyote.http2.Http2Protocol" /> element * inside a TLS enabled Connector element in server.xml to enable HTTP/2 support.</li> * </ul>
*/ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeHandler, Input, Output {
protectedstaticfinal Log log = LogFactory.getLog(Http2UpgradeHandler.class); protectedstaticfinal StringManager sm = StringManager.getManager(Http2UpgradeHandler.class);
// Simple state machine (sequence of states) private AtomicReference<ConnectionState> connectionState = new AtomicReference<>(ConnectionState.NEW); privatevolatilelong pausedNanoTime = Long.MAX_VALUE;
/** * Remote settings are settings defined by the client and sent to Tomcat that Tomcat must use when communicating * with the client.
*/ privatefinal ConnectionSettingsRemote remoteSettings; /** * Local settings are settings defined by Tomcat and sent to the client that the client must use when communicating * with Tomcat.
*/ protectedfinal ConnectionSettingsLocal localSettings;
privatefinal ConcurrentNavigableMap<Integer,AbstractNonZeroStream> streams = new ConcurrentSkipListMap<>(); protectedfinal AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); // Start at -1 so the 'add 2' logic in closeIdleStreams() works privatevolatileint maxActiveRemoteStreamId = -1; privatevolatileint maxProcessedStreamId; privatefinal AtomicInteger nextLocalStreamId = new AtomicInteger(2); privatefinal PingManager pingManager = getPingManager(); privatevolatileint newStreamsSinceLastPrune = 0; privatefinal Set<Stream> backLogStreams = new HashSet<>(); privatelong backLogSize = 0; // The time at which the connection will timeout unless data arrives before // then. -1 means no timeout. privatevolatilelong connectionTimeout = -1;
// Defaults to -10 * the count factor. // i.e. when the connection opens, 10 'overhead' frames in a row will // cause the connection to be closed. // Over time the count should be a slowly decreasing negative number. // Therefore, the longer a connection is 'well-behaved', the greater // tolerance it will have for a period of 'bad' behaviour.
overheadCount = new AtomicLong(-10 * protocol.getOverheadCountFactor());
if (!connectionState.compareAndSet(ConnectionState.NEW, ConnectionState.CONNECTED)) { return;
}
// Init concurrency control if needed if (protocol.getMaxConcurrentStreamExecution() < localSettings.getMaxConcurrentStreams()) {
streamConcurrency = new AtomicInteger(0);
queuedRunnable = new ConcurrentLinkedQueue<>();
}
if (webConnection != null) { // HTTP/2 started via HTTP upgrade. // The initial HTTP/1.1 request is available as Stream 1.
try { // Process the initial settings frame
stream = getStream(1, true);
String base64Settings = stream.getCoyoteRequest().getHeader(HTTP2_SETTINGS_HEADER); byte[] settings = Base64.decodeBase64URLSafe(base64Settings);
// Settings are only valid on stream 0
FrameType.SETTINGS.check(0, settings.length);
for (int i = 0; i < settings.length % 6; i++) { int id = ByteUtil.getTwoBytes(settings, i * 6); long value = ByteUtil.getFourBytes(settings, (i * 6) + 2);
Setting key = Setting.valueOf(id); if (key == Setting.UNKNOWN) {
log.warn(sm.getString("connectionSettings.unknown", connectionId, Integer.toString(id), Long.toString(value)));
}
remoteSettings.set(key, value);
}
} catch (Http2Exception e) { thrownew ProtocolException(sm.getString("upgradeHandler.upgrade.fail", connectionId));
}
}
// Send the initial settings frame
writeSettings();
// Make sure the client has sent a valid connection preface before we // send the response to the original request over HTTP/2. try {
parser.readConnectionPreface(webConnection, stream);
} catch (Http2Exception e) {
String msg = sm.getString("upgradeHandler.invalidPreface", connectionId); if (log.isDebugEnabled()) {
log.debug(msg, e);
} thrownew ProtocolException(msg);
} if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.prefaceReceived", connectionId));
}
// Allow streams and connection to determine timeouts
socketWrapper.setReadTimeout(-1);
socketWrapper.setWriteTimeout(-1);
processConnection(webConnection, stream);
}
protectedvoid processConnection(WebConnection webConnection, Stream stream) { // Send a ping to get an idea of round trip time as early as possible try {
pingManager.sendPing(true);
} catch (IOException ioe) { thrownew ProtocolException(sm.getString("upgradeHandler.pingFailed", connectionId), ioe);
}
if (webConnection != null) {
processStreamOnContainerThread(stream);
}
}
@Override public SocketState upgradeDispatch(SocketEvent status) { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.upgradeDispatch.entry", connectionId, status));
}
// WebConnection is not used so passing null here is fine // Might not be necessary. init() will handle that.
init(null);
SocketState result = SocketState.CLOSED;
try { switch (status) { case OPEN_READ:
socketWrapper.getLock().lock(); try { if (socketWrapper.canWrite()) { // Only send a ping if there is no other data waiting to be sent. // Ping manager will ensure they aren't sent too frequently.
pingManager.sendPing(false);
}
} finally {
socketWrapper.getLock().unlock();
} try { // Disable the connection timeout while frames are processed
setConnectionTimeout(-1); while (true) { try { if (!parser.readFrame()) { break;
}
} catch (StreamException se) { // Log the Stream error but not necessarily all of // them
UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) {
String message = sm.getString("upgradeHandler.stream.error", connectionId,
Integer.toString(se.getStreamId())); switch (logMode) { case INFO_THEN_DEBUG:
message += sm.getString("upgradeHandler.fallToDebug"); //$FALL-THROUGH$ case INFO:
log.info(message, se); break; case DEBUG:
log.debug(message, se);
}
} // Stream errors are not fatal to the connection so // continue reading frames
Stream stream = getStream(se.getStreamId(), false); if (stream == null) {
sendStreamReset(null, se);
} else {
stream.close(se);
}
} finally { if (isOverheadLimitExceeded()) { thrownew ConnectionException(
sm.getString("upgradeHandler.tooMuchOverhead", connectionId),
Http2Error.ENHANCE_YOUR_CALM);
}
}
}
// Need to know the correct timeout before starting the read // but that may not be known at this time if one or more // requests are currently being processed so don't set a // timeout for the socket...
socketWrapper.setReadTimeout(-1);
// ...set a timeout on the connection
setConnectionTimeoutForStreamCount(activeRemoteStreamCount.get());
if (connectionState.get() != ConnectionState.CLOSED) { if (socketWrapper.hasAsyncIO()) {
result = SocketState.ASYNC_IO;
} else {
result = SocketState.UPGRADED;
}
} break;
case OPEN_WRITE:
processWrites(); if (socketWrapper.hasAsyncIO()) {
result = SocketState.ASYNC_IO;
} else {
result = SocketState.UPGRADED;
} break;
case TIMEOUT:
closeConnection(null); break;
case DISCONNECT: case ERROR: case STOP: case CONNECT_FAIL:
close(); break;
}
} catch (IOException ioe) { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.ioerror", connectionId), ioe);
}
close();
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.upgradeDispatch.exit", connectionId, result));
} return result;
}
/* * Sets the connection timeout based on the current number of active streams.
*/ protectedvoid setConnectionTimeoutForStreamCount(int streamCount) { if (streamCount == 0) { // No streams currently active. Use the keep-alive // timeout for the connection. long keepAliveTimeout = protocol.getKeepAliveTimeout(); if (keepAliveTimeout == -1) {
setConnectionTimeout(-1);
} else {
setConnectionTimeout(System.currentTimeMillis() + keepAliveTimeout);
}
} else { // Streams currently active. Individual streams have // timeouts so keep the connection open.
setConnectionTimeout(-1);
}
}
@Override publicvoid timeoutAsync(long now) { long connectionTimeout = this.connectionTimeout; if (now == -1 || connectionTimeout > -1 && now > connectionTimeout) { // Have to dispatch as this will be executed from a non-container // thread.
socketWrapper.processSocket(SocketEvent.TIMEOUT, true);
}
}
@Override publicvoid pause() { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.pause.entry", connectionId));
}
if (connectionState.compareAndSet(ConnectionState.CONNECTED, ConnectionState.PAUSING)) {
pausedNanoTime = System.nanoTime();
try {
writeGoAwayFrame((1 << 31) - 1, Http2Error.NO_ERROR.getCode(), null);
} catch (IOException ioe) { // This is fatal for the connection. Ignore it here. There will be // further attempts at I/O in upgradeDispatch() and it can better // handle the IO errors.
}
}
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.rst.debug", connectionId, Integer.toString(se.getStreamId()),
se.getError(), se.getMessage()));
}
// Write a RST frame byte[] rstFrame = newbyte[13]; // Length
ByteUtil.setThreeBytes(rstFrame, 0, 4); // Type
rstFrame[3] = FrameType.RST.getIdByte(); // No flags // Stream ID
ByteUtil.set31Bits(rstFrame, 5, se.getStreamId()); // Payload
ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode());
// Need to update state atomically with the sending of the RST // frame else other threads currently working with this stream // may see the state change and send a RST frame before the RST // frame triggered by this thread. If that happens the client // may see out of order RST frames which may hard to follow if // the client is unaware the RST frames may be received out of // order.
socketWrapper.getLock().lock(); try { if (state != null) { boolean active = state.isActive();
state.sendReset(); if (active) {
decrementActiveRemoteStreamCount();
}
}
socketWrapper.write(true, rstFrame, 0, rstFrame.length);
socketWrapper.flush(true);
} finally {
socketWrapper.getLock().unlock();
}
}
void closeConnection(Http2Exception ce) { long code; byte[] msg; if (ce == null) {
code = Http2Error.NO_ERROR.getCode();
msg = null;
} else {
code = ce.getError().getCode();
msg = ce.getMessage().getBytes(StandardCharsets.UTF_8);
} try {
writeGoAwayFrame(maxProcessedStreamId, code, msg);
} catch (IOException ioe) { // Ignore. GOAWAY is sent on a best efforts basis and the original // error has already been logged.
}
close();
}
/** * Write the initial settings frame and any necessary supporting frames. If the initial settings increase the * initial window size, it will also be necessary to send a WINDOW_UPDATE frame to increase the size of the flow * control window for the connection (stream 0).
*/ protectedvoid writeSettings() { // Send the initial settings frame try { byte[] settings = localSettings.getSettingsFrameForPending();
socketWrapper.write(true, settings, 0, settings.length); byte[] windowUpdateFrame = createWindowUpdateForSettings(); if (windowUpdateFrame.length > 0) {
socketWrapper.write(true, windowUpdateFrame, 0, windowUpdateFrame.length);
}
socketWrapper.flush(true);
} catch (IOException ioe) {
String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId); if (log.isDebugEnabled()) {
log.debug(msg);
} thrownew ProtocolException(msg, ioe);
}
}
/** * @return The WINDOW_UPDATE frame if one is required or an empty array if no WINDOW_UPDATE is required.
*/ protectedbyte[] createWindowUpdateForSettings() { // Build a WINDOW_UPDATE frame if one is required. If not, create an // empty byte array. byte[] windowUpdateFrame; int increment = protocol.getInitialWindowSize() - ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE; if (increment > 0) { // Build window update frame for stream 0
windowUpdateFrame = newbyte[13];
ByteUtil.setThreeBytes(windowUpdateFrame, 0, 4);
windowUpdateFrame[3] = FrameType.WINDOW_UPDATE.getIdByte();
ByteUtil.set31Bits(windowUpdateFrame, 9, increment);
} else {
windowUpdateFrame = newbyte[0];
}
return windowUpdateFrame;
}
protectedvoid writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) throwsIOException { byte[] fixedPayload = newbyte[8];
ByteUtil.set31Bits(fixedPayload, 0, maxStreamId);
ByteUtil.setFourBytes(fixedPayload, 4, errorCode); int len = 8; if (debugMsg != null) {
len += debugMsg.length;
} byte[] payloadLength = newbyte[3];
ByteUtil.setThreeBytes(payloadLength, 0, len);
void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { // This ensures the Stream processing thread has control of the socket.
Lock lock = socketWrapper.getLock();
lock.lock(); try {
doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize);
} finally {
lock.unlock();
}
stream.sentHeaders(); if (endOfStream) {
sentEndOfStream(stream);
}
}
/* * Separate method to allow Http2AsyncUpgradeHandler to call this code without synchronizing on socketWrapper since * it doesn't need to.
*/ protected HeaderFrameBuffers doWriteHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException {
/* * Handles an I/O error on the socket underlying the HTTP/2 connection when it is triggered by application code * (usually reading the request or writing the response). Such I/O errors are fatal so the connection is closed. The * exception is re-thrown to make the client code aware of the problem. * * Note: We can not rely on this exception reaching the socket processor since the application code may swallow it.
*/ protectedvoid handleAppInitiatedIOException(IOException ioe) throws IOException {
close(); throw ioe;
}
/* * Needs to know if this was application initiated since that affects the error handling.
*/ void writeWindowUpdate(AbstractNonZeroStream stream, int increment, boolean applicationInitiated) throws IOException { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.windowUpdateConnection", getConnectionId(),
Integer.valueOf(increment)));
}
socketWrapper.getLock().lock(); try { // Build window update frame for stream 0 byte[] frame = newbyte[13];
ByteUtil.setThreeBytes(frame, 0, 4);
frame[3] = FrameType.WINDOW_UPDATE.getIdByte();
ByteUtil.set31Bits(frame, 9, increment);
socketWrapper.write(true, frame, 0, frame.length); boolean needFlush = true; // No need to send update from closed stream if (stream instanceof Stream && ((Stream) stream).canWrite()) { int streamIncrement = ((Stream) stream).getWindowUpdateSizeToWrite(increment); if (streamIncrement > 0) { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.windowUpdateStream", getConnectionId(), getIdAsString(),
Integer.valueOf(streamIncrement)));
} // Re-use buffer as connection update has already been written
ByteUtil.set31Bits(frame, 5, stream.getIdAsInt());
ByteUtil.set31Bits(frame, 9, streamIncrement); try {
socketWrapper.write(true, frame, 0, frame.length);
socketWrapper.flush(true);
needFlush = false;
} catch (IOException ioe) { if (applicationInitiated) {
handleAppInitiatedIOException(ioe);
} else { throw ioe;
}
}
}
} if (needFlush) {
socketWrapper.flush(true);
}
} finally {
socketWrapper.getLock().unlock();
}
}
protectedvoid processWrites() throws IOException {
Lock lock = socketWrapper.getLock();
lock.lock(); try { if (socketWrapper.flush(false)) {
socketWrapper.registerWriteInterest();
} else { // Only send a ping if there is no other data waiting to be sent. // Ping manager will ensure they aren't sent too frequently.
pingManager.sendPing(false);
}
} finally {
lock.unlock();
}
}
int reserveWindowSize(Stream stream, int reservation, boolean block) throws IOException { // Need to be holding the stream lock so releaseBacklog() can't notify // this thread until after this thread enters wait() int allocation = 0;
stream.windowAllocationLock.lock(); try {
windowAllocationLock.lock(); try { if (!stream.canWrite()) {
stream.doStreamCancel(
sm.getString("upgradeHandler.stream.notWritable", stream.getConnectionId(),
stream.getIdAsString(), stream.state.getCurrentStateName()),
Http2Error.STREAM_CLOSED);
} long windowSize = getWindowSize(); if (stream.getConnectionAllocationMade() > 0) {
allocation = stream.getConnectionAllocationMade();
stream.setConnectionAllocationMade(0);
} elseif (windowSize < 1) { // Has this stream been granted an allocation if (stream.getConnectionAllocationMade() == 0) {
stream.setConnectionAllocationRequested(reservation);
backLogSize += reservation;
backLogStreams.add(stream);
}
} elseif (windowSize < reservation) {
allocation = (int) windowSize;
decrementWindowSize(allocation);
} else {
allocation = reservation;
decrementWindowSize(allocation);
}
} finally {
windowAllocationLock.unlock();
} if (allocation == 0) { if (block) { try { // Connection level window is empty. Although this // request is for a stream, use the connection // timeout long writeTimeout = protocol.getWriteTimeout();
stream.waitForConnectionAllocation(writeTimeout); // Has this stream been granted an allocation if (stream.getConnectionAllocationMade() == 0) {
String msg;
Http2Error error; if (stream.isActive()) { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.noAllocation", connectionId,
stream.getIdAsString()));
} // No allocation // Close the connection. Do this first since // closing the stream will raise an exception.
close();
msg = sm.getString("stream.writeTimeout");
error = Http2Error.ENHANCE_YOUR_CALM;
} else {
msg = sm.getString("stream.clientCancel");
error = Http2Error.STREAM_CLOSED;
} // Close the stream // This thread is in application code so need // to signal to the application that the // stream is closing
stream.doStreamCancel(msg, error);
} else {
allocation = stream.getConnectionAllocationMade();
stream.setConnectionAllocationMade(0);
}
} catch (InterruptedException e) { thrownew IOException(sm.getString("upgradeHandler.windowSizeReservationInterrupted",
connectionId, stream.getIdAsString(), Integer.toString(reservation)), e);
}
} else {
stream.waitForConnectionAllocationNonBlocking(); return 0;
}
}
} finally {
stream.windowAllocationLock.unlock();
} return allocation;
}
windowAllocationLock.lock(); try { long windowSize = getWindowSize(); if (windowSize < 1 && windowSize + increment > 0) { // Connection window is exhausted. Assume there will be streams // to notify. The overhead is minimal if there are none.
streamsToNotify = releaseBackLog((int) (windowSize + increment));
} else { super.incrementWindowSize(increment);
}
} finally {
windowAllocationLock.unlock();
}
if (streamsToNotify != null) { for (AbstractStream stream : streamsToNotify) { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.releaseBacklog", connectionId, stream.getIdAsString()));
} // There is never any O/P on stream zero but it is included in // the backlog as it simplifies the code. Skip it if it appears // here. if (this == stream) { continue;
}
((Stream) stream).notifyConnection();
}
}
}
/** * Process send file (if supported) for the given stream. The appropriate request attributes should be set before * calling this method. * * @param sendfileData The stream and associated data to process * * @return The result of the send file processing
*/ protected SendfileState processSendfile(SendfileData sendfileData) { return SendfileState.DONE;
}
private Set<AbstractStream> releaseBackLog(int increment) throws Http2Exception {
windowAllocationLock.lock(); try {
Set<AbstractStream> result = new HashSet<>(); if (backLogSize < increment) { // Can clear the whole backlog for (AbstractStream stream : backLogStreams) { if (stream.getConnectionAllocationRequested() > 0) {
stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested());
stream.setConnectionAllocationRequested(0);
result.add(stream);
}
} // Cast is safe due to test above int remaining = increment - (int) backLogSize;
backLogSize = 0; super.incrementWindowSize(remaining);
backLogStreams.clear();
} else { // Can't clear the whole backlog. // Need streams in priority order
Set<Stream> orderedStreams = new ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency)
.thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt));
orderedStreams.addAll(backLogStreams);
// Iteration 1. Need to work out how much we can clear. long urgencyWhereAllocationIsExhausted = 0; long requestedAllocationForIncrementalStreams = 0; int remaining = increment;
Iterator<Stream> orderedStreamsIterator = orderedStreams.iterator(); while (orderedStreamsIterator.hasNext()) {
Stream s = orderedStreamsIterator.next(); if (urgencyWhereAllocationIsExhausted < s.getUrgency()) { if (remaining < 1) { break;
}
requestedAllocationForIncrementalStreams = 0;
}
urgencyWhereAllocationIsExhausted = s.getUrgency(); if (s.getIncremental()) {
requestedAllocationForIncrementalStreams += s.getConnectionAllocationRequested();
remaining -= s.getConnectionAllocationRequested();
} else {
remaining -= s.getConnectionAllocationRequested(); if (remaining < 1) { break;
}
}
}
// Iteration 2. Allocate. // Reset for second iteration
remaining = increment;
orderedStreamsIterator = orderedStreams.iterator(); while (orderedStreamsIterator.hasNext()) {
Stream s = orderedStreamsIterator.next(); if (s.getUrgency() < urgencyWhereAllocationIsExhausted) { // Can fully allocate
remaining = allocate(s, remaining);
result.add(s);
orderedStreamsIterator.remove();
backLogStreams.remove(s);
} elseif (requestedAllocationForIncrementalStreams == 0) { // Allocation ran out in non-incremental streams so fully // allocate in iterator order until allocation is exhausted
remaining = allocate(s, remaining);
result.add(s); if (s.getConnectionAllocationRequested() == 0) { // Fully allocated
orderedStreamsIterator.remove();
backLogStreams.remove(s);
} if (remaining < 1) { break;
}
} else { // Allocation ran out in incremental streams. Distribute // remaining allocation between the incremental streams at // this urgency level. if (s.getUrgency() != urgencyWhereAllocationIsExhausted) { break;
}
int share = (int) (s.getConnectionAllocationRequested() * remaining /
requestedAllocationForIncrementalStreams); if (share == 0) {
share = 1;
}
allocate(s, share);
result.add(s); if (s.getConnectionAllocationRequested() == 0) { // Fully allocated (unlikely but possible due to // rounding if only a few bytes required).
orderedStreamsIterator.remove();
backLogStreams.remove(s);
}
}
}
} return result;
} finally {
windowAllocationLock.unlock();
}
}
privateint allocate(AbstractStream stream, int allocation) {
windowAllocationLock.lock(); try { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdAsString(),
Integer.toString(allocation)));
}
for (AbstractNonZeroStream stream : streams.values()) { if (stream instanceof Stream) { // The connection is closing. Close the associated streams as no // longer required (also notifies any threads waiting for allocations).
((Stream) stream).receiveReset(Http2Error.CANCEL.getCode());
}
} try {
socketWrapper.close();
} catch (Exception e) {
log.debug(sm.getString("upgradeHandler.socketCloseFailed"), e);
}
}
privatevoid pruneClosedStreams(int streamId) { // Only prune every 10 new streams if (newStreamsSinceLastPrune < 9) { // Not atomic. Increments may be lost. Not a problem.
newStreamsSinceLastPrune++; return;
} // Reset counter
newStreamsSinceLastPrune = 0;
// RFC 7540, 5.3.4 endpoints should maintain state for at least the // maximum number of concurrent streams. long max = localSettings.getMaxConcurrentStreams();
// Ideally need to retain information for a "significant" amount of time // after sending END_STREAM (RFC 7540, page 20) so we detect potential // connection error. 5x seems reasonable. The client will have had // plenty of opportunity to process the END_STREAM if another 5x max // concurrent streams have been processed.
max = max * 5; if (max > Integer.MAX_VALUE) {
max = Integer.MAX_VALUE;
}
// Need to try and prune some streams. Prune streams starting with the // oldest. Pruning stops as soon as enough streams have been pruned. // Iterator is in key order. for (AbstractNonZeroStream stream : streams.values()) { if (toClose < 1) { return;
} if (stream instanceof Stream && ((Stream) stream).isActive()) { continue;
}
streams.remove(stream.getIdentifier());
toClose--; if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.pruned", connectionId, stream.getIdAsString()));
}
void push(Request request, Stream associatedStream) throws IOException { if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.incrementAndGet()) { // If there are too many open streams, simply ignore the push // request.
setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet()); return;
}
Stream pushStream;
/* * Uses SocketWrapper lock since PUSH_PROMISE frames have to be sent in order. Once the stream has been created * we need to ensure that the PUSH_PROMISE is sent before the next stream is created for a PUSH_PROMISE.
*/
Lock lock = socketWrapper.getLock();
lock.lock(); try {
pushStream = createLocalStream(request);
writeHeaders(associatedStream, pushStream.getIdAsInt(), request.getMimeHeaders(), false,
Constants.DEFAULT_HEADERS_FRAME_SIZE);
} finally {
lock.unlock();
}
void reduceOverheadCount(FrameType frameType) { // A non-overhead frame reduces the overhead count by // Http2Protocol.DEFAULT_OVERHEAD_REDUCTION_FACTOR. A simple browser // request is likely to have one non-overhead frame (HEADERS) and one // overhead frame (REPRIORITISE). With the default settings the overhead // count will reduce by 10 for each simple request. // Requests and responses with bodies will create additional // non-overhead frames, further reducing the overhead count.
updateOverheadCount(frameType, Http2Protocol.DEFAULT_OVERHEAD_REDUCTION_FACTOR);
}
@Override publicvoid increaseOverheadCount(FrameType frameType) { // An overhead frame increases the overhead count by // overheadCountFactor. By default, this means an overhead frame // increases the overhead count by 10. A simple browser request is // likely to have one non-overhead frame (HEADERS) and one overhead // frame (REPRIORITISE). With the default settings the overhead count // will reduce by 10 for each simple request.
updateOverheadCount(frameType, getProtocol().getOverheadCountFactor());
}
privatevoid increaseOverheadCount(FrameType frameType, int increment) { // Overhead frames that indicate inefficient (and potentially malicious) // use of small frames trigger an increase that is inversely // proportional to size. The default threshold for all three potential // areas for abuse (HEADERS, DATA, WINDOW_UPDATE) is 1024 bytes. Frames // with sizes smaller than this will trigger an increase of // threshold/size. // DATA and WINDOW_UPDATE take an average over the last two non-final // frames to allow for client buffering schemes that can result in some // small DATA payloads.
updateOverheadCount(frameType, increment);
}
privatevoid updateOverheadCount(FrameType frameType, int increment) { long newOverheadCount = overheadCount.addAndGet(increment); if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.overheadChange", connectionId, getIdAsString(), frameType.name(), Long.valueOf(newOverheadCount)));
}
}
@Override publicboolean fill(boolean block, byte[] data, int offset, int length) throws IOException { int len = length; int pos = offset; boolean nextReadBlock = block; int thisRead = 0;
while (len > 0) { // Blocking reads use the protocol level read timeout. Non-blocking // reads do not timeout. The intention is that once a frame has // started to be read, the read timeout applies until it is // completely read. if (nextReadBlock) {
socketWrapper.setReadTimeout(protocol.getReadTimeout());
} else {
socketWrapper.setReadTimeout(-1);
}
thisRead = socketWrapper.read(nextReadBlock, data, pos, len); if (thisRead == 0) { if (nextReadBlock) { // Should never happen thrownew IllegalStateException();
} else { returnfalse;
}
} elseif (thisRead == -1) { if (connectionState.get().isNewStreamAllowed()) { thrownew EOFException();
} else { returnfalse;
}
} else {
pos += thisRead;
len -= thisRead;
nextReadBlock = true;
}
}
@Override public HpackDecoder getHpackDecoder() { if (hpackDecoder == null) {
hpackDecoder = new HpackDecoder(localSettings.getHeaderTableSize());
} return hpackDecoder;
}
@Override public ByteBuffer startRequestBodyFrame(int streamId, int payloadSize, boolean endOfStream) throws Http2Exception { // DATA frames reduce the overhead count ...
reduceOverheadCount(FrameType.DATA);
// .. but lots of small payloads are inefficient so that will increase // the overhead count unless it is the final DATA frame where small // payloads are expected.
// See also https://bz.apache.org/bugzilla/show_bug.cgi?id=63690 // The buffering behaviour of some clients means that small data frames // are much more frequent (roughly 1 in 20) than expected. Use an // average over two frames to avoid false positives. if (!endOfStream) { int overheadThreshold = protocol.getOverheadDataThreshold(); int average = (lastNonFinalDataPayload >> 1) + (payloadSize >> 1);
lastNonFinalDataPayload = payloadSize; // Avoid division by zero if (average == 0) {
average = 1;
} if (average < overheadThreshold) {
increaseOverheadCount(FrameType.DATA, overheadThreshold / average);
}
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.startRequestBodyFrame.result", getConnectionId(),
abstractNonZeroStream.getIdAsString(), result));
}
return result;
}
@Override publicvoid endRequestBodyFrame(int streamId, int dataLength) throws Http2Exception, IOException {
AbstractNonZeroStream abstractNonZeroStream = getAbstractNonZeroStream(streamId, true); if (abstractNonZeroStream instanceof Stream) {
((Stream) abstractNonZeroStream).getInputBuffer().onDataAvailable();
} else { // The Stream was recycled between the call in Http2Parser to // startRequestBodyFrame() and the synchronized block that contains // the call to this method. This means the bytes read will have been // written to the original stream and, effectively, swallowed. // Therefore, need to notify that those bytes were swallowed here. if (dataLength>0) {
onSwallowedDataFramePayload(streamId, dataLength);
}
}
}
// Check the pause state before processing headers since the pause state // determines if a new stream is created or if this stream is ignored.
checkPauseState();
if (connectionState.get().isNewStreamAllowed()) {
Stream stream = getStream(streamId, false); if (stream == null) {
stream = createRemoteStream(streamId);
} if (streamId < maxActiveRemoteStreamId) { thrownew ConnectionException(sm.getString("upgradeHandler.stream.old", Integer.valueOf(streamId),
Integer.valueOf(maxActiveRemoteStreamId)), Http2Error.PROTOCOL_ERROR);
}
stream.checkState(FrameType.HEADERS);
stream.receivedStartOfHeaders(headersEndStream);
closeIdleStreams(streamId); return stream;
} else { if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.noNewStreams", connectionId, Integer.toString(streamId)));
}
reduceOverheadCount(FrameType.HEADERS); // Stateless so a static can be used to save on GC return HEADER_SINK;
}
}
/** * Unused - NO-OP. * * @param streamId Unused * @param parentStreamId Unused * @param exclusive Unused * @param weight Unused * @throws Http2Exception Never thrown * * @deprecated Unused. Will be removed in Tomcat 11 onwards.
*/
@Deprecated publicvoid reprioritise(int streamId, int parentStreamId, boolean exclusive, int weight) throws Http2Exception { // NO-OP
}
@Override publicvoid headersContinue(int payloadSize, boolean endOfHeaders) { // Generally, continuation frames don't impact the overhead count but if // they are small and the frame isn't the final header frame then that
--> --------------------
--> maximum size reached
--> --------------------
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.35Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.