/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/ package org.apache.coyote.http2;
@SuppressWarnings("deprecation")
Stream(Integer identifier, Http2UpgradeHandler handler, Request coyoteRequest) { super(handler.getConnectionId(), identifier); this.handler = handler;
setWindowSize(handler.getRemoteSettings().getInitialWindowSize()); if (coyoteRequest == null) { // HTTP/2 new request this.coyoteRequest = new Request(); this.inputBuffer = new StandardStreamInputBuffer(); this.coyoteRequest.setInputBuffer(inputBuffer);
} else { // HTTP/2 Push or HTTP/1.1 upgrade this.coyoteRequest = coyoteRequest; this.inputBuffer = new SavedRequestStreamInputBuffer((SavedRequestInputFilter) coyoteRequest.getInputBuffer()); // Headers have been read by this point
state.receivedStartOfHeaders(); if (HTTP_UPGRADE_STREAM.equals(identifier)) { // Populate coyoteRequest from headers (HTTP/1.1 only) try {
prepareRequest();
} catch (IllegalArgumentException iae) { // Something in the headers is invalid // Set correct return status
coyoteResponse.setStatus(400); // Set error flag. This triggers error processing rather than // the normal mapping
coyoteResponse.setError();
}
} // Request body, if any, has been read and buffered
state.receivedEndOfStream();
} this.coyoteRequest.setSendfile(handler.hasAsyncIO() && handler.getProtocol().getUseSendfile()); this.coyoteResponse.setOutputBuffer(http2OutputBuffer); this.coyoteRequest.setResponse(coyoteResponse); this.coyoteRequest.protocol().setString("HTTP/2.0"); if (this.coyoteRequest.getStartTimeNanos() < 0) { this.coyoteRequest.setStartTimeNanos(System.nanoTime());
}
}
privatevoid prepareRequest() { if (coyoteRequest.scheme().isNull()) { if (handler.getProtocol().getHttp11Protocol().isSSLEnabled()) {
coyoteRequest.scheme().setString("https");
} else {
coyoteRequest.scheme().setString("http");
}
}
MessageBytes hostValueMB = coyoteRequest.getMimeHeaders().getUniqueValue("host"); if (hostValueMB == null) { thrownew IllegalArgumentException();
} // This processing expects bytes. Server push will have used a String // so trigger a conversion if required.
hostValueMB.toBytes();
ByteChunk valueBC = hostValueMB.getByteChunk(); byte[] valueB = valueBC.getBytes(); int valueL = valueBC.getLength(); int valueS = valueBC.getStart();
int colonPos = Host.parse(hostValueMB); if (colonPos != -1) { int port = 0; for (int i = colonPos + 1; i < valueL; i++) { char c = (char) valueB[i + valueS]; if (c < '0' || c > '9') { thrownew IllegalArgumentException();
}
port = port * 10 + c - '0';
}
coyoteRequest.setServerPort(port);
// Only need to copy the host name up to the :
valueL = colonPos;
}
// Extract the host name char[] hostNameC = newchar[valueL]; for (int i = 0; i < valueL; i++) {
hostNameC[i] = (char) valueB[i + valueS];
}
coyoteRequest.serverName().setChars(hostNameC, 0, valueL);
}
finalvoid receiveReset(long errorCode) { if (log.isDebugEnabled()) {
log.debug(
sm.getString("stream.reset.receive", getConnectionId(), getIdAsString(), Long.toString(errorCode)));
} // Set the new state first since read and write both check this
state.receivedReset(); // Reads wait internally so need to call a method to break the wait() if (inputBuffer != null) {
inputBuffer.receiveReset();
}
cancelAllocationRequests();
}
@Override finalvoid incrementWindowSize(int windowSizeIncrement) throws Http2Exception {
windowAllocationLock.lock(); try { // If this is zero then any thread that has been trying to write for // this stream will be waiting. Notify that thread it can continue. Use // notify all even though only one thread is waiting to be on the safe // side. boolean notify = getWindowSize() < 1; super.incrementWindowSize(windowSizeIncrement); if (notify && getWindowSize() > 0) {
allocationManager.notifyStream();
}
} finally {
windowAllocationLock.unlock();
}
}
finalint reserveWindowSize(int reservation, boolean block) throws IOException {
windowAllocationLock.lock(); try { long windowSize = getWindowSize(); while (windowSize < 1) { if (!canWrite()) { thrownew CloseNowException(sm.getString("stream.notWritable", getConnectionId(), getIdAsString()));
} if (block) { try { long writeTimeout = handler.getProtocol().getStreamWriteTimeout();
allocationManager.waitForStream(writeTimeout);
windowSize = getWindowSize(); if (windowSize == 0) {
doStreamCancel(sm.getString("stream.writeTimeout"), Http2Error.ENHANCE_YOUR_CALM);
}
} catch (InterruptedException e) { // Possible shutdown / rst or similar. Use an IOException to // signal to the client that further I/O isn't possible for this // Stream. thrownew IOException(e);
}
} else {
allocationManager.waitForStreamNonBlocking(); return 0;
}
} int allocation; if (windowSize < reservation) {
allocation = (int) windowSize;
} else {
allocation = reservation;
}
decrementWindowSize(allocation); return allocation;
} finally {
windowAllocationLock.unlock();
}
}
@SuppressWarnings("deprecation") void doStreamCancel(String msg, Http2Error error) throws CloseNowException {
StreamException se = new StreamException(msg, error, getIdAsInt()); // Prevent the application making further writes
streamOutputBuffer.closed = true; // Prevent Tomcat's error handling trying to write
coyoteResponse.setError();
coyoteResponse.setErrorReported(); // Trigger a reset once control returns to Tomcat
streamOutputBuffer.reset = se; thrownew CloseNowException(msg, se);
}
// Header names must be lower case if (!name.toLowerCase(Locale.US).equals(name)) { thrownew HpackException(sm.getString("stream.header.case", getConnectionId(), getIdAsString(), name));
}
if (HTTP_CONNECTION_SPECIFIC_HEADERS.contains(name)) { thrownew HpackException(
sm.getString("stream.header.connection", getConnectionId(), getIdAsString(), name));
}
if ("te".equals(name)) { if (!"trailers".equals(value)) { thrownew HpackException(sm.getString("stream.header.te", getConnectionId(), getIdAsString(), value));
}
}
if (headerException != null) { // Don't bother processing the header since the stream is going to // be reset anyway return;
}
if (name.length() == 0) { thrownew HpackException(sm.getString("stream.header.empty", getConnectionId(), getIdAsString()));
}
boolean pseudoHeader = name.charAt(0) == ':';
if (pseudoHeader && headerState != HEADER_STATE_PSEUDO) {
headerException = new StreamException(
sm.getString("stream.header.unexpectedPseudoHeader", getConnectionId(), getIdAsString(), name),
Http2Error.PROTOCOL_ERROR, getIdAsInt()); // No need for further processing. The stream will be reset. return;
}
finalboolean receivedEndOfHeaders() throws ConnectionException { if (coyoteRequest.method().isNull() || coyoteRequest.scheme().isNull() ||
!coyoteRequest.method().equals("CONNECT") && coyoteRequest.requestURI().isNull()) { thrownew ConnectionException(sm.getString("stream.header.required", getConnectionId(), getIdAsString()),
Http2Error.PROTOCOL_ERROR);
} // Cookie headers need to be concatenated into a single header // See RFC 7540 8.1.2.5 // Can only do this once the headers are fully received if (cookieHeader != null) {
coyoteRequest.getMimeHeaders().addValue("cookie").setString(cookieHeader.toString());
} return headerState == HEADER_STATE_REGULAR || headerState == HEADER_STATE_PSEUDO;
}
finalvoid writeTrailers() throws IOException {
Supplier<Map<String,String>> supplier = coyoteResponse.getTrailerFields(); if (supplier == null) { // No supplier was set, end of stream will already have been sent return;
}
// We can re-use the MimeHeaders from the response since they have // already been processed by the encoder at this point
MimeHeaders mimeHeaders = coyoteResponse.getMimeHeaders();
mimeHeaders.recycle();
// Copy the contents of the Map to the MimeHeaders // TODO: Is there benefit in refactoring this? Is MimeHeaders too // heavyweight? Can we reduce the copy/conversions? for (Map.Entry<String,String> headerEntry : headerMap.entrySet()) {
MessageBytes mb = mimeHeaders.addValue(headerEntry.getKey());
mb.setString(headerEntry.getValue());
}
@Override final String getConnectionId() { return handler.getConnectionId();
}
final Request getCoyoteRequest() { return coyoteRequest;
}
final Response getCoyoteResponse() { return coyoteResponse;
}
@Override final ByteBuffer getInputByteBuffer() { if (inputBuffer == null) { // This must either be a push or an HTTP upgrade. Either way there // should not be a request body so return a zero length ByteBuffer // to trigger a flow control error. return ZERO_LENGTH_BYTEBUFFER;
} return inputBuffer.getInBuffer();
}
finalvoid receivedStartOfHeaders(boolean headersEndStream) throws Http2Exception { if (headerState == HEADER_STATE_START) {
headerState = HEADER_STATE_PSEUDO;
handler.getHpackDecoder().setMaxHeaderCount(handler.getProtocol().getMaxHeaderCount());
handler.getHpackDecoder().setMaxHeaderSize(handler.getProtocol().getMaxHeaderSize());
} elseif (headerState == HEADER_STATE_PSEUDO || headerState == HEADER_STATE_REGULAR) { // Trailer headers MUST include the end of stream flag if (headersEndStream) {
headerState = HEADER_STATE_TRAILER;
handler.getHpackDecoder().setMaxHeaderCount(handler.getProtocol().getMaxTrailerCount());
handler.getHpackDecoder().setMaxHeaderSize(handler.getProtocol().getMaxTrailerSize());
} else { thrownew ConnectionException(
sm.getString("stream.trailerHeader.noEndOfStream", getConnectionId(), getIdAsString()),
Http2Error.PROTOCOL_ERROR);
}
} // Parser will catch attempt to send a headers frame after the stream // has closed.
state.receivedStartOfHeaders();
}
finalvoid close(Http2Exception http2Exception) { if (http2Exception instanceof StreamException) { try {
StreamException se = (StreamException) http2Exception; if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.reset.send", getConnectionId(), getIdAsString(), se.getError()));
}
// Need to update state atomically with the sending of the RST // frame else other threads currently working with this stream // may see the state change and send a RST frame before the RST // frame triggered by this thread. If that happens the client // may see out of order RST frames which may hard to follow if // the client is unaware the RST frames may be received out of // order.
handler.sendStreamReset(state, se);
/* * This method is called recycle for consistency with the rest of the Tomcat code base. Currently, it calls the * handler to replace this stream with an implementation that uses less memory. It does not fully recycle the Stream * ready for re-use since Stream objects are not re-used. This is useful because Stream instances are retained for a * period after the Stream closes.
*/ finalvoid recycle() { if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.recycle", getConnectionId(), getIdAsString()));
} int remaining; // May be null if stream was closed before any DATA frames were processed.
ByteBuffer inputByteBuffer = getInputByteBuffer(); if (inputByteBuffer == null) {
remaining = 0;
} else {
remaining = inputByteBuffer.remaining();
}
handler.replaceStream(this, new RecycledStream(getConnectionId(), getIdentifier(), state, remaining));
}
finalvoid push(Request request) throws IOException { // Can only push when supported and from a peer initiated stream if (!isPushSupported() || getIdAsInt() % 2 == 0) { return;
} // Set the special HTTP/2 headers
request.getMimeHeaders().addValue(":method").duplicate(request.method());
request.getMimeHeaders().addValue(":scheme").duplicate(request.scheme());
StringBuilder path = new StringBuilder(request.requestURI().toString()); if (!request.queryString().isNull()) {
path.append('?');
path.append(request.queryString().toString());
}
request.getMimeHeaders().addValue(":path").setString(path.toString());
// Authority needs to include the port only if a non-standard port is // being used. if (!(request.scheme().equals("http") && request.getServerPort() == 80) &&
!(request.scheme().equals("https") && request.getServerPort() == 443)) {
request.getMimeHeaders().addValue(":authority")
.setString(request.serverName().getString() + ":" + request.getServerPort());
} else {
request.getMimeHeaders().addValue(":authority").duplicate(request.serverName());
}
push(handler, request, this);
}
boolean isTrailerFieldsReady() { // Once EndOfStream has been received, canRead will be false return !state.canRead();
}
class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
privatefinal Lock writeLock = new ReentrantLock(); privatefinal ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); privatefinal WriteBuffer writeBuffer = new WriteBuffer(32 * 1024); // Flag that indicates that data was left over on a previous // non-blocking write. Once set, this flag stays set until all the data // has been written. privateboolean dataLeft; privatevolatilelong written = 0; privateint streamReservation = 0; privatevolatileboolean closed = false; privatevolatile StreamException reset = null; privatevolatileboolean endOfStreamSent = false;
/* * The write methods share a common lock to ensure that only one thread at a time is able to access the buffer. * Without this protection, a client that performed concurrent writes could corrupt the buffer.
*/
@Override publicfinalint doWrite(ByteBuffer chunk) throws IOException {
writeLock.lock(); try { if (closed) { thrownew IOException(sm.getString("stream.closed", getConnectionId(), getIdAsString()));
} // chunk is always fully written int result = chunk.remaining(); if (writeBuffer.isEmpty()) { int chunkLimit = chunk.limit(); while (chunk.remaining() > 0) { int thisTime = Math.min(buffer.remaining(), chunk.remaining());
chunk.limit(chunk.position() + thisTime);
buffer.put(chunk);
chunk.limit(chunkLimit); if (chunk.remaining() > 0 && !buffer.hasRemaining()) { // Only flush if we have more data to write and the buffer // is full if (flush(true, coyoteResponse.getWriteListener() == null)) {
writeBuffer.add(chunk);
dataLeft = true; break;
}
}
}
} else {
writeBuffer.add(chunk);
}
written += result; return result;
} finally {
writeLock.unlock();
}
}
finalboolean flush(boolean block) throws IOException {
writeLock.lock(); try { /* * Need to ensure that there is exactly one call to flush even when there is no data to write. Too few calls * (i.e. zero) and the end of stream message is not sent for a completed asynchronous write. Too many calls * and the end of stream message is sent too soon and trailer headers are not sent.
*/ boolean dataInBuffer = buffer.position() > 0; boolean flushed = false;
privateboolean flush(boolean writeInProgress, boolean block) throws IOException {
writeLock.lock(); try { if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdAsString(),
Integer.toString(buffer.position()), Boolean.toString(writeInProgress), Boolean.toString(closed)));
} if (buffer.position() == 0) { if (closed && !endOfStreamSent) { // Handling this special case here is simpler than trying // to modify the following code to handle it.
handler.writeBody(Stream.this, buffer, 0, coyoteResponse.getTrailerFields() == null);
} // Buffer is empty. Nothing to do. returnfalse;
}
buffer.flip(); int left = buffer.remaining(); while (left > 0) { if (streamReservation == 0) {
streamReservation = reserveWindowSize(left, block); if (streamReservation == 0) { // Must be non-blocking. // Note: Can't add to the writeBuffer here as the write // may originate from the writeBuffer.
buffer.compact(); returntrue;
}
} while (streamReservation > 0) { int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation, block); if (connectionReservation == 0) { // Must be non-blocking. // Note: Can't add to the writeBuffer here as the write // may originate from the writeBuffer.
buffer.compact(); returntrue;
} // Do the write
handler.writeBody(Stream.this, buffer, connectionReservation, !writeInProgress && closed &&
left == connectionReservation && coyoteResponse.getTrailerFields() == null);
streamReservation -= connectionReservation;
left -= connectionReservation;
}
}
buffer.clear(); returnfalse;
} finally {
writeLock.unlock();
}
}
finalboolean isReady() {
writeLock.lock(); try { // Bug 63682 // Only want to return false if the window size is zero AND we are // already waiting for an allocation. if (getWindowSize() > 0 && allocationManager.isWaitingForStream() ||
handler.getWindowSize() > 0 && allocationManager.isWaitingForConnection() || dataLeft) { returnfalse;
} else { returntrue;
}
} finally {
writeLock.unlock();
}
}
/** * @return <code>true</code> if it is certain that the associated response has no body.
*/ finalboolean hasNoBody() { return ((written == 0) && closed);
}
@Override publicvoid flush() throws IOException { /* * This method should only be called during blocking I/O. All the Servlet API calls that end up here are * illegal during non-blocking I/O. Servlet 5.4. However, the wording Servlet specification states that the * behaviour is undefined so we do the best we can which is to perform a flush using blocking I/O or * non-blocking I/O based depending which is currently in use.
*/
flush(getCoyoteResponse().getWriteListener() == null);
}
class StandardStreamInputBuffer extends StreamInputBuffer {
privatefinal Lock readStateLock = new ReentrantLock(); /* * Two buffers are required to avoid various multi-threading issues. These issues arise from the fact that the * Stream (or the Request/Response) used by the application is processed in one thread but the connection is * processed in another. Therefore it is possible that a request body frame could be received before the * application is ready to read it. If it isn't buffered, processing of the connection (and hence all streams) * would block until the application read the data. Hence the incoming data has to be buffered. If only one * buffer was used then it could become corrupted if the connection thread is trying to add to it at the same * time as the application is read it. While it should be possible to avoid this corruption by careful use of * the buffer it would still require the same copies as using two buffers and the behaviour would be less clear. * * The buffers are created lazily because they quickly add up to a lot of memory and most requests do not have * bodies.
*/ // This buffer is used to populate the ByteChunk passed in to the read // method privatebyte[] outBuffer; // This buffer is the destination for incoming data. It is normally is // 'write mode'. privatevolatile ByteBuffer inBuffer; privatevolatileboolean readInterest; privatevolatileboolean closed; privateboolean resetReceived;
// It is still possible that the stream has been closed and inBuffer // set to null between the call to ensureBuffersExist() above and // the sync below. The checks just before and just inside the sync // ensure we don't get any NPEs reported.
ByteBuffer tmpInBuffer = inBuffer; if (tmpInBuffer == null) { return -1;
} // Ensure that only one thread accesses inBuffer at a time synchronized (tmpInBuffer) { if (inBuffer == null) { return -1;
} boolean canRead = false; while (inBuffer.position() == 0 && (canRead = isActive() && !isInputFinished())) { // Need to block until some data is written try { if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.empty"));
}
long readTimeout = handler.getProtocol().getStreamReadTimeout(); if (readTimeout < 0) {
inBuffer.wait();
} else {
inBuffer.wait(readTimeout);
}
if (resetReceived) { thrownew IOException(sm.getString("stream.inputBuffer.reset"));
}
if (inBuffer.position() == 0 && isActive() && !isInputFinished()) {
String msg = sm.getString("stream.inputBuffer.readTimeout");
StreamException se = new StreamException(msg, Http2Error.ENHANCE_YOUR_CALM, getIdAsInt()); // Trigger a reset once control returns to Tomcat
coyoteResponse.setError();
streamOutputBuffer.reset = se; thrownew CloseNowException(msg, se);
}
} catch (InterruptedException e) { // Possible shutdown / rst or similar. Use an // IOException to signal to the client that further I/O // isn't possible for this Stream. thrownew IOException(e);
}
}
if (inBuffer.position() > 0) { // Data is available in the inBuffer. Copy it to the // outBuffer.
inBuffer.flip();
written = inBuffer.remaining(); if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.copy", Integer.toString(written)));
}
inBuffer.get(outBuffer, 0, written);
inBuffer.clear();
} elseif (!canRead) { return -1;
} else { // Should never happen thrownew IllegalStateException();
}
}
/* * Called after placing some data in the inBuffer.
*/
@Override finalvoid onDataAvailable() throws IOException {
readStateLock.lock(); try { if (closed) {
swallowUnread();
} elseif (readInterest) { if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.dispatch"));
}
readInterest = false;
coyoteRequest.action(ActionCode.DISPATCH_READ, null); // Always need to dispatch since this thread is processing // the incoming connection and streams are processed on their // own.
coyoteRequest.action(ActionCode.DISPATCH_EXECUTE, null);
} else { if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.signal"));
} synchronized (inBuffer) {
inBuffer.notifyAll();
}
}
} finally {
readStateLock.unlock();
}
}
@Override final ByteBuffer getInBuffer() {
ensureBuffersExist(); return inBuffer;
}
privatevoid ensureBuffersExist() { if (inBuffer == null && !closed) { // The client must obey Tomcat's window size when sending so // this is the initial window size set by Tomcat that the client // uses (i.e. the local setting is required here). int size = handler.getLocalSettings().getInitialWindowSize();
readStateLock.lock(); try { if (inBuffer == null && !closed) {
inBuffer = ByteBuffer.allocate(size);
outBuffer = newbyte[size];
}
} finally {
readStateLock.unlock();
}
}
}
@Override finalvoid swallowUnread() throws IOException {
readStateLock.lock(); try {
closed = true;
} finally {
readStateLock.unlock();
} if (inBuffer != null) { int unreadByteCount = 0; synchronized (inBuffer) {
unreadByteCount = inBuffer.position(); if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.swallowUnread", Integer.valueOf(unreadByteCount)));
} if (unreadByteCount > 0) {
inBuffer.position(0);
inBuffer.limit(inBuffer.limit() - unreadByteCount);
}
} // Do this outside of the sync because: // - it doesn't need to be inside the sync // - if inside the sync it can trigger a deadlock // https://markmail.org/message/vbglzkvj6wxlhh3p if (unreadByteCount > 0) {
handler.onSwallowedDataFramePayload(getIdAsInt(), unreadByteCount);
}
}
}
}
class SavedRequestStreamInputBuffer extends StreamInputBuffer {
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.