Applied some of the changes to the structure of WebServer that were least controversial. Some of the code added is not yet used.

This commit is contained in:
wcrisman
2014-12-08 17:23:59 -08:00
parent acbae41318
commit d00638873b

View File

@@ -202,14 +202,37 @@ public class WebServer {
* The response message buffer encapsulating the request generating the response, and the content, and chainable into a linked list. * The response message buffer encapsulating the request generating the response, and the content, and chainable into a linked list.
*/ */
private static class MessageBuffer { private static class MessageBuffer {
/** The actual underlying buffer containing the bytes to be sent. Will be null if the message buffer needs initializing or has finished. */
private ByteBuffer buffer = null; private ByteBuffer buffer = null;
/** The ability to chain message buffers into a linked list. */
private MessageBuffer next = null;
/** The optional response the message is based upon. */ /** The optional response the message is based upon. */
private Response response = null; private Response response = null;
/** The content if there is any. */ /** The content if there is any. */
private IContent content = null; private IContent content = null;
/** The ability to chain message buffers into a linked list. */
private MessageBuffer next = null;
/**
* MessageBuffer constructor.
*/
public MessageBuffer() {
}//MessageBuffer()//
/**
* MessageBuffer constructor.
* @param buffer The buffer to use for assembling the message bytes.
*/
public MessageBuffer(ByteBuffer buffer) {
setBuffer(buffer);
}//MessageBuffer()//
/**
* Sets the actual underlying buffer for the message buffer.
* @param buffer
*/
protected void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
if(buffer != null && buffer.position() != 0) buffer.flip();
}//setBuffer()//
/** /**
* MessageBuffer constructor. * MessageBuffer constructor.
* @param buffer The buffer to use for assembling the message bytes. * @param buffer The buffer to use for assembling the message bytes.
@@ -230,15 +253,26 @@ public class WebServer {
this.response = response; this.response = response;
}//MessageBuffer()// }//MessageBuffer()//
/** /**
* MessageBuffer constructor. * Initializes the message buffer for use.
* @param buffer The buffer to use for assembling the message bytes. * @return Whether initialization succeded. Intialization should be considered a success even if none is required or has already been performed. If it fails the caller should close the socket.
*/ */
public MessageBuffer(ByteBuffer buffer) { public boolean initialize() {
this.buffer = buffer; //Does nothing by default. Subclasses may implement.//
return true;
//Flip the buffer (if not already flipped) so we can write out the bytes.// }//initialize()//
if(buffer.position() != 0) buffer.flip(); /**
}//MessageBuffer()// * Whether the message buffer is closed and all bytes have been sent.
* @return If the bytes have all been sent.
*/
public boolean isClosed() {
return buffer == null;
}//isClosed()//
/**
* Closes the message buffer.
*/
public void close() {
this.buffer = null;
}//close()//
/** /**
* Gets the byte buffer containing the current portion of the message to be sent. * Gets the byte buffer containing the current portion of the message to be sent.
* @return The buffer containing the next part of the message to be sent, or null if the message end has been reached. * @return The buffer containing the next part of the message to be sent, or null if the message end has been reached.
@@ -286,6 +320,501 @@ public class WebServer {
/** Gets the response object that created the message. This will be null for pass through sockets. */ /** Gets the response object that created the message. This will be null for pass through sockets. */
public Response getResponse() {return response;} public Response getResponse() {return response;}
}//MessageBuffer// }//MessageBuffer//
private class WebsocketMessageBuffer extends MessageBuffer {
/** The streaming message handler which will be set only if the currently sending message is streaming. */
private IStreamedWebsocketMessage streamingMessage = null;
/** The buffer containing the next part of the streamed message, or the bytes of the whole message (if streamingMessage == null), or null if the buffer is closed or not yet initialized. */
private ByteBuffer messagePart = null;
/** The message to be sent. Will be null after the message buffer has initialized. */
private Object message = null;
public WebsocketMessageBuffer(Object message) {
this.message = message;
}//WebsocketMessageBuffer()//
public boolean initialize() {
if(message != null) {
messagePart = stream(message, true);
message = null;
}//if//
return super.initialize();
}//initialize()//
public boolean isClosed() {
return super.isClosed() && messagePart == null && streamingMessage == null;
}//isClosed()//
public void close() {
super.close();
messagePart = null;
streamingMessage = null;
}//close()//
private ByteBuffer stream(Object next, boolean isLast) {
ByteBuffer result = null;
byte[] bytes = null;
int opCode = 0;
int length = 0;
if(next instanceof String) {
try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);}
opCode = streamingMessage == null ? 0x01 : 0;
length = bytes.length;
}//if//
else if(next instanceof byte[]) {
bytes = (byte[]) next;
opCode = streamingMessage == null ? 0x02 : 0;
length = bytes.length;
}//else if//
else if(next instanceof Byte) { //Control Message//
opCode = ((Byte) next).byteValue();
}//else if//
else if(next instanceof IStreamedWebsocketMessage) {
//TODO: Ensure that this is not recursive!
streamingMessage = (IStreamedWebsocketMessage) next;
next = streamingMessage.getNextPart();
isLast = !streamingMessage.hasNextPart();
if(next instanceof String) {
try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);}
opCode = 0x01; //Text//
length = bytes.length;
}//if//
else if(next instanceof byte[]) {
bytes = (byte[]) next;
opCode = 0x02; //Binary//
length = bytes.length;
}//else if//
else {
throw new RuntimeException("Invalid streaming message part type.");
}//if//
}//else if//
result = ByteBuffer.allocate(14 + length);
result.put((byte) (isLast ? 0x8 : 0));
result.put((byte) opCode);
//Write the length differently based on how long the content is.//
if(length < 126) {
result.put((byte) length);
}//if//
else if(length < 65535) {
result.put((byte) 126);
result.putShort((short) (length & 0xFFFF));
result.putInt(0);
}//else if//
else {
result.put((byte) 127);
result.putLong((long) length);
}//else//
//Put the content at the end of the message.//
result.put(bytes);
return result;
}//stream()//
public boolean loadBuffer() {
boolean result = true;
getBuffer().compact();
//Copy remaining bytes from MessagePart to the buffer.//
if(messagePart != null && messagePart.remaining() > 0) {
int length = Math.min(getBuffer().remaining(), messagePart.remaining());
getBuffer().put(messagePart.array(), messagePart.position(), length);
messagePart.position(messagePart.position() + length);
if(messagePart.remaining() == 0) {
messagePart = null;
}//if//
}//if//
//Load a new message part if streaming and copy as many bytes from the MessagePart into the buffer as possible.//
if(messagePart == null && streamingMessage != null) {
Object next = null;
boolean isLastPart = true;
next = streamingMessage.getNextPart();
isLastPart = !streamingMessage.hasNextPart();
//Ensure that we got a string or byte array.//
if(!(next instanceof String || next instanceof byte[])) {
throw new RuntimeException("Invalid streaming message part type.");
}//if//
messagePart = stream(next, isLastPart);
if(isLastPart) {
streamingMessage = null;
}//if//
int length = Math.min(getBuffer().remaining(), messagePart.remaining());
getBuffer().put(messagePart.array(), messagePart.position(), length);
messagePart.position(messagePart.position() + length);
if(messagePart.remaining() == 0) {
messagePart = null;
}//if//
}//if//
//Close the message buffer if we were unable to add any bytes and there is nothing left to send.//
if(getBuffer().remaining() == 0) {
close();
result = false;
}//if//
return result;
}//loadBuffer()//
}//WebsocketMessageBuffer//
private class HttpMessageBuffer extends MessageBuffer {
/** The response the message is based upon. */
private Response response = null;
/** The content if there is any. */
private IContent content = null;
public HttpMessageBuffer(Response response) {
this.response = response;
}//HttpMessageBuffer()//
/** Gets the response object that created the message. This will be null for pass through sockets. */
public Response getResponse() {return response;}
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.MessageBuffer#initialize()
*/
public boolean initialize() {
boolean result = true;
if(response != null && getBuffer() == null) {
Request request = (Request) response.getRequest();
byte[] headerBytes = null;
IContent content = null;
ByteBuffer buffer = null;
try {
//Wrap the response in http cloths. The HeaderFieldNames will be set if the response was provided with a completely custom HTTP header to be used.//
if(response.getHeaderFieldNames() != null) {
ByteArrayOutputStream bout = new ByteArrayOutputStream(1000);
PrintStream pout = new PrintStream(bout, true, "ASCII");
LiteList headerFieldNames = response.getHeaderFieldNames();
LiteHashMap headerFieldMap = response.getHeaderFieldMap();
//Write the response line which is mapped to the null field name.//
pout.print(headerFieldMap.get(null));
pout.print("\r\n");
//Write the rest of the response header lines in order.//
for(int index = 0; index < headerFieldNames.getSize(); index++) {
String headerFieldName = (String) headerFieldNames.get(index);
String headerFieldValue = (String) headerFieldMap.get(headerFieldName);
if(headerFieldName.equals("Server")) {
pout.print("Server: DE/1.0");
}//if//
else {
pout.print(headerFieldName);
pout.print(": ");
pout.print(headerFieldValue);
}//else//
pout.print("\r\n");
}//for//
//Write out any cookies necessary to retain our session.//
writeSessionCookies(pout);
//End the header.//
pout.print("\r\n");
pout.close();
headerBytes = bout.toByteArray();
//Prepare the content for delivery.//
content = response.getContent();
}//if//
else if(response.getForwardUri() != null) {
ByteArrayOutputStream bout = new ByteArrayOutputStream(1000);
PrintStream pout = new PrintStream(bout, true, "ASCII");
//String today = format.format(new Date());
//The 303 code may not be fully supported by browsers.//
//if(request.getHttpVersion().equalsIgnoreCase("HTTP/1.1") || request.getHttpVersion().equalsIgnoreCase("HTTP/1.2")) {
// pout.print("HTTP/1.1 303 Forwarded\r\n");
//}//if//
//else {
pout.print("HTTP/1.1 302 Moved Temporarily\r\n");
//}//else//
writeSessionCookies(pout);
pout.print("Location: " + response.getForwardUri() + "\r\n");
//Note: Encoded URL's will have their parameters in the content, not in the URL.//
if(response.getContent() == null) {
pout.print("Content-Length: 0\r\n");
}//if//
else {
pout.print("Content-Length: " + response.getContent().getSize() + "\r\n");
pout.print("Content-Type: application/x-www-form-urlencoded\r\n");
}//else//
pout.print("\r\n");
pout.close();
headerBytes = bout.toByteArray();
}//else if//
else if((content = response.getContent()) != null) { //Convert the result into a stream of bytes.//
ByteArrayOutputStream bout = new ByteArrayOutputStream(1000);
PrintStream pout = new PrintStream(bout, true, "ASCII");
Date lastModifiedDate = content.getLastModifiedDate();
String cacheDirective = null;
IMimeType mimeType = content.getMimeType(response.getApplication() != null ? response.getApplication().getMimeTypeProvider() : null);
boolean isDownloaded = content != null && (content.getIsDownloaded() == null ? mimeType != null && mimeType.isDownloaded() : content.getIsDownloaded().booleanValue());
boolean compress = response.getCompress().booleanValue() && (mimeType == null || mimeType.isCompressable());
int compressionType = 0; //0: none, 1: gzip, ..//
if(compress) {
//Check for an encoding allowed by the client that we know how to use.//
if(request.getAllowedEncodings() == null || request.getAllowedEncodings().length < 1) {
compress = false;
}//if//
else {
compress = false;
//Ensure we have an allowed encoding we know how to use.//
for(int index = 0; !compress && index < request.getAllowedEncodings().length; index++) {
String encoding = request.getAllowedEncodings()[index];
if(encoding.equalsIgnoreCase("gzip")) {
compress = true;
compressionType = 1;
}//if//
}//for//
}//else//
}//if//
if(response.isError()) {
if(response.getHeader() != null) {
pout.print(response.getHeader());
}//if//
else {
pout.print("HTTP/1.1 404 Resource Not Found\r\n");
}//else//
}//if//
else if(response.getCustomHeader() != null) {
pout.print(response.getCustomHeader());
}//else if//
else if(isDownloaded && request.getRange() != null) {
pout.print("HTTP/1.1 206 Partial Content\r\n");
}//else if//
else {
pout.print("HTTP/1.1 200 OK\r\n");
}//else//
pout.print("Content-Length: " + (content != null ? content.getSize() : 0) + "\r\n");
if(compress) {
//TODO: Add others?
if(compressionType == 1) {
content = new GzipContent(content);
pout.print("Content-Encoding: gzip\r\n");
}//if//
}//if//
if(content != null) {
//Note: The character set gives IE indigestion for some reason.//
pout.print("Content-Type: " + (mimeType != null ? mimeType.getMimeName() : "text/html") + "; charset=" + (response.getCharacterSet() == null ? "UTF-8" : response.getCharacterSet()) + "\r\n");
cacheDirective = content.getCacheDirective();
if(isDownloaded) {
pout.print("Content-Disposition: attachment; filename=\"" + content.getDownloadName() + "\";\r\n");
pout.print("Accept-Ranges: bytes\r\n");
if(request.getRange() != null) {
// Debug.log("Sending a ranged response: " + request.getRange() + " content range: (" + content.getStart() + " - " + content.getEnd() + "/" + content.getSize() + ").");
pout.print("Range: " + request.getRange() + "\r\n");
pout.print("Content-Range: bytes " + content.getStart() + "-" + content.getEnd() + "/" + content.getSize() + "\r\n");
}//if//
}//if//
}//if//
writeSessionCookies(pout);
pout.print("Server: DE/1.0\r\n");
//TODO: IE has a problem with caching and forwarding/redirecting. A page that redirects to another page that was previously cached does not result in IE sending a request for the forwarded content.//
//private / no-cache
if(content.getExpiresDirective() != null) {
pout.print("Expires: " + getHttpDateFormat().format(content.getExpiresDirective()));
}//if//
if(cacheDirective != null) {
pout.print("Cache-Control: " + cacheDirective + "\r\n");
}//if//
else {
int cacheLength = content.getCacheLength() != null ? content.getCacheLength().intValue() : mimeType != null ? mimeType.getDefaultCacheLength() : IMimeType.CACHE_LENGTH_NEVER_CACHE;
if(cacheLength > 0) {
pout.print("Cache-Control: public, max-age=" + cacheLength + "\r\n");
}//if//
else if(cacheLength == IMimeType.CACHE_LENGTH_ALWAYS_TEST) {
pout.print("Cache-Control: public, pre-check=0, post-check=120\r\n");
}//else if//
else if(cacheLength == IMimeType.CACHE_LENGTH_NEVER_CACHE) {
pout.print("Cache-Control: no-cache\r\n");
}//else if//
else {
pout.print("Cache-Control: no-store\r\n");
}//else//
}//else//
//TODO: Determine if we need to use age.
//pout.print("Age: 0\r\n");
//TODO: Determine if we need to use ETags
if(lastModifiedDate != null) {
SimpleDateFormat format = getHttpDateFormat();
pout.print("Last-Modified: " + format.format(lastModifiedDate) + "\r\n");
pout.print("Date: " + format.format(new Date()) + "\r\n");
}//if//
pout.print("\r\n");
headerBytes = bout.toByteArray();
}//else if//
else {
ByteArrayOutputStream bout = new ByteArrayOutputStream(1000);
PrintStream pout = new PrintStream(bout, true, "ASCII");
if(response.isError()) {
if(response.getHeader() != null) {
pout.print(response.getHeader());
}//if//
else {
pout.print("HTTP/1.1 404 Resource Not Found\r\n");
}//else//
}//if//
else if(response.getCustomHeader() != null) {
pout.print(response.getCustomHeader());
}//else if//
else {
Debug.log(new RuntimeException("The response to: " + response.getRequest().getHeaderText() + " had no response content!"));
pout.print("HTTP/1.1 200 OK\r\n");
}//else//
writeSessionCookies(pout);
pout.print("Content-Length: 0\r\n");
pout.print("Server: DE/1.0\r\n");
pout.print("\r\n");
pout.close();
headerBytes = bout.toByteArray();
}//else//
buffer = ByteBuffer.allocate(headerBytes.length > 2000 ? headerBytes.length : 2000);
buffer.put(headerBytes);
if(debug) {
//Test code...
ByteBuffer buffer2 = ByteBuffer.allocate(headerBytes.length);
buffer2.put(headerBytes);
buffer2.flip();
CharBuffer ch = decoder.decode(buffer2);
// debugBuffer.append("Sending message:\n");
// debugBuffer.append(ch.toString());
// debugBuffer.append("\nResponse Size: " + (headerBytes.length + (content != null ? content.getSize() : 0)) + "\n");
Debug.log(ch.toString());
}//if//
//Ignore the content if we are only accessing the header.//
// if(content != null && request.getRequestType() != Request.TYPE_HEAD) {
// content.get(buffer);
// }//if//
//Save the content as the current outbound message's content.//
this.content = content != null && request.getRequestType() != Request.TYPE_HEAD ? content : null;
//Fill the remaining buffer space with the content.//
if(this.content != null) {
this.content.get(buffer);
}//if//
//Save the buffer as the current outbound message's buffer.//
setBuffer(buffer);
}//try//
catch(Throwable e) {
Debug.log("Fatal Error: Failed to build and send the response message due to an exception.", e);
//Clean up after the request and response.//
try {response.close();} catch(Throwable e2) {}
//Fail the initialization.//
result = false;
}//catch//
}//if//
return result;
}//initialize()//
private void writeSessionCookies(PrintStream pout) {
ISession session = response.getSession();
if(session != null) {
//Write the session id only if it has changed.//
if(!Comparator.equals(response.getRequest().getSessionId(), session.getSessionId())) {
pout.print("Set-Cookie: sessionId=" + session.getSessionId() + ";path=/;\r\n");
}//if//
//Write the secure session id only if it has changed.//
if(!Comparator.equals(response.getRequest().getSecureSessionId(), session.getSecureSessionId())) {
pout.print("Set-Cookie: secureSessionId=" + (session.getSecureSessionId() == null ? "" : session.getSecureSessionId()) + ";path=/;secure\r\n");
}//if//
if(response.getRequest().isLoggedIn() != session.getIsLoggedIn()) {
pout.print("Set-Cookie: isLoggedIn=" + session.getIsLoggedIn() + ";path=/;\r\n");
}//if//
}//if//
}//writeSessionCookies()//
public boolean isClosed() {
return super.isClosed() && response == null;
}//isClosed()//
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.MessageBuffer#close()
*/
public void close() {
super.close();
try {if(response != null) response.close();} catch(Throwable e) {}
response = null;
content = null;
}//close()//
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.MessageBuffer#loadBuffer()
*/
public boolean loadBuffer() {
boolean result = true;
ByteBuffer buffer = getBuffer();
if(buffer != null) {
if(content != null) {
int getResult;
buffer.compact();
getResult = content.get(buffer);
if(getResult == IContent.CONTENT_PENDING) {
result = false; //Should never occur currently: See StreamedContent's javadocs.//
}//if//
else if(getResult == IContent.CONTENT_END) {
content = null;
}//else if//
if(buffer.position() != 0) buffer.flip();
}//if//
else {
if(!buffer.hasRemaining()) {
//Clear the buffer pointer indicating the message buffer is done.//
close();
}//if//
result = false;
}//else//
}//if//
else {
result = false;
}//else//
return result;
}//loadBuffer()//
}//HttpMessageBuffer//
private abstract class AbstractSocketContext extends ChannelContext { private abstract class AbstractSocketContext extends ChannelContext {
/** The key that represents the connection between the channel (socket) and the selector used to multiplex the listener. The code must synchronize on this attribute when accessing the isUsed functionality, or when interacting with the key's interestOps. */ /** The key that represents the connection between the channel (socket) and the selector used to multiplex the listener. The code must synchronize on this attribute when accessing the isUsed functionality, or when interacting with the key's interestOps. */
@@ -301,15 +830,15 @@ public class WebServer {
*/ */
protected abstract Object getLock(); protected abstract Object getLock();
/** /**
* Processes the next response in the sequence. * Writes the next responses/messages in the sequence.
* @throws IOException * @throws IOException
*/ */
protected abstract void processResponses() throws IOException; protected abstract void writeOutgoingMessages() throws IOException;
/** /**
* Processes the next request received via the socket. * Reads the next requests/messages received via the socket.
* @throws IOException * @throws IOException
*/ */
protected abstract void processRequest() throws IOException; protected abstract void readIncomingMessages() throws IOException;
/** /**
* Passes the message through to a receiving process via a second socket. * Passes the message through to a receiving process via a second socket.
* @param buffer The buffer containing the message. This buffer will not be retained by this method call, and can be reused by the caller. * @param buffer The buffer containing the message. This buffer will not be retained by this method call, and can be reused by the caller.
@@ -429,7 +958,7 @@ public class WebServer {
/* (non-Javadoc) /* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses()
*/ */
protected synchronized void processResponses() throws IOException { protected synchronized void writeOutgoingMessages() throws IOException {
//Actually this is called when a request is being sent via the pass through socket (sending the request to the remote server).// //Actually this is called when a request is being sent via the pass through socket (sending the request to the remote server).//
//Synchronized to avoid accessing the pendingMessageBuffer and lastAddedMessageBuffer at the same time as a thread that is calling passThrough(ByteBuffer) which also accesses these variables.// //Synchronized to avoid accessing the pendingMessageBuffer and lastAddedMessageBuffer at the same time as a thread that is calling passThrough(ByteBuffer) which also accesses these variables.//
boolean result = true; boolean result = true;
@@ -476,7 +1005,7 @@ public class WebServer {
/* (non-Javadoc) /* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest() * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest()
*/ */
protected void processRequest() throws IOException { protected void readIncomingMessages() throws IOException {
//Actually this is called when a response is being processed via the pass through socket (remote process that received the request).// //Actually this is called when a response is being processed via the pass through socket (remote process that received the request).//
int count = 1; int count = 1;
int loopCount = 0; int loopCount = 0;
@@ -566,9 +1095,9 @@ public class WebServer {
/** Whether the final part boundary has already been found. If this is true then the message bytes should keep getting read until the partReadCount == contentLength. */ /** Whether the final part boundary has already been found. If this is true then the message bytes should keep getting read until the partReadCount == contentLength. */
public boolean endPartBoundaryFound = false; public boolean endPartBoundaryFound = false;
/** The bytes containing the unencrypted outbound message that is waiting for the socket to allow a write. */ /** The bytes containing the unencrypted outbound message that is waiting for the socket to allow a write. */
public MessageBuffer/*ByteBuffer*/ pendingOutboundMessage = null; public MessageBuffer/*ByteBuffer*/ currentOutboundMessage = null;
/** The last message buffer added to the pending outbound message chain (linked list). Used only for pass through contexts currently since locally handled messages link the reponses together into a list. */ /** The last message buffer added to the pending outbound message chain (linked list). Used only for pass through contexts currently since locally handled messages link the reponses together into a list. */
private MessageBuffer lastAddedMessageBuffer = null; private MessageBuffer lastOutboundMessage = null;
/** The byte buffer used to read data from the socket. This must be null if a SSL engine is provided. */ /** The byte buffer used to read data from the socket. This must be null if a SSL engine is provided. */
public ByteBuffer socketReadBuffer = null; public ByteBuffer socketReadBuffer = null;
/** The buffer used to store the initial data in a SSL/TLS connection. The buffering is necessary to allow us to pre-read the client's hello message to gather the domain the client is connecting to - allowing for the correct SSL engine to be used. */ /** The buffer used to store the initial data in a SSL/TLS connection. The buffering is necessary to allow us to pre-read the client's hello message to gather the domain the client is connecting to - allowing for the correct SSL engine to be used. */
@@ -682,6 +1211,7 @@ public class WebServer {
try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {} try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {}
try {if(key != null) key.cancel();} catch(Throwable e) {} try {if(key != null) key.cancel();} catch(Throwable e) {}
//Clean up after the response and request.// //Clean up after the response and request.//
//try {while(currentOutboundMessage != null) {currentOutboundMessage.close(); currentOutboundMessage = currentOutboundMessage.getNext();}} catch(Throwable e2) {}
try {if(currentResponse != null) currentResponse.close();} catch(Throwable e2) {} try {if(currentResponse != null) currentResponse.close();} catch(Throwable e2) {}
if(getPassThroughSocketContext() != null) { if(getPassThroughSocketContext() != null) {
@@ -717,6 +1247,28 @@ public class WebServer {
public boolean isSsl() { public boolean isSsl() {
return socketReadBuffer == null; return socketReadBuffer == null;
}//isSsl()// }//isSsl()//
/**
* Queues an outbound client message by adding it to the linked list of message buffers. Handles synchronizing on this SocketContext to prevent multiple threads from adding messages at once. Also handles notifying the ServiceListener that it should update it's write flags for the socket.
* @param messageBuffer The buffer to be added to the end.
*/
private void queueOutboundClientMessage(MessageBuffer messageBuffer) {
boolean notify = false;
synchronized(this) {
if(currentOutboundMessage == null) {
lastOutboundMessage = currentOutboundMessage = messageBuffer;
notify = true;
}//if//
else {
lastOutboundMessage.setNext(messageBuffer);
lastOutboundMessage = messageBuffer;
}//else//
}//synchronized()//
if(notify) {
notifyListenerOfPendingWrite();
}//if//
}//queueOutboundClientMessage()//
private void writeSessionCookies(PrintStream pout) { private void writeSessionCookies(PrintStream pout) {
Response response = currentResponse; Response response = currentResponse;
ISession session = response.getSession(); ISession session = response.getSession();
@@ -987,7 +1539,7 @@ public class WebServer {
// }//if// // }//if//
//Save the buffer as the current pending outbound message for this socket context.// //Save the buffer as the current pending outbound message for this socket context.//
pendingOutboundMessage = new MessageBuffer(buffer, response, content != null && request.getRequestType() != Request.TYPE_HEAD ? content : null); currentOutboundMessage = new MessageBuffer(buffer, response, content != null && request.getRequestType() != Request.TYPE_HEAD ? content : null);
}//try// }//try//
catch(Throwable e) { catch(Throwable e) {
Debug.log("Fatal Error: Failed to build and send the response message due to an exception.", e); Debug.log("Fatal Error: Failed to build and send the response message due to an exception.", e);
@@ -998,12 +1550,12 @@ public class WebServer {
}//catch// }//catch//
}//prepareResponse()// }//prepareResponse()//
/** /**
* Adds a response to the socket context. * Adds a HTTP response to the socket context.
* <p>Note: We must synchronize since a socket could be used to access multiple applications and thus mutliple sessions.</p> * <p>Note: We must synchronize since a socket could be used to access multiple applications and thus mutliple sessions.</p>
* @param response The response to be added. * @param response The response to be added.
* @result Whether request is in a receive state. Will be false if the request generated a response that could not be completely transmitted. * @result Whether request is in a receive state. Will be false if the request generated a response that could not be completely transmitted.
*/ */
public synchronized boolean processResponse(Response response) { public synchronized boolean sendHttpResponse(Response response) {
if(currentResponse != null) { if(currentResponse != null) {
lastResponse.setNextResponse(response); lastResponse.setNextResponse(response);
lastResponse = response; lastResponse = response;
@@ -1020,7 +1572,7 @@ public class WebServer {
request = null; request = null;
return false; return false;
}//processResponse()// }//sendHttpResponse()//
/* (non-Javadoc) /* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer) * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer)
*/ */
@@ -1034,12 +1586,12 @@ public class WebServer {
message = new MessageBuffer(messageBytes); message = new MessageBuffer(messageBytes);
//Chain the message into the linked list. //Chain the message into the linked list.
if(lastAddedMessageBuffer == null || pendingOutboundMessage == null) { if(lastOutboundMessage == null || currentOutboundMessage == null) {
pendingOutboundMessage = lastAddedMessageBuffer = message; currentOutboundMessage = lastOutboundMessage = message;
}//if// }//if//
else { else {
lastAddedMessageBuffer.setNext(message); lastOutboundMessage.setNext(message);
lastAddedMessageBuffer = message; lastOutboundMessage = message;
}//else// }//else//
return true; return true;
@@ -1047,11 +1599,11 @@ public class WebServer {
/* (non-Javadoc) /* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses()
*/ */
protected void processResponses() throws IOException { protected void writeOutgoingMessages() throws IOException {
if(getPassThroughSocketContext() != null) { if(getPassThroughSocketContext() != null) {
//Synchronized to avoid multiple threads accessing the pendingOutboundMessage chain at one time and updating the write flag out of order (could happen if we enabled request chaining over a single socket).// //Synchronized to avoid multiple threads accessing the pendingOutboundMessage chain at one time and updating the write flag out of order (could happen if we enabled request chaining over a single socket).//
synchronized(this) { synchronized(this) {
writeClientResponse(); writeClientBoundMessage();
}//synchronized// }//synchronized//
}//if// }//if//
else if(isWebsocket) { else if(isWebsocket) {
@@ -1079,15 +1631,15 @@ public class WebServer {
while(websocketSendingMessage != null) { while(websocketSendingMessage != null) {
//If the socket is open then send the next buffer of data.// //If the socket is open then send the next buffer of data.//
if(key.channel().isOpen()) { if(key.channel().isOpen()) {
if(pendingOutboundMessage != null) { if(currentOutboundMessage != null) {
//Put the sending message in a MessageBuffer (pendingOutboundMessage).// //Put the sending message in a MessageBuffer (pendingOutboundMessage).//
pendingOutboundMessage = new MessageBuffer(websocketSendingMessage); currentOutboundMessage = new MessageBuffer(websocketSendingMessage);
}//if// }//if//
//Write the pendingOutboundMessage to the socket.// //Write the pendingOutboundMessage to the socket.//
if(writeClientResponse()) { if(writeClientBoundMessage()) {
websocketSendingMessage = null; websocketSendingMessage = null;
pendingOutboundMessage = null; currentOutboundMessage = null;
}//if// }//if//
}//if// }//if//
@@ -1202,7 +1754,7 @@ public class WebServer {
//If the socket is open then send the next buffer of data.// //If the socket is open then send the next buffer of data.//
if(key.channel().isOpen()) { if(key.channel().isOpen()) {
//Send the pending response object's prepared buffer of data.// //Send the pending response object's prepared buffer of data.//
finishedSending = writeClientResponse(); finishedSending = writeClientBoundMessage();
}//if// }//if//
//Close the response if successfully sent, or if the socket is closed.// //Close the response if successfully sent, or if the socket is closed.//
@@ -1238,7 +1790,7 @@ public class WebServer {
* Sends a response to the client. * Sends a response to the client.
* @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns. * @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns.
*/ */
private boolean writeClientResponse() { private boolean writeClientBoundMessage() {
boolean sendMore = true; boolean sendMore = true;
// if(debug) { // if(debug) {
@@ -1335,29 +1887,29 @@ public class WebServer {
// }//if// // }//if//
}//if// }//if//
if(sendMore && pendingOutboundMessage != null) { if(sendMore && currentOutboundMessage != null) {
//Check to see if the outbound message is prepared to send more content. For chunked transfers the outbound message may be waiting for additional content from another stream and we should return later.// //Check to see if the outbound message is prepared to send more content. For chunked transfers the outbound message may be waiting for additional content from another stream and we should return later.//
if(!pendingOutboundMessage.getBuffer().hasRemaining()) { if(!currentOutboundMessage.getBuffer().hasRemaining()) {
if(!pendingOutboundMessage.loadBuffer()) { if(!currentOutboundMessage.loadBuffer()) {
if(pendingOutboundMessage.getBuffer() == null && pendingOutboundMessage.getNext() != null) { if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) {
pendingOutboundMessage = pendingOutboundMessage.getNext(); currentOutboundMessage = currentOutboundMessage.getNext();
}//if// }//if//
else { else {
sendMore = false; sendMore = false;
}//else// }//else//
}//if// }//if//
if(pendingOutboundMessage.getBuffer() == null) { if(currentOutboundMessage.getBuffer() == null) {
pendingOutboundMessage = null; currentOutboundMessage = null;
lastAddedMessageBuffer = null; lastOutboundMessage = null;
}//if// }//if//
}//if// }//if//
//If we have an application response pending then send it now.// //If we have an application response pending then send it now.//
if(sendMore && pendingOutboundMessage.getBuffer().hasRemaining()) { if(sendMore && currentOutboundMessage.getBuffer().hasRemaining()) {
if(sslEngine != null) { if(sslEngine != null) {
//Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.//
while(key.channel().isOpen() && sendMore && (pendingOutboundMessage != null) && pendingOutboundMessage.getBuffer().hasRemaining()) { while(key.channel().isOpen() && sendMore && (currentOutboundMessage != null) && currentOutboundMessage.getBuffer().hasRemaining()) {
SSLEngineResult encryptResult; SSLEngineResult encryptResult;
// int offset = pendingOutboundMessage.getBuffer().position(); // int offset = pendingOutboundMessage.getBuffer().position();
//TODO: Comment me. //TODO: Comment me.
@@ -1365,7 +1917,7 @@ public class WebServer {
//Reset the encrypted write buffer.// //Reset the encrypted write buffer.//
encryptedWriteBuffer.compact(); encryptedWriteBuffer.compact();
//Encrypt the next message frame.// //Encrypt the next message frame.//
encryptResult = sslEngine.wrap(pendingOutboundMessage.getBuffer(), encryptedWriteBuffer); encryptResult = sslEngine.wrap(currentOutboundMessage.getBuffer(), encryptedWriteBuffer);
encryptedWriteBuffer.flip(); encryptedWriteBuffer.flip();
//TODO: Comment me. //TODO: Comment me.
//Debug.log("Encrypting/Sending to client from Git " + (rem - pendingOutboundMessage.getBuffer().remaining()) + " bytes."); //Debug.log("Encrypting/Sending to client from Git " + (rem - pendingOutboundMessage.getBuffer().remaining()) + " bytes.");
@@ -1422,11 +1974,11 @@ public class WebServer {
//Add more content to the buffer.// //Add more content to the buffer.//
//Note: Do this even if the last encrypted write buffer could not be fully sent - so that when it is sent there will be outbound message content.// //Note: Do this even if the last encrypted write buffer could not be fully sent - so that when it is sent there will be outbound message content.//
if(key.channel().isOpen() && pendingOutboundMessage != null) { if(key.channel().isOpen() && currentOutboundMessage != null) {
if(!pendingOutboundMessage.loadBuffer()) { if(!currentOutboundMessage.loadBuffer()) {
//Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.//
if(pendingOutboundMessage.getBuffer() == null && pendingOutboundMessage.getNext() != null) { if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) {
pendingOutboundMessage = pendingOutboundMessage.getNext(); currentOutboundMessage = currentOutboundMessage.getNext();
}//if// }//if//
else { else {
//Wait until additional message bytes are available.// //Wait until additional message bytes are available.//
@@ -1435,18 +1987,18 @@ public class WebServer {
}//if// }//if//
//If the message end has been reached then the buffer will be null.// //If the message end has been reached then the buffer will be null.//
if(pendingOutboundMessage.getBuffer() == null) { if(currentOutboundMessage.getBuffer() == null) {
pendingOutboundMessage = null; currentOutboundMessage = null;
lastAddedMessageBuffer = null; lastOutboundMessage = null;
}//if// }//if//
}//if// }//if//
}//while// }//while//
}//if// }//if//
else { else {
//Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.//
while(sendMore && (pendingOutboundMessage != null) && pendingOutboundMessage.getBuffer().hasRemaining()) { while(sendMore && (currentOutboundMessage != null) && currentOutboundMessage.getBuffer().hasRemaining()) {
//Write the bytes to the stream.// //Write the bytes to the stream.//
((SocketChannel) key.channel()).write(pendingOutboundMessage.getBuffer()); ((SocketChannel) key.channel()).write(currentOutboundMessage.getBuffer());
// if(debug) { // if(debug) {
// sentBytes += pendingOutboundMessage.position(); // sentBytes += pendingOutboundMessage.position();
@@ -1454,14 +2006,14 @@ public class WebServer {
// }//if// // }//if//
//If not all the bytes could be written then we will need to wait until we can write more.// //If not all the bytes could be written then we will need to wait until we can write more.//
if(pendingOutboundMessage.getBuffer().hasRemaining()) { if(currentOutboundMessage.getBuffer().hasRemaining()) {
sendMore = false; sendMore = false;
}//if// }//if//
else { else {
if(!pendingOutboundMessage.loadBuffer()) { if(!currentOutboundMessage.loadBuffer()) {
//Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.//
if(pendingOutboundMessage.getBuffer() == null && pendingOutboundMessage.getNext() != null) { if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) {
pendingOutboundMessage = pendingOutboundMessage.getNext(); currentOutboundMessage = currentOutboundMessage.getNext();
}//if// }//if//
else { else {
//Wait until additional message bytes are available.// //Wait until additional message bytes are available.//
@@ -1470,9 +2022,9 @@ public class WebServer {
}//if// }//if//
//If the message end has been reached then the buffer will be null.// //If the message end has been reached then the buffer will be null.//
if(pendingOutboundMessage.getBuffer() == null) { if(currentOutboundMessage.getBuffer() == null) {
pendingOutboundMessage = null; currentOutboundMessage = null;
lastAddedMessageBuffer = null; lastOutboundMessage = null;
}//if// }//if//
}//else// }//else//
}//while// }//while//
@@ -1499,11 +2051,11 @@ public class WebServer {
}//catch// }//catch//
return sendMore; return sendMore;
}//writeClientResponse()// }//writeClientBoundMessage()//
/* (non-Javadoc) /* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest() * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest()
*/ */
protected void processRequest() throws IOException { protected void readIncomingMessages() throws IOException {
boolean requiresRead = true; boolean requiresRead = true;
SocketChannel channel = (SocketChannel) key.channel(); SocketChannel channel = (SocketChannel) key.channel();
@@ -1675,11 +2227,11 @@ public class WebServer {
//Need to synchronize if this is a pass through socket so that multiple threads don't access pendingOutboundMessage or lastAddedMessageBuffer (via a call to passThrough(ByteBuffer) on another thread).// //Need to synchronize if this is a pass through socket so that multiple threads don't access pendingOutboundMessage or lastAddedMessageBuffer (via a call to passThrough(ByteBuffer) on another thread).//
if(getPassThroughSocketContext() == null) { if(getPassThroughSocketContext() == null) {
requiresRead = writeClientResponse(); requiresRead = writeClientBoundMessage();
}//if// }//if//
else { else {
synchronized(this) { synchronized(this) {
requiresRead = writeClientResponse(); requiresRead = writeClientBoundMessage();
}//synchronized// }//synchronized//
}//else// }//else//
}//if// }//if//
@@ -1791,7 +2343,7 @@ public class WebServer {
* @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite() * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite()
*/ */
protected boolean hasPendingWrite() { protected boolean hasPendingWrite() {
return pendingOutboundMessage != null || (encryptedWriteBuffer != null && encryptedWriteBuffer.hasRemaining()); return currentOutboundMessage != null || (encryptedWriteBuffer != null && encryptedWriteBuffer.hasRemaining());
}//hasPendingWrite()// }//hasPendingWrite()//
/* (non-Javadoc) /* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#upgradeToWebsocket(java.lang.String, long, com.foundation.web.interfaces.WebsocketHandler) * @see com.foundation.web.interfaces.IConnectionContext#upgradeToWebsocket(java.lang.String, long, com.foundation.web.interfaces.WebsocketHandler)
@@ -2078,14 +2630,14 @@ public class WebServer {
//Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).//
synchronized(((AbstractSocketContext) context).getLock()) { synchronized(((AbstractSocketContext) context).getLock()) {
//Process the pending write to the socket as much as is possible, then return.// //Process the pending write to the socket as much as is possible, then return.//
((AbstractSocketContext) context).processResponses(); ((AbstractSocketContext) context).writeOutgoingMessages();
}//synchronized// }//synchronized//
}//if// }//if//
else { else {
//Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).//
synchronized(((AbstractSocketContext) context).getLock()) { synchronized(((AbstractSocketContext) context).getLock()) {
//Process the incoming request and send the response (a partial response may be sent in which case the socket will be set to wait for a write opportunity and not a read opportunity).// //Process the incoming request and send the response (a partial response may be sent in which case the socket will be set to wait for a write opportunity and not a read opportunity).//
((AbstractSocketContext) context).processRequest(); ((AbstractSocketContext) context).readIncomingMessages();
}//synchronized// }//synchronized//
}//else// }//else//
}//try// }//try//
@@ -2112,6 +2664,8 @@ public class WebServer {
}//if// }//if//
else { else {
Debug.log(new RuntimeException("Woops! Somehow the selection key isn't valid, but the socket isn't closed either!")); Debug.log(new RuntimeException("Woops! Somehow the selection key isn't valid, but the socket isn't closed either!"));
try {((SocketChannel) channel).close();} catch(Throwable e2) {}
cleanupClientChannel((SocketContext) context, (SocketChannel) channel);
}//else// }//else//
((AbstractSocketContext) context).isUsed = false; ((AbstractSocketContext) context).isUsed = false;
@@ -3706,7 +4260,7 @@ private boolean processClientRequest(SocketContext context, final Request reques
else { else {
response = new Response(request, null, null); response = new Response(request, null, null);
response.setContent(content); response.setContent(content);
result = context.processResponse(response); result = context.sendHttpResponse(response);
}//else// }//else//
}//if// }//if//
}//try// }//try//
@@ -3759,7 +4313,7 @@ private boolean internalProcessClientRequest(final SocketContext context, Select
}//else if// }//else if//
//Convert the response into a byte stream and send it via the socket.// //Convert the response into a byte stream and send it via the socket.//
result = context.processResponse(response); result = context.sendHttpResponse(response);
}//try// }//try//
catch(Throwable e) { catch(Throwable e) {
Debug.log(e); Debug.log(e);