diff --git a/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java b/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java
new file mode 100644
index 0000000..2508283
--- /dev/null
+++ b/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java
@@ -0,0 +1,129 @@
+package com.foundation.web.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+public abstract class AbstractSocketContext implements IChannelContext {
+ /** The size of the buffers used per connection. */
+ protected static final int BUFFER_SIZE = 100000;
+ protected static final int SEND_BUFFER_SIZE = 20480; //2048
+ protected static final int RECEIVE_BUFFER_SIZE = 20480; //2048
+ protected static final byte[] DOUBLE_ENTER = new byte[] {0x0D, 0x0A, 0x0D, 0x0A};
+ /** The character set used by the HTTP messages. */
+ protected static final Charset charset = Charset.forName("us-ascii");
+ /** The decoder used to decode the HTTP messages. */
+ protected static final CharsetDecoder decoder = charset.newDecoder();
+ /** The format used for dates in the HTTP header. */
+ protected static final ThreadLocal httpDateFormat = new ThreadLocal();
+ protected static final String httpDateFormatString = "EEE, d MMM yyyy HH:mm:ss z";
+
+ /** The next available socket context id. */
+ private static int nextSocketContextId = 1;
+ /** The network listener that created this socket context. */
+ private final NetworkListener networkListener;
+ /** The debug ID for the socket. */
+ public final int id;
+ /** The web server that created the socket context. */
+ protected WebServer webServer = null;
+ /** The key that represents the connection between the channel (socket) and the selector used to multiplex the listener. The code must synchronize on this attribute when accessing the isUsed functionality, or when interacting with the key's interestOps. */
+ protected SelectionKey key = null;
+ /** Whether the socket is currently being used by a thread designated by the network listener thread to read or write to the socket. Currently the socket type we use in Java only allows one thread to read and write at a time. Note: Always synchronize on key before using this attribute. */
+ private boolean isUsed = false;
+ /** A socket context related to this one (when two are tied together such that data from one immediately is sent to the other). */
+ protected AbstractSocketContext relatedSocketContext = null;
+/**
+ * Gets the next available socket id.
+ * @return The ID useful for debugging.
+ */
+protected static int getNextSocketContextId() {return nextSocketContextId++;}
+/**
+ * Gets the threads date format for processing HTTP header dates.
+ *
This uses a thread local because the date format class is not thread safe :(.
+ * @return The date format for handling http header dates. + */ +protected static SimpleDateFormat getHttpDateFormat() { + SimpleDateFormat result = (SimpleDateFormat) httpDateFormat.get(); + + if(result == null) { + result = new SimpleDateFormat(httpDateFormatString); + result.setTimeZone(TimeZone.getTimeZone("GMT")); + httpDateFormat.set(result); + }//if// + + return result; +}//getHttpDateFormat()// +/** + * AbstractSocketContext constructor. + */ +public AbstractSocketContext(NetworkListener networkListener) { + this.networkListener = networkListener; + + synchronized(AbstractSocketContext.class) { + this.id = getNextSocketContextId(); + }//synchronized// +}//AbstractSocketContext()// +/** Gets the network listener the socket exists within. */ +public NetworkListener getNetworkListener() {return networkListener;} +/** Gets the web server the socket exists within. */ +public WebServer getWebServer() {return networkListener.getWebServer();} +/** Gets whether the socket context is currently in use by a thread. */ +public boolean getIsUsed() {return isUsed;} +/** Sets whether the socket context is currently in use by a thread. */ +public void setIsUsed(boolean isUsed) {this.isUsed = isUsed;} +/** + * Gets the lockable (synchronizable) object for this context. For contexts with a related context, only one of the two will be returned, such that a single synchronize block covers both contexts. + * @return The object to synchronize on such that two threads don't attempt to interact with the context at the same time (AsynchronousSocketChannel required for that). + */ +protected abstract Object getLock(); +/** + * Writes the next responses/messages in the sequence. + * @throws IOException + */ +protected abstract void writeOutgoingMessages() throws IOException; +/** + * Reads the next requests/messages received via the socket. + * @throws IOException + */ +protected abstract void readIncomingMessages() throws IOException; +/** + * Passes the message through to a receiving process via a second socket. + * @param buffer The buffer containing the message. This buffer will not be retained by this method call, and can be reused by the caller. + * @return Whether the whole message was transfered. + */ +protected abstract boolean passThrough(ByteBuffer buffer); +/** + * Closes the socket context and cleans up. + */ +protected abstract void close(); +/** + * Gets the socket context related to this one (when two are tied together such that data from one immediately is sent to the other). + * @return The related socket context, or null if none exists (data not forwarded to a remote server). + */ +protected AbstractSocketContext getRelatedSocketContext() {return relatedSocketContext;} +/** + * Determines whether the socket has a pending write operation. + */ +protected abstract boolean hasPendingWrite(); +/** + * Called to notify the network listener that a pending write operation exists for this socket. + */ +protected void notifyListenerOfPendingWrite() { + synchronized(key) { + //Ignore if a thread is using this socket currently since all operation flags will be set at the end of the use of the socket.// + if(!isUsed) { + int ops = key.interestOps(); + boolean hasWrite = (ops & SelectionKey.OP_WRITE) != 0; + + if(!hasWrite) { + key.interestOps(ops | SelectionKey.OP_WRITE); + key.selector().wakeup(); + }//if// + }//if// + }//synchronized// +}//notifyListenerOfPendingWrite()// +}//AbstractSocketContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/IChannelContext.java b/Foundation Web Core/src/com/foundation/web/server/IChannelContext.java new file mode 100644 index 0000000..7df1d80 --- /dev/null +++ b/Foundation Web Core/src/com/foundation/web/server/IChannelContext.java @@ -0,0 +1,7 @@ +package com.foundation.web.server; + +/** + * Provides a place for channel oriented data. + */ +public interface IChannelContext { +}//IChannelContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/MessageBuffer.java b/Foundation Web Core/src/com/foundation/web/server/MessageBuffer.java new file mode 100644 index 0000000..941fc22 --- /dev/null +++ b/Foundation Web Core/src/com/foundation/web/server/MessageBuffer.java @@ -0,0 +1,128 @@ +package com.foundation.web.server; + +import java.nio.ByteBuffer; + +import com.foundation.web.interfaces.IContent; + +/** + * The response message buffer encapsulating the request generating the response, and the content, and chainable into a linked list. + */ +class MessageBuffer { + /** The actual underlying buffer containing the bytes to be sent. Will be null if the message buffer needs initializing or has finished. */ + private ByteBuffer buffer = null; + /** The ability to chain message buffers into a linked list. */ + private MessageBuffer next = null; + + /** The optional response the message is based upon. */ + private Response response = null; + /** The content if there is any. */ + private IContent content = null; + + /** + * MessageBuffer constructor. + */ + public MessageBuffer() { + }//MessageBuffer()// + /** + * MessageBuffer constructor. + * @param buffer The buffer to use for assembling the message bytes. + */ + public MessageBuffer(ByteBuffer buffer) { + setBuffer(buffer); + }//MessageBuffer()// + /** + * Sets the actual underlying buffer for the message buffer. + * @param buffer + */ + protected void setBuffer(ByteBuffer buffer) { + this.buffer = buffer; + + if(buffer != null && buffer.position() != 0) buffer.flip(); + }//setBuffer()// + /** + * MessageBuffer constructor. + * @param buffer The buffer to use for assembling the message bytes. + * @param response The optional response that generates this message. + * @param content The content if the response is not just a header. + */ + public MessageBuffer(ByteBuffer buffer, Response response, IContent content) { + this.buffer = buffer; + this.content = content; + + //Fill the remaining buffer space with the content.// + if(content != null) { + content.get(buffer); + }//if// + + //Flip the buffer (if not already flipped) so we can write out the bytes.// + if(buffer.position() != 0) buffer.flip(); + this.response = response; + }//MessageBuffer()// + /** + * Initializes the message buffer for use. + * @return Whether initialization succeded. Intialization should be considered a success even if none is required or has already been performed. If it fails the caller should close the socket. + */ + public boolean initialize() { + //Does nothing by default. Subclasses may implement.// + return true; + }//initialize()// + /** + * Whether the message buffer is closed and all bytes have been sent. + * @return If the bytes have all been sent. + */ + public boolean isClosed() { + return buffer == null; + }//isClosed()// + /** + * Closes the message buffer. + */ + public void close() { + this.buffer = null; + }//close()// + /** + * Gets the byte buffer containing the current portion of the message to be sent. + * @return The buffer containing the next part of the message to be sent, or null if the message end has been reached. + */ + public ByteBuffer getBuffer() {return buffer;} + /** + * Loads the next part of the message into the buffer (any remaining bytes in the buffer will be compacted). + * @return Whether the buffer could be loaded with the next part of the message. If false, then the caller should try again in the future when additional message content may be available. Will always be false if there is no content to load from. + */ + public boolean loadBuffer() { + boolean result = true; + + if(buffer != null) { + if(content != null) { + int getResult; + + buffer.compact(); + getResult = content.get(buffer); + + if(getResult == IContent.CONTENT_PENDING) { + result = false; //Should never occur currently: See StreamedContent's javadocs.// + }//if// + else if(getResult == IContent.CONTENT_END) { + buffer = null; + }//else if// + + if(buffer != null && buffer.position() != 0) buffer.flip(); + }//if// + else if(!buffer.hasRemaining()) { + //Clear the buffer pointer indicating the message buffer is done.// + buffer = null; + result = false; + }//else if// + }//if// + else { + result = false; + }//else// + + return result; + }//loadBuffer()// + /** Gets the next message buffer (only used for pass through sockets). */ + public MessageBuffer getNext() {return next;} + /** Sets the next message buffer (only used for pass through sockets). */ + public void setNext(MessageBuffer next) {this.next = next;} + /** Gets the response object that created the message. This will be null for pass through sockets. */ + public Response getResponse() {return response;} +}//MessageBuffer// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java b/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java new file mode 100644 index 0000000..1e05e1b --- /dev/null +++ b/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java @@ -0,0 +1,267 @@ +package com.foundation.web.server; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +import com.common.debug.Debug; +import com.common.thread.ThreadService; +import com.common.util.LiteList; +import com.foundation.web.server.WebServer.TlsFailureException; + +/** + * Encapsulates the code that threads incomming socket requests and messages over sockets. + *Note that the HTTP protocol requires that the reponses be sent in order of the received requests.
+ */ +class NetworkListener implements Runnable { + private final WebServer webServer; + Selector selector = null; + private Iterator selectedKeys = null; + private volatile boolean stop = true; + private volatile boolean hasRunnables = false; + private LiteList runnables = new LiteList(10, 20); +public NetworkListener(WebServer webServer, Selector selector) { + this.webServer = webServer; + this.selector = selector; +}//NetworkListener()// +/** Gets the web server that created this network listener. */ +public WebServer getWebServer() {return webServer;} +/** + * Stops the network listener. + * Note that this may take a short time to complete. + */ +public void stop() { + if(!stop) { + stop = true; + selector.wakeup(); + }//if// +}//stop()// +/** + * Starts the network listener. + */ +public void start() { + if(stop) { + stop = false; + ThreadService.run(this); +// Thread t = new Thread(this); +// t.setName("Network Listener"); +// t.start(); + }//if// +}//start()// +/** + * Cleans up after the client channel. + * @param context The context associated with the client connection. + * @param channel The client connection that is now closed. + */ +private void cleanupClientChannel(SocketContext context, SocketChannel channel) { + if(this.webServer.debug) { + Debug.log("Connection closed to " + channel.socket().getInetAddress() + ":" + channel.socket().getPort()); + }//if// +}//cleanupClientChannel()// +/** + * Adds a runnable to the list of runnables to be run next time the loop is woken. + * @param runnable The runnable to be run by the thread that is listening for socket events. + */ +public synchronized void queue(Runnable runnable) { + runnables.add(runnable); + hasRunnables = true; +}//queue()// +/** + * Checks for runnables and runs them if there are any. + */ +private void checkForRunnables() { + if(hasRunnables) { + synchronized(this) { + while(runnables.getSize() > 0) { + ((Runnable) runnables.remove(0)).run(); + }//while// + + hasRunnables = false; + }//synchronized// + }//if// +}//checkForRunnables()// +/* (non-Javadoc) + * @see java.lang.Runnable#run() + */ +public void run() { + //Looping only occurs when we are at the maximum allowed number of threads handling messages.// + while(!stop) { + SelectionKey key = null; + + try { + //If we don't have an iterator over the active channels then get one and block if necessary.// + if(selectedKeys == null) { + int keyCount = 0; + + //Block until we have keys or were awakened by another thread.// + keyCount = selector.select(); + //Check for any pending runnables that need executing on this thread.// + checkForRunnables(); + + //If we have active keys then retrieve them.// + if(keyCount > 0) { + selectedKeys = selector.selectedKeys().iterator(); + }//if// + }//if// + + //If we have an iterator over the active channels then get and remove the next one (clean up the iterator if empty).// + if(selectedKeys != null) { + key = (SelectionKey) selectedKeys.next(); + selectedKeys.remove(); + + //Weed out invalid (cancelled) keys.// + if(!key.isValid()) { + key = null; + }//if// + + if(!selectedKeys.hasNext()) { + selectedKeys = null; + }//if// + }//if// + }//try// + catch(Throwable e) { + //TODO: Can we recover? + Debug.log(e); + }//catch// + + try { + if(key != null) { + final boolean isWrite = key.isWritable(); + final IChannelContext context = (IChannelContext) key.attachment(); + final SelectableChannel channel = key.channel(); + final SelectionKey selectionKey = key; + + if(channel instanceof ServerSocketChannel) { + try { + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) channel; + SocketChannel socketChannel = serverSocketChannel.accept(); + ServerSocketContext serverSocketContext = (ServerSocketContext) context; + SocketContext socketContext = new SocketContext(serverSocketContext, this); + + socketChannel.configureBlocking(false); + socketChannel.socket().setSendBufferSize(AbstractSocketContext.SEND_BUFFER_SIZE); + socketChannel.socket().setReceiveBufferSize(AbstractSocketContext.RECEIVE_BUFFER_SIZE); + socketContext.key = socketChannel.register(selector, SelectionKey.OP_READ, socketContext); + socketContext.serverSocketContext = serverSocketContext; + + //Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); + + if(serverSocketContext.serviceListener.type != IServiceListener.TYPE_SSL) { + socketContext.socketReadBuffer = ByteBuffer.allocate(AbstractSocketContext.BUFFER_SIZE); + }//if// + + if(this.webServer.debug) { + Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); + }//if// + }//try// + catch(Throwable e) { + //TODO: Can we recover? + Debug.log(e); + }//catch// + }//if// + else if(channel instanceof SocketChannel) { +// boolean socketClosed = false; + + //Toggle the write or read flag.// + synchronized(key) { +// //Save the ops that will be set when the processing is complete.// +// ((AbstractSocketContext) context).setFlags(key.interestOps()); + + //Notes: Java (pre-jdk7) does not have the ability to read and write to a socket at the same time (two threads, one socket). Post jdk7 there is AsynchronousSocketChannel and AsynchronousServerSocketChannel which could be used to send/receive at the same time. + //Truely enabling Speedy would require a thread to read which when finished would flag read again BEFORE processing the message and BEFORE sending a response. + //For now (so we don't have to require jdk7 yet) we will simply allow Speedy to queue up messages, but only read, process, and then write them one at a time. Most of the speed loss is in the waiting for the WRITE to finish before handling the next request (due to it being broken into packets and the mechanics of TCP), and that is generally minimal (speed lose) since usually the bottleneck in speed is the browser's connection to the internet (most of us haven't got Gigabit Ethernet at home). Anyone with enough home juice to have this be a problem would only notice the difference for really porky websites (which is a problem in and of its self). + + //Not allowing either reads or writes to continue until all processing of this message is done.// + ((AbstractSocketContext) context).setIsUsed(true); + key.interestOps(0); + + //The problem with this is that we'd have to use AsynchronousSocketChannel which would appear to require a complete rewrite of everything since it operates completely differently.// +// key.interestOps(key.interestOps() ^ (isWrite ? SelectionKey.OP_WRITE : SelectionKey.OP_READ)); + }//synchronized// + + if(((SocketChannel) channel).isOpen()) { + ThreadService.run(new Runnable() { + public void run() { + boolean socketClosed = false; + + try { + if(isWrite) { + //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// + synchronized(((AbstractSocketContext) context).getLock()) { + //Process the pending write to the socket as much as is possible, then return.// + ((AbstractSocketContext) context).writeOutgoingMessages(); + }//synchronized// + }//if// + else { + //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// + synchronized(((AbstractSocketContext) context).getLock()) { + //Process the incoming request and send the response (a partial response may be sent in which case the socket will be set to wait for a write opportunity and not a read opportunity).// + ((AbstractSocketContext) context).readIncomingMessages(); + }//synchronized// + }//else// + }//try// + catch(TlsFailureException e) { + //Allow the failure to be ignored. This occurs when the client fails to use TLS or fails to send the host name as part of the TLS handshake.// + try {((SocketChannel) channel).close();}catch(Throwable e2) {} //Release the socket so the message doesn't continue to be processed.// + }//catch// + catch(Throwable e) { + if(NetworkListener.this.webServer.debug) Debug.log(e); + + //Force the socket to be closed (for sure).// + try {((SocketChannel) channel).close();} catch(Throwable e2) {} + //Debug.log(e); + socketClosed = true; + }//catch// + finally { + if(channel != null && !socketClosed && channel.isOpen() && context != null) { + try { + //Set the new ops for the selection key and notify the selector that ops have changed.// + synchronized(selectionKey) { + if(selectionKey.isValid()) { + //Always flag the socket for reading, only flag the socket for writing if a pending write operation exists.// + selectionKey.interestOps(SelectionKey.OP_READ | (((AbstractSocketContext) context).hasPendingWrite() ? SelectionKey.OP_WRITE : 0)); + }//if// + else { + Debug.log(new RuntimeException("Woops! Somehow the selection key isn't valid, but the socket isn't closed either!")); + try {((SocketChannel) channel).close();} catch(Throwable e2) {} + cleanupClientChannel((SocketContext) context, (SocketChannel) channel); + }//else// + + ((AbstractSocketContext) context).setIsUsed(false); + }//synchronized// + + selector.wakeup(); + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + }//if// + else if(channel != null && (!channel.isOpen() || socketClosed) && channel instanceof SocketChannel && context instanceof SocketContext) { + cleanupClientChannel((SocketContext) context, (SocketChannel) channel); + }//else if// + else { + //This shouldn't be called I don't think.// + Debug.log(new RuntimeException("Woops! Somehow we aren't closed and we didn't setup the interestOps for the HTTP socket!")); + }//else// + }//finally// + }//run()// + }); + }//if// + }//else if// + }//if// + }//try// + catch(java.nio.channels.CancelledKeyException e) { + //Occurs if the socket is closed while we are handling the key.// + Debug.log(e); //TODO: Does anything need doing here? Should it be ignored? + }//catch// + catch(Throwable e) { + Debug.log(e); + //TODO: There needs to be more specfic error handling if we got here. + }//catch// + }//while// +}//run()// +}//NetworkListener// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java b/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java new file mode 100644 index 0000000..ed41e8f --- /dev/null +++ b/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java @@ -0,0 +1,168 @@ +package com.foundation.web.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import com.foundation.web.server.WebServer.RegisterKeyRunnable; + +/** + * Used by the SocketContext to create a connection to a remote process that will receive all client data once decrypted, and whose output will be encrypted and sent directly to the client. + * Allows the web server to act as an SSL front to another web server or service. + */ +public class PassThroughSocketContext extends AbstractSocketContext { + private MessageBuffer pendingMessageBuffer = null; + private MessageBuffer lastAddedMessageBuffer = null; + /** The byte buffer used to read data from the socket. */ + public ByteBuffer socketReadBuffer = ByteBuffer.allocate(BUFFER_SIZE); +/** + * PassThroughSocketContext constructor. + * @param linkedClientContext + * @param address + * @param port + * @throws IOException + */ +public PassThroughSocketContext(SocketContext linkedClientContext, String address, int port) throws IOException { + super(linkedClientContext.getNetworkListener()); + this.relatedSocketContext = linkedClientContext; + SocketChannel channel = SocketChannel.open(); + RegisterKeyRunnable runnable; + + //Connect while still blocking - will wait until the connection finishes or times out.// + channel.connect(new InetSocketAddress(address, port)); + //Setup the channel for non-blocking from here on out.// + channel.configureBlocking(false); + channel.socket().setSendBufferSize(SEND_BUFFER_SIZE); + channel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE); + channel.socket().setTcpNoDelay(false); + getNetworkListener().queue(runnable = new RegisterKeyRunnable(this, channel, linkedClientContext.key.selector())); + linkedClientContext.key.selector().wakeup(); + runnable.waitForRun(); + key = runnable.getKey(); + + //Set the initial interest ops to read.// + synchronized(key) { + key.interestOps(SelectionKey.OP_READ); + }//synchronized// + + key.selector().wakeup(); +}//PassThroughSocketContext()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock() + */ +protected Object getLock() { + return getRelatedSocketContext(); +}//getLock()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() + */ +protected synchronized void writeOutgoingMessages() throws IOException { + //Actually this is called when a request is being sent via the pass through socket (sending the request to the remote server).// + //Synchronized to avoid accessing the pendingMessageBuffer and lastAddedMessageBuffer at the same time as a thread that is calling passThrough(ByteBuffer) which also accesses these variables.// + boolean result = true; + + if(result && pendingMessageBuffer != null) { + //Check to see if the outbound message is prepared to send more content.// + if(!pendingMessageBuffer.getBuffer().hasRemaining()) { + //Load the next pending outbound message in the chain.// + if(pendingMessageBuffer.getNext() != null) { + pendingMessageBuffer = pendingMessageBuffer.getNext(); + }//if// + else { + //Wait until additional message bytes are available.// + result = false; + pendingMessageBuffer = null; + lastAddedMessageBuffer = null; + }//else// + }//if// + + //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// + while(result && (pendingMessageBuffer != null) && pendingMessageBuffer.getBuffer().hasRemaining()) { + //Write the bytes to the stream.// + ((SocketChannel) key.channel()).write(pendingMessageBuffer.getBuffer()); + + //If not all the bytes could be written then we will need to wait until we can write more.// + if(pendingMessageBuffer.getBuffer().hasRemaining()) { + result = false; + }//if// + else { + //Load the next pending outbound message in the chain.// + if(pendingMessageBuffer.getNext() != null) { + pendingMessageBuffer = pendingMessageBuffer.getNext(); + }//if// + else { + //Wait until additional message bytes are available.// + result = false; + pendingMessageBuffer = null; + lastAddedMessageBuffer = null; + }//else// + }//else// + }//while// + }//if// +}//processResponses()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest() + */ +protected void readIncomingMessages() throws IOException { + //Actually this is called when a response is being processed via the pass through socket (remote process that received the request).// + int count = 1; + int loopCount = 0; + boolean result = true; + SocketChannel channel = (SocketChannel) key.channel(); + + //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// + //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// + while(loopCount < 10 && result && count > 0) { + loopCount++; + count = channel.read(socketReadBuffer); + socketReadBuffer.flip(); + + if(count == -1) { + //The socket has been closed by the client.// + try {relatedSocketContext.close();} catch(Throwable e) {} + }//if// + else if(socketReadBuffer.hasRemaining()) { + result = relatedSocketContext.passThrough(socketReadBuffer); + socketReadBuffer.compact(); + }//else// + else { + socketReadBuffer.compact(); + break; + }//else// + }//while// +}//processRequest()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer) + */ +protected synchronized boolean passThrough(ByteBuffer buffer) { + ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); + MessageBuffer message; + + //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// + messageBytes = ByteBuffer.allocate(buffer.remaining()); + messageBytes.put(buffer); + message = new MessageBuffer(messageBytes); + + //Chain the message into the linked list. + if(lastAddedMessageBuffer == null) { + pendingMessageBuffer = lastAddedMessageBuffer = message; + }//if// + else { + lastAddedMessageBuffer.setNext(message); + lastAddedMessageBuffer = message; + }//else// + + return true; +}//passThrough()// +protected synchronized void close() { + try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {} + try {if(key != null) key.cancel();} catch(Throwable e) {} +}//close()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite() + */ +protected boolean hasPendingWrite() { + return pendingMessageBuffer != null; +}//hasPendingWrite()// +}//PassThroughSocketContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/Request.java b/Foundation Web Core/src/com/foundation/web/server/Request.java index a952278..c3f860a 100644 --- a/Foundation Web Core/src/com/foundation/web/server/Request.java +++ b/Foundation Web Core/src/com/foundation/web/server/Request.java @@ -475,7 +475,7 @@ public class Request implements IRequest { else if(fieldName.equals("if-modified-since")) { if(fieldValue.trim().length() > 0) { //Mozilla sometimes sends an empty value. Bad Mozilla.// try { - cacheDate = WebServer.getHttpDateFormat().parse(fieldValue.trim()); + cacheDate = SocketContext.getHttpDateFormat().parse(fieldValue.trim()); }//try// catch(Throwable e) { Debug.log("Unexpected date format sent by the web browser when using an If-Modified-Since request header: '" + fieldValue + "'. The server can recover from this error.", e); @@ -514,7 +514,7 @@ public class Request implements IRequest { else if(fieldName.equals("if-unmodified-since")) { if(fieldValue.trim().length() > 0) { //Shouldn't ever happen.// try { - unmodifiedSince = WebServer.getHttpDateFormat().parse(fieldValue.trim()); + unmodifiedSince = SocketContext.getHttpDateFormat().parse(fieldValue.trim()); }//try// catch(Throwable e) { Debug.log("Unexpected date format sent by the web browser when using an Unmodified-Since request header: '" + fieldValue + "'. The server can recover from this error.", e); diff --git a/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java b/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java new file mode 100644 index 0000000..f9ba543 --- /dev/null +++ b/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java @@ -0,0 +1,15 @@ +package com.foundation.web.server; + +import com.foundation.web.server.WebServer.ServiceListener; + +/** + * Provides a place connection oriented data. + */ +public class ServerSocketContext implements IChannelContext { + public ServiceListener serviceListener = null; + + public ServerSocketContext(ServiceListener serviceListener) { + super(); + this.serviceListener = serviceListener; + }//ServerSocketContext()// +}//ServerSocketContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/SocketContext.java b/Foundation Web Core/src/com/foundation/web/server/SocketContext.java new file mode 100644 index 0000000..e043a0d --- /dev/null +++ b/Foundation Web Core/src/com/foundation/web/server/SocketContext.java @@ -0,0 +1,2634 @@ +package com.foundation.web.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; + +import com.common.comparison.Comparator; +import com.common.debug.Debug; +import com.common.io.StreamSupport; +import com.common.util.IIterator; +import com.common.util.LiteHashMap; +import com.common.util.LiteList; +import com.common.util.Queue; +import com.common.util.StreamBuffer; +import com.foundation.web.interfaces.IConnectionContext; +import com.foundation.web.interfaces.IContent; +import com.foundation.web.interfaces.IMimeType; +import com.foundation.web.interfaces.IPassThroughDomain; +import com.foundation.web.interfaces.IRequest; +import com.foundation.web.interfaces.IResponse; +import com.foundation.web.interfaces.ISession; +import com.foundation.web.interfaces.ISessionLifecycleAware; +import com.foundation.web.interfaces.IStreamedWebsocketMessage; +import com.foundation.web.interfaces.IWebApplication; +import com.foundation.web.interfaces.WebsocketHandler; +import com.foundation.web.server.Request.ContentPart; +import com.foundation.web.server.WebServer.IWebApplicationContainerProvider; +import com.foundation.web.server.WebServer.IgnoredIOException; +import com.foundation.web.server.WebServer.TlsFailureException; + +/** + * Provides a place for connection oriented data. + *Note that a client server session can have multiple socket contexts (one for each socket) and that the socket context may be used to access multiple applications hosted on this server.
+ */ +public class SocketContext extends AbstractSocketContext implements IWebApplicationContainerProvider, IConnectionContext { + /** The server socket reference that created the socket. This will be null if the socket was not created with a server socket (shouldn't happen in a web server). */ + public ServerSocketContext serverSocketContext = null; + /** The web application's container for the application associated with this connection to a client. A connection is only allowed to access a single application. */ + public WebApplicationContainer webApplicationContainer = null; + /** A temporary storage location for part of an HTTP message header. Warning: This is used by the reading thread ONLY - never by the processing/writing threads. */ + public StringBuffer messageHeaderFragment = null; + /** A reference to the request object currently being processed. Warning: This is used by the reading thread ONLY - never by the processing/writing threads. */ + public Request request = null; + /** The count of stored content bytes for the request. This is valid for any type of request. */ + public int requestContentPosition = 0; + /** The multi-part message characters remaining to be processed from the last message fragment received. */ + public byte[] remainingPartBytes = null; + /** The multi-part message count of characters/bytes read thus far. This ensures the entire message is read properly. */ + public int partReadCount = 0; + /** The mutli-part message count of characters/bytes written to the buffer as the message from the client is parsed. Used to index into the buffer by the various ContentPart instances generated. */ + public int partWriteCount = 0; + /** The part that is currently being read and spans more than one message fragment. */ + public ContentPart currentPart = null; + /** Whether the final part boundary has already been found. If this is true then the message bytes should keep getting read until the partReadCount == contentLength. */ + public boolean endPartBoundaryFound = false; + /** The bytes containing the unencrypted outbound message that is waiting for the socket to allow a write. */ + public MessageBuffer/*ByteBuffer*/ currentOutboundMessage = null; + /** The last message buffer added to the pending outbound message chain (linked list). Used only for pass through contexts currently since locally handled messages link the reponses together into a list. */ + private MessageBuffer lastOutboundMessage = null; + /** The byte buffer used to read data from the socket. This must be null if a SSL engine is provided. */ + public ByteBuffer socketReadBuffer = null; + /** The buffer used to store the initial data in a SSL/TLS connection. The buffering is necessary to allow us to pre-read the client's hello message to gather the domain the client is connecting to - allowing for the correct SSL engine to be used. */ + public StreamBuffer initialBuffer = null; + /** Whether the TLS domain extension was used. If false then the default SSL domain should be used to attempt an anonymous connection with the client such that an error page can be displayed prompting the user to upgrade their browser to something modern. */ + public boolean tlsFailure = false; + /** The domain name the client is connecting to. This is provided by the initial TLS hello message, and is used to select the correct SSL context for the desired web application. */ + public String domain = null; + /** Non-null if the socket is using SSL to secure communications. */ + public SSLEngine sslEngine = null; + /** Whether the ssl engine needs to send handshake data to the client and has not yet generated it. */ + public boolean sslNeedsWrap = false; + /** The reusable buffer containing encrypted data from the client. */ + public ByteBuffer encryptedReadBuffer = null; + /** The reusable buffer containing unencrypted data from the client. */ + public ByteBuffer unencryptedReadBuffer = null; + /** The reusable buffer containing encrypted data being sent to the client. */ + public ByteBuffer encryptedWriteBuffer = null; + /** The last used request number which identifies the sequence for the requests. */ + private int lastRequestNumber = 0; + /** The response we are currently processing. */ + private Response currentResponse = null; + /** The response we are will process last. */ + private Response lastResponse = null; + /** Tracks the number of bytes sent from the current response. This is only used when debugging. */ + private int sentBytes = 0; + /** Tracks the getWebServer().debug output for the current request/response cycle. This is only used when debugging. */ + public StringBuffer debugBuffer = null; //getWebServer().debug ? new StringBuffer(1000) : null; + /** The optional application specific data indexed by an application spectific key. Elements (values, not keys) which implement ISessionLifecycleAware will be released when the socket context is released. */ + private LiteHashMap applicationDataMap; + /** Used to identify the first unencrypted message (ignored if ssl is being used) so that forwarding to a remote server can be accomplished. */ + private boolean isFirstUnencryptedMessage = true; + /** Whether this is a websocket connection. */ + private boolean isWebsocket = false; + /** The protocol passed when this connection was upgraded to a websocket. */ + private String websocketProtocol = null; + /** The maximum number of bytes in an allowed websocket message (may be composed of multiple frames, does not include the frame header sizes). While this is a long, we really can't read longer than Integer.MAX_VALUE sized frames. So the message might be within size limits, but it might still be rejected if the frame exceeds the server's capacity to read a frame. */ + private long websocketMaxMessageLength = 0; + /** The reusable frame header buffer. */ + private byte[] websocketFrameHeader = null; + /** The index into the frame header of the last read byte. */ + private int websocketFrameHeaderIndex = 0; + /** The next partial message (single messages may be broken into frames by the client). */ + private StreamBuffer websocketPartialReceivedMessage = null; + /** The remaining bytes to be read on the current message frame (the frame header and part of the frame was read previously). */ + private int websocketFrameRemainder = 0; + /** Whether the frame currently being read in the current message being received is the last frame in the message (when the frame is fully read it will trigger the message to be processed). */ + private boolean websocketIsLastFrameInMessage = false; + /** The op code for the currently reading message, or zero if the last message was completed. */ + private int websocketMessageOpCode = 0; + /** The currently reading frame's mask key used to decode the frame data. */ + private byte[] websocketMessageMaskKey = null; + /** The queue used to hold outgoing messages before they are sent. Will contain only String, byte[], or IConnectionContext.IStreamedWebsocketMessage instances. */ + private Queue websocketPendingMessages = null; + /** The streambuffer for the currently sending message. This will be for the current part of the streaming message if websocketStreamingMessage is non-null. */ + private ByteBuffer websocketSendingMessage = null; + /** The streaming message handler which will be set only if the currently sending message is streaming. */ + private IStreamedWebsocketMessage websocketStreamingMessage = null; + /** The application specified handler called when websocket events occur (messages received, socket closed, etc). */ + private WebsocketHandler websocketHandler = null; +/** + * SocketContext constructor. + * @param serverSocketContext The context for the server socket that accepted this socket. + * @param networkListener The network listener that this socket exists within. + */ +public SocketContext(ServerSocketContext serverSocketContext, NetworkListener networkListener) { + super(networkListener); +}//SocketContext()// +/** + * Gets the pass through socket context associated with this socket context, or null if none exists. + * @return The socket context for the pass through socket used to handle all incoming requests from the client on this socket. + */ +protected PassThroughSocketContext getPassThroughSocketContext() { + return (PassThroughSocketContext) getRelatedSocketContext(); +}//getPassThroughSocketContext()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock() + */ +protected Object getLock() { + return this; +}//getLock()// +protected synchronized void close() { + try { + if(websocketHandler != null) { + websocketHandler.connectionClosed(); + websocketHandler = null; + }//if// + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + + try { + if(applicationDataMap != null) { + for(IIterator iterator = applicationDataMap.valueIterator(); iterator.hasNext(); ) { + Object next = iterator.next(); + + //Ensure the code keeps going even if there is a problem cleaning up.// + try { + if(next instanceof ISessionLifecycleAware) ((ISessionLifecycleAware) next).release(); + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + }//for// + }//if// + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + + try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {} + try {if(key != null) key.cancel();} catch(Throwable e) {} + //Clean up after the response and request.// + //try {while(currentOutboundMessage != null) {currentOutboundMessage.close(); currentOutboundMessage = currentOutboundMessage.getNext();}} catch(Throwable e2) {} + try {if(currentResponse != null) currentResponse.close();} catch(Throwable e2) {} + + if(getPassThroughSocketContext() != null) { + getPassThroughSocketContext().close(); + }//if// +}//close()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#getApplicationData(java.lang.String) + */ +public Object getApplicationData(String key) { + return applicationDataMap == null ? null : applicationDataMap.get(key); +}//getApplicationData()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#setApplicationData(java.lang.String, java.lang.Object) + */ +public void setApplicationData(String key, Object applicationData) { + if(applicationDataMap == null) { + applicationDataMap = new LiteHashMap(10); + }//if// + + applicationDataMap.put(key, applicationData); +}//setApplicationData()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.IWebApplicationContainerProvider#getWebApplicationContainer() + */ +public WebApplicationContainer getWebApplicationContainer() { + return webApplicationContainer; +}//getWebApplicationContainer()// +/** + * Whether the socket is an SSL/TLS socket. + * @return Whether data streamed over the socket is encrypted using SSL/TLS. + */ +public boolean isSsl() { + return socketReadBuffer == null; +}//isSsl()// +/** + * Queues an outbound client message by adding it to the linked list of message buffers. Handles synchronizing on this SocketContext to prevent multiple threads from adding messages at once. Also handles notifying the ServiceListener that it should update it's write flags for the socket. + * @param messageBuffer The buffer to be added to the end. + */ +private void queueOutboundClientMessage(MessageBuffer messageBuffer) { + boolean notify = false; + + synchronized(this) { + if(currentOutboundMessage == null) { + lastOutboundMessage = currentOutboundMessage = messageBuffer; + notify = true; + }//if// + else { + lastOutboundMessage.setNext(messageBuffer); + lastOutboundMessage = messageBuffer; + }//else// + }//synchronized()// + + if(notify) { + notifyListenerOfPendingWrite(); + }//if// +}//queueOutboundClientMessage()// +private void writeSessionCookies(PrintStream pout) { + Response response = currentResponse; + ISession session = response.getSession(); + + if(session != null) { + //Write the session id only if it has changed.// + if(!Comparator.equals(response.getRequest().getSessionId(), session.getSessionId())) { + pout.print("Set-Cookie: sessionId=" + session.getSessionId() + ";path=/;\r\n"); + }//if// + + //Write the secure session id only if it has changed.// + if(!Comparator.equals(response.getRequest().getSecureSessionId(), session.getSecureSessionId())) { + pout.print("Set-Cookie: secureSessionId=" + (session.getSecureSessionId() == null ? "" : session.getSecureSessionId()) + ";path=/;secure\r\n"); + }//if// + + if(response.getRequest().isLoggedIn() != session.getIsLoggedIn()) { + pout.print("Set-Cookie: isLoggedIn=" + session.getIsLoggedIn() + ";path=/;\r\n"); + }//if// + }//if// +}//writeSessionCookies()// +/** + * Processes the next response in the sequence. + *Note: The caller must synchronize on this context to prevent multiple threads from accessing the context at the same time.
+ * @result Whether request is in a receive state. Will be false if the request generated a response that could not be completely transmitted. + */ +private void prepareResponse() { + Response response = currentResponse; + Request request = (Request) response.getRequest(); + byte[] headerBytes = null; + IContent content = null; + ByteBuffer buffer = null; + + try { + //Wrap the response in http cloths. The HeaderFieldNames will be set if the response was provided with a completely custom HTTP header to be used.// + if(response.getHeaderFieldNames() != null) { + ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); + PrintStream pout = new PrintStream(bout, true, "ASCII"); + LiteList headerFieldNames = response.getHeaderFieldNames(); + LiteHashMap headerFieldMap = response.getHeaderFieldMap(); + + //Write the response line which is mapped to the null field name.// + pout.print(headerFieldMap.get(null)); + pout.print("\r\n"); + + //Write the rest of the response header lines in order.// + for(int index = 0; index < headerFieldNames.getSize(); index++) { + String headerFieldName = (String) headerFieldNames.get(index); + String headerFieldValue = (String) headerFieldMap.get(headerFieldName); + + if(headerFieldName.equals("Server")) { + pout.print("Server: DE/1.0"); + }//if// + else { + pout.print(headerFieldName); + pout.print(": "); + pout.print(headerFieldValue); + }//else// + + pout.print("\r\n"); + }//for// + + //Write out any cookies necessary to retain our session.// + writeSessionCookies(pout); + + //End the header.// + pout.print("\r\n"); + pout.close(); + headerBytes = bout.toByteArray(); + + //Prepare the content for delivery.// + content = response.getContent(); + }//if// + else if(response.getForwardUri() != null) { + ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); + PrintStream pout = new PrintStream(bout, true, "ASCII"); + //String today = format.format(new Date()); + + //The 303 code may not be fully supported by browsers.// + //if(request.getHttpVersion().equalsIgnoreCase("HTTP/1.1") || request.getHttpVersion().equalsIgnoreCase("HTTP/1.2")) { + // pout.print("HTTP/1.1 303 Forwarded\r\n"); + //}//if// + //else { + pout.print("HTTP/1.1 302 Moved Temporarily\r\n"); + //}//else// + + writeSessionCookies(pout); + + pout.print("Location: " + response.getForwardUri() + "\r\n"); + + //Note: Encoded URL's will have their parameters in the content, not in the URL.// + if(response.getContent() == null) { + pout.print("Content-Length: 0\r\n"); + }//if// + else { + pout.print("Content-Length: " + response.getContent().getSize() + "\r\n"); + pout.print("Content-Type: application/x-www-form-urlencoded\r\n"); + }//else// + + pout.print("\r\n"); + pout.close(); + headerBytes = bout.toByteArray(); + }//else if// + else if((content = response.getContent()) != null) { //Convert the result into a stream of bytes.// + ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); + PrintStream pout = new PrintStream(bout, true, "ASCII"); + Date lastModifiedDate = content.getLastModifiedDate(); + String cacheDirective = null; + IMimeType mimeType = content.getMimeType(response.getApplication() != null ? response.getApplication().getMimeTypeProvider() : null); + boolean isDownloaded = content != null && (content.getIsDownloaded() == null ? mimeType != null && mimeType.isDownloaded() : content.getIsDownloaded().booleanValue()); + boolean compress = response.getCompress().booleanValue() && (mimeType == null || mimeType.isCompressable()); + int compressionType = 0; //0: none, 1: gzip, ..// + + if(compress) { + //Check for an encoding allowed by the client that we know how to use.// + if(request.getAllowedEncodings() == null || request.getAllowedEncodings().length < 1) { + compress = false; + }//if// + else { + compress = false; + + //Ensure we have an allowed encoding we know how to use.// + for(int index = 0; !compress && index < request.getAllowedEncodings().length; index++) { + String encoding = request.getAllowedEncodings()[index]; + + if(encoding.equalsIgnoreCase("gzip")) { + compress = true; + compressionType = 1; + }//if// + }//for// + }//else// + }//if// + + if(response.isError()) { + if(response.getHeader() != null) { + pout.print(response.getHeader()); + }//if// + else { + pout.print("HTTP/1.1 404 Resource Not Found\r\n"); + }//else// + }//if// + else if(response.getCustomHeader() != null) { + pout.print(response.getCustomHeader()); + }//else if// + else if(isDownloaded && request.getRange() != null) { + pout.print("HTTP/1.1 206 Partial Content\r\n"); + }//else if// + else { + pout.print("HTTP/1.1 200 OK\r\n"); + }//else// + + pout.print("Content-Length: " + (content != null ? content.getSize() : 0) + "\r\n"); + + if(compress) { + //TODO: Add others? + if(compressionType == 1) { + content = new GzipContent(content); + pout.print("Content-Encoding: gzip\r\n"); + }//if// + }//if// + + if(content != null) { + //Note: The character set gives IE indigestion for some reason.// + pout.print("Content-Type: " + (mimeType != null ? mimeType.getMimeName() : "text/html") + "; charset=" + (response.getCharacterSet() == null ? "UTF-8" : response.getCharacterSet()) + "\r\n"); + cacheDirective = content.getCacheDirective(); + + if(isDownloaded) { + pout.print("Content-Disposition: attachment; filename=\"" + content.getDownloadName() + "\";\r\n"); + pout.print("Accept-Ranges: bytes\r\n"); + + if(request.getRange() != null) { +// Debug.log("Sending a ranged response: " + request.getRange() + " content range: (" + content.getStart() + " - " + content.getEnd() + "/" + content.getSize() + ")."); + pout.print("Range: " + request.getRange() + "\r\n"); + pout.print("Content-Range: bytes " + content.getStart() + "-" + content.getEnd() + "/" + content.getSize() + "\r\n"); + }//if// + }//if// + }//if// + + writeSessionCookies(pout); + + pout.print("Server: DE/1.0\r\n"); + //TODO: IE has a problem with caching and forwarding/redirecting. A page that redirects to another page that was previously cached does not result in IE sending a request for the forwarded content.// + //private / no-cache + + if(content.getExpiresDirective() != null) { + pout.print("Expires: " + getHttpDateFormat().format(content.getExpiresDirective())); + }//if// + + if(cacheDirective != null) { + pout.print("Cache-Control: " + cacheDirective + "\r\n"); + }//if// + else { + int cacheLength = content.getCacheLength() != null ? content.getCacheLength().intValue() : mimeType != null ? mimeType.getDefaultCacheLength() : IMimeType.CACHE_LENGTH_NEVER_CACHE; + + if(cacheLength > 0) { + pout.print("Cache-Control: public, max-age=" + cacheLength + "\r\n"); + }//if// + else if(cacheLength == IMimeType.CACHE_LENGTH_ALWAYS_TEST) { + pout.print("Cache-Control: public, pre-check=0, post-check=120\r\n"); + }//else if// + else if(cacheLength == IMimeType.CACHE_LENGTH_NEVER_CACHE) { + pout.print("Cache-Control: no-cache\r\n"); + }//else if// + else { + pout.print("Cache-Control: no-store\r\n"); + }//else// + }//else// + + //TODO: Determine if we need to use age. + //pout.print("Age: 0\r\n"); + //TODO: Determine if we need to use ETags + + if(lastModifiedDate != null) { + SimpleDateFormat format = getHttpDateFormat(); + + pout.print("Last-Modified: " + format.format(lastModifiedDate) + "\r\n"); + pout.print("Date: " + format.format(new Date()) + "\r\n"); + }//if// + + pout.print("\r\n"); + headerBytes = bout.toByteArray(); + }//else if// + else { + ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); + PrintStream pout = new PrintStream(bout, true, "ASCII"); + + if(response.isError()) { + if(response.getHeader() != null) { + pout.print(response.getHeader()); + }//if// + else { + pout.print("HTTP/1.1 404 Resource Not Found\r\n"); + }//else// + }//if// + else if(response.getCustomHeader() != null) { + pout.print(response.getCustomHeader()); + }//else if// + else { + Debug.log(new RuntimeException("The response to: " + response.getRequest().getHeaderText() + " had no response content!")); + pout.print("HTTP/1.1 200 OK\r\n"); + }//else// + + writeSessionCookies(pout); + pout.print("Content-Length: 0\r\n"); + pout.print("Server: DE/1.0\r\n"); + pout.print("\r\n"); + pout.close(); + headerBytes = bout.toByteArray(); + }//else// + + buffer = ByteBuffer.allocate(headerBytes.length > 2000 ? headerBytes.length : 2000); + buffer.put(headerBytes); + + if(getWebServer().debug) { + //Test code... + ByteBuffer buffer2 = ByteBuffer.allocate(headerBytes.length); + buffer2.put(headerBytes); + buffer2.flip(); + CharBuffer ch = decoder.decode(buffer2); +// debugBuffer.append("Sending message:\n"); +// debugBuffer.append(ch.toString()); +// debugBuffer.append("\nResponse Size: " + (headerBytes.length + (content != null ? content.getSize() : 0)) + "\n"); + Debug.log(ch.toString()); + }//if// + + //Ignore the content if we are only accessing the header.// +// if(content != null && request.getRequestType() != Request.TYPE_HEAD) { +// content.get(buffer); +// }//if// + + //Save the buffer as the current pending outbound message for this socket context.// + currentOutboundMessage = new MessageBuffer(buffer, response, content != null && request.getRequestType() != Request.TYPE_HEAD ? content : null); + }//try// + catch(Throwable e) { + Debug.log("Fatal Error: Failed to build and send the response message due to an exception.", e); + //Force the channel to close.// + try {key.channel().close();} catch(Throwable e2) {} + //Clean up after the request and response.// + try {response.close();} catch(Throwable e2) {} + }//catch// +}//prepareResponse()// +/** + * Adds a HTTP response to the socket context. + *Note: We must synchronize since a socket could be used to access multiple applications and thus mutliple sessions.
+ * @param response The response to be added. + * @result Whether request is in a receive state. Will be false if the request generated a response that could not be completely transmitted. + */ +public synchronized boolean sendHttpResponse(Response response) { + if(currentResponse != null) { + lastResponse.setNextResponse(response); + lastResponse = response; + }//if// + else { + lastResponse = currentResponse = response; + sentBytes = 0; + prepareResponse(); + + //Note: Not going to process the response on this thread. Allow the flag to be set for writing to the socket, and have the next thread in the network listener handle the write. This allows for cleaner code and pipelining without all the synchronizing. +// result = internalProcessResponses(); + }//else// + + request = null; + + return false; +}//sendHttpResponse()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer) + */ +protected synchronized boolean passThrough(ByteBuffer buffer) { + ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); + MessageBuffer message; + + //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// + messageBytes = ByteBuffer.allocate(buffer.remaining()); + messageBytes.put(buffer); + message = new MessageBuffer(messageBytes); + + //Chain the message into the linked list. + if(lastOutboundMessage == null || currentOutboundMessage == null) { + currentOutboundMessage = lastOutboundMessage = message; + }//if// + else { + lastOutboundMessage.setNext(message); + lastOutboundMessage = message; + }//else// + + return true; +}//passThrough()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() + */ +protected void writeOutgoingMessages() throws IOException { + if(getPassThroughSocketContext() != null) { + //Synchronized to avoid multiple threads accessing the pendingOutboundMessage chain at one time and updating the write flag out of order (could happen if we enabled request chaining over a single socket).// + synchronized(this) { + writeClientBoundMessage(); + }//synchronized// + }//if// + else if(isWebsocket) { + //Right after upgrading the socket we have one last HTTP response to process.// + if(currentResponse != null) { + internalProcessResponses(); + }//if// + + internalProcessWebsocketMessages(); + }//else if// + else { + //Go directly to writing the client response if we are just passing everything through to another process.// + internalProcessResponses(); + }//else// +}//processCurrentResponse()// +/** + * Loads the next outbound websocket message and attempts to write it to the socket until all outbound messages have been sent, or the socket's buffers are full and a wait is required. + * If a message could only be partially sent then the next call will attempt to finish sending it. + */ +private void internalProcessWebsocketMessages() { + if(websocketSendingMessage == null) { + loadNextWebsocketMessage(); + }//if// + + while(websocketSendingMessage != null) { + //If the socket is open then send the next buffer of data.// + if(key.channel().isOpen()) { + if(currentOutboundMessage != null) { + //Put the sending message in a MessageBuffer (pendingOutboundMessage).// + currentOutboundMessage = new MessageBuffer(websocketSendingMessage); + }//if// + + //Write the pendingOutboundMessage to the socket.// + if(writeClientBoundMessage()) { + websocketSendingMessage = null; + currentOutboundMessage = null; + }//if// + }//if// + + //If we finished sending the message then load the next one.// + if(websocketSendingMessage == null) { + loadNextWebsocketMessage(); + }//if// + }//while// +}//internalProcessWebsocketMessages()// +/** + * Loads and prepares the next websocket message from the queue of pending messages. + * Clears the pending message attributes if there isn't a pending message to be processed. + * The caller can check websocketSendingMessage == null to see if there is a ready message. + */ +private void loadNextWebsocketMessage() { + Object next = null; + boolean isLastPart = true; + + if(websocketStreamingMessage != null && websocketStreamingMessage.hasNextPart()) { + next = websocketStreamingMessage.getNextPart(); + isLastPart = !websocketStreamingMessage.hasNextPart(); + + //Ensure that we got a string or byte array.// + if(!(next instanceof String || next instanceof byte[])) { + throw new RuntimeException("Invalid streaming message part type."); + }//if// + }//if// + else { + synchronized(websocketPendingMessages) { + if(websocketPendingMessages.getSize() > 0) { + next = websocketPendingMessages.dequeue(); + }//if// + }//synchronized// + }//else// + + if(next != null) { + byte[] bytes = null; + int opCode = 0; + int length = 0; + + if(next instanceof String) { + try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} + opCode = websocketStreamingMessage == null ? 0x01 : 0; + length = bytes.length; + }//if// + else if(next instanceof byte[]) { + bytes = (byte[]) next; + opCode = websocketStreamingMessage == null ? 0x02 : 0; + length = bytes.length; + }//else if// + else if(next instanceof Byte) { //Control Message// + opCode = ((Byte) next).byteValue(); + }//else if// + else if(next instanceof IStreamedWebsocketMessage) { + websocketStreamingMessage = (IStreamedWebsocketMessage) next; + next = websocketStreamingMessage.getNextPart(); + isLastPart = !websocketStreamingMessage.hasNextPart(); + + if(next instanceof String) { + try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} + opCode = 0x01; //Text// + length = bytes.length; + }//if// + else if(next instanceof byte[]) { + bytes = (byte[]) next; + opCode = 0x02; //Binary// + length = bytes.length; + }//else if// + else { + throw new RuntimeException("Invalid streaming message part type."); + }//if// + }//else if// + + websocketSendingMessage = ByteBuffer.allocate(14 + length); + websocketSendingMessage.put((byte) (isLastPart ? 0x8 : 0)); + websocketSendingMessage.put((byte) opCode); + + //Write the length differently based on how long the content is.// + if(length < 126) { + websocketSendingMessage.put((byte) length); +// websocketSendingMessage.putLong(0, StreamBuffer.NUMBER_MSF); + }//if// + else if(length < 65535) { + websocketSendingMessage.put((byte) 126); + websocketSendingMessage.putShort((short) (length & 0xFFFF)); +// websocketSendingMessage.putShort((short) 0); + websocketSendingMessage.putInt(0); + }//else if// + else { + websocketSendingMessage.put((byte) 127); + websocketSendingMessage.putLong((long) length); + }//else// + + //The server doesn't use a mask key.// +// websocketSendingMessage.putInt(0); + //Put the content at the end of the message.// + websocketSendingMessage.put(bytes); + }//if// + else { + websocketSendingMessage = null; + websocketStreamingMessage = null; + }//else// +}//loadNextWebsocketMessage()// +/** + * @return + */ +private synchronized void internalProcessResponses() { + boolean finishedSending = true; + + //Keep sending responses while the buffers are not full and there is another response to send.// + while(finishedSending && currentResponse != null) { + //If the socket is open then send the next buffer of data.// + if(key.channel().isOpen()) { + //Send the pending response object's prepared buffer of data.// + finishedSending = writeClientBoundMessage(); + }//if// + + //Close the response if successfully sent, or if the socket is closed.// + if(finishedSending || !key.channel().isOpen()) { + try {currentResponse.close();} catch(Throwable e) {} + }//if// + + //If we finished sending the current response then load the next one.// + if(finishedSending) { + currentResponse = currentResponse.getNextResponse(); + + if(currentResponse == null) { + lastResponse = null; + }//if// + else if(key.channel().isOpen()) { + //Prep the next response object for sending.// + prepareResponse(); + }//else// + else { + //Clean up after all the left over responses.// + while(currentResponse != null) { + currentResponse.close(); + currentResponse = currentResponse.getNextResponse(); + }//while// + + currentResponse = null; + lastResponse = null; + }//else// + }//if// + }//while// +}//processCurrentResponse()// +/** + * Sends a response to the client. + * @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns. + */ +private boolean writeClientBoundMessage() { + boolean sendMore = true; + +// if(getWebServer().debug) { +// debugBuffer.append("Starting a write cycle.\n"); +// }//if// + + try { + //Process SSL output first.// + if(sslEngine != null) { + //If we have part of an SSL frame then try to send it first.// + if(encryptedWriteBuffer.hasRemaining()) { + int remaining = encryptedWriteBuffer.remaining(); + + //Write the bytes to the stream.// + ((SocketChannel) key.channel()).write(encryptedWriteBuffer); + +// if(getWebServer().debug) { +// debugBuffer.append("Wrote " + (remaining - encryptedWriteBuffer.remaining()) + " encrypted bytes to the stream. " + encryptedWriteBuffer.remaining() + " remain.\n"); +// }//if// + + //Check to see if we failed to send the whole frame.// + if(encryptedWriteBuffer.hasRemaining()) { + sendMore = false; + }//if// + }//if// + + while(sendMore && sslNeedsWrap) { + SSLEngineResult handshakeResult; + + //Reset the encrypted write buffer - note that since we will never read while waiting to write data, this should always be empty.// + encryptedWriteBuffer.position(0); + encryptedWriteBuffer.limit(encryptedWriteBuffer.capacity()); + //Generate the handshake message.// + handshakeResult = sslEngine.wrap(ByteBuffer.allocate(0), encryptedWriteBuffer); + encryptedWriteBuffer.flip(); + + if(handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine buffer overflow.")); + }//if// + else if(handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine buffer underflow.")); + }//else if// + else if(handshakeResult.getStatus() == Status.CLOSED) { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine closed.")); + //TODO: Handle this closure without an infinate loop... + //Close the socket.// + try {key.channel().close();}catch(Throwable e2) {} + }//else if// + else if(handshakeResult.getStatus() == Status.OK) { + if(handshakeResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine task.")); + }//if// + else if(encryptedWriteBuffer.hasRemaining()) { + int remaining = encryptedWriteBuffer.remaining(); + + //Write the bytes to the stream.// + ((SocketChannel) key.channel()).write(encryptedWriteBuffer); + +// if(getWebServer().debug) { +// debugBuffer.append("Sent " + (remaining - encryptedWriteBuffer.remaining()) + " encrypted bytes.\n"); +// }//if// + + //If not all the bytes could be written then we will need to wait until we can write more.// + if(encryptedWriteBuffer.hasRemaining()) { +// if(getWebServer().debug) { +// debugBuffer.append("Pausing due to a partially sent packet (while ssl handshaking). Bytes actually sent: " + encryptedWriteBuffer.position() + ". Bytes remaining: " + encryptedWriteBuffer.remaining() + ".\n"); +// }//if// + + //Leave the data in the encrypted write buffer for the writing operation to send it.// + sendMore = false; + }//if// + + //Update the SSL needs wrap flag.// + if(handshakeResult.getHandshakeStatus() != HandshakeStatus.NEED_WRAP) { + sslNeedsWrap = false; + }//if// + }//else if// + else { + sslNeedsWrap = false; + }//else// + }//else if// + else { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine status code.")); + }//else// + }//while// + +// if(getWebServer().debug) { +// debugBuffer.append("End Handshaking SSL\n"); +// }//if// + }//if// + + if(sendMore && currentOutboundMessage != null) { + //Check to see if the outbound message is prepared to send more content. For chunked transfers the outbound message may be waiting for additional content from another stream and we should return later.// + if(!currentOutboundMessage.getBuffer().hasRemaining()) { + if(!currentOutboundMessage.loadBuffer()) { + if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) { + currentOutboundMessage = currentOutboundMessage.getNext(); + }//if// + else { + sendMore = false; + }//else// + }//if// + + if(currentOutboundMessage.getBuffer() == null) { + currentOutboundMessage = null; + lastOutboundMessage = null; + }//if// + }//if// + + //If we have an application response pending then send it now.// + if(sendMore && currentOutboundMessage.getBuffer().hasRemaining()) { + if(sslEngine != null) { + //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// + while(key.channel().isOpen() && sendMore && (currentOutboundMessage != null) && currentOutboundMessage.getBuffer().hasRemaining()) { + SSLEngineResult encryptResult; +// int offset = pendingOutboundMessage.getBuffer().position(); +//TODO: Comment me. +//int rem = pendingOutboundMessage.getBuffer().remaining(); + //Reset the encrypted write buffer.// + encryptedWriteBuffer.compact(); + //Encrypt the next message frame.// + encryptResult = sslEngine.wrap(currentOutboundMessage.getBuffer(), encryptedWriteBuffer); + encryptedWriteBuffer.flip(); +//TODO: Comment me. +//Debug.log("Encrypting/Sending to client from Git " + (rem - pendingOutboundMessage.getBuffer().remaining()) + " bytes."); + +// if(getWebServer().debug) { +// sentBytes += (pendingOutboundMessage.position() - offset); +// debugBuffer.append("Encrypted: " + (pendingOutboundMessage.position() - offset) + ". Total Encrypted: " + sentBytes + ". Encrypted size: " + encryptedWriteBuffer.limit() + ".\n"); +// }//if// + + if(encryptResult.getStatus() == Status.BUFFER_OVERFLOW) { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine buffer overflow.")); + }//if// + else if(encryptResult.getStatus() == Status.BUFFER_UNDERFLOW) { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine buffer underflow.")); + }//else if// + else if(encryptResult.getStatus() == Status.CLOSED) { + //Should never happen.// +// Debug.log(new RuntimeException("Unexpected ssl engine closed.")); + //TODO: Handle this closure without an infinate loop... + //Close the socket.// + try {key.channel().close();} catch(Throwable e2) {} + }//else if// + else if(encryptResult.getStatus() == Status.OK) { + //Write the bytes to the stream.// + try { + int remaining = encryptedWriteBuffer.remaining(); + + ((SocketChannel) key.channel()).write(encryptedWriteBuffer); + +// if(getWebServer().debug) { +// debugBuffer.append("Sent " + (remaining - encryptedWriteBuffer.remaining()) + " encrypted bytes.\n"); +// }//if// + }//try// + catch(IOException e) { + //Caught if the channel is forcably closed by the client. We will ignore it.// + }//catch// + + //If not all the bytes could be written then we will need to wait until we can write more.// + if(encryptedWriteBuffer.hasRemaining()) { + //Leave the data in the encrypted write buffer for the writing operation to send it.// + sendMore = false; + +// if(getWebServer().debug) { +// debugBuffer.append("Pausing due to a partially sent packet. Bytes actually sent: " + encryptedWriteBuffer.position() + ". Bytes remaining: " + encryptedWriteBuffer.remaining() + ".\n"); +// }//if// + }//if// + }//else if// + else { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine status code.")); + }//else// + + //Add more content to the buffer.// + //Note: Do this even if the last encrypted write buffer could not be fully sent - so that when it is sent there will be outbound message content.// + if(key.channel().isOpen() && currentOutboundMessage != null) { + if(!currentOutboundMessage.loadBuffer()) { + //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// + if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) { + currentOutboundMessage = currentOutboundMessage.getNext(); + }//if// + else { + //Wait until additional message bytes are available.// + sendMore = false; + }//else// + }//if// + + //If the message end has been reached then the buffer will be null.// + if(currentOutboundMessage.getBuffer() == null) { + currentOutboundMessage = null; + lastOutboundMessage = null; + }//if// + }//if// + }//while// + }//if// + else { + //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// + while(sendMore && (currentOutboundMessage != null) && currentOutboundMessage.getBuffer().hasRemaining()) { + //Write the bytes to the stream.// + ((SocketChannel) key.channel()).write(currentOutboundMessage.getBuffer()); + +// if(getWebServer().debug) { +// sentBytes += pendingOutboundMessage.position(); +// debugBuffer.append("Wrote " + pendingOutboundMessage.position() + " bytes to the client. Total sent: " + sentBytes + "\n"); +// }//if// + + //If not all the bytes could be written then we will need to wait until we can write more.// + if(currentOutboundMessage.getBuffer().hasRemaining()) { + sendMore = false; + }//if// + else { + if(!currentOutboundMessage.loadBuffer()) { + //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// + if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) { + currentOutboundMessage = currentOutboundMessage.getNext(); + }//if// + else { + //Wait until additional message bytes are available.// + sendMore = false; + }//else// + }//if// + + //If the message end has been reached then the buffer will be null.// + if(currentOutboundMessage.getBuffer() == null) { + currentOutboundMessage = null; + lastOutboundMessage = null; + }//if// + }//else// + }//while// + }//else// + }//if// + }//if// + }//try// + catch(ClosedChannelException e) { + close(); + }//catch// + catch(SSLException e) { + if(getWebServer().debug) { + Debug.log(e); + }//if// + + close(); + }//catch// + catch(IOException e) { + if(getWebServer().debug) { + Debug.log(e); + }//if// + + close(); + }//catch// + + return sendMore; +}//writeClientBoundMessage()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest() + */ +protected void readIncomingMessages() throws IOException { + boolean requiresRead = true; + SocketChannel channel = (SocketChannel) key.channel(); + + if(isSsl()) { + int loopCount = 0; + boolean keepReading = true; + + if(sslEngine == null) { + SSLSession session = null; + + if(parseFirstTlsMessage(this, channel)) { + SSLContext sslContext = null; + + //Error checking.// + if(domain == null || domain.length() == 0) { + //We seem to have a choice here. We can either throw an error which ends up forcably closing the connection which for most browsers allows the client to retry with different settings (not older IE6, 7, 8 implementations), or return a pretty error which prevents users of well behaved browsers from getting the correct credentials.// + throw new TlsFailureException("Connection did not provide a domain name in the TLS hello message."); +// context.tlsFailure = true; + }//if// + + //Get the web application for the given domain.// + //Synchronize to prevent another thread from altering the service's web applications while we are accessing it.// + synchronized(getWebServer()) { + if(tlsFailure) { + domain = serverSocketContext.serviceListener.getDefaultSslDomain(); + }//if// + + webApplicationContainer = serverSocketContext.serviceListener.getWebApplicationContainer(domain); + }//synchronized// + + //Error checking.// + if(webApplicationContainer == null) { + throw new IOException("Cannot find a web application for the domain " + domain + "."); + }//if// + + //Lock on the container so it doesn't change the object it contains while we access it.// + synchronized(webApplicationContainer) { + //Get the SSLContext from the web application.// + sslContext = webApplicationContainer.getWebApplication().getSslContext(domain); + + if(sslContext == null) { + throw new IOException("Cannot get an SSLContext for the domain " + domain + " for the domain associated web application."); + }//if// + + sslEngine = sslContext.createSSLEngine(); + + //Attempt an anonymous connection to the client so we can report that they have an old browser version that is not using TLS + domain extension.// +// if(context.tlsFailure) { +// context.sslEngine.setEnabledCipherSuites(new String[] { +// "SSL_DH_anon_WITH_3DES_EDE_CBC_SHA", +// "SSL_DH_anon_WITH_DES_CBC_SHA", +// "SSL_DH_anon_WITH_RC4_128_MD5"}); +// //"SSL_DH_anon_EXPORT_WITH_RC4_40_MD5", +// //"SSL_DH_anon_EXPORT_WITH_DES40_CBC_SHA", +// //"TLS_DH_anon_EXPORT_WITH_RC4_40_MD5", +// //"TLS_DH_anon_WITH_RC4_128_MD5", +// //"TLS_DH_anon_EXPORT_WITH_DES40_CBC_SHA", +// //"TLS_DH_anon_WITH_DES_CBC_SHA", +// //"TLS_DH_anon_WITH_3DES_EDE_CBC_SHA"}); +// }//if// + + //context.sslEngine.getEnabledProtocols(); + //context.sslEngine.getEnabledCipherSuites(); + sslEngine.setUseClientMode(false); + sslEngine.beginHandshake(); + session = sslEngine.getSession(); + encryptedReadBuffer = ByteBuffer.allocate(session.getPacketBufferSize()); + encryptedReadBuffer.position(encryptedReadBuffer.limit()); + unencryptedReadBuffer = ByteBuffer.allocate(session.getApplicationBufferSize()); + unencryptedReadBuffer.position(unencryptedReadBuffer.limit()); + encryptedWriteBuffer = ByteBuffer.allocate(session.getPacketBufferSize()); + encryptedWriteBuffer.position(encryptedWriteBuffer.limit()); + }//synchronized// + + //Create a pass through socket and context and attach it to this context if the application is setup as a pass through to another process.// + IWebApplication application = webApplicationContainer.getWebApplication(); + + if(application instanceof IPassThroughDomain) { + IPassThroughDomain passThroughDomain = ((IPassThroughDomain) application); + + //Setup the pass through socket context (and socket channel). All data will be sent to this context to be sent to the remote process.// + relatedSocketContext = new PassThroughSocketContext(this, passThroughDomain.getAddress(), passThroughDomain.getPort()); + }//if// + }//if// + }//if// + + if(sslEngine != null) { + //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// + //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// + while(channel.isOpen() && loopCount < 10 && requiresRead && keepReading) { + int readCount = -1; + + //Track how many loops we run so that we don't hog all the server's resources with one connection.// + loopCount++; + //Compact the buffer, placing remaining bytes at the start of the buffer and preping for reading more bytes from the stream.// + encryptedReadBuffer.compact(); + + //If there is an initial buffer of bytes then use those first. The initial buffer is created when parsing the initial TLS message to gather the domain the client is connecting to.// + if(initialBuffer != null) { + readCount = initialBuffer.readBytes(encryptedReadBuffer); + + if(initialBuffer.getSize() == 0) { + initialBuffer = null; + }//if// + }//if// + + //If there isn't an initial buffer then fill (or finish filling) the read buffer.// + if(readCount == -1 && initialBuffer == null) { + //Read a frame of encrypted data (or handshake data).// + try { + readCount = channel.read(encryptedReadBuffer); + }//try// + catch(IOException e) { + //Ignore the forcable close error here - the channel will be closed a few lines lower in the code. TODO: Not sure if all IOExceptions should be ignored here?// + if(!e.getMessage().equalsIgnoreCase("An existing connection was forcibly closed by the remote host")) { + throw e; + }//if// + }//catch// + }//else// + + //Make the encrypted frame buffer readable.// + encryptedReadBuffer.flip(); + + //If the socket was closed then handle it, if we received enough data to process a frame then do so and continue, otherwise we must wait for the client to send more data.// + if(readCount == -1) { + //The socket has been closed by the client.// + close(); + keepReading = false; + }//if// + else if(encryptedReadBuffer.remaining() != 0) { + SSLEngineResult sslResult; + + //Sometimes the SSL Engine requires a partial read of the encrypted bytes, then a task, then reading the rest of the encrypted bytes.// + while(key.channel().isOpen() && encryptedReadBuffer.hasRemaining()) { + //Reset the unencrypted read buffer prior to decrypting the next frame.// + unencryptedReadBuffer.position(0); + unencryptedReadBuffer.limit(unencryptedReadBuffer.capacity()); + //Decrypt the message and process any SSL messages.// + sslResult = sslEngine.unwrap(encryptedReadBuffer, unencryptedReadBuffer); + unencryptedReadBuffer.flip(); + + //Check the requiresRead of the SSL processing.// + if(sslResult.getStatus() == Status.BUFFER_UNDERFLOW) { + //Buffer underflow indicates we haven't enough bytes to finish processing the next frame of data.// + break; + }//if// + else if(sslResult.getStatus() == Status.BUFFER_OVERFLOW) { + //Should never happen.// +// if(getWebServer().debug) Debug.log(new RuntimeException("Unexpected ssl engine buffer overflow.")); + close(); + }//else if// + else if(sslResult.getStatus() == Status.CLOSED) { + //A normal close I think...// + //Debug.log(new RuntimeException("Unexpected ssl engine closed.")); + }//else if// + else if(sslResult.getStatus() == Status.OK) { + //Run any long running tasks now.// + if(sslResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + Runnable task = null; + + while((task = sslEngine.getDelegatedTask()) != null) { + task.run(); + }//while// + }//if// + + //If the engine requires handshake data to be wrapped and sent then do so now.// + if(sslResult.getHandshakeStatus() == HandshakeStatus.NEED_WRAP || sslResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + sslNeedsWrap = true; + + //Need to synchronize if this is a pass through socket so that multiple threads don't access pendingOutboundMessage or lastAddedMessageBuffer (via a call to passThrough(ByteBuffer) on another thread).// + if(getPassThroughSocketContext() == null) { + requiresRead = writeClientBoundMessage(); + }//if// + else { + synchronized(this) { + requiresRead = writeClientBoundMessage(); + }//synchronized// + }//else// + }//if// + + //If bytes were produced then process them.// + if(sslResult.bytesProduced() > 0) { + //If we are not passing all content to another process then handle it by calling processClientRequest, otherwise pass it through.// + if(getPassThroughSocketContext() == null) { + if(isWebsocket) { + requiresRead = processWebsocketFrame(unencryptedReadBuffer, key); + }//if// + else { + requiresRead = processClientRequest(unencryptedReadBuffer, key); + }//else// + }//if// + else { +//TODO: Comment me. +//Debug.log("Receiving message (" + unencryptedReadBuffer.remaining() + " bytes) from client for git."); + //Queue the data for sending to the remote process via the pass through socket context.// + getPassThroughSocketContext().passThrough(unencryptedReadBuffer); + requiresRead = true; + }//else// + }//if// + }//else if// + else { + //Should never happen.// + Debug.log(new RuntimeException("Unexpected ssl engine status code.")); + close(); + }//else// + }//while// + }//else if// + else { + //Wait for the client to send more data.// + keepReading = false; + }//else// + }//while// + }//if// + }//if// + else { + int count = 1; + ByteBuffer socketReadBuffer = this.socketReadBuffer; + int loopCount = 0; + + //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// + //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// + while(loopCount < 10 && requiresRead && count > 0) { + loopCount++; + //Don't allow data to be left on the socket read buffer.// + socketReadBuffer.position(0); + socketReadBuffer.limit(socketReadBuffer.capacity()); + count = channel.read(socketReadBuffer); + socketReadBuffer.flip(); + + //Setup the pass through socket if the application is an instance of IPassThroughDomain.// + if(count != -1 && isFirstUnencryptedMessage) { + //Read enough of the header to identify the application.// + if(processRequestedHost(socketReadBuffer, key)) { + //Create a pass through socket and context and attach it to this context if the application is setup as a pass through to another process.// + IWebApplication application = webApplicationContainer.getWebApplication(); + + if(application instanceof IPassThroughDomain) { + IPassThroughDomain passThroughDomain = ((IPassThroughDomain) application); + + //Setup the pass through socket context (and socket channel). All data will be sent to this context to be sent to the remote process.// + relatedSocketContext = new PassThroughSocketContext(this, passThroughDomain.getAddress(), passThroughDomain.getPort()); + }//if// + + isFirstUnencryptedMessage = false; + }//if// + else { + //We couldn't even read the host from the first bytes sent by the client - very unusual (it should be in the first couple hundred bytes - less than a single packet).// + //TODO: We could cycle and wait for the next packet. For now just close the socket since this should never really happen in the first place. + close(); + }//else// + }//if// + + if(count == -1) { + //The socket has been closed by the client.// + close(); + }//if// + else if(relatedSocketContext != null) { + relatedSocketContext.passThrough(socketReadBuffer); + + if(socketReadBuffer.remaining() > 0) { + Debug.log(new RuntimeException("Remaining bytes found on the read buffer!")); + }//if// + }//else if// + else if(isWebsocket) { + requiresRead = processWebsocketFrame(socketReadBuffer, key); + + if(socketReadBuffer.remaining() > 0) { + Debug.log(new RuntimeException("Remaining bytes found on the read buffer!")); + }//if// + }//else if// + else if(socketReadBuffer.hasRemaining()) { + requiresRead = processClientRequest(socketReadBuffer, key); + + if(socketReadBuffer.remaining() > 0) { + Debug.log(new RuntimeException("Remaining bytes found on the read buffer!")); + }//if// + }//else// + else { + break; + }//else// + }//while// + }//else// +}//processRequest()// +/** + * Processes a single websocket frame if there is enough data in the fragment. + *key before using this attribute. */
- private boolean isUsed = false;
- /** A socket context related to this one (when two are tied together such that data from one immediately is sent to the other). */
- protected AbstractSocketContext relatedSocketContext = null;
-
- /**
- * Gets the lockable (synchronizable) object for this context. For contexts with a related context, only one of the two will be returned, such that a single synchronize block covers both contexts.
- * @return The object to synchronize on such that two threads don't attempt to interact with the context at the same time (AsynchronousSocketChannel required for that).
- */
- protected abstract Object getLock();
- /**
- * Writes the next responses/messages in the sequence.
- * @throws IOException
- */
- protected abstract void writeOutgoingMessages() throws IOException;
- /**
- * Reads the next requests/messages received via the socket.
- * @throws IOException
- */
- protected abstract void readIncomingMessages() throws IOException;
- /**
- * Passes the message through to a receiving process via a second socket.
- * @param buffer The buffer containing the message. This buffer will not be retained by this method call, and can be reused by the caller.
- * @return Whether the whole message was transfered.
- */
- protected abstract boolean passThrough(ByteBuffer buffer);
- /**
- * Closes the socket context and cleans up.
- */
- protected abstract void close();
- /**
- * Gets the socket context related to this one (when two are tied together such that data from one immediately is sent to the other).
- * @return The related socket context, or null if none exists (data not forwarded to a remote server).
- */
- protected AbstractSocketContext getRelatedSocketContext() {return relatedSocketContext;}
- /**
- * Determines whether the socket has a pending write operation.
- */
- protected abstract boolean hasPendingWrite();
- /**
- * Called to notify the network listener that a pending write operation exists for this socket.
- */
- protected void notifyListenerOfPendingWrite() {
- synchronized(key) {
- //Ignore if a thread is using this socket currently since all operation flags will be set at the end of the use of the socket.//
- if(!isUsed) {
- int ops = key.interestOps();
- boolean hasWrite = (ops & SelectionKey.OP_WRITE) != 0;
-
- if(!hasWrite) {
- key.interestOps(ops | SelectionKey.OP_WRITE);
- key.selector().wakeup();
- }//if//
- }//if//
- }//synchronized//
- }//notifyListenerOfPendingWrite()//
- }//AbstractSocketContext//
-
- private static class RegisterKeyRunnable implements Runnable {
+ public static class RegisterKeyRunnable implements Runnable {
private boolean isRun = false;
private SocketChannel channel;
private Selector selector;
private SelectionKey key = null;
- private ChannelContext context;
+ private IChannelContext context;
- public RegisterKeyRunnable(ChannelContext context, SocketChannel channel, Selector selector) {
+ public RegisterKeyRunnable(IChannelContext context, SocketChannel channel, Selector selector) {
this.context = context;
this.channel = channel;
this.selector = selector;
@@ -915,1866 +198,10 @@ public class WebServer {
public SelectionKey getKey() {return key;}
}//RegisterKeyRunnable//
- /**
- * Used by the SocketContext to create a connection to a remote process that will receive all client data once decrypted, and whose output will be encrypted and sent directly to the client.
- * Allows the web server to act as an SSL front to another web server or service.
- */
- private class PassThroughSocketContext extends AbstractSocketContext {
- private MessageBuffer pendingMessageBuffer = null;
- private MessageBuffer lastAddedMessageBuffer = null;
- /** The byte buffer used to read data from the socket. */
- public ByteBuffer socketReadBuffer = ByteBuffer.allocate(BUFFER_SIZE);
-
- public PassThroughSocketContext(SocketContext linkedClientContext, String address, int port) throws IOException {
- this.relatedSocketContext = linkedClientContext;
- SocketChannel channel = SocketChannel.open();
- RegisterKeyRunnable runnable;
-
- //Connect while still blocking - will wait until the connection finishes or times out.//
- channel.connect(new InetSocketAddress(address, port));
- //Setup the channel for non-blocking from here on out.//
- channel.configureBlocking(false);
- channel.socket().setSendBufferSize(SEND_BUFFER_SIZE);
- channel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
- channel.socket().setTcpNoDelay(false);
- networkListener.queue(runnable = new RegisterKeyRunnable(this, channel, linkedClientContext.key.selector()));
- linkedClientContext.key.selector().wakeup();
- runnable.waitForRun();
- key = runnable.getKey();
-
- //Set the initial interest ops to read.//
- synchronized(key) {
- key.interestOps(SelectionKey.OP_READ);
- }//synchronized//
-
- key.selector().wakeup();
- }//PassThroughSocketContext()//
- /* (non-Javadoc)
- * @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock()
- */
- protected Object getLock() {
- return getRelatedSocketContext();
- }//getLock()//
- /* (non-Javadoc)
- * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses()
- */
- protected synchronized void writeOutgoingMessages() throws IOException {
- //Actually this is called when a request is being sent via the pass through socket (sending the request to the remote server).//
- //Synchronized to avoid accessing the pendingMessageBuffer and lastAddedMessageBuffer at the same time as a thread that is calling passThrough(ByteBuffer) which also accesses these variables.//
- boolean result = true;
-
- if(result && pendingMessageBuffer != null) {
- //Check to see if the outbound message is prepared to send more content.//
- if(!pendingMessageBuffer.getBuffer().hasRemaining()) {
- //Load the next pending outbound message in the chain.//
- if(pendingMessageBuffer.getNext() != null) {
- pendingMessageBuffer = pendingMessageBuffer.getNext();
- }//if//
- else {
- //Wait until additional message bytes are available.//
- result = false;
- pendingMessageBuffer = null;
- lastAddedMessageBuffer = null;
- }//else//
- }//if//
-
- //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.//
- while(result && (pendingMessageBuffer != null) && pendingMessageBuffer.getBuffer().hasRemaining()) {
- //Write the bytes to the stream.//
- ((SocketChannel) key.channel()).write(pendingMessageBuffer.getBuffer());
-
- //If not all the bytes could be written then we will need to wait until we can write more.//
- if(pendingMessageBuffer.getBuffer().hasRemaining()) {
- result = false;
- }//if//
- else {
- //Load the next pending outbound message in the chain.//
- if(pendingMessageBuffer.getNext() != null) {
- pendingMessageBuffer = pendingMessageBuffer.getNext();
- }//if//
- else {
- //Wait until additional message bytes are available.//
- result = false;
- pendingMessageBuffer = null;
- lastAddedMessageBuffer = null;
- }//else//
- }//else//
- }//while//
- }//if//
- }//processResponses()//
- /* (non-Javadoc)
- * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest()
- */
- protected void readIncomingMessages() throws IOException {
- //Actually this is called when a response is being processed via the pass through socket (remote process that received the request).//
- int count = 1;
- int loopCount = 0;
- boolean result = true;
- SocketChannel channel = (SocketChannel) key.channel();
-
- //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.//
- //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.//
- while(loopCount < 10 && result && count > 0) {
- loopCount++;
- count = channel.read(socketReadBuffer);
- socketReadBuffer.flip();
-
- if(count == -1) {
- //The socket has been closed by the client.//
- try {relatedSocketContext.close();} catch(Throwable e) {}
- }//if//
- else if(socketReadBuffer.hasRemaining()) {
- result = relatedSocketContext.passThrough(socketReadBuffer);
- socketReadBuffer.compact();
- }//else//
- else {
- socketReadBuffer.compact();
- break;
- }//else//
- }//while//
- }//processRequest()//
- /* (non-Javadoc)
- * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer)
- */
- protected synchronized boolean passThrough(ByteBuffer buffer) {
- ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining());
- MessageBuffer message;
-
- //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).//
- messageBytes = ByteBuffer.allocate(buffer.remaining());
- messageBytes.put(buffer);
- message = new MessageBuffer(messageBytes);
-
- //Chain the message into the linked list.
- if(lastAddedMessageBuffer == null) {
- pendingMessageBuffer = lastAddedMessageBuffer = message;
- }//if//
- else {
- lastAddedMessageBuffer.setNext(message);
- lastAddedMessageBuffer = message;
- }//else//
-
- return true;
- }//passThrough()//
- protected synchronized void close() {
- try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {}
- try {if(key != null) key.cancel();} catch(Throwable e) {}
- }//close()//
- /* (non-Javadoc)
- * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite()
- */
- protected boolean hasPendingWrite() {
- return pendingMessageBuffer != null;
- }//hasPendingWrite()//
- }//PassThroughSocketContext//
-
- /**
- * Provides a place for connection oriented data.
- * Note that a client server session can have multiple socket contexts (one for each socket) and that the socket context may be used to access multiple applications hosted on this server.
- */ - private class SocketContext extends AbstractSocketContext implements IWebApplicationContainerProvider, IConnectionContext { - public final int id; - /** The server socket reference that created the socket. This will be null if the socket was not created with a server socket (shouldn't happen in a web server). */ - public ServerSocketContext serverSocketContext = null; - /** The web application's container for the application associated with this connection to a client. A connection is only allowed to access a single application. */ - public WebApplicationContainer webApplicationContainer = null; - /** A temporary storage location for part of an HTTP message header. Warning: This is used by the reading thread ONLY - never by the processing/writing threads. */ - public StringBuffer messageHeaderFragment = null; - /** A reference to the request object currently being processed. Warning: This is used by the reading thread ONLY - never by the processing/writing threads. */ - public Request request = null; - /** The count of stored content bytes for the request. This is valid for any type of request. */ - public int requestContentPosition = 0; - /** The multi-part message characters remaining to be processed from the last message fragment received. */ - public byte[] remainingPartBytes = null; - /** The multi-part message count of characters/bytes read thus far. This ensures the entire message is read properly. */ - public int partReadCount = 0; - /** The mutli-part message count of characters/bytes written to the buffer as the message from the client is parsed. Used to index into the buffer by the various ContentPart instances generated. */ - public int partWriteCount = 0; - /** The part that is currently being read and spans more than one message fragment. */ - public ContentPart currentPart = null; - /** Whether the final part boundary has already been found. If this is true then the message bytes should keep getting read until the partReadCount == contentLength. */ - public boolean endPartBoundaryFound = false; - /** The bytes containing the unencrypted outbound message that is waiting for the socket to allow a write. */ - public MessageBuffer/*ByteBuffer*/ currentOutboundMessage = null; - /** The last message buffer added to the pending outbound message chain (linked list). Used only for pass through contexts currently since locally handled messages link the reponses together into a list. */ - private MessageBuffer lastOutboundMessage = null; - /** The byte buffer used to read data from the socket. This must be null if a SSL engine is provided. */ - public ByteBuffer socketReadBuffer = null; - /** The buffer used to store the initial data in a SSL/TLS connection. The buffering is necessary to allow us to pre-read the client's hello message to gather the domain the client is connecting to - allowing for the correct SSL engine to be used. */ - public StreamBuffer initialBuffer = null; - /** Whether the TLS domain extension was used. If false then the default SSL domain should be used to attempt an anonymous connection with the client such that an error page can be displayed prompting the user to upgrade their browser to something modern. */ - public boolean tlsFailure = false; - /** The domain name the client is connecting to. This is provided by the initial TLS hello message, and is used to select the correct SSL context for the desired web application. */ - public String domain = null; - /** Non-null if the socket is using SSL to secure communications. */ - public SSLEngine sslEngine = null; - /** Whether the ssl engine needs to send handshake data to the client and has not yet generated it. */ - public boolean sslNeedsWrap = false; - /** The reusable buffer containing encrypted data from the client. */ - public ByteBuffer encryptedReadBuffer = null; - /** The reusable buffer containing unencrypted data from the client. */ - public ByteBuffer unencryptedReadBuffer = null; - /** The reusable buffer containing encrypted data being sent to the client. */ - public ByteBuffer encryptedWriteBuffer = null; - /** The last used request number which identifies the sequence for the requests. */ - private int lastRequestNumber = 0; - /** The response we are currently processing. */ - private Response currentResponse = null; - /** The response we are will process last. */ - private Response lastResponse = null; - /** Tracks the number of bytes sent from the current response. This is only used when debugging. */ - private int sentBytes = 0; - /** Tracks the debug output for the current request/response cycle. This is only used when debugging. */ - public StringBuffer debugBuffer = null; //debug ? new StringBuffer(1000) : null; - /** The optional application specific data indexed by an application spectific key. Elements (values, not keys) which implement ISessionLifecycleAware will be released when the socket context is released. */ - private LiteHashMap applicationDataMap; - /** Used to identify the first unencrypted message (ignored if ssl is being used) so that forwarding to a remote server can be accomplished. */ - private boolean isFirstUnencryptedMessage = true; - /** Whether this is a websocket connection. */ - private boolean isWebsocket = false; - /** The protocol passed when this connection was upgraded to a websocket. */ - private String websocketProtocol = null; - /** The maximum number of bytes in an allowed websocket message (may be composed of multiple frames, does not include the frame header sizes). While this is a long, we really can't read longer than Integer.MAX_VALUE sized frames. So the message might be within size limits, but it might still be rejected if the frame exceeds the server's capacity to read a frame. */ - private long websocketMaxMessageLength = 0; - /** The reusable frame header buffer. */ - private byte[] websocketFrameHeader = null; - /** The index into the frame header of the last read byte. */ - private int websocketFrameHeaderIndex = 0; - /** The next partial message (single messages may be broken into frames by the client). */ - private StreamBuffer websocketPartialReceivedMessage = null; - /** The remaining bytes to be read on the current message frame (the frame header and part of the frame was read previously). */ - private int websocketFrameRemainder = 0; - /** Whether the frame currently being read in the current message being received is the last frame in the message (when the frame is fully read it will trigger the message to be processed). */ - private boolean websocketIsLastFrameInMessage = false; - /** The op code for the currently reading message, or zero if the last message was completed. */ - private int websocketMessageOpCode = 0; - /** The currently reading frame's mask key used to decode the frame data. */ - private byte[] websocketMessageMaskKey = null; - /** The queue used to hold outgoing messages before they are sent. Will contain only String, byte[], or IConnectionContext.IStreamedWebsocketMessage instances. */ - private Queue websocketPendingMessages = null; - /** The streambuffer for the currently sending message. This will be for the current part of the streaming message if websocketStreamingMessage is non-null. */ - private ByteBuffer websocketSendingMessage = null; - /** The streaming message handler which will be set only if the currently sending message is streaming. */ - private IStreamedWebsocketMessage websocketStreamingMessage = null; - /** The application specified handler called when websocket events occur (messages received, socket closed, etc). */ - private WebsocketHandler websocketHandler = null; - - public SocketContext(ServerSocketContext serverSocketContext) { - super(); - synchronized(SocketContext.class) { - this.id = nextSocketContextId++; - }//synchronized// - }//SocketContext()// - /** - * Gets the pass through socket context associated with this socket context, or null if none exists. - * @return The socket context for the pass through socket used to handle all incoming requests from the client on this socket. - */ - protected PassThroughSocketContext getPassThroughSocketContext() { - return (PassThroughSocketContext) getRelatedSocketContext(); - }//getPassThroughSocketContext()// - /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock() - */ - protected Object getLock() { - return this; - }//getLock()// - protected synchronized void close() { - try { - if(websocketHandler != null) { - websocketHandler.connectionClosed(); - websocketHandler = null; - }//if// - }//try// - catch(Throwable e) { - Debug.log(e); - }//catch// - - try { - if(applicationDataMap != null) { - for(IIterator iterator = applicationDataMap.valueIterator(); iterator.hasNext(); ) { - Object next = iterator.next(); - - //Ensure the code keeps going even if there is a problem cleaning up.// - try { - if(next instanceof ISessionLifecycleAware) ((ISessionLifecycleAware) next).release(); - }//try// - catch(Throwable e) { - Debug.log(e); - }//catch// - }//for// - }//if// - }//try// - catch(Throwable e) { - Debug.log(e); - }//catch// - - try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {} - try {if(key != null) key.cancel();} catch(Throwable e) {} - //Clean up after the response and request.// - //try {while(currentOutboundMessage != null) {currentOutboundMessage.close(); currentOutboundMessage = currentOutboundMessage.getNext();}} catch(Throwable e2) {} - try {if(currentResponse != null) currentResponse.close();} catch(Throwable e2) {} - - if(getPassThroughSocketContext() != null) { - getPassThroughSocketContext().close(); - }//if// - }//close()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#getApplicationData(java.lang.String) - */ - public Object getApplicationData(String key) { - return applicationDataMap == null ? null : applicationDataMap.get(key); - }//getApplicationData()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#setApplicationData(java.lang.String, java.lang.Object) - */ - public void setApplicationData(String key, Object applicationData) { - if(applicationDataMap == null) { - applicationDataMap = new LiteHashMap(10); - }//if// - - applicationDataMap.put(key, applicationData); - }//setApplicationData()// - /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.IWebApplicationContainerProvider#getWebApplicationContainer() - */ - public WebApplicationContainer getWebApplicationContainer() { - return webApplicationContainer; - }//getWebApplicationContainer()// - /** - * Whether the socket is an SSL/TLS socket. - * @return Whether data streamed over the socket is encrypted using SSL/TLS. - */ - public boolean isSsl() { - return socketReadBuffer == null; - }//isSsl()// - /** - * Queues an outbound client message by adding it to the linked list of message buffers. Handles synchronizing on this SocketContext to prevent multiple threads from adding messages at once. Also handles notifying the ServiceListener that it should update it's write flags for the socket. - * @param messageBuffer The buffer to be added to the end. - */ - private void queueOutboundClientMessage(MessageBuffer messageBuffer) { - boolean notify = false; - - synchronized(this) { - if(currentOutboundMessage == null) { - lastOutboundMessage = currentOutboundMessage = messageBuffer; - notify = true; - }//if// - else { - lastOutboundMessage.setNext(messageBuffer); - lastOutboundMessage = messageBuffer; - }//else// - }//synchronized()// - - if(notify) { - notifyListenerOfPendingWrite(); - }//if// - }//queueOutboundClientMessage()// - private void writeSessionCookies(PrintStream pout) { - Response response = currentResponse; - ISession session = response.getSession(); - - if(session != null) { - //Write the session id only if it has changed.// - if(!Comparator.equals(response.getRequest().getSessionId(), session.getSessionId())) { - pout.print("Set-Cookie: sessionId=" + session.getSessionId() + ";path=/;\r\n"); - }//if// - - //Write the secure session id only if it has changed.// - if(!Comparator.equals(response.getRequest().getSecureSessionId(), session.getSecureSessionId())) { - pout.print("Set-Cookie: secureSessionId=" + (session.getSecureSessionId() == null ? "" : session.getSecureSessionId()) + ";path=/;secure\r\n"); - }//if// - - if(response.getRequest().isLoggedIn() != session.getIsLoggedIn()) { - pout.print("Set-Cookie: isLoggedIn=" + session.getIsLoggedIn() + ";path=/;\r\n"); - }//if// - }//if// - }//writeSessionCookies()// - /** - * Processes the next response in the sequence. - *Note: The caller must synchronize on this context to prevent multiple threads from accessing the context at the same time.
- * @result Whether request is in a receive state. Will be false if the request generated a response that could not be completely transmitted. - */ - private void prepareResponse() { - Response response = currentResponse; - Request request = (Request) response.getRequest(); - byte[] headerBytes = null; - IContent content = null; - ByteBuffer buffer = null; - - try { - //Wrap the response in http cloths. The HeaderFieldNames will be set if the response was provided with a completely custom HTTP header to be used.// - if(response.getHeaderFieldNames() != null) { - ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); - PrintStream pout = new PrintStream(bout, true, "ASCII"); - LiteList headerFieldNames = response.getHeaderFieldNames(); - LiteHashMap headerFieldMap = response.getHeaderFieldMap(); - - //Write the response line which is mapped to the null field name.// - pout.print(headerFieldMap.get(null)); - pout.print("\r\n"); - - //Write the rest of the response header lines in order.// - for(int index = 0; index < headerFieldNames.getSize(); index++) { - String headerFieldName = (String) headerFieldNames.get(index); - String headerFieldValue = (String) headerFieldMap.get(headerFieldName); - - if(headerFieldName.equals("Server")) { - pout.print("Server: DE/1.0"); - }//if// - else { - pout.print(headerFieldName); - pout.print(": "); - pout.print(headerFieldValue); - }//else// - - pout.print("\r\n"); - }//for// - - //Write out any cookies necessary to retain our session.// - writeSessionCookies(pout); - - //End the header.// - pout.print("\r\n"); - pout.close(); - headerBytes = bout.toByteArray(); - - //Prepare the content for delivery.// - content = response.getContent(); - }//if// - else if(response.getForwardUri() != null) { - ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); - PrintStream pout = new PrintStream(bout, true, "ASCII"); - //String today = format.format(new Date()); - - //The 303 code may not be fully supported by browsers.// - //if(request.getHttpVersion().equalsIgnoreCase("HTTP/1.1") || request.getHttpVersion().equalsIgnoreCase("HTTP/1.2")) { - // pout.print("HTTP/1.1 303 Forwarded\r\n"); - //}//if// - //else { - pout.print("HTTP/1.1 302 Moved Temporarily\r\n"); - //}//else// - - writeSessionCookies(pout); - - pout.print("Location: " + response.getForwardUri() + "\r\n"); - - //Note: Encoded URL's will have their parameters in the content, not in the URL.// - if(response.getContent() == null) { - pout.print("Content-Length: 0\r\n"); - }//if// - else { - pout.print("Content-Length: " + response.getContent().getSize() + "\r\n"); - pout.print("Content-Type: application/x-www-form-urlencoded\r\n"); - }//else// - - pout.print("\r\n"); - pout.close(); - headerBytes = bout.toByteArray(); - }//else if// - else if((content = response.getContent()) != null) { //Convert the result into a stream of bytes.// - ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); - PrintStream pout = new PrintStream(bout, true, "ASCII"); - Date lastModifiedDate = content.getLastModifiedDate(); - String cacheDirective = null; - IMimeType mimeType = content.getMimeType(response.getApplication() != null ? response.getApplication().getMimeTypeProvider() : null); - boolean isDownloaded = content != null && (content.getIsDownloaded() == null ? mimeType != null && mimeType.isDownloaded() : content.getIsDownloaded().booleanValue()); - boolean compress = response.getCompress().booleanValue() && (mimeType == null || mimeType.isCompressable()); - int compressionType = 0; //0: none, 1: gzip, ..// - - if(compress) { - //Check for an encoding allowed by the client that we know how to use.// - if(request.getAllowedEncodings() == null || request.getAllowedEncodings().length < 1) { - compress = false; - }//if// - else { - compress = false; - - //Ensure we have an allowed encoding we know how to use.// - for(int index = 0; !compress && index < request.getAllowedEncodings().length; index++) { - String encoding = request.getAllowedEncodings()[index]; - - if(encoding.equalsIgnoreCase("gzip")) { - compress = true; - compressionType = 1; - }//if// - }//for// - }//else// - }//if// - - if(response.isError()) { - if(response.getHeader() != null) { - pout.print(response.getHeader()); - }//if// - else { - pout.print("HTTP/1.1 404 Resource Not Found\r\n"); - }//else// - }//if// - else if(response.getCustomHeader() != null) { - pout.print(response.getCustomHeader()); - }//else if// - else if(isDownloaded && request.getRange() != null) { - pout.print("HTTP/1.1 206 Partial Content\r\n"); - }//else if// - else { - pout.print("HTTP/1.1 200 OK\r\n"); - }//else// - - pout.print("Content-Length: " + (content != null ? content.getSize() : 0) + "\r\n"); - - if(compress) { - //TODO: Add others? - if(compressionType == 1) { - content = new GzipContent(content); - pout.print("Content-Encoding: gzip\r\n"); - }//if// - }//if// - - if(content != null) { - //Note: The character set gives IE indigestion for some reason.// - pout.print("Content-Type: " + (mimeType != null ? mimeType.getMimeName() : "text/html") + "; charset=" + (response.getCharacterSet() == null ? "UTF-8" : response.getCharacterSet()) + "\r\n"); - cacheDirective = content.getCacheDirective(); - - if(isDownloaded) { - pout.print("Content-Disposition: attachment; filename=\"" + content.getDownloadName() + "\";\r\n"); - pout.print("Accept-Ranges: bytes\r\n"); - - if(request.getRange() != null) { -// Debug.log("Sending a ranged response: " + request.getRange() + " content range: (" + content.getStart() + " - " + content.getEnd() + "/" + content.getSize() + ")."); - pout.print("Range: " + request.getRange() + "\r\n"); - pout.print("Content-Range: bytes " + content.getStart() + "-" + content.getEnd() + "/" + content.getSize() + "\r\n"); - }//if// - }//if// - }//if// - - writeSessionCookies(pout); - - pout.print("Server: DE/1.0\r\n"); - //TODO: IE has a problem with caching and forwarding/redirecting. A page that redirects to another page that was previously cached does not result in IE sending a request for the forwarded content.// - //private / no-cache - - if(content.getExpiresDirective() != null) { - pout.print("Expires: " + getHttpDateFormat().format(content.getExpiresDirective())); - }//if// - - if(cacheDirective != null) { - pout.print("Cache-Control: " + cacheDirective + "\r\n"); - }//if// - else { - int cacheLength = content.getCacheLength() != null ? content.getCacheLength().intValue() : mimeType != null ? mimeType.getDefaultCacheLength() : IMimeType.CACHE_LENGTH_NEVER_CACHE; - - if(cacheLength > 0) { - pout.print("Cache-Control: public, max-age=" + cacheLength + "\r\n"); - }//if// - else if(cacheLength == IMimeType.CACHE_LENGTH_ALWAYS_TEST) { - pout.print("Cache-Control: public, pre-check=0, post-check=120\r\n"); - }//else if// - else if(cacheLength == IMimeType.CACHE_LENGTH_NEVER_CACHE) { - pout.print("Cache-Control: no-cache\r\n"); - }//else if// - else { - pout.print("Cache-Control: no-store\r\n"); - }//else// - }//else// - - //TODO: Determine if we need to use age. - //pout.print("Age: 0\r\n"); - //TODO: Determine if we need to use ETags - - if(lastModifiedDate != null) { - SimpleDateFormat format = getHttpDateFormat(); - - pout.print("Last-Modified: " + format.format(lastModifiedDate) + "\r\n"); - pout.print("Date: " + format.format(new Date()) + "\r\n"); - }//if// - - pout.print("\r\n"); - headerBytes = bout.toByteArray(); - }//else if// - else { - ByteArrayOutputStream bout = new ByteArrayOutputStream(1000); - PrintStream pout = new PrintStream(bout, true, "ASCII"); - - if(response.isError()) { - if(response.getHeader() != null) { - pout.print(response.getHeader()); - }//if// - else { - pout.print("HTTP/1.1 404 Resource Not Found\r\n"); - }//else// - }//if// - else if(response.getCustomHeader() != null) { - pout.print(response.getCustomHeader()); - }//else if// - else { - Debug.log(new RuntimeException("The response to: " + response.getRequest().getHeaderText() + " had no response content!")); - pout.print("HTTP/1.1 200 OK\r\n"); - }//else// - - writeSessionCookies(pout); - pout.print("Content-Length: 0\r\n"); - pout.print("Server: DE/1.0\r\n"); - pout.print("\r\n"); - pout.close(); - headerBytes = bout.toByteArray(); - }//else// - - buffer = ByteBuffer.allocate(headerBytes.length > 2000 ? headerBytes.length : 2000); - buffer.put(headerBytes); - - if(debug) { - //Test code... - ByteBuffer buffer2 = ByteBuffer.allocate(headerBytes.length); - buffer2.put(headerBytes); - buffer2.flip(); - CharBuffer ch = decoder.decode(buffer2); -// debugBuffer.append("Sending message:\n"); -// debugBuffer.append(ch.toString()); -// debugBuffer.append("\nResponse Size: " + (headerBytes.length + (content != null ? content.getSize() : 0)) + "\n"); - Debug.log(ch.toString()); - }//if// - - //Ignore the content if we are only accessing the header.// -// if(content != null && request.getRequestType() != Request.TYPE_HEAD) { -// content.get(buffer); -// }//if// - - //Save the buffer as the current pending outbound message for this socket context.// - currentOutboundMessage = new MessageBuffer(buffer, response, content != null && request.getRequestType() != Request.TYPE_HEAD ? content : null); - }//try// - catch(Throwable e) { - Debug.log("Fatal Error: Failed to build and send the response message due to an exception.", e); - //Force the channel to close.// - try {key.channel().close();} catch(Throwable e2) {} - //Clean up after the request and response.// - try {response.close();} catch(Throwable e2) {} - }//catch// - }//prepareResponse()// - /** - * Adds a HTTP response to the socket context. - *Note: We must synchronize since a socket could be used to access multiple applications and thus mutliple sessions.
- * @param response The response to be added. - * @result Whether request is in a receive state. Will be false if the request generated a response that could not be completely transmitted. - */ - public synchronized boolean sendHttpResponse(Response response) { - if(currentResponse != null) { - lastResponse.setNextResponse(response); - lastResponse = response; - }//if// - else { - lastResponse = currentResponse = response; - sentBytes = 0; - prepareResponse(); - - //Note: Not going to process the response on this thread. Allow the flag to be set for writing to the socket, and have the next thread in the network listener handle the write. This allows for cleaner code and pipelining without all the synchronizing. -// result = internalProcessResponses(); - }//else// - - request = null; - - return false; - }//sendHttpResponse()// - /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer) - */ - protected synchronized boolean passThrough(ByteBuffer buffer) { - ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); - MessageBuffer message; - - //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// - messageBytes = ByteBuffer.allocate(buffer.remaining()); - messageBytes.put(buffer); - message = new MessageBuffer(messageBytes); - - //Chain the message into the linked list. - if(lastOutboundMessage == null || currentOutboundMessage == null) { - currentOutboundMessage = lastOutboundMessage = message; - }//if// - else { - lastOutboundMessage.setNext(message); - lastOutboundMessage = message; - }//else// - - return true; - }//passThrough()// - /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() - */ - protected void writeOutgoingMessages() throws IOException { - if(getPassThroughSocketContext() != null) { - //Synchronized to avoid multiple threads accessing the pendingOutboundMessage chain at one time and updating the write flag out of order (could happen if we enabled request chaining over a single socket).// - synchronized(this) { - writeClientBoundMessage(); - }//synchronized// - }//if// - else if(isWebsocket) { - //Right after upgrading the socket we have one last HTTP response to process.// - if(currentResponse != null) { - internalProcessResponses(); - }//if// - - internalProcessWebsocketMessages(); - }//else if// - else { - //Go directly to writing the client response if we are just passing everything through to another process.// - internalProcessResponses(); - }//else// - }//processCurrentResponse()// - /** - * Loads the next outbound websocket message and attempts to write it to the socket until all outbound messages have been sent, or the socket's buffers are full and a wait is required. - * If a message could only be partially sent then the next call will attempt to finish sending it. - */ - private void internalProcessWebsocketMessages() { - if(websocketSendingMessage == null) { - loadNextWebsocketMessage(); - }//if// - - while(websocketSendingMessage != null) { - //If the socket is open then send the next buffer of data.// - if(key.channel().isOpen()) { - if(currentOutboundMessage != null) { - //Put the sending message in a MessageBuffer (pendingOutboundMessage).// - currentOutboundMessage = new MessageBuffer(websocketSendingMessage); - }//if// - - //Write the pendingOutboundMessage to the socket.// - if(writeClientBoundMessage()) { - websocketSendingMessage = null; - currentOutboundMessage = null; - }//if// - }//if// - - //If we finished sending the message then load the next one.// - if(websocketSendingMessage == null) { - loadNextWebsocketMessage(); - }//if// - }//while// - }//internalProcessWebsocketMessages()// - /** - * Loads and prepares the next websocket message from the queue of pending messages. - * Clears the pending message attributes if there isn't a pending message to be processed. - * The caller can check websocketSendingMessage == null to see if there is a ready message. - */ - private void loadNextWebsocketMessage() { - Object next = null; - boolean isLastPart = true; - - if(websocketStreamingMessage != null && websocketStreamingMessage.hasNextPart()) { - next = websocketStreamingMessage.getNextPart(); - isLastPart = !websocketStreamingMessage.hasNextPart(); - - //Ensure that we got a string or byte array.// - if(!(next instanceof String || next instanceof byte[])) { - throw new RuntimeException("Invalid streaming message part type."); - }//if// - }//if// - else { - synchronized(websocketPendingMessages) { - if(websocketPendingMessages.getSize() > 0) { - next = websocketPendingMessages.dequeue(); - }//if// - }//synchronized// - }//else// - - if(next != null) { - byte[] bytes = null; - int opCode = 0; - int length = 0; - - if(next instanceof String) { - try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} - opCode = websocketStreamingMessage == null ? 0x01 : 0; - length = bytes.length; - }//if// - else if(next instanceof byte[]) { - bytes = (byte[]) next; - opCode = websocketStreamingMessage == null ? 0x02 : 0; - length = bytes.length; - }//else if// - else if(next instanceof Byte) { //Control Message// - opCode = ((Byte) next).byteValue(); - }//else if// - else if(next instanceof IStreamedWebsocketMessage) { - websocketStreamingMessage = (IStreamedWebsocketMessage) next; - next = websocketStreamingMessage.getNextPart(); - isLastPart = !websocketStreamingMessage.hasNextPart(); - - if(next instanceof String) { - try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} - opCode = 0x01; //Text// - length = bytes.length; - }//if// - else if(next instanceof byte[]) { - bytes = (byte[]) next; - opCode = 0x02; //Binary// - length = bytes.length; - }//else if// - else { - throw new RuntimeException("Invalid streaming message part type."); - }//if// - }//else if// - - websocketSendingMessage = ByteBuffer.allocate(14 + length); - websocketSendingMessage.put((byte) (isLastPart ? 0x8 : 0)); - websocketSendingMessage.put((byte) opCode); - - //Write the length differently based on how long the content is.// - if(length < 126) { - websocketSendingMessage.put((byte) length); -// websocketSendingMessage.putLong(0, StreamBuffer.NUMBER_MSF); - }//if// - else if(length < 65535) { - websocketSendingMessage.put((byte) 126); - websocketSendingMessage.putShort((short) (length & 0xFFFF)); -// websocketSendingMessage.putShort((short) 0); - websocketSendingMessage.putInt(0); - }//else if// - else { - websocketSendingMessage.put((byte) 127); - websocketSendingMessage.putLong((long) length); - }//else// - - //The server doesn't use a mask key.// -// websocketSendingMessage.putInt(0); - //Put the content at the end of the message.// - websocketSendingMessage.put(bytes); - }//if// - else { - websocketSendingMessage = null; - websocketStreamingMessage = null; - }//else// - }//loadNextWebsocketMessage()// - /** - * @return - */ - private synchronized void internalProcessResponses() { - boolean finishedSending = true; - - //Keep sending responses while the buffers are not full and there is another response to send.// - while(finishedSending && currentResponse != null) { - //If the socket is open then send the next buffer of data.// - if(key.channel().isOpen()) { - //Send the pending response object's prepared buffer of data.// - finishedSending = writeClientBoundMessage(); - }//if// - - //Close the response if successfully sent, or if the socket is closed.// - if(finishedSending || !key.channel().isOpen()) { - try {currentResponse.close();} catch(Throwable e) {} - }//if// - - //If we finished sending the current response then load the next one.// - if(finishedSending) { - currentResponse = currentResponse.getNextResponse(); - - if(currentResponse == null) { - lastResponse = null; - }//if// - else if(key.channel().isOpen()) { - //Prep the next response object for sending.// - prepareResponse(); - }//else// - else { - //Clean up after all the left over responses.// - while(currentResponse != null) { - currentResponse.close(); - currentResponse = currentResponse.getNextResponse(); - }//while// - - currentResponse = null; - lastResponse = null; - }//else// - }//if// - }//while// - }//processCurrentResponse()// - /** - * Sends a response to the client. - * @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns. - */ - private boolean writeClientBoundMessage() { - boolean sendMore = true; - -// if(debug) { -// debugBuffer.append("Starting a write cycle.\n"); -// }//if// - - try { - //Process SSL output first.// - if(sslEngine != null) { - //If we have part of an SSL frame then try to send it first.// - if(encryptedWriteBuffer.hasRemaining()) { - int remaining = encryptedWriteBuffer.remaining(); - - //Write the bytes to the stream.// - ((SocketChannel) key.channel()).write(encryptedWriteBuffer); - -// if(debug) { -// debugBuffer.append("Wrote " + (remaining - encryptedWriteBuffer.remaining()) + " encrypted bytes to the stream. " + encryptedWriteBuffer.remaining() + " remain.\n"); -// }//if// - - //Check to see if we failed to send the whole frame.// - if(encryptedWriteBuffer.hasRemaining()) { - sendMore = false; - }//if// - }//if// - - while(sendMore && sslNeedsWrap) { - SSLEngineResult handshakeResult; - - //Reset the encrypted write buffer - note that since we will never read while waiting to write data, this should always be empty.// - encryptedWriteBuffer.position(0); - encryptedWriteBuffer.limit(encryptedWriteBuffer.capacity()); - //Generate the handshake message.// - handshakeResult = sslEngine.wrap(ByteBuffer.allocate(0), encryptedWriteBuffer); - encryptedWriteBuffer.flip(); - - if(handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine buffer overflow.")); - }//if// - else if(handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine buffer underflow.")); - }//else if// - else if(handshakeResult.getStatus() == Status.CLOSED) { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine closed.")); - //TODO: Handle this closure without an infinate loop... - //Close the socket.// - try {key.channel().close();}catch(Throwable e2) {} - }//else if// - else if(handshakeResult.getStatus() == Status.OK) { - if(handshakeResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine task.")); - }//if// - else if(encryptedWriteBuffer.hasRemaining()) { - int remaining = encryptedWriteBuffer.remaining(); - - //Write the bytes to the stream.// - ((SocketChannel) key.channel()).write(encryptedWriteBuffer); - -// if(debug) { -// debugBuffer.append("Sent " + (remaining - encryptedWriteBuffer.remaining()) + " encrypted bytes.\n"); -// }//if// - - //If not all the bytes could be written then we will need to wait until we can write more.// - if(encryptedWriteBuffer.hasRemaining()) { -// if(debug) { -// debugBuffer.append("Pausing due to a partially sent packet (while ssl handshaking). Bytes actually sent: " + encryptedWriteBuffer.position() + ". Bytes remaining: " + encryptedWriteBuffer.remaining() + ".\n"); -// }//if// - - //Leave the data in the encrypted write buffer for the writing operation to send it.// - sendMore = false; - }//if// - - //Update the SSL needs wrap flag.// - if(handshakeResult.getHandshakeStatus() != HandshakeStatus.NEED_WRAP) { - sslNeedsWrap = false; - }//if// - }//else if// - else { - sslNeedsWrap = false; - }//else// - }//else if// - else { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine status code.")); - }//else// - }//while// - -// if(debug) { -// debugBuffer.append("End Handshaking SSL\n"); -// }//if// - }//if// - - if(sendMore && currentOutboundMessage != null) { - //Check to see if the outbound message is prepared to send more content. For chunked transfers the outbound message may be waiting for additional content from another stream and we should return later.// - if(!currentOutboundMessage.getBuffer().hasRemaining()) { - if(!currentOutboundMessage.loadBuffer()) { - if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) { - currentOutboundMessage = currentOutboundMessage.getNext(); - }//if// - else { - sendMore = false; - }//else// - }//if// - - if(currentOutboundMessage.getBuffer() == null) { - currentOutboundMessage = null; - lastOutboundMessage = null; - }//if// - }//if// - - //If we have an application response pending then send it now.// - if(sendMore && currentOutboundMessage.getBuffer().hasRemaining()) { - if(sslEngine != null) { - //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// - while(key.channel().isOpen() && sendMore && (currentOutboundMessage != null) && currentOutboundMessage.getBuffer().hasRemaining()) { - SSLEngineResult encryptResult; -// int offset = pendingOutboundMessage.getBuffer().position(); -//TODO: Comment me. -//int rem = pendingOutboundMessage.getBuffer().remaining(); - //Reset the encrypted write buffer.// - encryptedWriteBuffer.compact(); - //Encrypt the next message frame.// - encryptResult = sslEngine.wrap(currentOutboundMessage.getBuffer(), encryptedWriteBuffer); - encryptedWriteBuffer.flip(); -//TODO: Comment me. -//Debug.log("Encrypting/Sending to client from Git " + (rem - pendingOutboundMessage.getBuffer().remaining()) + " bytes."); - -// if(debug) { -// sentBytes += (pendingOutboundMessage.position() - offset); -// debugBuffer.append("Encrypted: " + (pendingOutboundMessage.position() - offset) + ". Total Encrypted: " + sentBytes + ". Encrypted size: " + encryptedWriteBuffer.limit() + ".\n"); -// }//if// - - if(encryptResult.getStatus() == Status.BUFFER_OVERFLOW) { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine buffer overflow.")); - }//if// - else if(encryptResult.getStatus() == Status.BUFFER_UNDERFLOW) { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine buffer underflow.")); - }//else if// - else if(encryptResult.getStatus() == Status.CLOSED) { - //Should never happen.// -// Debug.log(new RuntimeException("Unexpected ssl engine closed.")); - //TODO: Handle this closure without an infinate loop... - //Close the socket.// - try {key.channel().close();} catch(Throwable e2) {} - }//else if// - else if(encryptResult.getStatus() == Status.OK) { - //Write the bytes to the stream.// - try { - int remaining = encryptedWriteBuffer.remaining(); - - ((SocketChannel) key.channel()).write(encryptedWriteBuffer); - -// if(debug) { -// debugBuffer.append("Sent " + (remaining - encryptedWriteBuffer.remaining()) + " encrypted bytes.\n"); -// }//if// - }//try// - catch(IOException e) { - //Caught if the channel is forcably closed by the client. We will ignore it.// - }//catch// - - //If not all the bytes could be written then we will need to wait until we can write more.// - if(encryptedWriteBuffer.hasRemaining()) { - //Leave the data in the encrypted write buffer for the writing operation to send it.// - sendMore = false; - -// if(debug) { -// debugBuffer.append("Pausing due to a partially sent packet. Bytes actually sent: " + encryptedWriteBuffer.position() + ". Bytes remaining: " + encryptedWriteBuffer.remaining() + ".\n"); -// }//if// - }//if// - }//else if// - else { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine status code.")); - }//else// - - //Add more content to the buffer.// - //Note: Do this even if the last encrypted write buffer could not be fully sent - so that when it is sent there will be outbound message content.// - if(key.channel().isOpen() && currentOutboundMessage != null) { - if(!currentOutboundMessage.loadBuffer()) { - //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// - if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) { - currentOutboundMessage = currentOutboundMessage.getNext(); - }//if// - else { - //Wait until additional message bytes are available.// - sendMore = false; - }//else// - }//if// - - //If the message end has been reached then the buffer will be null.// - if(currentOutboundMessage.getBuffer() == null) { - currentOutboundMessage = null; - lastOutboundMessage = null; - }//if// - }//if// - }//while// - }//if// - else { - //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// - while(sendMore && (currentOutboundMessage != null) && currentOutboundMessage.getBuffer().hasRemaining()) { - //Write the bytes to the stream.// - ((SocketChannel) key.channel()).write(currentOutboundMessage.getBuffer()); - -// if(debug) { -// sentBytes += pendingOutboundMessage.position(); -// debugBuffer.append("Wrote " + pendingOutboundMessage.position() + " bytes to the client. Total sent: " + sentBytes + "\n"); -// }//if// - - //If not all the bytes could be written then we will need to wait until we can write more.// - if(currentOutboundMessage.getBuffer().hasRemaining()) { - sendMore = false; - }//if// - else { - if(!currentOutboundMessage.loadBuffer()) { - //Load the next pending outbound message in the chain. This is currently only used for content being passed through to another process via a second socket.// - if(currentOutboundMessage.getBuffer() == null && currentOutboundMessage.getNext() != null) { - currentOutboundMessage = currentOutboundMessage.getNext(); - }//if// - else { - //Wait until additional message bytes are available.// - sendMore = false; - }//else// - }//if// - - //If the message end has been reached then the buffer will be null.// - if(currentOutboundMessage.getBuffer() == null) { - currentOutboundMessage = null; - lastOutboundMessage = null; - }//if// - }//else// - }//while// - }//else// - }//if// - }//if// - }//try// - catch(ClosedChannelException e) { - close(); - }//catch// - catch(SSLException e) { - if(debug) { - Debug.log(e); - }//if// - - close(); - }//catch// - catch(IOException e) { - if(debug) { - Debug.log(e); - }//if// - - close(); - }//catch// - - return sendMore; - }//writeClientBoundMessage()// - /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest() - */ - protected void readIncomingMessages() throws IOException { - boolean requiresRead = true; - SocketChannel channel = (SocketChannel) key.channel(); - - if(isSsl()) { - int loopCount = 0; - boolean keepReading = true; - - if(sslEngine == null) { - SSLSession session = null; - - if(parseFirstTlsMessage(this, channel)) { - SSLContext sslContext = null; - - //Error checking.// - if(domain == null || domain.length() == 0) { - //We seem to have a choice here. We can either throw an error which ends up forcably closing the connection which for most browsers allows the client to retry with different settings (not older IE6, 7, 8 implementations), or return a pretty error which prevents users of well behaved browsers from getting the correct credentials.// - throw new TlsFailureException("Connection did not provide a domain name in the TLS hello message."); -// context.tlsFailure = true; - }//if// - - //Get the web application for the given domain.// - //Synchronize to prevent another thread from altering the service's web applications while we are accessing it.// - synchronized(WebServer.this) { - if(tlsFailure) { - domain = serverSocketContext.serviceListener.getDefaultSslDomain(); - }//if// - - webApplicationContainer = serverSocketContext.serviceListener.getWebApplicationContainer(domain); - }//synchronized// - - //Error checking.// - if(webApplicationContainer == null) { - throw new IOException("Cannot find a web application for the domain " + domain + "."); - }//if// - - //Lock on the container so it doesn't change the object it contains while we access it.// - synchronized(webApplicationContainer) { - //Get the SSLContext from the web application.// - sslContext = webApplicationContainer.getWebApplication().getSslContext(domain); - - if(sslContext == null) { - throw new IOException("Cannot get an SSLContext for the domain " + domain + " for the domain associated web application."); - }//if// - - sslEngine = sslContext.createSSLEngine(); - - //Attempt an anonymous connection to the client so we can report that they have an old browser version that is not using TLS + domain extension.// -// if(context.tlsFailure) { -// context.sslEngine.setEnabledCipherSuites(new String[] { -// "SSL_DH_anon_WITH_3DES_EDE_CBC_SHA", -// "SSL_DH_anon_WITH_DES_CBC_SHA", -// "SSL_DH_anon_WITH_RC4_128_MD5"}); -// //"SSL_DH_anon_EXPORT_WITH_RC4_40_MD5", -// //"SSL_DH_anon_EXPORT_WITH_DES40_CBC_SHA", -// //"TLS_DH_anon_EXPORT_WITH_RC4_40_MD5", -// //"TLS_DH_anon_WITH_RC4_128_MD5", -// //"TLS_DH_anon_EXPORT_WITH_DES40_CBC_SHA", -// //"TLS_DH_anon_WITH_DES_CBC_SHA", -// //"TLS_DH_anon_WITH_3DES_EDE_CBC_SHA"}); -// }//if// - - //context.sslEngine.getEnabledProtocols(); - //context.sslEngine.getEnabledCipherSuites(); - sslEngine.setUseClientMode(false); - sslEngine.beginHandshake(); - session = sslEngine.getSession(); - encryptedReadBuffer = ByteBuffer.allocate(session.getPacketBufferSize()); - encryptedReadBuffer.position(encryptedReadBuffer.limit()); - unencryptedReadBuffer = ByteBuffer.allocate(session.getApplicationBufferSize()); - unencryptedReadBuffer.position(unencryptedReadBuffer.limit()); - encryptedWriteBuffer = ByteBuffer.allocate(session.getPacketBufferSize()); - encryptedWriteBuffer.position(encryptedWriteBuffer.limit()); - }//synchronized// - - //Create a pass through socket and context and attach it to this context if the application is setup as a pass through to another process.// - IWebApplication application = webApplicationContainer.getWebApplication(); - - if(application instanceof IPassThroughDomain) { - IPassThroughDomain passThroughDomain = ((IPassThroughDomain) application); - - //Setup the pass through socket context (and socket channel). All data will be sent to this context to be sent to the remote process.// - relatedSocketContext = new PassThroughSocketContext(this, passThroughDomain.getAddress(), passThroughDomain.getPort()); - }//if// - }//if// - }//if// - - if(sslEngine != null) { - //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// - //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// - while(channel.isOpen() && loopCount < 10 && requiresRead && keepReading) { - int readCount = -1; - - //Track how many loops we run so that we don't hog all the server's resources with one connection.// - loopCount++; - //Compact the buffer, placing remaining bytes at the start of the buffer and preping for reading more bytes from the stream.// - encryptedReadBuffer.compact(); - - //If there is an initial buffer of bytes then use those first. The initial buffer is created when parsing the initial TLS message to gather the domain the client is connecting to.// - if(initialBuffer != null) { - readCount = initialBuffer.readBytes(encryptedReadBuffer); - - if(initialBuffer.getSize() == 0) { - initialBuffer = null; - }//if// - }//if// - - //If there isn't an initial buffer then fill (or finish filling) the read buffer.// - if(readCount == -1 && initialBuffer == null) { - //Read a frame of encrypted data (or handshake data).// - try { - readCount = channel.read(encryptedReadBuffer); - }//try// - catch(IOException e) { - //Ignore the forcable close error here - the channel will be closed a few lines lower in the code. TODO: Not sure if all IOExceptions should be ignored here?// - if(!e.getMessage().equalsIgnoreCase("An existing connection was forcibly closed by the remote host")) { - throw e; - }//if// - }//catch// - }//else// - - //Make the encrypted frame buffer readable.// - encryptedReadBuffer.flip(); - - //If the socket was closed then handle it, if we received enough data to process a frame then do so and continue, otherwise we must wait for the client to send more data.// - if(readCount == -1) { - //The socket has been closed by the client.// - close(); - keepReading = false; - }//if// - else if(encryptedReadBuffer.remaining() != 0) { - SSLEngineResult sslResult; - - //Sometimes the SSL Engine requires a partial read of the encrypted bytes, then a task, then reading the rest of the encrypted bytes.// - while(key.channel().isOpen() && encryptedReadBuffer.hasRemaining()) { - //Reset the unencrypted read buffer prior to decrypting the next frame.// - unencryptedReadBuffer.position(0); - unencryptedReadBuffer.limit(unencryptedReadBuffer.capacity()); - //Decrypt the message and process any SSL messages.// - sslResult = sslEngine.unwrap(encryptedReadBuffer, unencryptedReadBuffer); - unencryptedReadBuffer.flip(); - - //Check the requiresRead of the SSL processing.// - if(sslResult.getStatus() == Status.BUFFER_UNDERFLOW) { - //Buffer underflow indicates we haven't enough bytes to finish processing the next frame of data.// - break; - }//if// - else if(sslResult.getStatus() == Status.BUFFER_OVERFLOW) { - //Should never happen.// -// if(debug) Debug.log(new RuntimeException("Unexpected ssl engine buffer overflow.")); - close(); - }//else if// - else if(sslResult.getStatus() == Status.CLOSED) { - //A normal close I think...// - //Debug.log(new RuntimeException("Unexpected ssl engine closed.")); - }//else if// - else if(sslResult.getStatus() == Status.OK) { - //Run any long running tasks now.// - if(sslResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { - Runnable task = null; - - while((task = sslEngine.getDelegatedTask()) != null) { - task.run(); - }//while// - }//if// - - //If the engine requires handshake data to be wrapped and sent then do so now.// - if(sslResult.getHandshakeStatus() == HandshakeStatus.NEED_WRAP || sslResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { - sslNeedsWrap = true; - - //Need to synchronize if this is a pass through socket so that multiple threads don't access pendingOutboundMessage or lastAddedMessageBuffer (via a call to passThrough(ByteBuffer) on another thread).// - if(getPassThroughSocketContext() == null) { - requiresRead = writeClientBoundMessage(); - }//if// - else { - synchronized(this) { - requiresRead = writeClientBoundMessage(); - }//synchronized// - }//else// - }//if// - - //If bytes were produced then process them.// - if(sslResult.bytesProduced() > 0) { - //If we are not passing all content to another process then handle it by calling processClientRequest, otherwise pass it through.// - if(getPassThroughSocketContext() == null) { - if(isWebsocket) { - requiresRead = WebServer.this.processWebsocketFrame((SocketContext) this, unencryptedReadBuffer, key); - }//if// - else { - requiresRead = WebServer.this.processClientRequest((SocketContext) this, unencryptedReadBuffer, key); - }//else// - }//if// - else { -//TODO: Comment me. -//Debug.log("Receiving message (" + unencryptedReadBuffer.remaining() + " bytes) from client for git."); - //Queue the data for sending to the remote process via the pass through socket context.// - getPassThroughSocketContext().passThrough(unencryptedReadBuffer); - requiresRead = true; - }//else// - }//if// - }//else if// - else { - //Should never happen.// - Debug.log(new RuntimeException("Unexpected ssl engine status code.")); - close(); - }//else// - }//while// - }//else if// - else { - //Wait for the client to send more data.// - keepReading = false; - }//else// - }//while// - }//if// - }//if// - else { - int count = 1; - ByteBuffer socketReadBuffer = this.socketReadBuffer; - int loopCount = 0; - - //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// - //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// - while(loopCount < 10 && requiresRead && count > 0) { - loopCount++; - //Don't allow data to be left on the socket read buffer.// - socketReadBuffer.position(0); - socketReadBuffer.limit(socketReadBuffer.capacity()); - count = channel.read(socketReadBuffer); - socketReadBuffer.flip(); - - //Setup the pass through socket if the application is an instance of IPassThroughDomain.// - if(count != -1 && isFirstUnencryptedMessage) { - //Read enough of the header to identify the application.// - if(WebServer.this.processRequestedHost((SocketContext) this, socketReadBuffer, key)) { - //Create a pass through socket and context and attach it to this context if the application is setup as a pass through to another process.// - IWebApplication application = webApplicationContainer.getWebApplication(); - - if(application instanceof IPassThroughDomain) { - IPassThroughDomain passThroughDomain = ((IPassThroughDomain) application); - - //Setup the pass through socket context (and socket channel). All data will be sent to this context to be sent to the remote process.// - relatedSocketContext = new PassThroughSocketContext(this, passThroughDomain.getAddress(), passThroughDomain.getPort()); - }//if// - - isFirstUnencryptedMessage = false; - }//if// - else { - //We couldn't even read the host from the first bytes sent by the client - very unusual (it should be in the first couple hundred bytes - less than a single packet).// - //TODO: We could cycle and wait for the next packet. For now just close the socket since this should never really happen in the first place. - close(); - }//else// - }//if// - - if(count == -1) { - //The socket has been closed by the client.// - close(); - }//if// - else if(relatedSocketContext != null) { - relatedSocketContext.passThrough(socketReadBuffer); - - if(socketReadBuffer.remaining() > 0) { - Debug.log(new RuntimeException("Remaining bytes found on the read buffer!")); - }//if// - }//else if// - else if(isWebsocket) { - requiresRead = WebServer.this.processWebsocketFrame((SocketContext) this, socketReadBuffer, key); - - if(socketReadBuffer.remaining() > 0) { - Debug.log(new RuntimeException("Remaining bytes found on the read buffer!")); - }//if// - }//else if// - else if(socketReadBuffer.hasRemaining()) { - requiresRead = WebServer.this.processClientRequest((SocketContext) this, socketReadBuffer, key); - - if(socketReadBuffer.remaining() > 0) { - Debug.log(new RuntimeException("Remaining bytes found on the read buffer!")); - }//if// - }//else// - else { - break; - }//else// - }//while// - }//else// - }//processRequest()// - /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite() - */ - protected boolean hasPendingWrite() { - return currentOutboundMessage != null || (encryptedWriteBuffer != null && encryptedWriteBuffer.hasRemaining()); - }//hasPendingWrite()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#upgradeToWebsocket(java.lang.String, long, com.foundation.web.interfaces.WebsocketHandler) - */ - public void upgradeToWebsocket(String protocol, long maxMessageLength, WebsocketHandler websocketHandler) { - if(!isWebsocket) { - this.isWebsocket = true; - this.websocketProtocol = protocol; - this.websocketMaxMessageLength = maxMessageLength; - this.websocketFrameHeader = new byte[14]; - this.websocketMessageMaskKey = new byte[4]; - this.websocketPendingMessages = new Queue(20); - this.websocketHandler = websocketHandler; - }//if// - }//upgradeToWebsocket()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#getWebsocketProtocol() - */ - public String getWebsocketProtocol() { - return websocketProtocol; - }//getWebsocketProtocol()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#isWebsocket() - */ - public boolean isWebsocket() { - return isWebsocket; - }//isWebsocket()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(byte[]) - */ - public void sendWebsocketMessage(byte[] message) { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(message); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// - }//sendWebsocketMessage()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(java.lang.String) - */ - public void sendWebsocketMessage(String message) { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(message); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// - }//sendWebsocketMessage()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketPing() - */ - public void sendWebsocketPing() { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(new Byte((byte) 0x9)); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// - }//sendWebsocketPing()// - /** - * Sends a PONG response to the client's ping. - */ - public void sendWebsocketPong() { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(new Byte((byte) 0xA)); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// - }//sendWebsocketPong()// - /* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(com.foundation.web.interfaces.IConnectionContext.IStreamedWebsocketMessage) - */ - public void sendWebsocketMessage(IStreamedWebsocketMessage message) { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(message); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// - }//sendWebsocketMessage()// - }//SocketContext// - - /** - * Encapsulates the code that threads incomming socket requests and messages over sockets. - *Note that the HTTP protocol requires that the reponses be sent in order of the received requests.
- */ - private class NetworkListener implements Runnable { - private Selector selector = null; - private Iterator selectedKeys = null; -// private boolean hasListener = false; - private volatile boolean stop = true; - private volatile boolean hasRunnables = false; - private LiteList runnables = new LiteList(10, 20); - private int maxThreadCount = 0; - private int activeThreadCount = 0; - private int threadId = 1; - - public NetworkListener(Selector selector, int maxThreadCount) { - this.selector = selector; - this.maxThreadCount = maxThreadCount; - }//NetworkListener()// - /** - * Stops the network listener. - * Note that this may take a short time to complete. - */ - public void stop() { - if(!stop) { - stop = true; - selector.wakeup(); - }//if// - }//stop()// - /** - * Starts the network listener. - */ - public void start() { - if(stop) { - stop = false; - ThreadService.run(this); -// Thread t = new Thread(this); -// t.setName("Network Listener 1"); -// t.start(); - }//if// - }//start()// - /** - * Cleans up after the client channel. - * @param context The context associated with the client connection. - * @param channel The client connection that is now closed. - */ - private void cleanupClientChannel(SocketContext context, SocketChannel channel) { - if(debug) { - Debug.log("Connection closed to " + channel.socket().getInetAddress() + ":" + channel.socket().getPort()); - }//if// - }//cleanupClientChannel()// - /** - * Adds a runnable to the list of runnables to be run next time the loop is woken. - * @param runnable The runnable to be run by the thread that is listening for socket events. - */ - public synchronized void queue(Runnable runnable) { - runnables.add(runnable); - hasRunnables = true; - }//queue()// - /** - * Checks for runnables and runs them if there are any. - */ - private void checkForRunnables() { - if(hasRunnables) { - synchronized(this) { - while(runnables.getSize() > 0) { - ((Runnable) runnables.remove(0)).run(); - }//while// - - hasRunnables = false; - }//synchronized// - }//if// - }//checkForRunnables()// - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - public void run() { -// boolean loop = true; - - //Looping only occurs when we are at the maximum allowed number of threads handling messages.// - while(!stop /*&& loop*/) { - SelectionKey key = null; - - try { - //Synchronize so that we ensure thread safe access to the activeThreadCount variable.// -// synchronized(this) { //TODO: Remove me when we are sure that the threading works correctly. -// if(hasListener) { -// Debug.log(new RuntimeException("Failed to properly thread the NetworkListener.")); -// }//if// -// -// hasListener = true; -// }//synchronized// - - //If we don't have an iterator over the active channels then get one and block if necessary.// - if(selectedKeys == null) { - int keyCount = 0; - - //Block until we have keys or were awakened by another thread.// - keyCount = selector.select(); - //Check for any pending runnables that need executing on this thread.// - checkForRunnables(); - - //If we have active keys then retrieve them.// - if(keyCount > 0) { - selectedKeys = selector.selectedKeys().iterator(); - }//if// - }//if// - - //If we have an iterator over the active channels then get and remove the next one (clean up the iterator if empty).// - if(selectedKeys != null) { - key = (SelectionKey) selectedKeys.next(); - selectedKeys.remove(); - - //Weed out invalid (cancelled) keys.// - if(!key.isValid()) { - key = null; - }//if// - - if(!selectedKeys.hasNext()) { - selectedKeys = null; - }//if// - }//if// - -// synchronized(this) { //TODO: Remove me when we are sure that the threading works correctly. -// hasListener = false; -// }//synchronized// - }//try// - catch(Throwable e) { - //TODO: Can we recover? - Debug.log(e); - }//catch// - - try { - if(key != null) { - final boolean isWrite = key.isWritable(); - final ChannelContext context = (ChannelContext) key.attachment(); - final SelectableChannel channel = key.channel(); - final SelectionKey selectionKey = key; - - if(channel instanceof ServerSocketChannel) { - try { - ServerSocketChannel serverSocketChannel = (ServerSocketChannel) channel; - SocketChannel socketChannel = serverSocketChannel.accept(); - ServerSocketContext serverSocketContext = (ServerSocketContext) context; - SocketContext socketContext = new SocketContext(serverSocketContext); - - socketChannel.configureBlocking(false); - socketChannel.socket().setSendBufferSize(SEND_BUFFER_SIZE); - socketChannel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE); - socketContext.key = socketChannel.register(selector, SelectionKey.OP_READ, socketContext); - socketContext.serverSocketContext = serverSocketContext; - - //Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); - - if(serverSocketContext.serviceListener.type != IServiceListener.TYPE_SSL) { - socketContext.socketReadBuffer = ByteBuffer.allocate(BUFFER_SIZE); - }//if// - - if(debug) { - Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); - }//if// - }//try// - catch(Throwable e) { - //TODO: Can we recover? - Debug.log(e); - }//catch// - }//if// - else if(channel instanceof SocketChannel) { - // boolean socketClosed = false; - - //Toggle the write or read flag.// - synchronized(key) { - // //Save the ops that will be set when the processing is complete.// - // ((AbstractSocketContext) context).setFlags(key.interestOps()); - - //Notes: Java (pre-jdk7) does not have the ability to read and write to a socket at the same time (two threads, one socket). Post jdk7 there is AsynchronousSocketChannel and AsynchronousServerSocketChannel which could be used to send/receive at the same time. - //Truely enabling Speedy would require a thread to read which when finished would flag read again BEFORE processing the message and BEFORE sending a response. - //For now (so we don't have to require jdk7 yet) we will simply allow Speedy to queue up messages, but only read, process, and then write them one at a time. Most of the speed loss is in the waiting for the WRITE to finish before handling the next request (due to it being broken into packets and the mechanics of TCP), and that is generally minimal (speed lose) since usually the bottleneck in speed is the browser's connection to the internet (most of us haven't got Gigabit Ethernet at home). Anyone with enough home juice to have this be a problem would only notice the difference for really porky websites (which is a problem in and of its self). - - //Not allowing either reads or writes to continue until all processing of this message is done.// - ((AbstractSocketContext) context).isUsed = true; - key.interestOps(0); - - //The problem with this is that we'd have to use AsynchronousSocketChannel which would appear to require a complete rewrite of everything since it operates completely differently.// -// key.interestOps(key.interestOps() ^ (isWrite ? SelectionKey.OP_WRITE : SelectionKey.OP_READ)); - }//synchronized// - - if(((SocketChannel) channel).isOpen()) { - ThreadService.run(new Runnable() { - public void run() { - boolean socketClosed = false; - - try { - if(isWrite) { - //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// - synchronized(((AbstractSocketContext) context).getLock()) { - //Process the pending write to the socket as much as is possible, then return.// - ((AbstractSocketContext) context).writeOutgoingMessages(); - }//synchronized// - }//if// - else { - //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// - synchronized(((AbstractSocketContext) context).getLock()) { - //Process the incoming request and send the response (a partial response may be sent in which case the socket will be set to wait for a write opportunity and not a read opportunity).// - ((AbstractSocketContext) context).readIncomingMessages(); - }//synchronized// - }//else// - }//try// - catch(TlsFailureException e) { - //Allow the failure to be ignored. This occurs when the client fails to use TLS or fails to send the host name as part of the TLS handshake.// - try {((SocketChannel) channel).close();}catch(Throwable e2) {} //Release the socket so the message doesn't continue to be processed.// - }//catch// - catch(Throwable e) { - if(debug) Debug.log(e); - - //Force the socket to be closed (for sure).// - try {((SocketChannel) channel).close();} catch(Throwable e2) {} - //Debug.log(e); - socketClosed = true; - }//catch// - finally { - if(channel != null && !socketClosed && channel.isOpen() && context != null) { - try { - //Set the new ops for the selection key and notify the selector that ops have changed.// - synchronized(selectionKey) { - if(selectionKey.isValid()) { - //Always flag the socket for reading, only flag the socket for writing if a pending write operation exists.// - selectionKey.interestOps(SelectionKey.OP_READ | (((AbstractSocketContext) context).hasPendingWrite() ? SelectionKey.OP_WRITE : 0)); - }//if// - else { - Debug.log(new RuntimeException("Woops! Somehow the selection key isn't valid, but the socket isn't closed either!")); - try {((SocketChannel) channel).close();} catch(Throwable e2) {} - cleanupClientChannel((SocketContext) context, (SocketChannel) channel); - }//else// - - ((AbstractSocketContext) context).isUsed = false; - }//synchronized// - - selector.wakeup(); - }//try// - catch(Throwable e) { - Debug.log(e); - }//catch// - }//if// - else if(channel != null && (!channel.isOpen() || socketClosed) && channel instanceof SocketChannel && context instanceof SocketContext) { - cleanupClientChannel((SocketContext) context, (SocketChannel) channel); - }//else if// - else { - //This shouldn't be called I don't think.// - Debug.log(new RuntimeException("Woops! Somehow we aren't closed and we didn't setup the interestOps for the HTTP socket!")); - }//else// - }//finally// - }//run()// - }); - /* - try { - synchronized(this) { - // if(++activeThreadCount != maxThreadCount) { - //Start another thread to take this thread's place.// - ThreadService.run(this); - // }//if// - }//synchronized// - - if(isWrite) { - // if(debug) { - // ((SocketContext) context).debugBuffer.append("Socket is now write available.\n"); - // Debug.log("Socket is write available: " + ((SocketChannel) channel).socket().getInetAddress() + ":" + ((SocketChannel) channel).socket().getPort()); - // }//if// - - //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// - synchronized(((AbstractSocketContext) context).getLock()) { - //Process the pending write to the socket as much as is possible, then return.// - ((AbstractSocketContext) context).processResponses(); - }//synchronized// - }//if// - else { - // if(debug) { - // ((SocketContext) context).debugBuffer.append("Socket is now read available.\n"); - // Debug.log("Socket is read available: " + ((SocketChannel) channel).socket().getInetAddress() + ":" + ((SocketChannel) channel).socket().getPort()); - // }//if// - - //Prevent another thread from reading/writing on the same socket at the same time (safety). This would have to be removed if SPEEDY (or similar pipelining) were allowed, and AsynchronousSocketChannel/AsynchronousServerSocketChannel would have to be used (requiring jdk7).// - synchronized(((AbstractSocketContext) context).getLock()) { - //Process the incoming request and send the response (a partial response may be sent in which case the socket will be set to wait for a write opportunity and not a read opportunity).// - ((AbstractSocketContext) context).processRequest(); - }//synchronized// - }//else// - }//try// - catch(TlsFailureException e) { - //Allow the failure to be ignored. This occurs when the client fails to use TLS or fails to send the host name as part of the TLS handshake.// - try {((SocketChannel) channel).close();}catch(Throwable e2) {} //Release the socket so the message doesn't continue to be processed.// - }//catch// - catch(Throwable e) { - if(debug) Debug.log(e); - - //Force the socket to be closed (for sure).// - try {((SocketChannel) channel).close();}catch(Throwable e2) {} - //Debug.log(e); - socketClosed = true; - }//catch// - finally { - boolean requiresWakeup = false; - - if(channel != null && !socketClosed && channel.isOpen() && key != null && context != null) { - requiresWakeup = true; - }//if// - else if(channel != null && (!channel.isOpen() || socketClosed) && channel instanceof SocketChannel && context instanceof SocketContext) { - cleanupClientChannel((SocketContext) context, (SocketChannel) channel); - }//else if// - - //Loop if the last thread to wait for a message couldn't start another thread due to the max number of threads allowed.// - synchronized(this) { - // if(activeThreadCount-- != maxThreadCount) { - loop = false; - - if(requiresWakeup) { - selector.wakeup(); - }//if// - // }//if// - }//synchronized// - }//finally// - */ - }//if// - }//else if// - }//if// - }//try// - catch(java.nio.channels.CancelledKeyException e) { - //Occurs if the socket is closed while we are handling the key.// - Debug.log(e); //TODO: Does anything need doing here? Should it be ignored? - }//catch// - catch(Throwable e) { - Debug.log(e); - //TODO: There needs to be more specfic error handling if we got here. - }//catch// - }//while// - }//run()// - }//NetworkListener// - /** * A simple extension of the IOException to allow us to ignore it. */ - private static class IgnoredIOException extends IOException { + public static class IgnoredIOException extends IOException { public IgnoredIOException(String s) { super(s); }//IgnoredIOException()// @@ -2783,27 +210,11 @@ public class WebServer { /** * A simple extension of the IOException to allow us to ignore it. */ - private static class TlsFailureException extends IOException { + public static class TlsFailureException extends IOException { public TlsFailureException(String s) { super(s); }//IgnoredIOException()// }//TlsFailureException// -/** - * Gets the threads date format for processing HTTP header dates. - *This uses a thread local because the date format class is not thread safe :(.
- * @return The date format for handling http header dates. - */ -public static SimpleDateFormat getHttpDateFormat() { - SimpleDateFormat result = (SimpleDateFormat) httpDateFormat.get(); - - if(result == null) { - result = new SimpleDateFormat(httpDateFormatString); - result.setTimeZone(TimeZone.getTimeZone("GMT")); - httpDateFormat.set(result); - }//if// - - return result; -}//getHttpDateFormat()// /** * WebServer constructor. */ @@ -3054,18 +465,7 @@ public synchronized boolean start() throws IOException { }//else// if(success) { - String threadCount = System.getProperty("webserver.listener.threads"); - int count = 10; - - if(threadCount != null) { - try {count = Integer.parseInt(threadCount);} catch(Throwable e) {Debug.log(e);} - - if(count < 1) { - count = 10; - }//if// - }//if// - - networkListener = new NetworkListener(selector, count); + networkListener = new NetworkListener(this, selector); networkListener.start(); }//if// else { @@ -3099,1230 +499,4 @@ public synchronized void stop() { networkListener = null; }//if// }//stop()// -/** - * Simplification of the error throwing code. - */ -private void handleBrokenStream() throws IOException { - throw new IOException("Invalid TLS stream."); -}//handleBrokenStream()// -/** - * Parses the initial client hello message sent in the TLS protocol to identify which SSL context to use for the connection. - * @param context The connection context. - * @param channel The socket channel to read from. - * @return Whether the message could be parsed. If false, the method will be called again until true as new bytes arrive on the channel. - */ -private boolean parseFirstTlsMessage(SocketContext context, SocketChannel channel) throws IOException { - boolean result = false; - ByteBuffer temp = ByteBuffer.allocate(500); - StreamBuffer input; - int originalInputSize; - - //Create the stream buffer for this operation if not already created.// - if(context.initialBuffer == null) { - context.initialBuffer = new StreamBuffer(); - }//if// - - //Simplify the code a bit with a local variable.// - input = context.initialBuffer; - originalInputSize = input.getSize(); - - //Exit if the initial buffer size is rediculously large or if the channel is closed or has no additional bytes.// - while(channel.read(temp) > 0 && context.initialBuffer.getSize() < 10000) { - temp.flip(); - context.initialBuffer.writeBytes(temp); - }//while// - - //Ensure we have enough data to read the header.// - if(input.getSize() > 4) { - int contentType = input.getByte(0) & 0xFF; - int highVersion = input.getByte(1) & 0xFF; - int lowVersion = input.getByte(2) & 0xFF; - int length = input.getShort(3, StreamBuffer.NUMBER_MSF) & 0xFFFF; - -// context.initialBuffer.getAsText(); - - //Check the version.// - if(highVersion != 3 || lowVersion > 3 || lowVersion < 1) { - //handleBrokenStream(); - return true; - }//if// - - //Validate we have enough bytes for the whole message.// - if(input.getSize() > 4 + length) { - StreamBuffer tempBuffer = new StreamBuffer(input.getBufferPool()); - - //Indicate we have parsed the initial message (or at least attempted to).// - result = true; - //Clone the input buffer so we don't consume any bytes needed by the SSLEngine later.// - //Copy the bytes, don't consume them.// - tempBuffer.writeBytes(input, 0, length + 5); - input = tempBuffer; - - //A full record will be read.// - result = true; - //Skip the record header bytes.// - input.skipBytes(5); - - //Read each message in this record.// - switch(contentType) { -// case CONTENT_TYPE_CHANGE_CIPHER_SPEC: { -// processChangeCipherSpec(input); -// break; -// }//case// -// case CONTENT_TYPE_ALERT: { -// //Only one alert may exist in the message.// -// processAlert(input); -// break; -// }//case// - case 0x16: { //CONTENT_TYPE_HANDSHAKE - //Multiple handshake messages may be contained in the single record.// - while(input.getSize() > 0) { - int messageType = input.readByte() & 0xFF; - int messageSize = input.readMediumInt(StreamBuffer.NUMBER_MSF, false); - int postMessageStreamSize = input.getSize() - messageSize; - - switch(messageType) { - case 0x01: { //HANDSHAKE_TYPE_CLIENT_HELLO - //long time; - byte[] randomClientBytes = new byte[28]; - int sessionIdSize; - byte[] sessionId = null; - int cipherSuiteCount; - int[] cipherSuites; - int compressionMethodCount; - int[] compressionMethods; - /*int protocolVersionHigh = */input.readByte()/* & 0xFF*/; //Note: This should be the same as when it was read as part of the record earlier - this is redundant.// - /*int protocolVersionLow = */input.readByte()/* & 0xFF*/; - - /*time = */input.readInt(StreamBuffer.NUMBER_MSF)/* & 0x00000000FFFFFFFFL*/; - input.readBytes(randomClientBytes); - sessionIdSize = input.readByte() & 0xFF; - - if(sessionIdSize > 0) { - sessionId = new byte[sessionIdSize]; - input.readBytes(sessionId); - }//if// - - cipherSuiteCount = (input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF) / 2; - cipherSuites = new int[cipherSuiteCount]; - - for(int index = 0; index < cipherSuites.length; index++) { - cipherSuites[index] = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - }//for// - - compressionMethodCount = input.readByte() & 0xFF; - compressionMethods = new int[compressionMethodCount]; - - for(int index = 0; index < compressionMethods.length; index++) { - compressionMethods[index] = input.readByte() & 0xFF; - }//for// - - //Read extensions until the end of the stream is reached.// - if(input.getSize() != postMessageStreamSize) { - /*int extensionArraySize =*/ input.readShort(StreamBuffer.NUMBER_MSF)/* & 0xFFFF*/; - - //Read each extension.// - while(input.getSize() != postMessageStreamSize) { - int extensionType = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - int extensionDataSize = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - int expectedStreamSize = input.getSize() - extensionDataSize; - - //Read the extension data if it is known, otherwise skip the data.// - switch(extensionType) { - case 0x0000: { //EXT_SERVER_NAME - //The ServerName extension always starts with a ServerNameList structure.// - int serverNameListByteCount = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - int postServerNamesExpectedStreamSize = input.getSize() - serverNameListByteCount; - - //Read each of the server names (no idea how many there will be -