Reorganized SocketContext code so that methods are in the correct order to compare with the master branch. Added code back in and updated code as necessary to be as identical as possible to the master branch for the Websocket related methods and classes. Updated PassThroughSocketContext to be as identical as possible to the master branch. Updated NetworkListener to be as identical as possible to the master branch. Commented a number of TODO's in SocketContext for turning on bits of code that might cause problems, but will bring this branch up to speed with the master branch.

This commit is contained in:
wcrisman
2014-12-29 10:35:10 -08:00
parent 5aece2300e
commit b6d57986f2
6 changed files with 722 additions and 779 deletions

View File

@@ -28,8 +28,6 @@ public abstract class AbstractSocketContext implements IChannelContext {
private final NetworkListener networkListener;
/** The debug ID for the socket. */
private final int id;
/** The web server that created the socket context. */
protected WebServer webServer = null;
/** The key that represents the connection between the channel (socket) and the selector used to multiplex the listener. The code must synchronize on this attribute when accessing the isUsed functionality, or when interacting with the key's interestOps. */
protected SelectionKey key = null;
/** Whether the socket is currently being used by a thread designated by the network listener thread to read or write to the socket. Currently the socket type we use in Java only allows one thread to read and write at a time. Note: Always synchronize on <code>key</code> before using this attribute. */
@@ -75,13 +73,8 @@ public WebServer getWebServer() {return networkListener.getWebServer();}
public boolean getIsUsed() {return isUsed;}
/** Sets whether the socket context is currently in use by a thread. */
public void setIsUsed(boolean isUsed) {this.isUsed = isUsed;}
/** Gets the debug id for the socket. */
public int getId() {return id;}
/**
* Gets the lockable (synchronizable) object for this context. For contexts with a related context, only one of the two will be returned, such that a single synchronize block covers both contexts.
* @return The object to synchronize on such that two threads don't attempt to interact with the context at the same time (AsynchronousSocketChannel required for that).
*/
protected abstract Object getLock();
/** Gets the selection key used for this socket context by the Listener to identify when bytes are incoming or available for outgoing on the socket. */
public SelectionKey getKey() {return key;}
/**
* Writes the next responses/messages in the sequence.
* @throws IOException
@@ -97,7 +90,7 @@ protected abstract void readIncomingMessages() throws IOException;
* @param buffer The buffer containing the message. This buffer will not be retained by this method call, and can be reused by the caller.
* @return Whether the whole message was transfered.
*/
protected abstract boolean passThrough(ByteBuffer buffer);
protected abstract void passThrough(ByteBuffer buffer);
/**
* Closes the socket context and cleans up.
*/
@@ -128,4 +121,21 @@ protected void notifyListenerOfPendingWrite() {
}//if//
}//synchronized//
}//notifyListenerOfPendingWrite()//
/**
* Gets the lockable (synchronizable) object for this context. For contexts with a related context, only one of the two will be returned, such that a single synchronize block covers both contexts.
* @return The object to synchronize on such that two threads don't attempt to interact with the context at the same time (AsynchronousSocketChannel required for that).
*/
protected Object getLock() {
return key;
}//getLock()//
/**
* Gets the related socket's selection key. This will be the key not returned by a call to getLock(), or will be null if there isn't a related socket context.
* Allows for PassThroughSocketContext which links to a SocketContext to pass all data through to a remote process. This will always return the PassThroughSocketContext's selection key.
* @return The selection key for the PassThroughSocketContext if one exists for this SocketContext (or if this is a PassThroughSocketContext).
*/
public SelectionKey getRelatedSocketContextKey() {
return null;
}//getRelatedSocketContextKey()//
/** Gets the debug id for the socket. */
public int getId() {return id;}
}//AbstractSocketContext//

View File

@@ -146,13 +146,6 @@ public void run() {
socketChannel.socket().setSendBufferSize(AbstractSocketContext.SEND_BUFFER_SIZE);
socketChannel.socket().setReceiveBufferSize(AbstractSocketContext.RECEIVE_BUFFER_SIZE);
socketContext.key = socketChannel.register(selector, SelectionKey.OP_READ, socketContext);
socketContext.serverSocketContext = serverSocketContext;
//Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort());
if(serverSocketContext.serviceListener.type != IServiceListener.TYPE_SSL) {
socketContext.socketReadBuffer = ByteBuffer.allocate(AbstractSocketContext.BUFFER_SIZE);
}//if//
if(getWebServer().debug()) {
Debug.log("Connection opened to " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort());

View File

@@ -3,8 +3,13 @@ package com.foundation.web.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.ssl.SSLException;
import com.common.debug.Debug;
import com.foundation.web.server.WebServer.RegisterKeyRunnable;
/**
@@ -12,8 +17,8 @@ import com.foundation.web.server.WebServer.RegisterKeyRunnable;
* Allows the web server to act as an SSL front to another web server or service.
*/
public class PassThroughSocketContext extends AbstractSocketContext {
private MessageBuffer pendingMessageBuffer = null;
private MessageBuffer lastAddedMessageBuffer = null;
private MessageBuffer currentOutboundMessage = null;
private MessageBuffer lastOutboundMessage = null;
/** The byte buffer used to read data from the socket. */
public ByteBuffer socketReadBuffer = ByteBuffer.allocate(BUFFER_SIZE);
/**
@@ -52,68 +57,64 @@ public PassThroughSocketContext(SocketContext linkedClientContext, String addres
* @see com.foundation.web.server.WebServer.AbstractSocketContext#getLock()
*/
protected Object getLock() {
return getRelatedSocketContext();
return getRelatedSocketContext().getLock();
}//getLock()//
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses()
* @see com.foundation.web.server.WebServer.AbstractSocketContext#getRelatedSocketContextKey()
*/
protected synchronized void writeOutgoingMessages() throws IOException {
//Actually this is called when a request is being sent via the pass through socket (sending the request to the remote server).//
//Synchronized to avoid accessing the pendingMessageBuffer and lastAddedMessageBuffer at the same time as a thread that is calling passThrough(ByteBuffer) which also accesses these variables.//
boolean result = true;
if(result && pendingMessageBuffer != null) {
//Check to see if the outbound message is prepared to send more content.//
if(!pendingMessageBuffer.getBuffer().hasRemaining()) {
//Load the next pending outbound message in the chain.//
if(pendingMessageBuffer.getNext() != null) {
pendingMessageBuffer = pendingMessageBuffer.getNext();
}//if//
else {
//Wait until additional message bytes are available.//
result = false;
pendingMessageBuffer = null;
lastAddedMessageBuffer = null;
}//else//
}//if//
//Keep sending encrypted frames until the output buffer is full, or we run out of message to send.//
while(result && (pendingMessageBuffer != null) && pendingMessageBuffer.getBuffer().hasRemaining()) {
//Write the bytes to the stream.//
((SocketChannel) key.channel()).write(pendingMessageBuffer.getBuffer());
//If not all the bytes could be written then we will need to wait until we can write more.//
if(pendingMessageBuffer.getBuffer().hasRemaining()) {
result = false;
}//if//
else {
//Load the next pending outbound message in the chain.//
if(pendingMessageBuffer.getNext() != null) {
pendingMessageBuffer = pendingMessageBuffer.getNext();
}//if//
else {
//Wait until additional message bytes are available.//
result = false;
pendingMessageBuffer = null;
lastAddedMessageBuffer = null;
}//else//
}//else//
}//while//
}//if//
}//processResponses()//
public SelectionKey getRelatedSocketContextKey() {
return key;
}//getRelatedSocketContextKey()//
/** Gets whether the socket context is in use (reading or writing) such that the network listener doesn't attempt to read and write at the same time (not supported by Java's NIO sockets). Should only ever be used by the NetworkListener. */
public boolean isUsed() {return getRelatedSocketContext().getIsUsed();}
/** Sets whether the socket context is in use (reading or writing) such that the network listener doesn't attempt to read and write at the same time (not supported by Java's NIO sockets). Should only ever be used by the NetworkListener. */
public void isUsed(boolean isUsed) {getRelatedSocketContext().setIsUsed(isUsed);}
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processRequest()
* @see com.foundation.web.server.WebServer.AbstractSocketContext#writeOutgoingMessages()
*/
protected void writeOutgoingMessages() throws IOException {
boolean keepSending;
MessageBuffer outboundMessage;
SocketChannel channel;
//Synchronized to avoid multiple threads accessing the currentOutboundMessage chain at one time.//
synchronized(getLock()) {
outboundMessage = currentOutboundMessage;
channel = (SocketChannel) key.channel();
keepSending = hasPendingWrite() && channel.isOpen();
}//synchronized//
//Keep sending responses while the buffers are not full and there is another response to send.//
while(keepSending) {
//Send the pending response object's prepared buffer of data.//
writeClientBoundMessage(channel, outboundMessage);
//Synchronized to avoid multiple threads accessing the currentOutboundMessage chain at one time.//
synchronized(getLock()) {
//If we finished the message then load the next message, otherwise flag that we need to stop sending (buffers full - flag a write on the socket's key and wait).//
if(currentOutboundMessage != null && currentOutboundMessage.isClosed()) {
currentOutboundMessage = currentOutboundMessage.getNext();
keepSending = hasPendingWrite() && key.channel().isOpen();
outboundMessage = currentOutboundMessage;
}//if//
else {
keepSending = false;
}//else//
}//synchronized//
}//while//
}//writeOutgoingMessages()//
/* (non-Javadoc)
* @see com.foundation.web.server.AbstractSocketContext#readIncomingMessages()
*/
protected void readIncomingMessages() throws IOException {
//Actually this is called when a response is being processed via the pass through socket (remote process that received the request).//
int count = 1;
int loopCount = 0;
boolean result = true;
SocketChannel channel = (SocketChannel) key.channel();
//While we have a count greater than zero, indicating that some data is comming through, keep reading and processing the data.//
//Note: We are throddling this for active connections to prevent a single connection from hogging all the resources.//
while(loopCount < 10 && result && count > 0) {
while(loopCount < 10 && count > 0) {
loopCount++;
count = channel.read(socketReadBuffer);
socketReadBuffer.flip();
@@ -123,7 +124,7 @@ protected void readIncomingMessages() throws IOException {
try {relatedSocketContext.close();} catch(Throwable e) {}
}//if//
else if(socketReadBuffer.hasRemaining()) {
result = relatedSocketContext.passThrough(socketReadBuffer);
relatedSocketContext.passThrough(socketReadBuffer);
socketReadBuffer.compact();
}//else//
else {
@@ -131,38 +132,112 @@ protected void readIncomingMessages() throws IOException {
break;
}//else//
}//while//
}//processRequest()//
}//readIncomingMessages()//
/**
* Sends a message/response to the client.
* @param channel The channel to use for the sending.
* @param currentOutboundMessage The message to be sending. This may be null if using SSL and sending handshake data (in which case the encryptedWriteBuffer should have data on it).
* @return Whether the response could be fully sent. This will be false if there is still more data to be written for the current message OR in the encrypted SSL write buffer (for SSL handshake messages) when the call returns.
*/
private boolean writeClientBoundMessage(SocketChannel channel, MessageBuffer currentOutboundMessage) {
boolean sendMore = true;
// if(debug) {
// debugBuffer.append("Starting a write cycle.\n");
// }//if//
try {
//If we can send more data and we have an outbound message then initialize and send it! The currentOutboundMessage might be null if performing an SSL handshake.//
if(sendMore && currentOutboundMessage != null && !currentOutboundMessage.isClosed()) {
//Initialize the outbound message.//
if(!currentOutboundMessage.initialize()) {
getRelatedSocketContext().close();
}//if//
else {
currentOutboundMessage.loadBuffer();
//If we have an application response pending then send it now.//
if(sendMore && !currentOutboundMessage.isClosed()) {
//Keep sending encrypted frames until the output buffer is full, or we run out of message to send.//
while(sendMore && !currentOutboundMessage.isClosed()) {
//Write the bytes to the stream.//
channel.write(currentOutboundMessage.getBuffer());
// if(debug) {
// sentBytes += pendingOutboundMessage.position();
// debugBuffer.append("Wrote " + pendingOutboundMessage.position() + " bytes to the client. Total sent: " + sentBytes + "\n");
// }//if//
//If not all the bytes could be written then we will need to wait until we can write more.//
if(currentOutboundMessage.getBuffer().hasRemaining()) {
sendMore = false;
}//if//
else {
sendMore = currentOutboundMessage.loadBuffer();
}//else//
}//while//
}//if//
}//else//
}//if//
}//try//
catch(ClosedChannelException e) {
getRelatedSocketContext().close();
}//catch//
catch(SSLException e) {
if(getWebServer().debug()) {
Debug.log(e);
}//if//
close();
}//catch//
catch(IOException e) {
if(getWebServer().debug()) {
Debug.log(e);
}//if//
getRelatedSocketContext().close();
}//catch//
//Return true if the current outbound message was fully sent and any SSL handshake messages were fully sent.//
return (currentOutboundMessage == null || currentOutboundMessage.isClosed());
}//writeClientBoundMessages()//
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#passThrough(java.nio.ByteBuffer)
*/
protected synchronized boolean passThrough(ByteBuffer buffer) {
// ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining());
// MessageBuffer message;
//
// //Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).//
// messageBytes = ByteBuffer.allocate(buffer.remaining());
// messageBytes.put(buffer);
// message = new MessageBuffer(messageBytes);
//
// //Chain the message into the linked list.
// if(lastAddedMessageBuffer == null) {
// pendingMessageBuffer = lastAddedMessageBuffer = message;
// }//if//
// else {
// lastAddedMessageBuffer.setNext(message);
// lastAddedMessageBuffer = message;
// }//else//
protected void passThrough(ByteBuffer buffer) {
synchronized(getLock()) {
ByteBuffer messageBytes = ByteBuffer.allocate(buffer.remaining());
MessageBuffer message;
return true;
//Create a new buffer to hold the data so we don't modify the passed buffer (other than to update its position).//
messageBytes = ByteBuffer.allocate(buffer.remaining());
messageBytes.put(buffer);
message = new MessageBuffer(this, messageBytes);
//Chain the message into the linked list.
if(lastOutboundMessage == null) {
currentOutboundMessage = lastOutboundMessage = message;
}//if//
else {
lastOutboundMessage.setNext(message);
lastOutboundMessage = message;
}//else//
}//synchronized//
}//passThrough()//
protected synchronized void close() {
try {if(key != null && key.channel() != null) key.channel().close();} catch(Throwable e) {}
try {if(key != null) key.cancel();} catch(Throwable e) {}
synchronized(getLock()) {
if(key != null) {
synchronized(key) {
try {if(key.channel() != null) key.channel().close();} catch(Throwable e) {}
try {key.cancel();} catch(Throwable e) {}
}//synchronized//
}//if//
}//synchronized//
}//close()//
/* (non-Javadoc)
* @see com.foundation.web.server.WebServer.AbstractSocketContext#hasPendingWrite()
*/
protected boolean hasPendingWrite() {
return pendingMessageBuffer != null;
return currentOutboundMessage != null;
}//hasPendingWrite()//
}//PassThroughSocketContext//

View File

@@ -6,7 +6,7 @@ import com.foundation.web.server.WebServer.ServiceListener;
* Provides a place connection oriented data.
*/
public class ServerSocketContext implements IChannelContext {
public ServiceListener serviceListener = null;
private ServiceListener serviceListener = null;
public ServerSocketContext(ServiceListener serviceListener) {
super();

View File

@@ -12,28 +12,42 @@ class WebsocketMessageBuffer extends MessageBuffer {
private ByteBuffer messagePart = null;
/** The message to be sent. Will be null after the message buffer has initialized. */
private Object message = null;
public WebsocketMessageBuffer(AbstractSocketContext socketContext, Object message) {
/**
* WebsocketMessageBuffer constructor.
* @param socketContext The socket context associated with this message buffer.
* @param message The message to be sent.
*/
public WebsocketMessageBuffer(AbstractSocketContext socketContext, Object message) {
super(socketContext);
this.message = message;
}//WebsocketMessageBuffer()//
public boolean initialize() {
if(message != null) {
}//WebsocketMessageBuffer()//
/* (non-Javadoc)
* @see com.foundation.web.server.MessageBuffer#initialize()
*/
public boolean initialize() {
if(getIsInitialized()) {
super.initialize();
messagePart = stream(message, true);
message = null;
}//if//
return super.initialize();
}//initialize()//
public boolean isClosed() {
return true;
}//initialize()//
/* (non-Javadoc)
* @see com.foundation.web.server.MessageBuffer#isClosed()
*/
public boolean isClosed() {
return super.isClosed() && messagePart == null && streamingMessage == null;
}//isClosed()//
public void close() {
}//isClosed()//
/* (non-Javadoc)
* @see com.foundation.web.server.MessageBuffer#close()
*/
public void close() {
super.close();
messagePart = null;
streamingMessage = null;
}//close()//
private ByteBuffer stream(Object next, boolean isLast) {
}//close()//
private ByteBuffer stream(Object next, boolean isLast) {
ByteBuffer result = null;
byte[] bytes = null;
int opCode = 0;
@@ -95,8 +109,8 @@ class WebsocketMessageBuffer extends MessageBuffer {
result.put(bytes);
return result;
}//stream()//
public boolean loadBuffer() {
}//stream()//
public boolean loadBuffer() {
boolean result = true;
getBuffer().compact();
@@ -147,5 +161,5 @@ class WebsocketMessageBuffer extends MessageBuffer {
}//if//
return result;
}//loadBuffer()//
}//loadBuffer()//
}//WebsocketMessageBuffer//