Changed the SocketContext.writeOutgoingMessages() to copy the code in the master tree. Changed the SSL reading code to call the new writeClientBoundMessage() with the proper parameters. Removed internalProcessResponses().

This commit is contained in:
wcrisman
2014-12-28 20:33:03 -08:00
parent bd3e9ac5cc
commit 840fa8dd89

View File

@@ -71,9 +71,9 @@ public class SocketContext extends AbstractSocketContext implements IWebApplicat
/** Whether the final part boundary has already been found. If this is true then the message bytes should keep getting read until the partReadCount == contentLength. */
public boolean endPartBoundaryFound = false;
/** The bytes containing the unencrypted outbound message that is waiting for the socket to allow a write. */
public MessageBuffer/*ByteBuffer*/ currentOutboundMessage = null;
protected MessageBuffer currentOutboundMessage = null;
/** The last message buffer added to the pending outbound message chain (linked list). Used only for pass through contexts currently since locally handled messages link the reponses together into a list. */
private MessageBuffer lastOutboundMessage = null;
protected MessageBuffer lastOutboundMessage = null;
/** The byte buffer used to read data from the socket. This must be null if a SSL engine is provided. */
public ByteBuffer socketReadBuffer = null;
/** The buffer used to store the initial data in a SSL/TLS connection. The buffering is necessary to allow us to pre-read the client's hello message to gather the domain the client is connecting to - allowing for the correct SSL engine to be used. */
@@ -579,6 +579,97 @@ protected synchronized boolean passThrough(ByteBuffer buffer) {
* @see com.foundation.web.server.WebServer.AbstractSocketContext#processResponses()
*/
protected void writeOutgoingMessages() throws IOException {
boolean keepSending;
MessageBuffer outboundMessage;
SocketChannel channel;
//Synchronized to avoid multiple threads accessing the currentOutboundMessage chain at one time.//
synchronized(getLock()) {
outboundMessage = currentOutboundMessage;
channel = (SocketChannel) key.channel();
keepSending = hasPendingWrite() && channel.isOpen();
}//synchronized//
while(keepSending) {
boolean messageSent;
if(getWebServer().debug()) {
Debug.log(this.getId() + "|" + System.nanoTime() + "|Write a client bound message.");
}//if//
messageSent = writeClientBoundMessage(channel, outboundMessage);
if(getWebServer().debug()) {
Debug.log(this.getId() + "|" + System.nanoTime() + "|Check on the status of the sent message.");
}//if//
//Synchronized to avoid multiple threads accessing the currentOutboundMessage chain at one time.//
synchronized(getLock()) {
if(currentOutboundMessage != outboundMessage) {
Debug.log(new RuntimeException("Cannot change the currentOutboundMessage while we are sending it!"));
}//if//
//If we finished the message then load the next message, otherwise flag that we need to stop sending (buffers full - flag a write on the socket's key and wait).//
if(messageSent || (currentOutboundMessage != null && currentOutboundMessage.isClosed())) {
if(getWebServer().debug()) {
Debug.log(this.getId() + "|" + System.nanoTime() + "|The sent message was fully sent.");
}//if//
//Close the message if possible.//
try {currentOutboundMessage.close();} catch(Throwable e) {}
//Load the next available message.//
// currentOutboundMessage = currentOutboundMessage.getNext();
currentOutboundMessage = null;
if(currentOutboundMessage == null) lastOutboundMessage = null;
keepSending = hasPendingWrite() && key.channel().isOpen();
outboundMessage = currentOutboundMessage;
if(getWebServer().debug()) {
Debug.log(this.getId() + "|" + System.nanoTime() + "| Channel is open? " + key.channel().isOpen() + "; More message available? " + hasPendingWrite() + "; EncryptedWriteBuffer.remaining? " + encryptedWriteBuffer.remaining() + ".");
}//if//
}//if//
else {
if(getWebServer().debug()) {
Debug.log(this.getId() + "|" + System.nanoTime() + "|The sent message was only partially sent, stop sending for now.");
}//if//
keepSending = false;
}//else//
}//synchronized//
}//while//
/* From internalProcessResponses:
boolean doneSending = false;
//Keep sending responses while the buffers are not full and there is another response to send.//
while(!doneSending) {
boolean messageSent = true;
//If the socket is open then send the next buffer of data.//
if(key.channel().isOpen()) {
//Send the pending response object's prepared buffer of data.//
messageSent = writeClientBoundMessage();
}//if//
//Close the response if successfully sent, or if the socket is closed.//
if((messageSent || !key.channel().isOpen()) && currentOutboundMessage != null) {
try {currentOutboundMessage.close();} catch(Throwable e) {}
}//if//
//If we finished sending the current response then load the next one.//
if(messageSent || (currentOutboundMessage != null && currentOutboundMessage.isClosed())) {
//TODO: Queue up the next outbound message.
currentOutboundMessage = null;
lastOutboundMessage = null;
}//if//
if(currentOutboundMessage == null) {
doneSending = true;
}//if//
}//while//
*/
/* Original code for writeOutgoingMessages:
if(getPassThroughSocketContext() != null) {
// //Synchronized to avoid multiple threads accessing the pendingOutboundMessage chain at one time and updating the write flag out of order (could happen if we enabled request chaining over a single socket).//
// synchronized(this) {
@@ -597,12 +688,14 @@ protected void writeOutgoingMessages() throws IOException {
//Go directly to writing the client response if we are just passing everything through to another process.//
internalProcessResponses();
}//else//
*/
}//processCurrentResponse()//
/**
* Loads the next outbound websocket message and attempts to write it to the socket until all outbound messages have been sent, or the socket's buffers are full and a wait is required.
* If a message could only be partially sent then the next call will attempt to finish sending it.
*/
private void internalProcessWebsocketMessages() {
/*
if(websocketSendingMessage == null) {
loadNextWebsocketMessage();
}//if//
@@ -627,6 +720,7 @@ private void internalProcessWebsocketMessages() {
loadNextWebsocketMessage();
}//if//
}//while//
*/
}//internalProcessWebsocketMessages()//
/**
* Loads and prepares the next websocket message from the queue of pending messages.
@@ -722,88 +816,18 @@ private void loadNextWebsocketMessage() {
websocketStreamingMessage = null;
}//else//
}//loadNextWebsocketMessage()//
/**
* @return
*/
private synchronized void internalProcessResponses() {
boolean doneSending = false;
//Keep sending responses while the buffers are not full and there is another response to send.//
while(!doneSending) {
boolean messageSent = true;
//If the socket is open then send the next buffer of data.//
if(key.channel().isOpen()) {
//Send the pending response object's prepared buffer of data.//
messageSent = writeClientBoundMessage();
}//if//
//Close the response if successfully sent, or if the socket is closed.//
if((messageSent || !key.channel().isOpen()) && currentOutboundMessage != null) {
try {currentOutboundMessage.close();} catch(Throwable e) {}
}//if//
//If we finished sending the current response then load the next one.//
if(messageSent || (currentOutboundMessage != null && currentOutboundMessage.isClosed())) {
//TODO: Queue up the next outbound message.
currentOutboundMessage = null;
lastOutboundMessage = null;
}//if//
if(currentOutboundMessage == null) {
doneSending = true;
}//if//
}//while//
// //Keep sending responses while the buffers are not full and there is another response to send.//
// while(finishedSending && currentResponse != null) {
// //If the socket is open then send the next buffer of data.//
// if(key.channel().isOpen()) {
// //Send the pending response object's prepared buffer of data.//
// finishedSending = writeClientBoundMessage();
// }//if//
//
// //Close the response if successfully sent, or if the socket is closed.//
// if(finishedSending || !key.channel().isOpen()) {
// try {currentResponse.close();} catch(Throwable e) {}
// }//if//
//
// //If we finished sending the current response then load the next one.//
// if(finishedSending) {
// currentResponse = currentResponse.getNextResponse();
//
// if(currentResponse == null) {
// lastResponse = null;
// }//if//
// else if(key.channel().isOpen()) {
// //Prep the next response object for sending.//
// prepareResponse();
// }//else//
// else {
// //Clean up after all the left over responses.//
// while(currentResponse != null) {
// currentResponse.close();
// currentResponse = currentResponse.getNextResponse();
// }//while//
//
// currentResponse = null;
// lastResponse = null;
// }//else//
// }//if//
// }//while//
}//processCurrentResponse()//
/**
* Sends a response to the client.
* @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns.
*/
private boolean writeClientBoundMessage() {
private boolean writeClientBoundMessage(SocketChannel channel, MessageBuffer currentOutboundMessage) {
boolean sendMore = true;
if(sslEngine != null) {
sendMore = writeClientBoundSslMessage((SocketChannel) key.channel(), currentOutboundMessage);
sendMore = writeClientBoundSslMessage(channel, currentOutboundMessage);
}//if//
else {
sendMore = writeClientBoundPlainMessage();
sendMore = writeClientBoundPlainMessage(channel, currentOutboundMessage);
}//else//
return sendMore;
@@ -1021,7 +1045,7 @@ private boolean writeClientBoundSslMessage(SocketChannel channel, MessageBuffer
* Sends a response to the client.
* @return Whether the response could be fully sent. This will be false if there is still more data to be written when the call returns.
*/
private boolean writeClientBoundPlainMessage() {
private boolean writeClientBoundPlainMessage(SocketChannel channel, MessageBuffer currentOutboundMessage) {
boolean sendMore = true;
try {
@@ -1268,7 +1292,8 @@ protected void readIncomingMessages() throws IOException {
//If the engine requires handshake data to be wrapped and sent then do so now.//
if(sslResult.getHandshakeStatus() == HandshakeStatus.NEED_WRAP || sslResult.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
sslNeedsWrap = true;
requiresRead = writeClientBoundMessage(channel, null);
/*
//Need to synchronize if this is a pass through socket so that multiple threads don't access pendingOutboundMessage or lastAddedMessageBuffer (via a call to passThrough(ByteBuffer) on another thread).//
if(getPassThroughSocketContext() == null) {
requiresRead = writeClientBoundMessage();
@@ -1278,6 +1303,7 @@ protected void readIncomingMessages() throws IOException {
requiresRead = writeClientBoundMessage();
}//synchronized//
}//else//
*/
}//if//
//If bytes were produced then process them.//