diff --git a/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java b/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java index 7c2421f..06409fb 100644 --- a/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java +++ b/Foundation Web Core/src/com/foundation/web/server/AbstractSocketContext.java @@ -28,8 +28,6 @@ public abstract class AbstractSocketContext implements IChannelContext { private final NetworkListener networkListener; /** The debug ID for the socket. */ private final int id; - /** The web server that created the socket context. */ - protected WebServer webServer = null; /** The key that represents the connection between the channel (socket) and the selector used to multiplex the listener. The code must synchronize on this attribute when accessing the isUsed functionality, or when interacting with the key's interestOps. */ protected SelectionKey key = null; /** Whether the socket is currently being used by a thread designated by the network listener thread to read or write to the socket. Currently the socket type we use in Java only allows one thread to read and write at a time. Note: Always synchronize on key before using this attribute. */ @@ -75,13 +73,8 @@ public WebServer getWebServer() {return networkListener.getWebServer();} public boolean getIsUsed() {return isUsed;} /** Sets whether the socket context is currently in use by a thread. */ public void setIsUsed(boolean isUsed) {this.isUsed = isUsed;} -/** Gets the debug id for the socket. */ -public int getId() {return id;} -/** - * Gets the lockable (synchronizable) object for this context. For contexts with a related context, only one of the two will be returned, such that a single synchronize block covers both contexts. - * @return The object to synchronize on such that two threads don't attempt to interact with the context at the same time (AsynchronousSocketChannel required for that). - */ -protected abstract Object getLock(); +/** Gets the selection key used for this socket context by the Listener to identify when bytes are incoming or available for outgoing on the socket. */ +public SelectionKey getKey() {return key;} /** * Writes the next responses/messages in the sequence. * @throws IOException @@ -97,7 +90,7 @@ protected abstract void readIncomingMessages() throws IOException; * @param buffer The buffer containing the message. This buffer will not be retained by this method call, and can be reused by the caller. * @return Whether the whole message was transfered. */ -protected abstract boolean passThrough(ByteBuffer buffer); +protected abstract void passThrough(ByteBuffer buffer); /** * Closes the socket context and cleans up. */ @@ -128,4 +121,21 @@ protected void notifyListenerOfPendingWrite() { }//if// }//synchronized// }//notifyListenerOfPendingWrite()// +/** + * Gets the lockable (synchronizable) object for this context. For contexts with a related context, only one of the two will be returned, such that a single synchronize block covers both contexts. + * @return The object to synchronize on such that two threads don't attempt to interact with the context at the same time (AsynchronousSocketChannel required for that). + */ +protected Object getLock() { + return key; +}//getLock()// +/** + * Gets the related socket's selection key. This will be the key not returned by a call to getLock(), or will be null if there isn't a related socket context. + * Allows for PassThroughSocketContext which links to a SocketContext to pass all data through to a remote process. This will always return the PassThroughSocketContext's selection key. + * @return The selection key for the PassThroughSocketContext if one exists for this SocketContext (or if this is a PassThroughSocketContext). + */ +public SelectionKey getRelatedSocketContextKey() { + return null; +}//getRelatedSocketContextKey()// +/** Gets the debug id for the socket. */ +public int getId() {return id;} }//AbstractSocketContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java b/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java index 74d5b7d..bebf097 100644 --- a/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java +++ b/Foundation Web Core/src/com/foundation/web/server/NetworkListener.java @@ -146,14 +146,7 @@ public void run() { socketChannel.socket().setSendBufferSize(AbstractSocketContext.SEND_BUFFER_SIZE); socketChannel.socket().setReceiveBufferSize(AbstractSocketContext.RECEIVE_BUFFER_SIZE); socketContext.key = socketChannel.register(selector, SelectionKey.OP_READ, socketContext); - socketContext.serverSocketContext = serverSocketContext; - //Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); - - if(serverSocketContext.serviceListener.type != IServiceListener.TYPE_SSL) { - socketContext.socketReadBuffer = ByteBuffer.allocate(AbstractSocketContext.BUFFER_SIZE); - }//if// - if(getWebServer().debug()) { Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); }//if// @@ -164,12 +157,12 @@ public void run() { }//catch// }//if// else if(channel instanceof SocketChannel) { -// boolean socketClosed = false; +// boolean socketClosed = false; //Toggle the write or read flag.// synchronized(key) { -// //Save the ops that will be set when the processing is complete.// -// ((AbstractSocketContext) context).setFlags(key.interestOps()); +// //Save the ops that will be set when the processing is complete.// +// ((AbstractSocketContext) context).setFlags(key.interestOps()); //Notes: Java (pre-jdk7) does not have the ability to read and write to a socket at the same time (two threads, one socket). Post jdk7 there is AsynchronousSocketChannel and AsynchronousServerSocketChannel which could be used to send/receive at the same time. //Truely enabling Speedy would require a thread to read which when finished would flag read again BEFORE processing the message and BEFORE sending a response. @@ -180,7 +173,7 @@ public void run() { key.interestOps(0); //The problem with this is that we'd have to use AsynchronousSocketChannel which would appear to require a complete rewrite of everything since it operates completely differently.// -// key.interestOps(key.interestOps() ^ (isWrite ? SelectionKey.OP_WRITE : SelectionKey.OP_READ)); +// key.interestOps(key.interestOps() ^ (isWrite ? SelectionKey.OP_WRITE : SelectionKey.OP_READ)); }//synchronized// if(((SocketChannel) channel).isOpen()) { diff --git a/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java b/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java index c49eeec..fec88a6 100644 --- a/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java +++ b/Foundation Web Core/src/com/foundation/web/server/PassThroughSocketContext.java @@ -3,8 +3,13 @@ package com.foundation.web.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; + +import javax.net.ssl.SSLException; + +import com.common.debug.Debug; import com.foundation.web.server.WebServer.RegisterKeyRunnable; /** @@ -12,8 +17,8 @@ import com.foundation.web.server.WebServer.RegisterKeyRunnable; * Allows the web server to act as an SSL front to another web server or service. */ public class PassThroughSocketContext extends AbstractSocketContext { - private MessageBuffer pendingMessageBuffer = null; - private MessageBuffer lastAddedMessageBuffer = null; + private MessageBuffer currentOutboundMessage = null; + private MessageBuffer lastOutboundMessage = null; /** The byte buffer used to read data from the socket. */ public ByteBuffer socketReadBuffer = ByteBuffer.allocate(BUFFER_SIZE); /** @@ -52,117 +57,187 @@ public PassThroughSocketContext(SocketContext linkedClientContext, String addres * @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock() */ protected Object getLock() { - return getRelatedSocketContext(); + return getRelatedSocketContext().getLock(); }//getLock()// /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() + * @see com.foundation.web.server.WebServer.AbstractSocketContext#getRelatedSocketContextKey() */ -protected synchronized void writeOutgoingMessages() throws IOException { - //Actually this is called when a request is being sent via the pass through socket (sending the request to the remote server).// - //Synchronized to avoid accessing the pendingMessageBuffer and lastAddedMessageBuffer at the same time as a thread that is calling passThrough(ByteBuffer) which also accesses these variables.// - boolean result = true; - - if(result && pendingMessageBuffer != null) { - //Check to see if the outbound message is prepared to send more content.// - if(!pendingMessageBuffer.getBuffer().hasRemaining()) { - //Load the next pending outbound message in the chain.// - if(pendingMessageBuffer.getNext() != null) { - pendingMessageBuffer = pendingMessageBuffer.getNext(); - }//if// - else { - //Wait until additional message bytes are available.// - result = false; - pendingMessageBuffer = null; - lastAddedMessageBuffer = null; - }//else// - }//if// - - //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// - while(result && (pendingMessageBuffer != null) && pendingMessageBuffer.getBuffer().hasRemaining()) { - //Write the bytes to the stream.// - ((SocketChannel) key.channel()).write(pendingMessageBuffer.getBuffer()); - - //If not all the bytes could be written then we will need to wait until we can write more.// - if(pendingMessageBuffer.getBuffer().hasRemaining()) { - result = false; - }//if// - else { - //Load the next pending outbound message in the chain.// - if(pendingMessageBuffer.getNext() != null) { - pendingMessageBuffer = pendingMessageBuffer.getNext(); - }//if// - else { - //Wait until additional message bytes are available.// - result = false; - pendingMessageBuffer = null; - lastAddedMessageBuffer = null; - }//else// - }//else// - }//while// - }//if// -}//processResponses()// +public SelectionKey getRelatedSocketContextKey() { + return key; +}//getRelatedSocketContextKey()// +/** Gets whether the socket context is in use (reading or writing) such that the network listener doesn't attempt to read and write at the same time (not supported by Java's NIO sockets). Should only ever be used by the NetworkListener. */ +public boolean isUsed() {return getRelatedSocketContext().getIsUsed();} +/** Sets whether the socket context is in use (reading or writing) such that the network listener doesn't attempt to read and write at the same time (not supported by Java's NIO sockets). Should only ever be used by the NetworkListener. */ +public void isUsed(boolean isUsed) {getRelatedSocketContext().setIsUsed(isUsed);} /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest() + * @see com.foundation.web.server.WebServer.AbstractSocketContext#writeOutgoingMessages() + */ +protected void writeOutgoingMessages() throws IOException { + boolean keepSending; + MessageBuffer outboundMessage; + SocketChannel channel; + + //Synchronized to avoid multiple threads accessing the currentOutboundMessage chain at one time.// + synchronized(getLock()) { + outboundMessage = currentOutboundMessage; + channel = (SocketChannel) key.channel(); + keepSending = hasPendingWrite() && channel.isOpen(); + }//synchronized// + + //Keep sending responses while the buffers are not full and there is another response to send.// + while(keepSending) { + //Send the pending response object's prepared buffer of data.// + writeClientBoundMessage(channel, outboundMessage); + + //Synchronized to avoid multiple threads accessing the currentOutboundMessage chain at one time.// + synchronized(getLock()) { + //If we finished the message then load the next message, otherwise flag that we need to stop sending (buffers full - flag a write on the socket's key and wait).// + if(currentOutboundMessage != null && currentOutboundMessage.isClosed()) { + currentOutboundMessage = currentOutboundMessage.getNext(); + keepSending = hasPendingWrite() && key.channel().isOpen(); + outboundMessage = currentOutboundMessage; + }//if// + else { + keepSending = false; + }//else// + }//synchronized// + }//while// +}//writeOutgoingMessages()// +/* (non-Javadoc) + * @see com.foundation.web.server.AbstractSocketContext#readIncomingMessages() */ protected void readIncomingMessages() throws IOException { //Actually this is called when a response is being processed via the pass through socket (remote process that received the request).// - int count = 1; - int loopCount = 0; - boolean result = true; - SocketChannel channel = (SocketChannel) key.channel(); - - //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// - //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// - while(loopCount < 10 && result && count > 0) { - loopCount++; - count = channel.read(socketReadBuffer); - socketReadBuffer.flip(); + int count = 1; + int loopCount = 0; + SocketChannel channel = (SocketChannel) key.channel(); - if(count == -1) { - //The socket has been closed by the client.// - try {relatedSocketContext.close();} catch(Throwable e) {} + //While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.// + //Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.// + while(loopCount < 10 && count > 0) { + loopCount++; + count = channel.read(socketReadBuffer); + socketReadBuffer.flip(); + + if(count == -1) { + //The socket has been closed by the client.// + try {relatedSocketContext.close();} catch(Throwable e) {} + }//if// + else if(socketReadBuffer.hasRemaining()) { + relatedSocketContext.passThrough(socketReadBuffer); + socketReadBuffer.compact(); + }//else// + else { + socketReadBuffer.compact(); + break; + }//else// + }//while// +}//readIncomingMessages()// +/** + * Sends a message/response to the client. + * @param channel The channel to use for the sending. + * @param currentOutboundMessage The message to be sending. This may be null if using SSL and sending handshake data (in which case the encryptedWriteBuffer should have data on it). + * @return Whether the response could be fully sent. This will be false if there is still more data to be written for the current message OR in the encrypted SSL write buffer (for SSL handshake messages) when the call returns. + */ +private boolean writeClientBoundMessage(SocketChannel channel, MessageBuffer currentOutboundMessage) { + boolean sendMore = true; + +// if(debug) { +// debugBuffer.append("Starting a write cycle.\n"); +// }//if// + + try { + //If we can send more data and we have an outbound message then initialize and send it! The currentOutboundMessage might be null if performing an SSL handshake.// + if(sendMore && currentOutboundMessage != null && !currentOutboundMessage.isClosed()) { + //Initialize the outbound message.// + if(!currentOutboundMessage.initialize()) { + getRelatedSocketContext().close(); + }//if// + else { + currentOutboundMessage.loadBuffer(); + + //If we have an application response pending then send it now.// + if(sendMore && !currentOutboundMessage.isClosed()) { + //Keep sending encrypted frames until the output buffer is full, or we run out of message to send.// + while(sendMore && !currentOutboundMessage.isClosed()) { + //Write the bytes to the stream.// + channel.write(currentOutboundMessage.getBuffer()); + +// if(debug) { +// sentBytes += pendingOutboundMessage.position(); +// debugBuffer.append("Wrote " + pendingOutboundMessage.position() + " bytes to the client. Total sent: " + sentBytes + "\n"); +// }//if// + + //If not all the bytes could be written then we will need to wait until we can write more.// + if(currentOutboundMessage.getBuffer().hasRemaining()) { + sendMore = false; + }//if// + else { + sendMore = currentOutboundMessage.loadBuffer(); + }//else// + }//while// + }//if// + }//else// }//if// - else if(socketReadBuffer.hasRemaining()) { - result = relatedSocketContext.passThrough(socketReadBuffer); - socketReadBuffer.compact(); - }//else// - else { - socketReadBuffer.compact(); - break; - }//else// - }//while// -}//processRequest()// + }//try// + catch(ClosedChannelException e) { + getRelatedSocketContext().close(); + }//catch// + catch(SSLException e) { + if(getWebServer().debug()) { + Debug.log(e); + }//if// + + close(); + }//catch// + catch(IOException e) { + if(getWebServer().debug()) { + Debug.log(e); + }//if// + + getRelatedSocketContext().close(); + }//catch// + + //Return true if the current outbound message was fully sent and any SSL handshake messages were fully sent.// + return (currentOutboundMessage == null || currentOutboundMessage.isClosed()); +}//writeClientBoundMessages()// /* (non-Javadoc) * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer) */ -protected synchronized boolean passThrough(ByteBuffer buffer) { -// ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); -// MessageBuffer message; -// -// //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// -// messageBytes = ByteBuffer.allocate(buffer.remaining()); -// messageBytes.put(buffer); -// message = new MessageBuffer(messageBytes); -// -// //Chain the message into the linked list. -// if(lastAddedMessageBuffer == null) { -// pendingMessageBuffer = lastAddedMessageBuffer = message; -// }//if// -// else { -// lastAddedMessageBuffer.setNext(message); -// lastAddedMessageBuffer = message; -// }//else// - - return true; +protected void passThrough(ByteBuffer buffer) { + synchronized(getLock()) { + ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); + MessageBuffer message; + + //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// + messageBytes = ByteBuffer.allocate(buffer.remaining()); + messageBytes.put(buffer); + message = new MessageBuffer(this, messageBytes); + + //Chain the message into the linked list. + if(lastOutboundMessage == null) { + currentOutboundMessage = lastOutboundMessage = message; + }//if// + else { + lastOutboundMessage.setNext(message); + lastOutboundMessage = message; + }//else// + }//synchronized// }//passThrough()// protected synchronized void close() { - try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {} - try {if(key != null) key.cancel();} catch(Throwable e) {} + synchronized(getLock()) { + if(key != null) { + synchronized(key) { + try {if(key.channel() != null) key.channel().close();} catch(Throwable e) {} + try {key.cancel();} catch(Throwable e) {} + }//synchronized// + }//if// + }//synchronized// }//close()// /* (non-Javadoc) * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite() */ protected boolean hasPendingWrite() { - return pendingMessageBuffer != null; + return currentOutboundMessage != null; }//hasPendingWrite()// }//PassThroughSocketContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java b/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java index 33ee8f4..37c6fe5 100644 --- a/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java +++ b/Foundation Web Core/src/com/foundation/web/server/ServerSocketContext.java @@ -6,7 +6,7 @@ import com.foundation.web.server.WebServer.ServiceListener; * Provides a place connection oriented data. */ public class ServerSocketContext implements IChannelContext { - public ServiceListener serviceListener = null; + private ServiceListener serviceListener = null; public ServerSocketContext(ServiceListener serviceListener) { super(); diff --git a/Foundation Web Core/src/com/foundation/web/server/SocketContext.java b/Foundation Web Core/src/com/foundation/web/server/SocketContext.java index 130249c..7b5411b 100644 --- a/Foundation Web Core/src/com/foundation/web/server/SocketContext.java +++ b/Foundation Web Core/src/com/foundation/web/server/SocketContext.java @@ -51,53 +51,49 @@ import com.foundation.web.server.WebServer.TlsFailureException; */ public class SocketContext extends AbstractSocketContext implements IWebApplicationContainerProvider, IConnectionContext { /** The server socket reference that created the socket. This will be null if the socket was not created with a server socket (shouldn't happen in a web server). */ - public ServerSocketContext serverSocketContext = null; + private ServerSocketContext serverSocketContext = null; /** The web application's container for the application associated with this connection to a client. A connection is only allowed to access a single application. */ - public WebApplicationContainer webApplicationContainer = null; + private WebApplicationContainer webApplicationContainer = null; /** A temporary storage location for part of an HTTP message header. Warning: This is used by the reading thread ONLY - never by the processing/writing threads. */ - public StringBuffer messageHeaderFragment = null; + private StringBuffer messageHeaderFragment = null; /** A reference to the request object currently being processed. Warning: This is used by the reading thread ONLY - never by the processing/writing threads. */ - public Request request = null; + private Request request = null; /** The count of stored content bytes for the request. This is valid for any type of request. */ - public int requestContentPosition = 0; + private int requestContentPosition = 0; /** The multi-part message characters remaining to be processed from the last message fragment received. */ - public byte[] remainingPartBytes = null; + private byte[] remainingPartBytes = null; /** The multi-part message count of characters/bytes read thus far. This ensures the entire message is read properly. */ - public int partReadCount = 0; + private int partReadCount = 0; /** The mutli-part message count of characters/bytes written to the buffer as the message from the client is parsed. Used to index into the buffer by the various ContentPart instances generated. */ - public int partWriteCount = 0; + private int partWriteCount = 0; /** The part that is currently being read and spans more than one message fragment. */ - public ContentPart currentPart = null; + private ContentPart currentPart = null; /** Whether the final part boundary has already been found. If this is true then the message bytes should keep getting read until the partReadCount == contentLength. */ - public boolean endPartBoundaryFound = false; + private boolean endPartBoundaryFound = false; /** The bytes containing the unencrypted outbound message that is waiting for the socket to allow a write. */ - protected MessageBuffer currentOutboundMessage = null; + private MessageBuffer currentOutboundMessage = null; /** The last message buffer added to the pending outbound message chain (linked list). Used only for pass through contexts currently since locally handled messages link the reponses together into a list. */ - protected MessageBuffer lastOutboundMessage = null; + private MessageBuffer lastOutboundMessage = null; /** The byte buffer used to read data from the socket. This must be null if a SSL engine is provided. */ - public ByteBuffer socketReadBuffer = null; + private ByteBuffer socketReadBuffer = null; /** The buffer used to store the initial data in a SSL/TLS connection. The buffering is necessary to allow us to pre-read the client's hello message to gather the domain the client is connecting to - allowing for the correct SSL engine to be used. */ - public StreamBuffer initialBuffer = null; + private StreamBuffer initialBuffer = null; /** Whether the TLS domain extension was used. If false then the default SSL domain should be used to attempt an anonymous connection with the client such that an error page can be displayed prompting the user to upgrade their browser to something modern. */ - public boolean tlsFailure = false; + private boolean tlsFailure = false; /** The domain name the client is connecting to. This is provided by the initial TLS hello message, and is used to select the correct SSL context for the desired web application. */ - public String domain = null; + private String domain = null; /** Non-null if the socket is using SSL to secure communications. */ - public SSLEngine sslEngine = null; + private SSLEngine sslEngine = null; /** Whether the ssl engine needs to send handshake data to the client and has not yet generated it. */ - public boolean sslNeedsWrap = false; + private boolean sslNeedsWrap = false; /** The reusable buffer containing encrypted data from the client. */ - public ByteBuffer encryptedReadBuffer = null; + private ByteBuffer encryptedReadBuffer = null; /** The reusable buffer containing unencrypted data from the client. */ - public ByteBuffer unencryptedReadBuffer = null; + private ByteBuffer unencryptedReadBuffer = null; /** The reusable buffer containing encrypted data being sent to the client. */ - public ByteBuffer encryptedWriteBuffer = null; + private ByteBuffer encryptedWriteBuffer = null; /** The last used request number which identifies the sequence for the requests. */ private int lastRequestNumber = 0; - /** The response we are currently processing. */ -// private Response currentResponse = null; -// /** The response we are will process last. */ -// private Response lastResponse = null; /** Tracks the number of bytes sent from the current response. This is only used when debugging. */ private int sentBytes = 0; /** Tracks the getWebServer().debug output for the current request/response cycle. This is only used when debugging. */ @@ -112,6 +108,8 @@ public class SocketContext extends AbstractSocketContext implements IWebApplicat private String websocketProtocol = null; /** The maximum number of bytes in an allowed websocket message (may be composed of multiple frames, does not include the frame header sizes). While this is a long, we really can't read longer than Integer.MAX_VALUE sized frames. So the message might be within size limits, but it might still be rejected if the frame exceeds the server's capacity to read a frame. */ private long websocketMaxMessageLength = 0; + /** The application specified handler called when websocket events occur (messages received, socket closed, etc). */ + private WebsocketHandler websocketHandler = null; /** The reusable frame header buffer. */ private byte[] websocketFrameHeader = null; /** The index into the frame header of the last read byte. */ @@ -126,14 +124,10 @@ public class SocketContext extends AbstractSocketContext implements IWebApplicat private int websocketMessageOpCode = 0; /** The currently reading frame's mask key used to decode the frame data. */ private byte[] websocketMessageMaskKey = null; - /** The queue used to hold outgoing messages before they are sent. Will contain only String, byte[], or IConnectionContext.IStreamedWebsocketMessage instances. */ - private Queue websocketPendingMessages = null; /** The streambuffer for the currently sending message. This will be for the current part of the streaming message if websocketStreamingMessage is non-null. */ - private ByteBuffer websocketSendingMessage = null; +// private ByteBuffer websocketSendingMessage = null; /** The streaming message handler which will be set only if the currently sending message is streaming. */ - private IStreamedWebsocketMessage websocketStreamingMessage = null; - /** The application specified handler called when websocket events occur (messages received, socket closed, etc). */ - private WebsocketHandler websocketHandler = null; +// private IStreamedWebsocketMessage websocketStreamingMessage = null; /** * SocketContext constructor. * @param serverSocketContext The context for the server socket that accepted this socket. @@ -141,7 +135,17 @@ public class SocketContext extends AbstractSocketContext implements IWebApplicat */ public SocketContext(ServerSocketContext serverSocketContext, NetworkListener networkListener) { super(networkListener); + this.serverSocketContext = serverSocketContext; + + if(serverSocketContext.getServiceListener().type != IServiceListener.TYPE_SSL) { + this.socketReadBuffer = ByteBuffer.allocate(AbstractSocketContext.BUFFER_SIZE); + }//if// +//TODO: Move code here from caller. }//SocketContext()// +/** + * Gets the next request number and increments the counter. + */ +public int getNextRequestNumber() {return ++lastRequestNumber;} /** * Gets the pass through socket context associated with this socket context, or null if none exists. * @return The socket context for the pass through socket used to handle all incoming requests from the client on this socket. @@ -150,12 +154,60 @@ protected PassThroughSocketContext getPassThroughSocketContext() { return (PassThroughSocketContext) getRelatedSocketContext(); }//getPassThroughSocketContext()// /* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock() + * @see com.foundation.web.server.WebServer.AbstractSocketContext#getRelatedSocketContextKey() */ -protected Object getLock() { - return this; -}//getLock()// -protected synchronized void close() { +public SelectionKey getRelatedSocketContextKey() { + return getPassThroughSocketContext() != null ? getPassThroughSocketContext().getKey() : null; +}//getRelatedSocketContextKey()// +/* (non-Javadoc) + * @see com.foundation.web.server.AbstractSocketContext#close() + */ +protected void close() { + if(getWebServer().debug()) { + Debug.log(this.getId() + "|" + System.nanoTime() + "|Closing socket."); + }//if// + + synchronized(getLock()) { + try {if(key != null && key.channel() != null) key.channel().close(); else if(getWebServer().debug()) Debug.log(this.getId() + "|" + System.nanoTime() + "|Could not close the key's channel!");} catch(Throwable e) {if(getWebServer().debug()) Debug.log(e);} + try {if(key != null) key.cancel(); else if(getWebServer().debug()) Debug.log(this.getId() + "|" + System.nanoTime() + "|Could not cancel the key!");} catch(Throwable e) {if(getWebServer().debug()) Debug.log(e);} + + if(getPassThroughSocketContext() != null) { + getPassThroughSocketContext().close(); + }//if// + + //Clean up after the response and request.// + try {while(currentOutboundMessage != null) {currentOutboundMessage.close(); currentOutboundMessage = currentOutboundMessage.getNext();}} catch(Throwable e) {if(getWebServer().debug()) Debug.log(e);} + + try { + if(websocketHandler != null) { + websocketHandler.connectionClosed(); + websocketHandler = null; + }//if// + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + + try { + if(applicationDataMap != null) { + for(IIterator iterator = applicationDataMap.valueIterator(); iterator.hasNext(); ) { + Object next = iterator.next(); + + //Ensure the code keeps going even if there is a problem cleaning up.// + try { + if(next instanceof ISessionLifecycleAware) ((ISessionLifecycleAware) next).release(); + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + }//for// + }//if// + }//try// + catch(Throwable e) { + Debug.log(e); + }//catch// + }//synchronized// + /* TODO: Remove me. Old code. try { if(websocketHandler != null) { websocketHandler.connectionClosed(); @@ -195,6 +247,7 @@ protected synchronized void close() { if(getPassThroughSocketContext() != null) { getPassThroughSocketContext().close(); }//if// + */ }//close()// /* (non-Javadoc) * @see com.foundation.web.interfaces.IConnectionContext#getApplicationData(java.lang.String) @@ -279,25 +332,16 @@ public void sendHttpResponse(Response response) { /* (non-Javadoc) * @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer) */ -protected synchronized boolean passThrough(ByteBuffer buffer) { -// ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); -// MessageBuffer message; -// -// //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// -// messageBytes = ByteBuffer.allocate(buffer.remaining()); -// messageBytes.put(buffer); -// message = new MessageBuffer(messageBytes); -// -// //Chain the message into the linked list. -// if(lastOutboundMessage == null || currentOutboundMessage == null) { -// currentOutboundMessage = lastOutboundMessage = message; -// }//if// -// else { -// lastOutboundMessage.setNext(message); -// lastOutboundMessage = message; -// }//else// +protected void passThrough(ByteBuffer buffer) { + ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining()); + MessageBuffer message; - return true; + //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).// + messageBytes = ByteBuffer.allocate(buffer.remaining()); + messageBytes.put(buffer); + message = new MessageBuffer(this, messageBytes); + + queueOutboundClientMessage(message); }//passThrough()// /* (non-Javadoc) * @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses() @@ -342,6 +386,7 @@ protected void writeOutgoingMessages() throws IOException { //Close the message if possible.// try {currentOutboundMessage.close();} catch(Throwable e) {} //Load the next available message.// +//TODO: Swap these lines. // currentOutboundMessage = currentOutboundMessage.getNext(); currentOutboundMessage = null; if(currentOutboundMessage == null) lastOutboundMessage = null; @@ -362,184 +407,10 @@ protected void writeOutgoingMessages() throws IOException { }//synchronized// }//while// - /* From internalProcessResponses: - boolean doneSending = false; - - //Keep sending responses while the buffers are not full and there is another response to send.// - while(!doneSending) { - boolean messageSent = true; - - //If the socket is open then send the next buffer of data.// - if(key.channel().isOpen()) { - //Send the pending response object's prepared buffer of data.// - messageSent = writeClientBoundMessage(); - }//if// - - //Close the response if successfully sent, or if the socket is closed.// - if((messageSent || !key.channel().isOpen()) && currentOutboundMessage != null) { - try {currentOutboundMessage.close();} catch(Throwable e) {} - }//if// - - //If we finished sending the current response then load the next one.// - if(messageSent || (currentOutboundMessage != null && currentOutboundMessage.isClosed())) { - //TODO: Queue up the next outbound message. - currentOutboundMessage = null; - lastOutboundMessage = null; - }//if// - - if(currentOutboundMessage == null) { - doneSending = true; - }//if// - }//while// - */ - - /* Original code for writeOutgoingMessages: - if(getPassThroughSocketContext() != null) { -// //Synchronized to avoid multiple threads accessing the pendingOutboundMessage chain at one time and updating the write flag out of order (could happen if we enabled request chaining over a single socket).// -// synchronized(this) { -// writeClientBoundMessage(); -// }//synchronized// + if(getWebServer().debug()) { + Debug.log(this.getId() + "|" + System.nanoTime() + "|Finished sending messages for now."); }//if// - else if(isWebsocket) { - //Right after upgrading the socket we have one last HTTP response to process.// -// if(currentResponse != null) { -// internalProcessResponses(); -// }//if// -// -// internalProcessWebsocketMessages(); - }//else if// - else { - //Go directly to writing the client response if we are just passing everything through to another process.// - internalProcessResponses(); - }//else// - */ -}//processCurrentResponse()// -/** - * Loads the next outbound websocket message and attempts to write it to the socket until all outbound messages have been sent, or the socket's buffers are full and a wait is required. - * If a message could only be partially sent then the next call will attempt to finish sending it. - */ -private void internalProcessWebsocketMessages() { - /* - if(websocketSendingMessage == null) { - loadNextWebsocketMessage(); - }//if// - - while(websocketSendingMessage != null) { - //If the socket is open then send the next buffer of data.// - if(key.channel().isOpen()) { - if(currentOutboundMessage != null) { - //Put the sending message in a MessageBuffer (pendingOutboundMessage).// - currentOutboundMessage = new MessageBuffer(websocketSendingMessage); - }//if// - - //Write the pendingOutboundMessage to the socket.// - if(writeClientBoundMessage()) { - websocketSendingMessage = null; - currentOutboundMessage = null; - }//if// - }//if// - - //If we finished sending the message then load the next one.// - if(websocketSendingMessage == null) { - loadNextWebsocketMessage(); - }//if// - }//while// - */ -}//internalProcessWebsocketMessages()// -/** - * Loads and prepares the next websocket message from the queue of pending messages. - * Clears the pending message attributes if there isn't a pending message to be processed. - * The caller can check websocketSendingMessage == null to see if there is a ready message. - */ -private void loadNextWebsocketMessage() { - Object next = null; - boolean isLastPart = true; - - if(websocketStreamingMessage != null && websocketStreamingMessage.hasNextPart()) { - next = websocketStreamingMessage.getNextPart(); - isLastPart = !websocketStreamingMessage.hasNextPart(); - - //Ensure that we got a string or byte array.// - if(!(next instanceof String || next instanceof byte[])) { - throw new RuntimeException("Invalid streaming message part type."); - }//if// - }//if// - else { - synchronized(websocketPendingMessages) { - if(websocketPendingMessages.getSize() > 0) { - next = websocketPendingMessages.dequeue(); - }//if// - }//synchronized// - }//else// - - if(next != null) { - byte[] bytes = null; - int opCode = 0; - int length = 0; - - if(next instanceof String) { - try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} - opCode = websocketStreamingMessage == null ? 0x01 : 0; - length = bytes.length; - }//if// - else if(next instanceof byte[]) { - bytes = (byte[]) next; - opCode = websocketStreamingMessage == null ? 0x02 : 0; - length = bytes.length; - }//else if// - else if(next instanceof Byte) { //Control Message// - opCode = ((Byte) next).byteValue(); - }//else if// - else if(next instanceof IStreamedWebsocketMessage) { - websocketStreamingMessage = (IStreamedWebsocketMessage) next; - next = websocketStreamingMessage.getNextPart(); - isLastPart = !websocketStreamingMessage.hasNextPart(); - - if(next instanceof String) { - try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} - opCode = 0x01; //Text// - length = bytes.length; - }//if// - else if(next instanceof byte[]) { - bytes = (byte[]) next; - opCode = 0x02; //Binary// - length = bytes.length; - }//else if// - else { - throw new RuntimeException("Invalid streaming message part type."); - }//if// - }//else if// - - websocketSendingMessage = ByteBuffer.allocate(14 + length); - websocketSendingMessage.put((byte) (isLastPart ? 0x8 : 0)); - websocketSendingMessage.put((byte) opCode); - - //Write the length differently based on how long the content is.// - if(length < 126) { - websocketSendingMessage.put((byte) length); -// websocketSendingMessage.putLong(0, StreamBuffer.NUMBER_MSF); - }//if// - else if(length < 65535) { - websocketSendingMessage.put((byte) 126); - websocketSendingMessage.putShort((short) (length & 0xFFFF)); -// websocketSendingMessage.putShort((short) 0); - websocketSendingMessage.putInt(0); - }//else if// - else { - websocketSendingMessage.put((byte) 127); - websocketSendingMessage.putLong((long) length); - }//else// - - //The server doesn't use a mask key.// -// websocketSendingMessage.putInt(0); - //Put the content at the end of the message.// - websocketSendingMessage.put(bytes); - }//if// - else { - websocketSendingMessage = null; - websocketStreamingMessage = null; - }//else// -}//loadNextWebsocketMessage()// +}//writeOutgoingMessages()// /** * Sends a response to the client. * @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns. @@ -1132,6 +1003,84 @@ protected void readIncomingMessages() throws IOException { }//while// }//else// }//readIncomingMessages()// +/* (non-Javadoc) + * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite() + */ +protected boolean hasPendingWrite() { + //Will have a pending write if either the current outbound messages is non-null (finished messages should no longer be referenced), or if SSL is used and there is a handshake or maintenance message queued in the encrypted write buffer.// + return currentOutboundMessage != null || (encryptedWriteBuffer != null && encryptedWriteBuffer.hasRemaining()); +}//hasPendingWrite()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#upgradeToWebsocket(java.lang.String, long, com.foundation.web.interfaces.WebsocketHandler) + */ +public void upgradeToWebsocket(String protocol, long maxMessageLength, WebsocketHandler websocketHandler) { + if(!isWebsocket) { + this.isWebsocket = true; + this.websocketProtocol = protocol; + this.websocketMaxMessageLength = maxMessageLength; + this.websocketFrameHeader = new byte[14]; + this.websocketMessageMaskKey = new byte[4]; + this.websocketHandler = websocketHandler; + }//if// +}//upgradeToWebsocket()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#getWebsocketProtocol() + */ +public String getWebsocketProtocol() { + return websocketProtocol; +}//getWebsocketProtocol()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#isWebsocket() + */ +public boolean isWebsocket() { + return isWebsocket; +}//isWebsocket()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(byte[]) + */ +public void sendWebsocketMessage(byte[] message) { + if(isWebsocket) { + queueOutboundClientMessage(new WebsocketMessageBuffer(this, message)); + }//if// +}//sendWebsocketMessage()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(java.lang.String) + */ +public void sendWebsocketMessage(String message) { + if(isWebsocket) { + queueOutboundClientMessage(new WebsocketMessageBuffer(this, message)); + }//if// +}//sendWebsocketMessage()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketPing() + */ +public void sendWebsocketPing() { + if(isWebsocket) { + queueOutboundClientMessage(new WebsocketMessageBuffer(this, new Byte((byte) 0x9))); + }//if// +}//sendWebsocketPing()// +/** + * Sends a PONG response to the client's ping. + */ +public void sendWebsocketPong() { + if(isWebsocket) { + queueOutboundClientMessage(new WebsocketMessageBuffer(this, new Byte((byte) 0xA))); + }//if// +}//sendWebsocketPong()// +/* (non-Javadoc) + * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(com.foundation.web.interfaces.IConnectionContext.IStreamedWebsocketMessage) + */ +public void sendWebsocketMessage(IStreamedWebsocketMessage message) { + if(isWebsocket) { + queueOutboundClientMessage(new WebsocketMessageBuffer(this, message)); + }//if// +}//sendWebsocketMessage()// +/** + * Simplification of the error throwing code. + */ +private void handleBrokenStream() throws IOException { + throw new IOException("Invalid TLS stream."); +}//handleBrokenStream()// /** * Processes a single websocket frame if there is enough data in the fragment. *
Will not return until all data is read from the frame or the socket is closed. @@ -1344,14 +1293,6 @@ private boolean processWebsocketFrame(ByteBuffer fragment) throws IOException { return !isFrameFullyRead; }//processWebsocketFrame()// -/** - * Determines whether we have a complete HTML header in the string buffer. - * @param buffer The buffer containing the HTML ASCII header characters. - * @return Whether the header is complete. - */ -private boolean isCompleteHeader(StringBuffer buffer) { - return (buffer.length() > 4) && (buffer.charAt(buffer.length() - 4) == '\r') && (buffer.charAt(buffer.length() - 3) == '\n') && (buffer.charAt(buffer.length() - 2) == '\r') && (buffer.charAt(buffer.length() - 1) == '\n'); -}//isCompleteHeader()// /** * Processes enough of the header of this first request to identify the application and set it for the socket. Used to forward unencrypted message to a remote server. * @param fragment @@ -1411,7 +1352,7 @@ private boolean processRequestedHost(ByteBuffer fragment) throws IOException { //Get the web application for the given domain.// //Synchronize to prevent another thread from altering the service's web applications while we are accessing it.// synchronized(getWebServer()) { - webApplicationContainer = serverSocketContext.serviceListener.getWebApplicationContainer(domain); + webApplicationContainer = serverSocketContext.getServiceListener().getWebApplicationContainer(domain); }//synchronized// result = true; @@ -1428,6 +1369,214 @@ private boolean processRequestedHost(ByteBuffer fragment) throws IOException { return result; }//processRequestedHost()// +/** + * Parses the initial client hello message sent in the TLS protocol to identify which SSL context to use for the connection. + * @param channel The socket channel to read from. + * @return Whether the message could be parsed. If false, the method will be called again until true as new bytes arrive on the channel. + */ +private boolean parseFirstTlsMessage(SocketChannel channel) throws IOException { + boolean result = false; + ByteBuffer temp = ByteBuffer.allocate(500); + StreamBuffer input; + int originalInputSize; + + //Create the stream buffer for this operation if not already created.// + if(initialBuffer == null) { + initialBuffer = new StreamBuffer(); + }//if// + + //Simplify the code a bit with a local variable.// + input = initialBuffer; + originalInputSize = input.getSize(); + + //Exit if the initial buffer size is rediculously large or if the channel is closed or has no additional bytes.// + while(channel.read(temp) > 0 && initialBuffer.getSize() < 10000) { + temp.flip(); + initialBuffer.writeBytes(temp); + }//while// + + //Ensure we have enough data to read the header.// + if(input.getSize() > 4) { + int contentType = input.getByte(0) & 0xFF; + int highVersion = input.getByte(1) & 0xFF; + int lowVersion = input.getByte(2) & 0xFF; + int length = input.getShort(3, StreamBuffer.NUMBER_MSF) & 0xFFFF; + +// initialBuffer.getAsText(); + + //Check the version.// + if(highVersion != 3 || lowVersion > 3 || lowVersion < 1) { + //handleBrokenStream(); + return true; + }//if// + + //Validate we have enough bytes for the whole message.// + if(input.getSize() > 4 + length) { + StreamBuffer tempBuffer = new StreamBuffer(input.getBufferPool()); + + //Indicate we have parsed the initial message (or at least attempted to).// + result = true; + //Clone the input buffer so we don't consume any bytes needed by the SSLEngine later.// + //Copy the bytes, don't consume them.// + tempBuffer.writeBytes(input, 0, length + 5); + input = tempBuffer; + + //A full record will be read.// + result = true; + //Skip the record header bytes.// + input.skipBytes(5); + + //Read each message in this record.// + switch(contentType) { +// case CONTENT_TYPE_CHANGE_CIPHER_SPEC: { +// processChangeCipherSpec(input); +// break; +// }//case// +// case CONTENT_TYPE_ALERT: { +// //Only one alert may exist in the message.// +// processAlert(input); +// break; +// }//case// + case 0x16: { //CONTENT_TYPE_HANDSHAKE + //Multiple handshake messages may be contained in the single record.// + while(input.getSize() > 0) { + int messageType = input.readByte() & 0xFF; + int messageSize = input.readMediumInt(StreamBuffer.NUMBER_MSF, false); + int postMessageStreamSize = input.getSize() - messageSize; + + switch(messageType) { + case 0x01: { //HANDSHAKE_TYPE_CLIENT_HELLO + //long time; + byte[] randomClientBytes = new byte[28]; + int sessionIdSize; + byte[] sessionId = null; + int cipherSuiteCount; + int[] cipherSuites; + int compressionMethodCount; + int[] compressionMethods; + /*int protocolVersionHigh = */input.readByte()/* & 0xFF*/; //Note: This should be the same as when it was read as part of the record earlier - this is redundant.// + /*int protocolVersionLow = */input.readByte()/* & 0xFF*/; + + /*time = */input.readInt(StreamBuffer.NUMBER_MSF)/* & 0x00000000FFFFFFFFL*/; + input.readBytes(randomClientBytes); + sessionIdSize = input.readByte() & 0xFF; + + if(sessionIdSize > 0) { + sessionId = new byte[sessionIdSize]; + input.readBytes(sessionId); + }//if// + + cipherSuiteCount = (input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF) / 2; + cipherSuites = new int[cipherSuiteCount]; + + for(int index = 0; index < cipherSuites.length; index++) { + cipherSuites[index] = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; + }//for// + + compressionMethodCount = input.readByte() & 0xFF; + compressionMethods = new int[compressionMethodCount]; + + for(int index = 0; index < compressionMethods.length; index++) { + compressionMethods[index] = input.readByte() & 0xFF; + }//for// + + //Read extensions until the end of the stream is reached.// + if(input.getSize() != postMessageStreamSize) { + /*int extensionArraySize =*/ input.readShort(StreamBuffer.NUMBER_MSF)/* & 0xFFFF*/; + + //Read each extension.// + while(input.getSize() != postMessageStreamSize) { + int extensionType = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; + int extensionDataSize = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; + int expectedStreamSize = input.getSize() - extensionDataSize; + + //Read the extension data if it is known, otherwise skip the data.// + switch(extensionType) { + case 0x0000: { //EXT_SERVER_NAME + //The ServerName extension always starts with a ServerNameList structure.// + int serverNameListByteCount = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; + int postServerNamesExpectedStreamSize = input.getSize() - serverNameListByteCount; + + //Read each of the server names (no idea how many there will be - ).// + while(input.getSize() != postServerNamesExpectedStreamSize) { + int nameType = input.readByte() & 0xFF; + + switch(nameType) { + case 0x00: { //NAME_TYPE_HOST_NAME + int nameByteCount = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; + String name; + + try { + name = input.readText(nameByteCount, "UTF8").toLowerCase(); +// Debug.log(name); + + if(domain == null) { + domain = name; + }//if// + }//try// + catch(UnsupportedEncodingException e) { + Debug.log(e); + }//catch// + break; + }//case// + default: { + break; + }//default// + }//switch// + }//while// + + break; + }//case// + default: { + input.skipBytes(extensionDataSize); + break; + }//default// + }//switch// + + //Ignore any unused bytes.// + if(input.getSize() != expectedStreamSize) { + input.skipBytes(input.getSize() - expectedStreamSize); + }//if// + }//for// + }//if// + break; + }//case// + default: { + handleBrokenStream(); + break; + }//default// + }//switch// + + //Verify we read exactly the number of bytes in the message.// + if(input.getSize() != postMessageStreamSize) { + handleBrokenStream(); + }//if// + }//while// + break; + }//case// +// case CONTENT_TYPE_APPLICATION: { +// //TODO: Decrypt / Validate the message. +// //TODO: Place the decrypted bytes in the result buffer. +// break; +// }//case// + default: { + handleBrokenStream(); + }//default// + }//switch// + + //Ignore any padding or mac bytes. There will not be any for handshake messages.// + if(input.getSize() > 0) { + input.skipBytes(input.getSize()); + }//if// + }//if// + }//if// + else if(originalInputSize == input.getSize()) { + //TODO: Why would the channel be flagged for reading when there is nothing to read? + throw new TlsFailureException("Connection is being flagged as having input, but has none."); + }//else if// + + return result; +}//parseFirstTlsMessage()// /** * Processes the client request given the latest fragment of a message. * @param messageFragment The message fragment. @@ -1508,7 +1657,7 @@ private boolean processClientRequest(ByteBuffer fragment) throws IOException { //Get the web application for the given domain.// //Synchronize to prevent another thread from altering the service's web applications while we are accessing it.// synchronized(getWebServer()) { - webApplicationContainer = serverSocketContext.serviceListener.getWebApplicationContainer(domain); + webApplicationContainer = serverSocketContext.getServiceListener().getWebApplicationContainer(domain); }//synchronized// if(webApplicationContainer != null) { @@ -2115,219 +2264,13 @@ private boolean internalProcessClientRequest(SelectionKey key, final IWebApplica return result; }//internalProcessClientRequest()// /** - * Parses the initial client hello message sent in the TLS protocol to identify which SSL context to use for the connection. - * @param channel The socket channel to read from. - * @return Whether the message could be parsed. If false, the method will be called again until true as new bytes arrive on the channel. + * Determines whether we have a complete HTML header in the string buffer. + * @param buffer The buffer containing the HTML ASCII header characters. + * @return Whether the header is complete. */ -private boolean parseFirstTlsMessage(SocketChannel channel) throws IOException { - boolean result = false; - ByteBuffer temp = ByteBuffer.allocate(500); - StreamBuffer input; - int originalInputSize; - - //Create the stream buffer for this operation if not already created.// - if(initialBuffer == null) { - initialBuffer = new StreamBuffer(); - }//if// - - //Simplify the code a bit with a local variable.// - input = initialBuffer; - originalInputSize = input.getSize(); - - //Exit if the initial buffer size is rediculously large or if the channel is closed or has no additional bytes.// - while(channel.read(temp) > 0 && initialBuffer.getSize() < 10000) { - temp.flip(); - initialBuffer.writeBytes(temp); - }//while// - - //Ensure we have enough data to read the header.// - if(input.getSize() > 4) { - int contentType = input.getByte(0) & 0xFF; - int highVersion = input.getByte(1) & 0xFF; - int lowVersion = input.getByte(2) & 0xFF; - int length = input.getShort(3, StreamBuffer.NUMBER_MSF) & 0xFFFF; - -// initialBuffer.getAsText(); - - //Check the version.// - if(highVersion != 3 || lowVersion > 3 || lowVersion < 1) { - //handleBrokenStream(); - return true; - }//if// - - //Validate we have enough bytes for the whole message.// - if(input.getSize() > 4 + length) { - StreamBuffer tempBuffer = new StreamBuffer(input.getBufferPool()); - - //Indicate we have parsed the initial message (or at least attempted to).// - result = true; - //Clone the input buffer so we don't consume any bytes needed by the SSLEngine later.// - //Copy the bytes, don't consume them.// - tempBuffer.writeBytes(input, 0, length + 5); - input = tempBuffer; - - //A full record will be read.// - result = true; - //Skip the record header bytes.// - input.skipBytes(5); - - //Read each message in this record.// - switch(contentType) { -// case CONTENT_TYPE_CHANGE_CIPHER_SPEC: { -// processChangeCipherSpec(input); -// break; -// }//case// -// case CONTENT_TYPE_ALERT: { -// //Only one alert may exist in the message.// -// processAlert(input); -// break; -// }//case// - case 0x16: { //CONTENT_TYPE_HANDSHAKE - //Multiple handshake messages may be contained in the single record.// - while(input.getSize() > 0) { - int messageType = input.readByte() & 0xFF; - int messageSize = input.readMediumInt(StreamBuffer.NUMBER_MSF, false); - int postMessageStreamSize = input.getSize() - messageSize; - - switch(messageType) { - case 0x01: { //HANDSHAKE_TYPE_CLIENT_HELLO - //long time; - byte[] randomClientBytes = new byte[28]; - int sessionIdSize; - byte[] sessionId = null; - int cipherSuiteCount; - int[] cipherSuites; - int compressionMethodCount; - int[] compressionMethods; - /*int protocolVersionHigh = */input.readByte()/* & 0xFF*/; //Note: This should be the same as when it was read as part of the record earlier - this is redundant.// - /*int protocolVersionLow = */input.readByte()/* & 0xFF*/; - - /*time = */input.readInt(StreamBuffer.NUMBER_MSF)/* & 0x00000000FFFFFFFFL*/; - input.readBytes(randomClientBytes); - sessionIdSize = input.readByte() & 0xFF; - - if(sessionIdSize > 0) { - sessionId = new byte[sessionIdSize]; - input.readBytes(sessionId); - }//if// - - cipherSuiteCount = (input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF) / 2; - cipherSuites = new int[cipherSuiteCount]; - - for(int index = 0; index < cipherSuites.length; index++) { - cipherSuites[index] = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - }//for// - - compressionMethodCount = input.readByte() & 0xFF; - compressionMethods = new int[compressionMethodCount]; - - for(int index = 0; index < compressionMethods.length; index++) { - compressionMethods[index] = input.readByte() & 0xFF; - }//for// - - //Read extensions until the end of the stream is reached.// - if(input.getSize() != postMessageStreamSize) { - /*int extensionArraySize =*/ input.readShort(StreamBuffer.NUMBER_MSF)/* & 0xFFFF*/; - - //Read each extension.// - while(input.getSize() != postMessageStreamSize) { - int extensionType = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - int extensionDataSize = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - int expectedStreamSize = input.getSize() - extensionDataSize; - - //Read the extension data if it is known, otherwise skip the data.// - switch(extensionType) { - case 0x0000: { //EXT_SERVER_NAME - //The ServerName extension always starts with a ServerNameList structure.// - int serverNameListByteCount = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - int postServerNamesExpectedStreamSize = input.getSize() - serverNameListByteCount; - - //Read each of the server names (no idea how many there will be - ).// - while(input.getSize() != postServerNamesExpectedStreamSize) { - int nameType = input.readByte() & 0xFF; - - switch(nameType) { - case 0x00: { //NAME_TYPE_HOST_NAME - int nameByteCount = input.readShort(StreamBuffer.NUMBER_MSF) & 0xFFFF; - String name; - - try { - name = input.readText(nameByteCount, "UTF8").toLowerCase(); -// Debug.log(name); - - if(domain == null) { - domain = name; - }//if// - }//try// - catch(UnsupportedEncodingException e) { - Debug.log(e); - }//catch// - break; - }//case// - default: { - break; - }//default// - }//switch// - }//while// - - break; - }//case// - default: { - input.skipBytes(extensionDataSize); - break; - }//default// - }//switch// - - //Ignore any unused bytes.// - if(input.getSize() != expectedStreamSize) { - input.skipBytes(input.getSize() - expectedStreamSize); - }//if// - }//for// - }//if// - break; - }//case// - default: { - handleBrokenStream(); - break; - }//default// - }//switch// - - //Verify we read exactly the number of bytes in the message.// - if(input.getSize() != postMessageStreamSize) { - handleBrokenStream(); - }//if// - }//while// - break; - }//case// -// case CONTENT_TYPE_APPLICATION: { -// //TODO: Decrypt / Validate the message. -// //TODO: Place the decrypted bytes in the result buffer. -// break; -// }//case// - default: { - handleBrokenStream(); - }//default// - }//switch// - - //Ignore any padding or mac bytes. There will not be any for handshake messages.// - if(input.getSize() > 0) { - input.skipBytes(input.getSize()); - }//if// - }//if// - }//if// - else if(originalInputSize == input.getSize()) { - //TODO: Why would the channel be flagged for reading when there is nothing to read? - throw new TlsFailureException("Connection is being flagged as having input, but has none."); - }//else if// - - return result; -}//parseFirstTlsMessage()// -/** - * Simplification of the error throwing code. - */ -private void handleBrokenStream() throws IOException { - throw new IOException("Invalid TLS stream."); -}//handleBrokenStream()// +private boolean isCompleteHeader(StringBuffer buffer) { + return (buffer.length() > 4) && (buffer.charAt(buffer.length() - 4) == '\r') && (buffer.charAt(buffer.length() - 3) == '\n') && (buffer.charAt(buffer.length() - 2) == '\r') && (buffer.charAt(buffer.length() - 1) == '\n'); +}//isCompleteHeader()// /** * Locates the first pattern match in the source from the given offset. * @param source The source to be searched. @@ -2352,96 +2295,4 @@ private int indexOf(byte[] source, byte[] pattern, int fromOffset) { return result; }//indexOf()// -/* (non-Javadoc) - * @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite() - */ -protected boolean hasPendingWrite() { - return currentOutboundMessage != null || (encryptedWriteBuffer != null && encryptedWriteBuffer.hasRemaining()); -}//hasPendingWrite()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#upgradeToWebsocket(java.lang.String, long, com.foundation.web.interfaces.WebsocketHandler) - */ -public void upgradeToWebsocket(String protocol, long maxMessageLength, WebsocketHandler websocketHandler) { - if(!isWebsocket) { - this.isWebsocket = true; - this.websocketProtocol = protocol; - this.websocketMaxMessageLength = maxMessageLength; - this.websocketFrameHeader = new byte[14]; - this.websocketMessageMaskKey = new byte[4]; - this.websocketPendingMessages = new Queue(20); - this.websocketHandler = websocketHandler; - }//if// -}//upgradeToWebsocket()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#getWebsocketProtocol() - */ -public String getWebsocketProtocol() { - return websocketProtocol; -}//getWebsocketProtocol()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#isWebsocket() - */ -public boolean isWebsocket() { - return isWebsocket; -}//isWebsocket()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(byte[]) - */ -public void sendWebsocketMessage(byte[] message) { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(message); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// -}//sendWebsocketMessage()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(java.lang.String) - */ -public void sendWebsocketMessage(String message) { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(message); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// -}//sendWebsocketMessage()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketPing() - */ -public void sendWebsocketPing() { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(new Byte((byte) 0x9)); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// -}//sendWebsocketPing()// -/** - * Sends a PONG response to the client's ping. - */ -public void sendWebsocketPong() { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(new Byte((byte) 0xA)); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// -}//sendWebsocketPong()// -/* (non-Javadoc) - * @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(com.foundation.web.interfaces.IConnectionContext.IStreamedWebsocketMessage) - */ -public void sendWebsocketMessage(IStreamedWebsocketMessage message) { - if(isWebsocket) { - synchronized(websocketPendingMessages) { - websocketPendingMessages.enqueue(message); - }//synchronized// - - notifyListenerOfPendingWrite(); - }//if// -}//sendWebsocketMessage()// }//SocketContext// \ No newline at end of file diff --git a/Foundation Web Core/src/com/foundation/web/server/WebsocketMessageBuffer.java b/Foundation Web Core/src/com/foundation/web/server/WebsocketMessageBuffer.java index 1fc816e..238e0d5 100644 --- a/Foundation Web Core/src/com/foundation/web/server/WebsocketMessageBuffer.java +++ b/Foundation Web Core/src/com/foundation/web/server/WebsocketMessageBuffer.java @@ -12,140 +12,154 @@ class WebsocketMessageBuffer extends MessageBuffer { private ByteBuffer messagePart = null; /** The message to be sent. Will be null after the message buffer has initialized. */ private Object message = null; +/** + * WebsocketMessageBuffer constructor. + * @param socketContext The socket context associated with this message buffer. + * @param message The message to be sent. + */ +public WebsocketMessageBuffer(AbstractSocketContext socketContext, Object message) { + super(socketContext); + this.message = message; +}//WebsocketMessageBuffer()// +/* (non-Javadoc) + * @see com.foundation.web.server.MessageBuffer#initialize() + */ +public boolean initialize() { + if(getIsInitialized()) { + super.initialize(); + messagePart = stream(message, true); + message = null; + }//if// - public WebsocketMessageBuffer(AbstractSocketContext socketContext, Object message) { - super(socketContext); - this.message = message; - }//WebsocketMessageBuffer()// - public boolean initialize() { - if(message != null) { - messagePart = stream(message, true); - message = null; - }//if// - - return super.initialize(); - }//initialize()// - public boolean isClosed() { - return super.isClosed() && messagePart == null && streamingMessage == null; - }//isClosed()// - public void close() { - super.close(); - messagePart = null; - streamingMessage = null; - }//close()// - private ByteBuffer stream(Object next, boolean isLast) { - ByteBuffer result = null; - byte[] bytes = null; - int opCode = 0; - int length = 0; + return true; +}//initialize()// +/* (non-Javadoc) + * @see com.foundation.web.server.MessageBuffer#isClosed() + */ +public boolean isClosed() { + return super.isClosed() && messagePart == null && streamingMessage == null; +}//isClosed()// +/* (non-Javadoc) + * @see com.foundation.web.server.MessageBuffer#close() + */ +public void close() { + super.close(); + messagePart = null; + streamingMessage = null; +}//close()// +private ByteBuffer stream(Object next, boolean isLast) { + ByteBuffer result = null; + byte[] bytes = null; + int opCode = 0; + int length = 0; + + if(next instanceof String) { + try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} + opCode = streamingMessage == null ? 0x01 : 0; + length = bytes.length; + }//if// + else if(next instanceof byte[]) { + bytes = (byte[]) next; + opCode = streamingMessage == null ? 0x02 : 0; + length = bytes.length; + }//else if// + else if(next instanceof Byte) { //Control Message// + opCode = ((Byte) next).byteValue(); + }//else if// + else if(next instanceof IStreamedWebsocketMessage) { + //TODO: Ensure that this is not recursive! + streamingMessage = (IStreamedWebsocketMessage) next; + next = streamingMessage.getNextPart(); + isLast = !streamingMessage.hasNextPart(); if(next instanceof String) { try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} - opCode = streamingMessage == null ? 0x01 : 0; + opCode = 0x01; //Text// length = bytes.length; }//if// else if(next instanceof byte[]) { bytes = (byte[]) next; - opCode = streamingMessage == null ? 0x02 : 0; + opCode = 0x02; //Binary// length = bytes.length; }//else if// - else if(next instanceof Byte) { //Control Message// - opCode = ((Byte) next).byteValue(); - }//else if// - else if(next instanceof IStreamedWebsocketMessage) { - //TODO: Ensure that this is not recursive! - streamingMessage = (IStreamedWebsocketMessage) next; - next = streamingMessage.getNextPart(); - isLast = !streamingMessage.hasNextPart(); - - if(next instanceof String) { - try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);} - opCode = 0x01; //Text// - length = bytes.length; - }//if// - else if(next instanceof byte[]) { - bytes = (byte[]) next; - opCode = 0x02; //Binary// - length = bytes.length; - }//else if// - else { - throw new RuntimeException("Invalid streaming message part type."); - }//if// - }//else if// - - result = ByteBuffer.allocate(14 + length); - result.put((byte) (isLast ? 0x8 : 0)); - result.put((byte) opCode); - - //Write the length differently based on how long the content is.// - if(length < 126) { - result.put((byte) length); - }//if// - else if(length < 65535) { - result.put((byte) 126); - result.putShort((short) (length & 0xFFFF)); - result.putInt(0); - }//else if// else { - result.put((byte) 127); - result.putLong((long) length); - }//else// + throw new RuntimeException("Invalid streaming message part type."); + }//if// + }//else if// + + result = ByteBuffer.allocate(14 + length); + result.put((byte) (isLast ? 0x8 : 0)); + result.put((byte) opCode); + + //Write the length differently based on how long the content is.// + if(length < 126) { + result.put((byte) length); + }//if// + else if(length < 65535) { + result.put((byte) 126); + result.putShort((short) (length & 0xFFFF)); + result.putInt(0); + }//else if// + else { + result.put((byte) 127); + result.putLong((long) length); + }//else// + + //Put the content at the end of the message.// + result.put(bytes); + + return result; +}//stream()// +public boolean loadBuffer() { + boolean result = true; + + getBuffer().compact(); + + //Copy remaining bytes from MessagePart to the buffer.// + if(messagePart != null && messagePart.remaining() > 0) { + int length = Math.min(getBuffer().remaining(), messagePart.remaining()); + getBuffer().put(messagePart.array(), messagePart.position(), length); + messagePart.position(messagePart.position() + length); - //Put the content at the end of the message.// - result.put(bytes); + if(messagePart.remaining() == 0) { + messagePart = null; + }//if// + }//if// + + //Load a new message part if streaming and copy as many bytes from the MessagePart into the buffer as possible.// + if(messagePart == null && streamingMessage != null) { + Object next = null; + boolean isLastPart = true; - return result; - }//stream()// - public boolean loadBuffer() { - boolean result = true; + next = streamingMessage.getNextPart(); + isLastPart = !streamingMessage.hasNextPart(); - getBuffer().compact(); - - //Copy remaining bytes from MessagePart to the buffer.// - if(messagePart != null && messagePart.remaining() > 0) { - int length = Math.min(getBuffer().remaining(), messagePart.remaining()); - getBuffer().put(messagePart.array(), messagePart.position(), length); - messagePart.position(messagePart.position() + length); - - if(messagePart.remaining() == 0) { - messagePart = null; - }//if// + //Ensure that we got a string or byte array.// + if(!(next instanceof String || next instanceof byte[])) { + throw new RuntimeException("Invalid streaming message part type."); }//if// - //Load a new message part if streaming and copy as many bytes from the MessagePart into the buffer as possible.// - if(messagePart == null && streamingMessage != null) { - Object next = null; - boolean isLastPart = true; - - next = streamingMessage.getNextPart(); - isLastPart = !streamingMessage.hasNextPart(); - - //Ensure that we got a string or byte array.// - if(!(next instanceof String || next instanceof byte[])) { - throw new RuntimeException("Invalid streaming message part type."); - }//if// - - messagePart = stream(next, isLastPart); - - if(isLastPart) { - streamingMessage = null; - }//if// - - int length = Math.min(getBuffer().remaining(), messagePart.remaining()); - getBuffer().put(messagePart.array(), messagePart.position(), length); - messagePart.position(messagePart.position() + length); - - if(messagePart.remaining() == 0) { - messagePart = null; - }//if// + messagePart = stream(next, isLastPart); + + if(isLastPart) { + streamingMessage = null; }//if// - //Close the message buffer if we were unable to add any bytes and there is nothing left to send.// - if(getBuffer().remaining() == 0) { - close(); - result = false; - }//if// + int length = Math.min(getBuffer().remaining(), messagePart.remaining()); + getBuffer().put(messagePart.array(), messagePart.position(), length); + messagePart.position(messagePart.position() + length); - return result; - }//loadBuffer()// + if(messagePart.remaining() == 0) { + messagePart = null; + }//if// + }//if// + + //Close the message buffer if we were unable to add any bytes and there is nothing left to send.// + if(getBuffer().remaining() == 0) { + close(); + result = false; + }//if// + + return result; +}//loadBuffer()// }//WebsocketMessageBuffer// \ No newline at end of file