Fixed bugs in web server's handling of streamed content (logic was incorrectly handling non-blocking IO in the stream - interpreting it as a stream closed). Modified StreamedContent to use blocking IO instead of non-blocking since it was wasting CPU cycles (non-blocking IO done properly would require removing the write flag from the client socket and listening to the StreamedContent for the availablity of content before re-flagging the socket for writing). Using non-blocking IO here really isn't that useful since the content source should be trusted and timely, and multiple threads service clients (a few threads blocking for a few milliseconds is no big deal), and implementing it properly would significantly increase code complexity.

This commit is contained in:
wcrisman
2014-07-13 12:19:30 -07:00
parent 837fdced57
commit 11b75d567f
2 changed files with 135 additions and 134 deletions

View File

@@ -11,10 +11,15 @@ import com.foundation.web.interfaces.IContent;
import com.foundation.web.interfaces.IMimeType; import com.foundation.web.interfaces.IMimeType;
import com.foundation.web.interfaces.IMimeTypeProvider; import com.foundation.web.interfaces.IMimeTypeProvider;
/**
* Allows the request handler (Webapp code) to specify content that will be streamed (retrieved) as it is sent to the client, versus loaded into memory or on disk (file). This allows dynamic content to be processed as it is being sent to the client, and it allows for wrappering a stream of data being sent by an external process.
* <p>Note: Since the stream is considered a trusted source, non-blocking IO is currently prevented. That is to say that the thread trying to send the next packet of data to the client will wait for the content to become available rather than cycle back into a write waiting state. Write waiting is waiting upon the socket to the client to be available for writing, which it already is (or it wouldn't be asking for content to send), therefor it would just waste a lot of CPU cycles checking constantly for more content. Either we must be able to release the thread until content becomes available (we'd need to clear the socket listener flags until content is available, then set the write flag again), or we must block until content is available. Blocking shouldn't be a problem because the content source is trusted (won't keep us blocked an unreasonable amount of time) and because the WebServer allows for a fair number of threads servicing requests and responses at once (a couple blocking threads won't kill us).</p>
*/
public class StreamedContent implements IContent { public class StreamedContent implements IContent {
private SocketChannel channel = null; private SocketChannel channel = null;
private Runnable releaseChannelHandler = null; private Runnable releaseChannelHandler = null;
private ByteBuffer buffer = null; private ByteBuffer buffer = null;
/** The size of the content if known. Will be -1 if the size is not known (0 size indicates no remaining bytes to be read). */
private int size = 0; private int size = 0;
private int chunkSize = 0; private int chunkSize = 0;
/** /**
@@ -47,7 +52,8 @@ public StreamedContent(SocketChannel channel, ByteBuffer buffer, Runnable releas
this.buffer = buffer; this.buffer = buffer;
this.releaseChannelHandler = releaseChannelHandler; this.releaseChannelHandler = releaseChannelHandler;
this.size = -1; this.size = -1;
channel.configureBlocking(false); //TODO: This could be false, but we'd have to find a way to un-flag the socket for writing, and re-flag it once content is available. For now (due to the trusted and timely nature of streamed content) this isn't worth all the code complexity that would result.
channel.configureBlocking(true);
}//StreamedContent()// }//StreamedContent()//
public long getStart() { public long getStart() {
return 0; return 0;
@@ -85,8 +91,10 @@ public int get(ByteBuffer buffer) {
int result = CONTENT_END; int result = CONTENT_END;
try { try {
//Handle known size streams differently than streams with an unknown size.//
if(size != -1) { if(size != -1) {
if(size != 0) { if(size != 0) {
//Use already buffered data first, then read from the stream.//
if(this.buffer != null && this.buffer.hasRemaining()) { if(this.buffer != null && this.buffer.hasRemaining()) {
//Copy the data from our source buffer to the destination buffer.// //Copy the data from our source buffer to the destination buffer.//
result = put(buffer, this.buffer); result = put(buffer, this.buffer);
@@ -97,7 +105,7 @@ public int get(ByteBuffer buffer) {
int count = channel.read(buffer); int count = channel.read(buffer);
if(count == -1) result = CONTENT_END; if(count == -1) result = CONTENT_END;
else if(count == 0) result = CONTENT_PENDING; else if(count == 0) result = CONTENT_PENDING; //Note: Shouldn't occur since the stream is set to blocking.//
else { else {
result = count; result = count;
size -= count; size -= count;

View File

@@ -716,7 +716,7 @@ public class WebServer {
ByteBuffer buffer = null; ByteBuffer buffer = null;
try { try {
//Wrap the response in http cloths.// //Wrap the response in http cloths. The HeaderFieldNames will be set if the response was provided with a completely custom HTTP header to be used.//
if(response.getHeaderFieldNames() != null) { if(response.getHeaderFieldNames() != null) {
ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); ByteArrayOutputStream bout = new ByteArrayOutputStream(1000);
PrintStream pout = new PrintStream(bout, true, "ASCII"); PrintStream pout = new PrintStream(bout, true, "ASCII");
@@ -815,104 +815,93 @@ public class WebServer {
}//else// }//else//
}//if// }//if//
if(!response.isError() && response.getHeader() != null) { if(response.isError()) {
//Include all but the last end of line.// if(response.getHeader() != null) {
pout.print(response.getHeader().substring(0, response.getHeader().length() - 2)); pout.print(response.getHeader());
writeSessionCookies(pout); }//if//
//Add a final terminating end of line.// else {
pout.print("\r\n"); pout.print("HTTP/1.1 404 Resource Not Found\r\n");
}//else//
}//if// }//if//
else if(response.getCustomHeader() != null) {
pout.print(response.getCustomHeader());
}//else if//
else if(isDownloaded && request.getRange() != null) {
pout.print("HTTP/1.1 206 Partial Content\r\n");
}//else if//
else { else {
if(response.isError()) { pout.print("HTTP/1.1 200 OK\r\n");
if(response.getHeader() != null) {
pout.print(response.getHeader());
}//if//
else {
pout.print("HTTP/1.1 404 Resource Not Found\r\n");
}//else//
}//if//
else if(response.getCustomHeader() != null) {
pout.print(response.getCustomHeader());
}//else if//
else if(isDownloaded && request.getRange() != null) {
pout.print("HTTP/1.1 206 Partial Content\r\n");
}//else if//
else {
pout.print("HTTP/1.1 200 OK\r\n");
}//else//
pout.print("Content-Length: " + (content != null ? content.getSize() : 0) + "\r\n");
if(compress) {
//TODO: Add others?
if(compressionType == 1) {
content = new GzipContent(content);
pout.print("Content-Encoding: gzip\r\n");
}//if//
}//if//
if(content != null) {
//Note: The character set gives IE indigestion for some reason.//
pout.print("Content-Type: " + (mimeType != null ? mimeType.getMimeName() : "text/html") + "; charset=" + (response.getCharacterSet() == null ? "UTF-8" : response.getCharacterSet()) + "\r\n");
cacheDirective = content.getCacheDirective();
if(isDownloaded) {
pout.print("Content-Disposition: attachment; filename=\"" + content.getDownloadName() + "\";\r\n");
pout.print("Accept-Ranges: bytes\r\n");
if(request.getRange() != null) {
// Debug.log("Sending a ranged response: " + request.getRange() + " content range: (" + content.getStart() + " - " + content.getEnd() + "/" + content.getSize() + ").");
pout.print("Range: " + request.getRange() + "\r\n");
pout.print("Content-Range: bytes " + content.getStart() + "-" + content.getEnd() + "/" + content.getSize() + "\r\n");
}//if//
}//if//
}//if//
writeSessionCookies(pout);
pout.print("Server: DE/1.0\r\n");
//TODO: IE has a problem with caching and forwarding/redirecting. A page that redirects to another page that was previously cached does not result in IE sending a request for the forwarded content.//
//private / no-cache
if(content.getExpiresDirective() != null) {
pout.print("Expires: " + getHttpDateFormat().format(content.getExpiresDirective()));
}//if//
if(cacheDirective != null) {
pout.print("Cache-Control: " + cacheDirective + "\r\n");
}//if//
else {
int cacheLength = content.getCacheLength() != null ? content.getCacheLength().intValue() : mimeType != null ? mimeType.getDefaultCacheLength() : IMimeType.CACHE_LENGTH_NEVER_CACHE;
if(cacheLength > 0) {
pout.print("Cache-Control: public, max-age=" + cacheLength + "\r\n");
}//if//
else if(cacheLength == IMimeType.CACHE_LENGTH_ALWAYS_TEST) {
pout.print("Cache-Control: public, pre-check=0, post-check=120\r\n");
}//else if//
else if(cacheLength == IMimeType.CACHE_LENGTH_NEVER_CACHE) {
pout.print("Cache-Control: no-cache\r\n");
}//else if//
else {
pout.print("Cache-Control: no-store\r\n");
}//else//
}//else//
//TODO: Determine if we need to use age.
//pout.print("Age: 0\r\n");
//TODO: Determine if we need to use ETags
if(lastModifiedDate != null) {
SimpleDateFormat format = getHttpDateFormat();
pout.print("Last-Modified: " + format.format(lastModifiedDate) + "\r\n");
pout.print("Date: " + format.format(new Date()) + "\r\n");
}//if//
pout.print("\r\n");
}//else// }//else//
pout.print("Content-Length: " + (content != null ? content.getSize() : 0) + "\r\n");
if(compress) {
//TODO: Add others?
if(compressionType == 1) {
content = new GzipContent(content);
pout.print("Content-Encoding: gzip\r\n");
}//if//
}//if//
if(content != null) {
//Note: The character set gives IE indigestion for some reason.//
pout.print("Content-Type: " + (mimeType != null ? mimeType.getMimeName() : "text/html") + "; charset=" + (response.getCharacterSet() == null ? "UTF-8" : response.getCharacterSet()) + "\r\n");
cacheDirective = content.getCacheDirective();
if(isDownloaded) {
pout.print("Content-Disposition: attachment; filename=\"" + content.getDownloadName() + "\";\r\n");
pout.print("Accept-Ranges: bytes\r\n");
if(request.getRange() != null) {
// Debug.log("Sending a ranged response: " + request.getRange() + " content range: (" + content.getStart() + " - " + content.getEnd() + "/" + content.getSize() + ").");
pout.print("Range: " + request.getRange() + "\r\n");
pout.print("Content-Range: bytes " + content.getStart() + "-" + content.getEnd() + "/" + content.getSize() + "\r\n");
}//if//
}//if//
}//if//
writeSessionCookies(pout);
pout.print("Server: DE/1.0\r\n");
//TODO: IE has a problem with caching and forwarding/redirecting. A page that redirects to another page that was previously cached does not result in IE sending a request for the forwarded content.//
//private / no-cache
if(content.getExpiresDirective() != null) {
pout.print("Expires: " + getHttpDateFormat().format(content.getExpiresDirective()));
}//if//
if(cacheDirective != null) {
pout.print("Cache-Control: " + cacheDirective + "\r\n");
}//if//
else {
int cacheLength = content.getCacheLength() != null ? content.getCacheLength().intValue() : mimeType != null ? mimeType.getDefaultCacheLength() : IMimeType.CACHE_LENGTH_NEVER_CACHE;
if(cacheLength > 0) {
pout.print("Cache-Control: public, max-age=" + cacheLength + "\r\n");
}//if//
else if(cacheLength == IMimeType.CACHE_LENGTH_ALWAYS_TEST) {
pout.print("Cache-Control: public, pre-check=0, post-check=120\r\n");
}//else if//
else if(cacheLength == IMimeType.CACHE_LENGTH_NEVER_CACHE) {
pout.print("Cache-Control: no-cache\r\n");
}//else if//
else {
pout.print("Cache-Control: no-store\r\n");
}//else//
}//else//
//TODO: Determine if we need to use age.
//pout.print("Age: 0\r\n");
//TODO: Determine if we need to use ETags
if(lastModifiedDate != null) {
SimpleDateFormat format = getHttpDateFormat();
pout.print("Last-Modified: " + format.format(lastModifiedDate) + "\r\n");
pout.print("Date: " + format.format(new Date()) + "\r\n");
}//if//
pout.print("\r\n");
headerBytes = bout.toByteArray(); headerBytes = bout.toByteArray();
}//else if// }//else if//
else { else {
@@ -1215,26 +1204,26 @@ public class WebServer {
//Check to see if the outbound message is prepared to send more content. For chunked transfers the outbound message may be waiting for additional content from another stream and we should return later.// //Check to see if the outbound message is prepared to send more content. For chunked transfers the outbound message may be waiting for additional content from another stream and we should return later.//
if(result && !pendingOutboundMessage.getBuffer().hasRemaining()) { if(result && !pendingOutboundMessage.getBuffer().hasRemaining()) {
//Attempt to load additional message bytes into the buffer.// //Attempt to load additional message bytes into the buffer.//
if(!pendingOutboundMessage.loadBuffer()) { boolean couldLoadAdditionalBytes = pendingOutboundMessage.loadBuffer();
//The buffer will be set to null if there are no more bytes to be read from the stream (ever).//
if(pendingOutboundMessage.getBuffer() == null) {
//Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.//
if(pendingOutboundMessage.getNext() != null) { if(pendingOutboundMessage.getNext() != null) {
pendingOutboundMessage = pendingOutboundMessage.getNext(); pendingOutboundMessage = pendingOutboundMessage.getNext();
//TODO: Comment me.
//Debug.log("Getting next pending outbound message in linked list to send to the client from git.");
}//if// }//if//
//Tell the caller that all messages have been sent.//
else { else {
//Wait until additional message bytes are available.// pendingOutboundMessage = null;
result = false; lastAddedMessageBuffer = null;
result = true;
}//else// }//else//
}//if// }//if//
//Note: Currently this will never happen since it is a waste of CPU cycles. See comments in StreamedContent's class javadocs. We'd need to remove the write flag from the socket and listen for a read flag on the stream before re-setting the write flag on the socket.//
//If the message end has been reached then the buffer will be null.// else if(!couldLoadAdditionalBytes) {
if(pendingOutboundMessage.getBuffer() == null) { //Wait until additional message bytes are available.//
//TODO: Comment me. result = false;
//Debug.log("Last pending outbound message sent to client from git."); }//else if//
pendingOutboundMessage = null;
lastAddedMessageBuffer = null;
}//if//
}//if// }//if//
//If we have an application response pending then send it now.// //If we have an application response pending then send it now.//
@@ -1307,28 +1296,27 @@ public class WebServer {
//Add more content to the buffer.// //Add more content to the buffer.//
//Note: Do this even if the last encrypted write buffer could not be fully sent - so that when it is sent there will be outbound message content.// //Note: Do this even if the last encrypted write buffer could not be fully sent - so that when it is sent there will be outbound message content.//
if(key.channel().isOpen() && pendingOutboundMessage != null) { if(key.channel().isOpen() && pendingOutboundMessage != null) {
if(!pendingOutboundMessage.loadBuffer()) { //Attempt to load additional message bytes into the buffer.//
boolean couldLoadAdditionalBytes = pendingOutboundMessage.loadBuffer();
//The buffer will be set to null if there are no more bytes to be read from the stream (ever).//
if(pendingOutboundMessage.getBuffer() == null) {
//Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.//
if(pendingOutboundMessage.getNext() != null) { if(pendingOutboundMessage.getNext() != null) {
pendingOutboundMessage = pendingOutboundMessage.getNext(); pendingOutboundMessage = pendingOutboundMessage.getNext();
//TODO: Comment me.
//Debug.log("Getting next pending outbound message in linked list to send to the client from git.");
}//if// }//if//
//Tell the caller that all messages have been sent.//
else { else {
//TODO: Comment me. pendingOutboundMessage = null;
//Debug.log("No next pending outbound message in linked list to send to the client from git."); lastAddedMessageBuffer = null;
//Wait until additional message bytes are available.// result = true;
result = false;
}//else// }//else//
}//if// }//if//
//Note: Currently this will never happen since it is a waste of CPU cycles. See comments in StreamedContent's class javadocs. We'd need to remove the write flag from the socket and listen for a read flag on the stream before re-setting the write flag on the socket.//
//If the message end has been reached then the buffer will be null.// else if(!couldLoadAdditionalBytes) {
if(pendingOutboundMessage.getBuffer() == null) { //Wait until additional message bytes are available.//
//TODO: Comment me. result = false;
//Debug.log("Last pending outbound message sent to client from git."); }//else if//
pendingOutboundMessage = null;
lastAddedMessageBuffer = null;
}//if//
}//if// }//if//
}//while// }//while//
}//if// }//if//
@@ -1348,22 +1336,27 @@ public class WebServer {
result = false; result = false;
}//if// }//if//
else { else {
if(!pendingOutboundMessage.loadBuffer()) { //Attempt to load additional message bytes into the buffer.//
boolean couldLoadAdditionalBytes = pendingOutboundMessage.loadBuffer();
//The buffer will be set to null if there are no more bytes to be read from the stream (ever).//
if(pendingOutboundMessage.getBuffer() == null) {
//Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.//
if(pendingOutboundMessage.getNext() != null) { if(pendingOutboundMessage.getNext() != null) {
pendingOutboundMessage = pendingOutboundMessage.getNext(); pendingOutboundMessage = pendingOutboundMessage.getNext();
}//if// }//if//
//Tell the caller that all messages have been sent.//
else { else {
//Wait until additional message bytes are available.// pendingOutboundMessage = null;
result = false; lastAddedMessageBuffer = null;
result = true;
}//else// }//else//
}//if// }//if//
//Note: Currently this will never happen since it is a waste of CPU cycles. See comments in StreamedContent's class javadocs. We'd need to remove the write flag from the socket and listen for a read flag on the stream before re-setting the write flag on the socket.//
//If the message end has been reached then the buffer will be null.// else if(!couldLoadAdditionalBytes) {
if(pendingOutboundMessage.getBuffer() == null) { //Wait until additional message bytes are available.//
pendingOutboundMessage = null; result = false;
lastAddedMessageBuffer = null; }//else if//
}//if//
}//else// }//else//
}//while// }//while//
}//else// }//else//