Merged many of the changes from the first attempt at an HTML5 Websocket implementation. Focused on the ones that didn't change the behavior of the web server.

This commit is contained in:
wcrisman
2014-12-07 16:12:29 -08:00
parent 46df41d433
commit 2adcfad863
13 changed files with 525 additions and 4 deletions

View File

@@ -61,6 +61,7 @@ import com.common.util.IIterator;
import com.common.util.LiteHashMap;
import com.common.util.LiteHashSet;
import com.common.util.LiteList;
import com.common.util.Queue;
import com.common.util.StreamBuffer;
import com.common.util.StringSupport;
import com.common.util.optimized.IntObjectHashMap;
@@ -285,6 +286,8 @@ public class WebServer {
private abstract class AbstractSocketContext extends ChannelContext {
/** The key that represents the connection between the channel (socket) and the selector used to multiplex the listener. */
public SelectionKey key = null;
/** Whether the socket is currently being used by a thread designated by the network listener thread to read or write to the socket. Currently the socket type we use in Java only allows one thread to read and write at a time. Note: Always synchronize on <code>key</code> before using this attribute. */
private boolean isUsed = false;
/** The SelectionKey flags that will be used when the NetworkListener thread finishes processing the input/output of a message. The thread can safely change these flags without synchronizing. */
private int flags = 0;
/** A socket context related to this one (when two are tied together such that data from one immediately is sent to the other). */
@@ -443,6 +446,23 @@ public class WebServer {
flags = SelectionKey.OP_READ;
}//synchronized//
}//flagReadOnly()//
/**
* Called to notify the network listener that a pending write operation exists for this socket.
*/
protected void notifyListenerOfPendingWrite() {
synchronized(key) {
//Ignore if a thread is using this socket currently since all operation flags will be set at the end of the use of the socket.//
if(!isUsed) {
int ops = key.interestOps();
boolean hasWrite = (ops & SelectionKey.OP_WRITE) != 0;
if(!hasWrite) {
key.interestOps(ops | SelectionKey.OP_WRITE);
key.selector().wakeup();
}//if//
}//if//
}//synchronized//
}//notifyListenerOfPendingWrite()//
}//AbstractSocketContext//
private static class RegisterKeyRunnable implements Runnable {
@@ -711,6 +731,34 @@ public class WebServer {
private LiteHashMap applicationDataMap;
/** Used to identify the first unencrypted message (ignored if ssl is being used) so that forwarding to a remote server can be accomplished. */
private boolean isFirstUnencryptedMessage = true;
/** Whether this is a websocket connection. */
private boolean isWebsocket = false;
/** The protocol passed when this connection was upgraded to a websocket. */
private String websocketProtocol = null;
/** The maximum number of bytes in an allowed websocket message (may be composed of multiple frames, does not include the frame header sizes). While this is a long, we really can't read longer than Integer.MAX_VALUE sized frames. So the message might be within size limits, but it might still be rejected if the frame exceeds the server's capacity to read a frame. */
private long websocketMaxMessageLength = 0;
/** The reusable frame header buffer. */
private byte[] websocketFrameHeader = null;
/** The index into the frame header of the last read byte. */
private int websocketFrameHeaderIndex = 0;
/** The next partial message (single messages may be broken into frames by the client). */
private StreamBuffer websocketPartialReceivedMessage = null;
/** The remaining bytes to be read on the current message frame (the frame header and part of the frame was read previously). */
private int websocketFrameRemainder = 0;
/** Whether the frame currently being read in the current message being received is the last frame in the message (when the frame is fully read it will trigger the message to be processed). */
private boolean websocketIsLastFrameInMessage = false;
/** The op code for the currently reading message, or zero if the last message was completed. */
private int websocketMessageOpCode = 0;
/** The currently reading frame's mask key used to decode the frame data. */
private byte[] websocketMessageMaskKey = null;
/** The queue used to hold outgoing messages before they are sent. Will contain only String, byte[], or IConnectionContext.IStreamedWebsocketMessage instances. */
private Queue websocketPendingMessages = null;
/** The streambuffer for the currently sending message. This will be for the current part of the streaming message if websocketStreamingMessage is non-null. */
private ByteBuffer websocketSendingMessage = null;
/** The streaming message handler which will be set only if the currently sending message is streaming. */
private IStreamedWebsocketMessage websocketStreamingMessage = null;
/** The application specified handler called when websocket events occur (messages received, socket closed, etc). */
private WebsocketHandler websocketHandler = null;
public SocketContext(ServerSocketContext serverSocketContext) {
super();
@@ -732,6 +780,16 @@ public class WebServer {
return this;
}//getLock()//
protected synchronized void close() {
try {
if(websocketHandler != null) {
websocketHandler.connectionClosed();
websocketHandler = null;
}//if//
}//try//
catch(Throwable e) {
Debug.log(e);
}//catch//
try {
if(applicationDataMap != null) {
for(IIterator iterator = applicationDataMap.valueIterator(); iterator.hasNext(); ) {
@@ -1134,6 +1192,14 @@ public class WebServer {
// flagWrite(pendingOutboundMessage != null);
}//synchronized//
}//if//
else if(isWebsocket) {
//Right after upgrading the socket we have one last HTTP response to process.//
if(currentResponse != null) {
internalProcessResponses();
}//if//
internalProcessWebsocketMessages();
}//else if//
else {
//Go directly to writing the client response if we are just passing everything through to another process.//
boolean receive = internalProcessResponses();
@@ -1147,6 +1213,130 @@ public class WebServer {
}//else//
}//else//
}//processCurrentResponse()//
/**
* Loads the next outbound websocket message and attempts to write it to the socket until all outbound messages have been sent, or the socket's buffers are full and a wait is required.
* If a message could only be partially sent then the next call will attempt to finish sending it.
*/
private void internalProcessWebsocketMessages() {
if(websocketSendingMessage == null) {
loadNextWebsocketMessage();
}//if//
while(websocketSendingMessage != null) {
//If the socket is open then send the next buffer of data.//
if(key.channel().isOpen()) {
if(pendingOutboundMessage != null) {
//Put the sending message in a MessageBuffer (pendingOutboundMessage).//
pendingOutboundMessage = new MessageBuffer(websocketSendingMessage);
}//if//
//Write the pendingOutboundMessage to the socket.//
if(writeClientResponse()) {
websocketSendingMessage = null;
pendingOutboundMessage = null;
}//if//
}//if//
//If we finished sending the message then load the next one.//
if(websocketSendingMessage == null) {
loadNextWebsocketMessage();
}//if//
}//while//
}//internalProcessWebsocketMessages()//
/**
* Loads and prepares the next websocket message from the queue of pending messages.
* Clears the pending message attributes if there isn't a pending message to be processed.
* The caller can check websocketSendingMessage == null to see if there is a ready message.
*/
private void loadNextWebsocketMessage() {
Object next = null;
boolean isLastPart = true;
if(websocketStreamingMessage != null && websocketStreamingMessage.hasNextPart()) {
next = websocketStreamingMessage.getNextPart();
isLastPart = !websocketStreamingMessage.hasNextPart();
//Ensure that we got a string or byte array.//
if(!(next instanceof String || next instanceof byte[])) {
throw new RuntimeException("Invalid streaming message part type.");
}//if//
}//if//
else {
synchronized(websocketPendingMessages) {
if(websocketPendingMessages.getSize() > 0) {
next = websocketPendingMessages.dequeue();
}//if//
}//synchronized//
}//else//
if(next != null) {
byte[] bytes = null;
int opCode = 0;
int length = 0;
if(next instanceof String) {
try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);}
opCode = websocketStreamingMessage == null ? 0x01 : 0;
length = bytes.length;
}//if//
else if(next instanceof byte[]) {
bytes = (byte[]) next;
opCode = websocketStreamingMessage == null ? 0x02 : 0;
length = bytes.length;
}//else if//
else if(next instanceof Byte) { //Control Message//
opCode = ((Byte) next).byteValue();
}//else if//
else if(next instanceof IStreamedWebsocketMessage) {
websocketStreamingMessage = (IStreamedWebsocketMessage) next;
next = websocketStreamingMessage.getNextPart();
isLastPart = !websocketStreamingMessage.hasNextPart();
if(next instanceof String) {
try {bytes = ((String) next).getBytes("UTF-8");} catch(Throwable e) {Debug.log(e);}
opCode = 0x01; //Text//
length = bytes.length;
}//if//
else if(next instanceof byte[]) {
bytes = (byte[]) next;
opCode = 0x02; //Binary//
length = bytes.length;
}//else if//
else {
throw new RuntimeException("Invalid streaming message part type.");
}//if//
}//else if//
websocketSendingMessage = ByteBuffer.allocate(14 + length);
websocketSendingMessage.put((byte) (isLastPart ? 0x8 : 0));
websocketSendingMessage.put((byte) opCode);
//Write the length differently based on how long the content is.//
if(length < 126) {
websocketSendingMessage.put((byte) length);
// websocketSendingMessage.putLong(0, StreamBuffer.NUMBER_MSF);
}//if//
else if(length < 65535) {
websocketSendingMessage.put((byte) 126);
websocketSendingMessage.putShort((short) (length & 0xFFFF));
// websocketSendingMessage.putShort((short) 0);
websocketSendingMessage.putInt(0);
}//else if//
else {
websocketSendingMessage.put((byte) 127);
websocketSendingMessage.putLong((long) length);
}//else//
//The server doesn't use a mask key.//
// websocketSendingMessage.putInt(0);
//Put the content at the end of the message.//
websocketSendingMessage.put(bytes);
}//if//
else {
websocketSendingMessage = null;
websocketStreamingMessage = null;
}//else//
}//loadNextWebsocketMessage()//
/**
* @return
*/
@@ -1746,6 +1936,98 @@ public class WebServer {
// flagWriteOnly();
}//else//
}//processRequest()//
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite()
*/
protected boolean hasPendingWrite() {
return pendingOutboundMessage != null;
}//hasPendingWrite()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#upgradeToWebsocket(java.lang.String, long, com.foundation.web.interfaces.WebsocketHandler)
*/
public void upgradeToWebsocket(String protocol, long maxMessageLength, WebsocketHandler websocketHandler) {
if(!isWebsocket) {
this.isWebsocket = true;
this.websocketProtocol = protocol;
this.websocketMaxMessageLength = maxMessageLength;
this.websocketFrameHeader = new byte[14];
this.websocketMessageMaskKey = new byte[4];
this.websocketPendingMessages = new Queue(20);
this.websocketHandler = websocketHandler;
}//if//
}//upgradeToWebsocket()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#getWebsocketProtocol()
*/
public String getWebsocketProtocol() {
return websocketProtocol;
}//getWebsocketProtocol()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#isWebsocket()
*/
public boolean isWebsocket() {
return isWebsocket;
}//isWebsocket()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(byte[])
*/
public void sendWebsocketMessage(byte[] message) {
if(isWebsocket) {
synchronized(websocketPendingMessages) {
websocketPendingMessages.enqueue(message);
}//synchronized//
notifyListenerOfPendingWrite();
}//if//
}//sendWebsocketMessage()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(java.lang.String)
*/
public void sendWebsocketMessage(String message) {
if(isWebsocket) {
synchronized(websocketPendingMessages) {
websocketPendingMessages.enqueue(message);
}//synchronized//
notifyListenerOfPendingWrite();
}//if//
}//sendWebsocketMessage()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketPing()
*/
public void sendWebsocketPing() {
if(isWebsocket) {
synchronized(websocketPendingMessages) {
websocketPendingMessages.enqueue(new Byte((byte) 0x9));
}//synchronized//
notifyListenerOfPendingWrite();
}//if//
}//sendWebsocketPing()//
/**
* Sends a PONG response to the client's ping.
*/
public void sendWebsocketPong() {
if(isWebsocket) {
synchronized(websocketPendingMessages) {
websocketPendingMessages.enqueue(new Byte((byte) 0xA));
}//synchronized//
notifyListenerOfPendingWrite();
}//if//
}//sendWebsocketPong()//
/* (non-Javadoc)
* @see com.foundation.web.interfaces.IConnectionContext#sendWebsocketMessage(com.foundation.web.interfaces.IConnectionContext.IStreamedWebsocketMessage)
*/
public void sendWebsocketMessage(IStreamedWebsocketMessage message) {
if(isWebsocket) {
synchronized(websocketPendingMessages) {
websocketPendingMessages.enqueue(message);
}//synchronized//
notifyListenerOfPendingWrite();
}//if//
}//sendWebsocketMessage()//
}//SocketContext//
/**