Initial commit from SVN.
This commit is contained in:
431
Orb/src/com/de22/orb/NioConnection.java
Normal file
431
Orb/src/com/de22/orb/NioConnection.java
Normal file
@@ -0,0 +1,431 @@
|
||||
/*
|
||||
* Copyright (c) 2008,2009 Declarative Engineering LLC.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Declarative Engineering LLC
|
||||
* verson 1 which accompanies this distribution, and is available at
|
||||
* http://declarativeengineering.com/legal/DE_Developer_License_v1.txt
|
||||
*/
|
||||
package com.de22.orb;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import com.common.debug.Debug;
|
||||
import com.common.event.VoidHandler1;
|
||||
import com.common.event.VoidHandler2;
|
||||
import com.common.io.ObjectInputStream;
|
||||
import com.common.io.StreamSupport;
|
||||
import com.common.thread.ThreadService;
|
||||
import com.common.util.StreamBuffer;
|
||||
import com.de22.orb.AbstractConnection.SocketData;
|
||||
import com.de22.orb.exception.NewVersionFoundException;
|
||||
import com.de22.orb.io.OrbObjectOutputStream;
|
||||
|
||||
/**
|
||||
* A non-blocking socket implementation.
|
||||
*/
|
||||
public class NioConnection extends AbstractConnection {
|
||||
/** The NIO engine used by the socket. */
|
||||
private NioEngine nioEngine = null;
|
||||
|
||||
/**
|
||||
* Encapsulates the underlying actual connection to the remote machine and the connection's state, such that if the connection fails, this data object can be swapped out for a new connection (upon a successful reconnect).
|
||||
*/
|
||||
public class NioSocketData extends SocketData implements NioEngine.ISocketHandler {
|
||||
/** The actual socket channel that will be used for transmission and reception of messages. */
|
||||
private SocketChannel socketChannel = null;
|
||||
/** The selection key is used to link the socket to the selector which allows multiple sockets to be serviced by a single or pool of threads. */
|
||||
private SelectionKey key = null;
|
||||
/** The incomming raw bytes from the socket. */
|
||||
private StreamBuffer rawInputBuffer = null;
|
||||
/** The outgoing raw bytes from the orb. */
|
||||
private StreamBuffer rawOutputBuffer = null;
|
||||
|
||||
/**
|
||||
* StSocketData constructor.
|
||||
* @param nioEngine The engine used to handle NIO listening.
|
||||
* @param socketChannel The actual socket channel that will be used for transmission and reception of messages.
|
||||
* @param key The selection key is used to link the socket to the selector which allows multiple sockets to be serviced by a single or pool of threads.
|
||||
*/
|
||||
public NioSocketData(SocketChannel socketChannel, SelectionKey key) throws IOException {
|
||||
super();
|
||||
this.socketChannel = socketChannel;
|
||||
this.key = key;
|
||||
nioEngine.start();
|
||||
}//NioSocketData()//
|
||||
/* (non-Javadoc)
|
||||
* @see com.de22.orb.AbstractSocket.SocketData#close()
|
||||
*/
|
||||
protected void close() {
|
||||
if(key != null) {
|
||||
try {
|
||||
key.interestOps(0);
|
||||
}//try//
|
||||
catch(Throwable e) {
|
||||
//Ignored.//
|
||||
}//catch//
|
||||
}//if//
|
||||
|
||||
if(socketChannel != null) {
|
||||
try {
|
||||
socketChannel.close();
|
||||
}//try//
|
||||
catch(Throwable e) {
|
||||
//Ignored.//
|
||||
}//catch//
|
||||
}//if//
|
||||
|
||||
nioEngine.stop();
|
||||
}//close()//
|
||||
/**
|
||||
* Gets the actual socket channel that will be used for transmission and reception of messages.
|
||||
* @return The socket channel.
|
||||
*/
|
||||
public SocketChannel getSocketChannel() {
|
||||
return socketChannel;
|
||||
}//getSocketChannel()//
|
||||
/**
|
||||
* Gets the selection key is used to link the socket to the selector which allows multiple sockets to be serviced by a single or pool of threads.
|
||||
* @return The selection key.
|
||||
*/
|
||||
public SelectionKey getKey() {
|
||||
return key;
|
||||
}//getKey()//
|
||||
/**
|
||||
* Gets the NIO engine used by the socket.
|
||||
* @return The socket's NIO engine.
|
||||
*/
|
||||
public NioEngine getNioEngine() {
|
||||
return nioEngine;
|
||||
}//getNioEngine()//
|
||||
/**
|
||||
* Called by the NIO Engine when the socket has available bytes for reading.
|
||||
* @param buffer The reusable buffer provided to the thread for reading data off the socket.
|
||||
*/
|
||||
public void receivePendingMessages(ByteBuffer buffer) {
|
||||
try {
|
||||
if(rawInputBuffer == null) {
|
||||
rawInputBuffer = new StreamBuffer();
|
||||
}//if//
|
||||
|
||||
//Read until the socket is out of available bytes.//
|
||||
do {
|
||||
//Prepare the buffer for writting.//
|
||||
buffer.clear();
|
||||
//Read data from the socket to the buffer. If the buffer cannot be filled then the buffer's eventual limit will not equal the buffer's capacity.//
|
||||
socketChannel.read(buffer);
|
||||
//Prepare the buffer for reading.//
|
||||
buffer.flip();
|
||||
//Buffer the available bytes to the raw input buffer.//
|
||||
rawInputBuffer.writeBytes(buffer);
|
||||
} while(buffer.limit() == buffer.capacity());
|
||||
|
||||
//Process as much of the raw buffer as possible.//
|
||||
receiveMessage(rawInputBuffer);
|
||||
}//try//
|
||||
catch(IOException e) {
|
||||
//Note: This will happen when the client forcably closes the connection.//
|
||||
NioConnection.this.close(true, false);
|
||||
}//catch//
|
||||
catch(Throwable e) {
|
||||
Debug.log(e);
|
||||
}//catch//
|
||||
}//receivePendingMessages()//
|
||||
/**
|
||||
* Called by the NIO Engine when the socket has available capacity for writing.
|
||||
* <p>Note: We are using the NioSocketData's monitor for ensuring that another thread doesn't add content to be written while we are sending content to be written.</p>
|
||||
* @param buffer The reusable buffer provided to the thread for reading data off the socket.
|
||||
*/
|
||||
public synchronized void sendPendingMessages(ByteBuffer buffer) {
|
||||
try {
|
||||
if(rawOutputBuffer != null) {
|
||||
//Ensure that the buffer will report no bytes available to be written.//
|
||||
buffer.position(buffer.limit());
|
||||
|
||||
//Keep sending as long as all bytes were sent via the socket and there are more bytes in the raw output buffer.//
|
||||
while(!buffer.hasRemaining() && rawOutputBuffer.getSize() > 0) {
|
||||
//Prepare the buffer to be written to.//
|
||||
buffer.clear();
|
||||
//Gets as many bytes as possible placed in the buffer.//
|
||||
rawOutputBuffer.getBytes(0, buffer);
|
||||
//Prepare the buffer to be read from.//
|
||||
buffer.flip();
|
||||
//Write as much as possible to the socket.//
|
||||
socketChannel.write(buffer);
|
||||
//Remove the written bytes from the raw output buffer.//
|
||||
rawOutputBuffer.skipBytes(buffer.position());
|
||||
}//while//
|
||||
|
||||
if(rawOutputBuffer.getSize() > 0) {
|
||||
synchronized(key) {
|
||||
//Turn on the write flag.//
|
||||
key.interestOps(key.interestOps() | key.OP_WRITE);
|
||||
}//synchronized//
|
||||
|
||||
key.selector().wakeup();
|
||||
}//if//
|
||||
else {
|
||||
rawOutputBuffer = null;
|
||||
}//else//
|
||||
}//if//
|
||||
}//try//
|
||||
catch(Throwable e) {
|
||||
Debug.log(e);
|
||||
}//catch//
|
||||
}//sendPendingMessages()//
|
||||
/* (non-Javadoc)
|
||||
* @see com.de22.orb.AbstractSocket.SocketData#writeMessage(com.de22.orb.StreamBuffer)
|
||||
*/
|
||||
protected synchronized void writeMessage(StreamBuffer buffer) {
|
||||
if(rawOutputBuffer != null) {
|
||||
//Add the output to the end of the already pending output. The key will already be flagged for writing.//
|
||||
rawOutputBuffer.writeBytes(buffer);
|
||||
}//if//
|
||||
else {
|
||||
ByteBuffer b = nioEngine.getBuffer();
|
||||
|
||||
rawOutputBuffer = buffer;
|
||||
sendPendingMessages(b);
|
||||
nioEngine.returnBuffer(b);
|
||||
}//else//
|
||||
}//writeMessage()//
|
||||
public void filterInitializationComplete() {
|
||||
super.filterInitializationComplete();
|
||||
|
||||
try {
|
||||
//TODO: Should this be on or off?
|
||||
socketChannel.socket().setKeepAlive(true);
|
||||
}//try//
|
||||
catch(SocketException e) {
|
||||
Debug.log(e);
|
||||
}//catch//
|
||||
}//filterInitializationComplete()//
|
||||
}//NioSocketData//
|
||||
/**
|
||||
* NioSocket constructor.
|
||||
*/
|
||||
public NioConnection() {
|
||||
super();
|
||||
}//NioSocket()//
|
||||
/**
|
||||
* NioSocket constructor.
|
||||
* @param sessionId The session identifier issued by the server socket.
|
||||
* @param autoReconnectTimeLimit The amount of time the server should wait for the client to auto reconnect.
|
||||
*/
|
||||
public NioConnection(long sessionId, long autoReconnectTimeLimit) {
|
||||
super(sessionId, autoReconnectTimeLimit);
|
||||
}//NioSocket()//
|
||||
/* (non-Javadoc)
|
||||
* @see com.de22.orb.AbstractSocket#reconnect(java.net.InetAddress, int)
|
||||
*/
|
||||
protected void reconnect(InetAddress address, int port) {
|
||||
Socket socket = null;
|
||||
|
||||
try {
|
||||
socket = new Socket(address, port);
|
||||
//Code that uses the non-blocking connect. Commented because it is more work than it is worth.//
|
||||
//socketChannel = listener.getSelector().provider().openSocketChannel();
|
||||
|
||||
if(socket != null) {
|
||||
SelectionKey key = null;
|
||||
|
||||
//Code that uses the non-blocking connect. Commented because it is more work than it is worth.//
|
||||
// socketChannel.configureBlocking(false);
|
||||
// key = socketChannel.register(listener.getSelector(), SelectionKey.OP_CONNECT);
|
||||
//
|
||||
// //Will return true if the connection was made immediately.//
|
||||
// if(socketChannel.connect(socketAddress)) {
|
||||
// key.interestOps(0);
|
||||
// socketChannel.finishConnect();
|
||||
// }//if//
|
||||
|
||||
NioSocketData socketData;
|
||||
SocketChannel socketChannel = socket.getChannel();
|
||||
|
||||
//Register for read operations.//
|
||||
key = socket.getChannel().register(nioEngine.getSelector(), SelectionKey.OP_READ);
|
||||
//Setup the socket data (the data regarding the socket which may be altered when a reconnect occurs).//
|
||||
socketData = new NioSocketData(socketChannel, key);
|
||||
//Attach the SocketData instance to the key - what the NioEngine will call when the socket is available to be read/written from/to.//
|
||||
key.attach(socketData);
|
||||
//Save the socket data, representing the socket this NioSocket is currently using for the underlying communications.//
|
||||
setSocketData(socketData);
|
||||
//Initialize the socket.//
|
||||
socket.setSoLinger(false, 0);
|
||||
socket.setTcpNoDelay(getSocketOptions().useTcpNoDelay());
|
||||
socket.setSoTimeout(0);
|
||||
socket.setReuseAddress(true);
|
||||
socket.setReceiveBufferSize(internalSocketBufferSize); //TODO: Place this in the options.
|
||||
socket.setSendBufferSize(internalSocketBufferSize); //TODO: Place this in the options.
|
||||
//Start the handshake process.//
|
||||
socketData.startHandshake();
|
||||
}//if//
|
||||
}//try//
|
||||
catch(java.net.ConnectException e) {
|
||||
// Debug.log(e);
|
||||
|
||||
if(socket != null) {
|
||||
try {
|
||||
socket.close();
|
||||
}//try//
|
||||
catch(Throwable e2) {}
|
||||
}//if//
|
||||
}//catch//
|
||||
catch(Throwable e) {
|
||||
Debug.log(e);
|
||||
|
||||
if(socket != null) {
|
||||
try {
|
||||
socket.close();
|
||||
}//try//
|
||||
catch(Throwable e2) {}
|
||||
}//if//
|
||||
}//catch//
|
||||
}//reconnect()//
|
||||
/**
|
||||
* Initializes the client side socket.
|
||||
* @param orb The orb instance that created this socket.
|
||||
* @param listener The selector listener the socket will be using.
|
||||
* @param address The address to connect to.
|
||||
* @param port The port to connect on.
|
||||
* @param options The socket options that will be used to open the socket and to reconnect the socket.
|
||||
* @param classLoader The loader that will be used to load classes.
|
||||
* @param messageHandler The handler called to process a message after the bytes have been read from the stream.
|
||||
* @param initCompleteHandler The handler called to finish the initialization process (after all initialization of the socket has finished).
|
||||
*/
|
||||
public void initialize(Orb orb, InetAddress address, int port, ISocketOptions options, ClassLoader classLoader, VoidHandler2 messageHandler, VoidHandler1 initCompleteHandler, NioEngine nioEngine) throws IOException {
|
||||
Socket s = new Socket();
|
||||
int timeout = options.getSocketConnectionTimeout();
|
||||
|
||||
try {
|
||||
s.connect(new InetSocketAddress(address, port), timeout < 500 ? 500 : timeout);
|
||||
}//try//
|
||||
catch(SocketTimeoutException e) {
|
||||
s = null;
|
||||
}//catch//
|
||||
|
||||
try {
|
||||
final Socket socket = s;
|
||||
|
||||
this.nioEngine = nioEngine;
|
||||
|
||||
//Code that uses the non-blocking connect. Commented because it is more work than it is worth.//
|
||||
//socketChannel = listener.getSelector().provider().openSocketChannel();
|
||||
|
||||
if(socket != null) {
|
||||
|
||||
//Code that uses the non-blocking connect. Commented because it is more work than it is worth.//
|
||||
// socketChannel.configureBlocking(false);
|
||||
// key = socketChannel.register(listener.getSelector(), SelectionKey.OP_CONNECT);
|
||||
//
|
||||
// //Will return true if the connection was made immediately.//
|
||||
// if(socketChannel.connect(socketAddress)) {
|
||||
// key.interestOps(0);
|
||||
// socketChannel.finishConnect();
|
||||
// }//if//
|
||||
|
||||
//Complete the initialization of the socket.//
|
||||
super.initialize(orb, options, classLoader, messageHandler, initCompleteHandler, socket.getInetAddress().getHostAddress() + ":" + socket.getPort());
|
||||
|
||||
//Initialize the socket.//
|
||||
socket.setSoLinger(false, 0);
|
||||
socket.setTcpNoDelay(options.useTcpNoDelay());
|
||||
socket.setSoTimeout(0);
|
||||
socket.setReuseAddress(true);
|
||||
socket.setReceiveBufferSize(internalSocketBufferSize); //TODO: Place this in the options.
|
||||
socket.setSendBufferSize(internalSocketBufferSize); //TODO: Place this in the options.
|
||||
|
||||
nioEngine.process(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
SelectionKey key = null;
|
||||
NioSocketData socketData;
|
||||
|
||||
//Setup the socket data (the data regarding the socket which may be altered when a reconnect occurs).//
|
||||
socketData = new NioSocketData(socket.getChannel(), key);
|
||||
//Save the socket data, representing the socket this NioSocket is currently using for the underlying communications.//
|
||||
setSocketData(socketData);
|
||||
|
||||
//Register for read operations.//
|
||||
key = socket.getChannel().register(NioConnection.this.nioEngine.getSelector(), SelectionKey.OP_READ);
|
||||
//Attach the SocketData instance to the key - what the NioEngine will call when the socket is available to be read/written from/to.//
|
||||
key.attach(socketData);
|
||||
//Start the handshake process.//
|
||||
socketData.startHandshake();
|
||||
}//try//
|
||||
catch(Throwable e) {
|
||||
Debug.log(e);
|
||||
close(true, false);
|
||||
}//catch//
|
||||
}//run()//
|
||||
});
|
||||
}//if//
|
||||
}//try//
|
||||
catch(java.net.ConnectException e) {
|
||||
// Debug.log(e);
|
||||
|
||||
if(s != null) {
|
||||
try {
|
||||
s.close();
|
||||
}//try//
|
||||
catch(Throwable e2) {}
|
||||
}//if//
|
||||
}//catch//
|
||||
catch(Throwable e) {
|
||||
Debug.log(e);
|
||||
|
||||
if(s != null) {
|
||||
try {
|
||||
s.close();
|
||||
}//try//
|
||||
catch(Throwable e2) {}
|
||||
}//if//
|
||||
}//catch//
|
||||
}//initialize()//
|
||||
/**
|
||||
* Initializes a server side socket accepted through a server socket.
|
||||
* @param orb The orb instance that created this socket.
|
||||
* @param socketChannel The socket channel created by the server socket.
|
||||
* @param serverSocket The server socket that accepted this socket.
|
||||
* @param messageHandler The handler called to process a message after the bytes have been read from the stream.
|
||||
* @param initCompleteHandler The handler called to finish the initialization process (after all initialization of the socket has finished).
|
||||
*/
|
||||
public void initialize(Orb orb, SocketChannel socketChannel, NioConnectionServer serverSocket, VoidHandler2 messageHandler, VoidHandler1 initCompleteHandler, NioEngine nioEngine) throws IOException {
|
||||
super.initialize(orb, serverSocket, messageHandler, initCompleteHandler, socketChannel.socket().getInetAddress().getHostAddress() + ':' + socketChannel.socket().getPort());
|
||||
Socket socket = socketChannel.socket();
|
||||
NioSocketData socketData;
|
||||
SelectionKey key;
|
||||
|
||||
//Flag the channel as non-blocking.//
|
||||
socketChannel.configureBlocking(false);
|
||||
//Setup the channel key.//
|
||||
key = socketChannel.register(nioEngine.getSelector(), SelectionKey.OP_READ, this);
|
||||
//Save the NIO engine reference.//
|
||||
this.nioEngine = nioEngine;
|
||||
//Make sure that the socket is setup properly (tcpNoDelay, SoLinger, etc).//
|
||||
socket.setSoLinger(false, 0);
|
||||
socket.setTcpNoDelay(serverSocket.getSocketOptions().useTcpNoDelay());
|
||||
socket.setSoTimeout(0);
|
||||
socket.setReuseAddress(true);
|
||||
socket.setKeepAlive(true);
|
||||
socket.setReceiveBufferSize(internalSocketBufferSize); //TODO: Place this in the options.
|
||||
socket.setSendBufferSize(internalSocketBufferSize); //TODO: Place this in the options.
|
||||
//Setup the socket data (the data that might change if the socket is reconnected in the future).//
|
||||
socketData = new NioSocketData(socketChannel, key);
|
||||
socketData.setActivityTime();
|
||||
//Attach the SocketData instance to the key - what the NioEngine will call when the socket is available to be read/written from/to.//
|
||||
key.attach(socketData);
|
||||
//Save the socket data, representing the socket this NioSocket is currently using for the underlying communications.//
|
||||
setSocketData(socketData);
|
||||
}//initialize()//
|
||||
}//NioSocket//
|
||||
Reference in New Issue
Block a user