/* * Copyright (c) 2008,2009 Declarative Engineering LLC. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Declarative Engineering LLC * verson 1 which accompanies this distribution, and is available at * http://declarativeengineering.com/legal/DE_Developer_License_v1.txt */ package com.de22.orb; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import com.common.debug.Debug; import com.common.event.VoidHandler1; import com.common.event.VoidHandler2; import com.common.io.ObjectInputStream; import com.common.io.StreamSupport; import com.common.thread.ThreadService; import com.common.util.StreamBuffer; import com.de22.orb.AbstractConnection.SocketData; import com.de22.orb.exception.NewVersionFoundException; import com.de22.orb.io.OrbObjectOutputStream; /** * A non-blocking socket implementation. */ public class NioConnection extends AbstractConnection { /** The NIO engine used by the socket. */ private NioEngine nioEngine = null; /** * Encapsulates the underlying actual connection to the remote machine and the connection's state, such that if the connection fails, this data object can be swapped out for a new connection (upon a successful reconnect). */ public class NioSocketData extends SocketData implements NioEngine.ISocketHandler { /** The actual socket channel that will be used for transmission and reception of messages. */ private SocketChannel socketChannel = null; /** The selection key is used to link the socket to the selector which allows multiple sockets to be serviced by a single or pool of threads. */ private SelectionKey key = null; /** The incomming raw bytes from the socket. */ private StreamBuffer rawInputBuffer = null; /** The outgoing raw bytes from the orb. */ private StreamBuffer rawOutputBuffer = null; /** * StSocketData constructor. * @param nioEngine The engine used to handle NIO listening. * @param socketChannel The actual socket channel that will be used for transmission and reception of messages. * @param key The selection key is used to link the socket to the selector which allows multiple sockets to be serviced by a single or pool of threads. */ public NioSocketData(SocketChannel socketChannel, SelectionKey key) throws IOException { super(); this.socketChannel = socketChannel; this.key = key; nioEngine.start(); }//NioSocketData()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.SocketData#close() */ protected void close() { if(key != null) { try { key.interestOps(0); }//try// catch(Throwable e) { //Ignored.// }//catch// }//if// if(socketChannel != null) { try { socketChannel.close(); }//try// catch(Throwable e) { //Ignored.// }//catch// }//if// nioEngine.stop(); }//close()// /** * Gets the actual socket channel that will be used for transmission and reception of messages. * @return The socket channel. */ public SocketChannel getSocketChannel() { return socketChannel; }//getSocketChannel()// /** * Gets the selection key is used to link the socket to the selector which allows multiple sockets to be serviced by a single or pool of threads. * @return The selection key. */ public SelectionKey getKey() { return key; }//getKey()// /** * Gets the NIO engine used by the socket. * @return The socket's NIO engine. */ public NioEngine getNioEngine() { return nioEngine; }//getNioEngine()// /** * Called by the NIO Engine when the socket has available bytes for reading. * @param buffer The reusable buffer provided to the thread for reading data off the socket. */ public void receivePendingMessages(ByteBuffer buffer) { try { if(rawInputBuffer == null) { rawInputBuffer = new StreamBuffer(); }//if// //Read until the socket is out of available bytes.// do { //Prepare the buffer for writting.// buffer.clear(); //Read data from the socket to the buffer. If the buffer cannot be filled then the buffer's eventual limit will not equal the buffer's capacity.// socketChannel.read(buffer); //Prepare the buffer for reading.// buffer.flip(); //Buffer the available bytes to the raw input buffer.// rawInputBuffer.writeBytes(buffer); } while(buffer.limit() == buffer.capacity()); //Process as much of the raw buffer as possible.// receiveMessage(rawInputBuffer); }//try// catch(IOException e) { //Note: This will happen when the client forcably closes the connection.// NioConnection.this.close(true, false); }//catch// catch(Throwable e) { Debug.log(e); }//catch// }//receivePendingMessages()// /** * Called by the NIO Engine when the socket has available capacity for writing. *

Note: We are using the NioSocketData's monitor for ensuring that another thread doesn't add content to be written while we are sending content to be written.

* @param buffer The reusable buffer provided to the thread for reading data off the socket. */ public synchronized void sendPendingMessages(ByteBuffer buffer) { try { if(rawOutputBuffer != null) { //Ensure that the buffer will report no bytes available to be written.// buffer.position(buffer.limit()); //Keep sending as long as all bytes were sent via the socket and there are more bytes in the raw output buffer.// while(!buffer.hasRemaining() && rawOutputBuffer.getSize() > 0) { //Prepare the buffer to be written to.// buffer.clear(); //Gets as many bytes as possible placed in the buffer.// rawOutputBuffer.getBytes(0, buffer); //Prepare the buffer to be read from.// buffer.flip(); //Write as much as possible to the socket.// socketChannel.write(buffer); //Remove the written bytes from the raw output buffer.// rawOutputBuffer.skipBytes(buffer.position()); }//while// if(rawOutputBuffer.getSize() > 0) { synchronized(key) { //Turn on the write flag.// key.interestOps(key.interestOps() | key.OP_WRITE); }//synchronized// key.selector().wakeup(); }//if// else { rawOutputBuffer = null; }//else// }//if// }//try// catch(Throwable e) { Debug.log(e); }//catch// }//sendPendingMessages()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.SocketData#writeMessage(com.de22.orb.StreamBuffer) */ protected synchronized void writeMessage(StreamBuffer buffer) { if(rawOutputBuffer != null) { //Add the output to the end of the already pending output. The key will already be flagged for writing.// rawOutputBuffer.writeBytes(buffer); }//if// else { ByteBuffer b = nioEngine.getBuffer(); rawOutputBuffer = buffer; sendPendingMessages(b); nioEngine.returnBuffer(b); }//else// }//writeMessage()// public void filterInitializationComplete() { super.filterInitializationComplete(); try { //TODO: Should this be on or off? socketChannel.socket().setKeepAlive(true); }//try// catch(SocketException e) { Debug.log(e); }//catch// }//filterInitializationComplete()// }//NioSocketData// /** * NioSocket constructor. */ public NioConnection() { super(); }//NioSocket()// /** * NioSocket constructor. * @param sessionId The session identifier issued by the server socket. * @param autoReconnectTimeLimit The amount of time the server should wait for the client to auto reconnect. */ public NioConnection(long sessionId, long autoReconnectTimeLimit) { super(sessionId, autoReconnectTimeLimit); }//NioSocket()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket#reconnect(java.net.InetAddress, int) */ protected void reconnect(InetAddress address, int port) { Socket socket = null; try { socket = new Socket(address, port); //Code that uses the non-blocking connect. Commented because it is more work than it is worth.// //socketChannel = listener.getSelector().provider().openSocketChannel(); if(socket != null) { SelectionKey key = null; //Code that uses the non-blocking connect. Commented because it is more work than it is worth.// // socketChannel.configureBlocking(false); // key = socketChannel.register(listener.getSelector(), SelectionKey.OP_CONNECT); // // //Will return true if the connection was made immediately.// // if(socketChannel.connect(socketAddress)) { // key.interestOps(0); // socketChannel.finishConnect(); // }//if// NioSocketData socketData; SocketChannel socketChannel = socket.getChannel(); //Register for read operations.// key = socket.getChannel().register(nioEngine.getSelector(), SelectionKey.OP_READ); //Setup the socket data (the data regarding the socket which may be altered when a reconnect occurs).// socketData = new NioSocketData(socketChannel, key); //Attach the SocketData instance to the key - what the NioEngine will call when the socket is available to be read/written from/to.// key.attach(socketData); //Save the socket data, representing the socket this NioSocket is currently using for the underlying communications.// setSocketData(socketData); //Initialize the socket.// socket.setSoLinger(false, 0); socket.setTcpNoDelay(getSocketOptions().useTcpNoDelay()); socket.setSoTimeout(0); socket.setReuseAddress(true); socket.setReceiveBufferSize(internalSocketBufferSize); //TODO: Place this in the options. socket.setSendBufferSize(internalSocketBufferSize); //TODO: Place this in the options. //Start the handshake process.// socketData.startHandshake(); }//if// }//try// catch(java.net.ConnectException e) { // Debug.log(e); if(socket != null) { try { socket.close(); }//try// catch(Throwable e2) {} }//if// }//catch// catch(Throwable e) { Debug.log(e); if(socket != null) { try { socket.close(); }//try// catch(Throwable e2) {} }//if// }//catch// }//reconnect()// /** * Initializes the client side socket. * @param orb The orb instance that created this socket. * @param listener The selector listener the socket will be using. * @param address The address to connect to. * @param port The port to connect on. * @param options The socket options that will be used to open the socket and to reconnect the socket. * @param classLoader The loader that will be used to load classes. * @param messageHandler The handler called to process a message after the bytes have been read from the stream. * @param initCompleteHandler The handler called to finish the initialization process (after all initialization of the socket has finished). */ public void initialize(Orb orb, InetAddress address, int port, ISocketOptions options, ClassLoader classLoader, VoidHandler2 messageHandler, VoidHandler1 initCompleteHandler, NioEngine nioEngine) throws IOException { Socket s = new Socket(); int timeout = options.getSocketConnectionTimeout(); try { s.connect(new InetSocketAddress(address, port), timeout < 500 ? 500 : timeout); }//try// catch(SocketTimeoutException e) { s = null; }//catch// try { final Socket socket = s; this.nioEngine = nioEngine; //Code that uses the non-blocking connect. Commented because it is more work than it is worth.// //socketChannel = listener.getSelector().provider().openSocketChannel(); if(socket != null) { //Code that uses the non-blocking connect. Commented because it is more work than it is worth.// // socketChannel.configureBlocking(false); // key = socketChannel.register(listener.getSelector(), SelectionKey.OP_CONNECT); // // //Will return true if the connection was made immediately.// // if(socketChannel.connect(socketAddress)) { // key.interestOps(0); // socketChannel.finishConnect(); // }//if// //Complete the initialization of the socket.// super.initialize(orb, options, classLoader, messageHandler, initCompleteHandler, socket.getInetAddress().getHostAddress() + ":" + socket.getPort()); //Initialize the socket.// socket.setSoLinger(false, 0); socket.setTcpNoDelay(options.useTcpNoDelay()); socket.setSoTimeout(0); socket.setReuseAddress(true); socket.setReceiveBufferSize(internalSocketBufferSize); //TODO: Place this in the options. socket.setSendBufferSize(internalSocketBufferSize); //TODO: Place this in the options. nioEngine.process(new Runnable() { public void run() { try { SelectionKey key = null; NioSocketData socketData; //Setup the socket data (the data regarding the socket which may be altered when a reconnect occurs).// socketData = new NioSocketData(socket.getChannel(), key); //Save the socket data, representing the socket this NioSocket is currently using for the underlying communications.// setSocketData(socketData); //Register for read operations.// key = socket.getChannel().register(NioConnection.this.nioEngine.getSelector(), SelectionKey.OP_READ); //Attach the SocketData instance to the key - what the NioEngine will call when the socket is available to be read/written from/to.// key.attach(socketData); //Start the handshake process.// socketData.startHandshake(); }//try// catch(Throwable e) { Debug.log(e); close(true, false); }//catch// }//run()// }); }//if// }//try// catch(java.net.ConnectException e) { // Debug.log(e); if(s != null) { try { s.close(); }//try// catch(Throwable e2) {} }//if// }//catch// catch(Throwable e) { Debug.log(e); if(s != null) { try { s.close(); }//try// catch(Throwable e2) {} }//if// }//catch// }//initialize()// /** * Initializes a server side socket accepted through a server socket. * @param orb The orb instance that created this socket. * @param socketChannel The socket channel created by the server socket. * @param serverSocket The server socket that accepted this socket. * @param messageHandler The handler called to process a message after the bytes have been read from the stream. * @param initCompleteHandler The handler called to finish the initialization process (after all initialization of the socket has finished). */ public void initialize(Orb orb, SocketChannel socketChannel, NioConnectionServer serverSocket, VoidHandler2 messageHandler, VoidHandler1 initCompleteHandler, NioEngine nioEngine) throws IOException { super.initialize(orb, serverSocket, messageHandler, initCompleteHandler, socketChannel.socket().getInetAddress().getHostAddress() + ':' + socketChannel.socket().getPort()); Socket socket = socketChannel.socket(); NioSocketData socketData; SelectionKey key; //Flag the channel as non-blocking.// socketChannel.configureBlocking(false); //Setup the channel key.// key = socketChannel.register(nioEngine.getSelector(), SelectionKey.OP_READ, this); //Save the NIO engine reference.// this.nioEngine = nioEngine; //Make sure that the socket is setup properly (tcpNoDelay, SoLinger, etc).// socket.setSoLinger(false, 0); socket.setTcpNoDelay(serverSocket.getSocketOptions().useTcpNoDelay()); socket.setSoTimeout(0); socket.setReuseAddress(true); socket.setKeepAlive(true); socket.setReceiveBufferSize(internalSocketBufferSize); //TODO: Place this in the options. socket.setSendBufferSize(internalSocketBufferSize); //TODO: Place this in the options. //Setup the socket data (the data that might change if the socket is reconnected in the future).// socketData = new NioSocketData(socketChannel, key); socketData.setActivityTime(); //Attach the SocketData instance to the key - what the NioEngine will call when the socket is available to be read/written from/to.// key.attach(socketData); //Save the socket data, representing the socket this NioSocket is currently using for the underlying communications.// setSocketData(socketData); }//initialize()// }//NioSocket//