/* * Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions.
*/
// maps completion key to channel privatefinal ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock(); privatefinal Map<Integer,OverlappedChannel> keyToChannel = new HashMap<Integer,OverlappedChannel>(); privateint nextCompletionKey;
// handle to completion port privatefinallong port;
// true if port has been closed privateboolean closed;
// the set of "stale" OVERLAPPED structures. These OVERLAPPED structures // relate to I/O operations where the completion notification was not // received in a timely manner after the channel is closed. privatefinal Set<Long> staleIoSet = new HashSet<Long>();
/* * Channels implements this interface support overlapped I/O and can be * associated with a completion port.
*/ staticinterface OverlappedChannel extends Closeable { /** * Returns a reference to the pending I/O result.
*/
<V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
}
@Override void closeAllChannels() { /** * On Windows the close operation will close the socket/file handle * and then wait until all outstanding I/O operations have aborted. * This is necessary as each channel's cache of OVERLAPPED structures * can only be freed once all I/O operations have completed. As I/O * completion requires a lookup of the keyToChannel then we must close * the channels when not holding the write lock.
*/ finalint MAX_BATCH_SIZE = 32;
OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE]; int count; do { // grab a batch of up to 32 channels
keyToChannelLock.writeLock().lock();
count = 0; try { for (Integer key: keyToChannel.keySet()) {
channels[count++] = keyToChannel.get(key); if (count >= MAX_BATCH_SIZE) break;
}
} finally {
keyToChannelLock.writeLock().unlock();
}
// close them for (int i=0; i<count; i++) { try {
channels[i].close();
} catch (IOException ignore) { }
}
} while (count > 0);
}
privatevoid wakeup() { try {
postQueuedCompletionStatus(port, 0);
} catch (IOException e) { // should not happen thrownew AssertionError(e);
}
}
@Override void shutdownHandlerTasks() { // shutdown all handler threads int nThreads = threadCount(); while (nThreads-- > 0) {
wakeup();
}
}
/** * Associate the given handle with this group
*/ int associate(OverlappedChannel ch, long handle) throws IOException {
keyToChannelLock.writeLock().lock();
// generate a completion key (if not shutdown) int key; try { if (isShutdown()) thrownew ShutdownChannelGroupException();
// generate unique key do {
key = nextCompletionKey++;
} while ((key == 0) || keyToChannel.containsKey(key));
// associate with I/O completion port if (handle != 0L) {
createIoCompletionPort(handle, port, key, 0);
}
/** * Invoked when a channel associated with this port is closed before * notifications for all outstanding I/O operations have been received.
*/ void makeStale(Long overlapped) { synchronized (staleIoSet) {
staleIoSet.add(overlapped);
}
}
/** * Checks if the given OVERLAPPED is stale and if so, releases it.
*/ privatevoid checkIfStale(long ov) { synchronized (staleIoSet) { boolean removed = staleIoSet.remove(ov); if (removed) {
unsafe.freeMemory(ov);
}
}
}
/** * The handler for consuming the result of an asynchronous I/O operation.
*/ staticinterface ResultHandler { /** * Invoked if the I/O operation completes successfully.
*/ publicvoid completed(int bytesTransferred, boolean canInvokeDirect);
/** * Invoked if the I/O operation fails.
*/ publicvoid failed(int error, IOException ioe);
}
// Creates IOException for the given I/O error. privatestatic IOException translateErrorToIOException(int error) {
String msg = getErrorMessage(error); if (msg == null)
msg = "Unknown error: 0x0" + Integer.toHexString(error); returnnew IOException(msg);
}
try { for (;;) { // reset invoke count if (myGroupAndInvokeCount != null)
myGroupAndInvokeCount.resetInvokeCount();
// wait for I/O completion event // An error here is fatal (thread will not be replaced)
replaceMe = false; try {
getQueuedCompletionStatus(port, ioResult);
} catch (IOException x) { // should not happen
x.printStackTrace(); return;
}
// handle wakeup to execute task or shutdown if (ioResult.completionKey() == 0 &&
ioResult.overlapped() == 0L)
{
Runnable task = pollTask(); if (task == null) { // shutdown request return;
}
// run task // (if error/exception then replace thread)
replaceMe = true;
task.run(); continue;
}
// lookup I/O request
PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped()); if (result == null) { // we get here if the OVERLAPPED structure is associated // with an I/O operation on a channel that was closed // but the I/O operation event wasn't read in a timely // manner. Alternatively, it may be related to a // tryLock operation as the OVERLAPPED structures for // these operations are not in the I/O cache.
checkIfStale(ioResult.overlapped()); continue;
}
// synchronize on result in case I/O completed immediately // and was handled by initiator synchronized (result) { if (result.isDone()) { continue;
} // not handled by initiator
}
// invoke I/O result handler int error = ioResult.error();
ResultHandler rh = (ResultHandler)result.getContext();
replaceMe = true; // (if error/exception then replace thread) if (error == 0) {
rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
} else {
rh.failed(error, translateErrorToIOException(error));
}
}
} finally { // last thread to exit when shutdown releases resources int remaining = threadExit(this, replaceMe); if (remaining == 0 && isShutdown()) {
implClose();
}
}
}
}
/** * Container for data returned by GetQueuedCompletionStatus
*/ privatestaticclass CompletionStatus { privateint error; privateint bytesTransferred; privateint completionKey; privatelong overlapped;
private CompletionStatus() { } int error() { return error; } int bytesTransferred() { return bytesTransferred; } int completionKey() { return completionKey; } long overlapped() { return overlapped; }
}
// -- native methods --
privatestaticnativevoid initIDs();
privatestaticnativelong createIoCompletionPort(long handle, long existingPort, int completionKey, int concurrency) throws IOException;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.