Modified the web server to use a dedicated thread to handle socket management. It now will setup new sockets, and pass request and response handling for each socket needing it to a runnable run through the thread service. This may impact performance (not sure which direction) and will ensure that socket listening is never dropped in favor of message processing.

This commit is contained in:
wcrisman
2014-08-30 17:28:58 -07:00
parent fa572dae03
commit 1a8fd62dd8

View File

@@ -1796,12 +1796,11 @@ public class WebServer {
* @see java.lang.Runnable#run() * @see java.lang.Runnable#run()
*/ */
public void run() { public void run() {
boolean loop = true; // boolean loop = true;
//Looping only occurs when we are at the maximum allowed number of threads handling messages.// //Looping only occurs when we are at the maximum allowed number of threads handling messages.//
while(!stop && loop) { while(!stop /*&& loop*/) {
SelectionKey key = null; SelectionKey key = null;
boolean isWrite = false;
try { try {
//Synchronize so that we ensure thread safe access to the activeThreadCount variable.// //Synchronize so that we ensure thread safe access to the activeThreadCount variable.//
@@ -1834,12 +1833,9 @@ public class WebServer {
selectedKeys.remove(); selectedKeys.remove();
//Weed out invalid (cancelled) keys.// //Weed out invalid (cancelled) keys.//
if(key.isValid()) { if(!key.isValid()) {
isWrite = key.isWritable();
}//if//
else {
key = null; key = null;
}//else// }//if//
if(!selectedKeys.hasNext()) { if(!selectedKeys.hasNext()) {
selectedKeys = null; selectedKeys = null;
@@ -1856,8 +1852,9 @@ public class WebServer {
}//catch// }//catch//
if(key != null) { if(key != null) {
ChannelContext context = (ChannelContext) key.attachment(); final boolean isWrite = key.isWritable();
SelectableChannel channel = key.channel(); final ChannelContext context = (ChannelContext) key.attachment();
final SelectableChannel channel = key.channel();
if(channel instanceof ServerSocketChannel) { if(channel instanceof ServerSocketChannel) {
try { try {
@@ -1888,7 +1885,7 @@ public class WebServer {
}//catch// }//catch//
}//if// }//if//
else if(channel instanceof SocketChannel) { else if(channel instanceof SocketChannel) {
boolean socketClosed = false; // boolean socketClosed = false;
//Toggle the write or read flag.// //Toggle the write or read flag.//
synchronized(key) { synchronized(key) {
@@ -1903,9 +1900,52 @@ public class WebServer {
//Not allowing either reads or writes to continue until all processing of this message is done.// //Not allowing either reads or writes to continue until all processing of this message is done.//
// key.interestOps(0); // key.interestOps(0);
}//synchronized// }//synchronized//
try { if(((SocketChannel) channel).isOpen()) {
if(((SocketChannel) channel).isOpen()) { ThreadService.run(new Runnable() {
public void run() {
boolean socketClosed = false;
try {
if(isWrite) {
//Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).//
synchronized(((AbstractSocketContext) context).getLock()) {
//Process the pending write to the socket as much as is possible, then return.//
((AbstractSocketContext) context).processResponses();
}//synchronized//
}//if//
else {
//Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).//
synchronized(((AbstractSocketContext) context).getLock()) {
//Process the incoming request and send the response (a partial response may be sent in which case the socket will be set to wait for a write opportunity and not a read opportunity).//
((AbstractSocketContext) context).processRequest();
}//synchronized//
}//else//
}//try//
catch(TlsFailureException e) {
//Allow the failure to be ignored. This occurs when the client fails to use TLS or fails to send the host name as part of the TLS handshake.//
try {((SocketChannel) channel).close();}catch(Throwable e2) {} //Release the socket so the message doesn't continue to be processed.//
}//catch//
catch(Throwable e) {
if(debug) Debug.log(e);
//Force the socket to be closed (for sure).//
try {((SocketChannel) channel).close();} catch(Throwable e2) {}
//Debug.log(e);
socketClosed = true;
}//catch//
finally {
if(channel != null && !socketClosed && channel.isOpen() && context != null) {
selector.wakeup();
}//if//
else if(channel != null && (!channel.isOpen() || socketClosed) && channel instanceof SocketChannel && context instanceof SocketContext) {
cleanupClientChannel((SocketContext) context, (SocketChannel) channel);
}//else if//
}//finally//
}//run()//
});
/*
try {
synchronized(this) { synchronized(this) {
// if(++activeThreadCount != maxThreadCount) { // if(++activeThreadCount != maxThreadCount) {
//Start another thread to take this thread's place.// //Start another thread to take this thread's place.//
@@ -1937,42 +1977,42 @@ public class WebServer {
((AbstractSocketContext) context).processRequest(); ((AbstractSocketContext) context).processRequest();
}//synchronized// }//synchronized//
}//else// }//else//
}//if// }//try//
}//try// catch(TlsFailureException e) {
catch(TlsFailureException e) { //Allow the failure to be ignored. This occurs when the client fails to use TLS or fails to send the host name as part of the TLS handshake.//
//Allow the failure to be ignored. This occurs when the client fails to use TLS or fails to send the host name as part of the TLS handshake.// try {((SocketChannel) channel).close();}catch(Throwable e2) {} //Release the socket so the message doesn't continue to be processed.//
try {((SocketChannel) channel).close();}catch(Throwable e2) {} //Release the socket so the message doesn't continue to be processed.// }//catch//
}//catch// catch(Throwable e) {
catch(Throwable e) { if(debug) Debug.log(e);
if(debug) Debug.log(e);
//Force the socket to be closed (for sure).//
try {((SocketChannel) channel).close();}catch(Throwable e2) {}
//Debug.log(e);
socketClosed = true;
}//catch//
finally {
boolean requiresWakeup = false;
if(channel != null && !socketClosed && channel.isOpen() && key != null && context != null) {
requiresWakeup = true;
}//if// //Force the socket to be closed (for sure).//
else if(channel != null && (!channel.isOpen() || socketClosed) && channel instanceof SocketChannel && context instanceof SocketContext) { try {((SocketChannel) channel).close();}catch(Throwable e2) {}
cleanupClientChannel((SocketContext) context, (SocketChannel) channel); //Debug.log(e);
}//else if// socketClosed = true;
}//catch//
//Loop if the last thread to wait for a message couldn't start another thread due to the max number of threads allowed.// finally {
synchronized(this) { boolean requiresWakeup = false;
// if(activeThreadCount-- != maxThreadCount) {
loop = false; if(channel != null && !socketClosed && channel.isOpen() && key != null && context != null) {
requiresWakeup = true;
if(requiresWakeup) { }//if//
selector.wakeup(); else if(channel != null && (!channel.isOpen() || socketClosed) && channel instanceof SocketChannel && context instanceof SocketContext) {
}//if// cleanupClientChannel((SocketContext) context, (SocketChannel) channel);
// }//if// }//else if//
}//synchronized//
}//finally// //Loop if the last thread to wait for a message couldn't start another thread due to the max number of threads allowed.//
synchronized(this) {
// if(activeThreadCount-- != maxThreadCount) {
loop = false;
if(requiresWakeup) {
selector.wakeup();
}//if//
// }//if//
}//synchronized//
}//finally//
*/
}//if//
}//else if// }//else if//
}//if// }//if//
}//while// }//while//