/* * Copyright (c) 1999,2009 Declarative Engineering LLC. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Declarative Engineering LLC * verson 1 which accompanies this distribution, and is available at * http://declarativeengineering.com/legal/DE_Developer_License_v1.txt */ package com.de22.orb; import java.io.IOException; import java.io.Externalizable; import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.util.Date; import com.common.security.Random; import com.common.thread.Monitor; import com.common.thread.Scheduler; import com.common.thread.ThreadService; import com.common.util.*; import com.common.comparison.Comparator; import com.common.debug.*; import com.common.event.VoidHandler1; import com.common.event.VoidHandler2; import com.common.io.IClassReplacementHandler; import com.common.io.StreamSupport; import com.common.util.optimized.*; import com.de22.orb.StConnection.StSocketData; import com.de22.orb.exception.NewVersionFoundException; import com.de22.orb.io.*; import com.de22.orb.security.AbstractSecuritySystem; /** * The base class for a remote process connection. */ public abstract class AbstractConnection { protected static final byte CLOSED = -1; protected static final byte PING = -2; protected static final byte MESSAGE = -3; protected static final byte CLOSE = -4; protected static final byte RECEIVED = -5; /** TODO: This should be set by the user via the socket options and server socket options. */ protected static final int internalSocketBufferSize = 10000; /** The associated orb instance. */ private Orb orb = null; /** The socket's session identifier which is issued by the server socket that accepted the connection, and can be used to reconnect due to a network failure. */ private long sessionId = 0; /** The password required by the server to validate the user's session when reconnecting. This is not unique, and it must be negotiated between the client and server after the security layer has initialized. */ private String sessionPassword = null; /** The amount of time the server should wait for the client to auto reconnect. */ private long autoReconnectTimeLimit = 0; /** A string containing the address and port that the socket is connected to in the form: a.b.c.d:port. */ private String nameAndPort = null; /** The address the socket is connected to. This will be non null if this is a client side socket. */ private Address connectedAddress = null; /** A set of listeners to the socket close event. */ private LiteHashSet closeListeners = new LiteHashSet(10); /** The socket options passed when creating the socket, or given to the server socket. */ private ISocketOptions socketOptions = null; /** A reference to the abstract server socket that created this socket. */ private AbstractConnectionServer serverSocket = null; /** Processes the message after it has been read from the stream. */ private VoidHandler2 messageHandler = null; /** The handler called after the socket initialization has completed. */ private VoidHandler1 initCompleteHandler = null; /** The class loader the socket will use for loading classes. */ private ClassLoader classLoader = null; /** The task to ping the server every N minutes. */ private Object pingTask = null; /** The security system's class name, used to re-create the security system on the client. */ private String securitySystemClassName = null; /** The security system's data, used to re-create the security system on the client. */ private byte[] securitySystemData = null; /** The interval between keep alive messages in milliseconds. This should be on the range of 500-600,000 and it should be roughly half the reconnect time limit. */ private long keepAliveInterval = 0; /** The names the socket is created with. Since the socket can be created with multiple names, we collect all of them here. */ private LiteHashSet names = new LiteHashSet(10, LiteHashSet.DEFAULT_LOAD_FACTOR, Comparator.getLogicalComparator(), LiteHashSet.STYLE_COUNT_DUPLICATES); //Orb Stuff & Stream Stuff// /** Maintains reference counts for the socket by socket name. */ private ObjectIntHashMap referenceMap = new ObjectIntHashMap(4); /** Maintains the total reference count for the socket. */ private int totalReferenceCount = 0; /** A map of return value holders for messages that have been sent and are expecting a return value back. */ private LongObjectHashMap returnValueMap = new LongObjectHashMap(4); /** A mapping of classes by their identifiers. This allows for added stream compression. Used by the output stream only. */ private LiteHashMap classMap = new LiteHashMap(10); /** An ordered list of classes used to look up a class by the index. This allows for added stream compression. Used by the input stream only. */ private IList classList = new LiteList(10, 20); //TODO: Figure this stuff out: Can we somehow remove it? When does it get synchronized? This needs real testing. /** The listener thread's last received message number. */ int lastVerifiedMessageNumberListenerOnly = -1; /** The last received message's number from the perspective of the listener thread. This is for the listener thread only and doesn't require synchronizing to access it. */ int lastReceivedMessageNumberListenerOnly = 0; /** The last received message's number from the perspective of the sender thread. This is for the sender thread only and doesn't require synchronizing to access it.*/ int lastReceivedMessageNumberSenderOnly = 0; //Threading Sensitive Attributes// //The following attributes require synchronizing on this instance.// /** The shared last received message's number. Access to this attribute requires synchronizing on this socket instance. */ private int lastReceivedMessageNumber = 0; /** The last message number sent. Access to this attribute requires synchronizing on this socket instance. */ private int lastSentMessageNumber = 0; /** The map of cached messages indexed by the message number. */ private IntObjectHashMap cachedMessageMap = new IntObjectHashMap(10); /** The last message number confirmed by the remote process. The user must synchronize on this before using this attribute. */ private int lastConfirmedMessageNumber = -1; /** The thread currently writing via the socket. The user must synchronize on this before accessing this attribute. */ protected Thread writingThread = null; /** The number of times the writing thread has locked for writing (allows for the close message to be sent without worrying about a double lock). The user must synchronize on this before accessing this attribute. */ protected int writingLockCount = 0; /** Whether the current writing thread should send a confirmation message when it finishes it's current sending. The user must synchronize on this before accessing this attribute. */ protected boolean sendReceivedMessageNumber = false; /** The current socket's data. This encapsulates the data that might change when the socket fails and reconnects. */ private SocketData socketData = null; //The following attributes require a write lock (call lockForWrite() and unlockForWrite()).// /** The message number to be used with the currently being sent message. This attribute is sender thread only. */ private int currentMessageNumber = 0; /** * Abstracts the next input handler in the filter chain. */ public interface IMessageFilterInputHandler { /** * Processes the input bytes and passes any result to the next input filter. * @param input The input bytes. */ public void startIncommingMessage(StreamBuffer input); /** * Initializes this filter. * When this filter has finished initialization, it will notify the next input filter to begin initialization. */ public void initializeFilter(); /** * Closes the connection to the remote process. * @param force Whether the close is due to a stream error or socket closure, versus a normal close operation. */ public void closeConnection(boolean force); }//IMessageFilterInputHandler// /** * Abstracts the next output handler in the filter chain. */ public interface IMessageFilterOutputHandler { /** * Processes the output bytes and passes any result to the next output filter. * @param input The input bytes. */ public void startOutgoingMessage(StreamBuffer input); }//IMessageFilterOutputHandler// /** * The base class for a message filter. */ public static abstract class MessageFilter implements IMessageFilterOutputHandler, IMessageFilterInputHandler { /** Indicates that the filter processed the data properly and that the output should be used if there is any. Anything left in the input stream buffer will be saved for future use (when additional bytes arrive). */ public static final int RESULT_OK = 0; /** Indicates that the filter did not perform any operation on the data. The input data should be provided as is to the next filter in the chain. The output stream buffer will be ignored. */ public static final int RESULT_NO_OPERATION = 2; /** The remaining output bytes that could not be processed in the last set of message bytes. */ private StreamBuffer outputBuffer = null; /** The remaining input bytes that could not be processed in the last set of message bytes. */ private StreamBuffer inputBuffer = null; /** The next output handler in the chain. Used to pass processed output to the next filter. */ private IMessageFilterOutputHandler nextOutputFilter = null; /** The next input handler in the chain. Used to pass processed input to the next filter. */ private IMessageFilterInputHandler nextInputFilter = null; /** * MessageFilter constructor. */ public MessageFilter() { }//MessageFilter()// //TODO: Remove this once tested. public void setHandlers(IMessageFilterInputHandler input, IMessageFilterOutputHandler output) { this.nextInputFilter = input; this.nextOutputFilter = output; } /** * Called by the system when the filter has an oportunity to handle or alter an incomming message before handled by the orb. * @param input The incomming stream of content. * @param output The outgoing stream of content which will be processed by the next filter. * @return The result of the processing. */ public int processIncommingMessage(StreamBuffer input, StreamBuffer output) throws IOException { return RESULT_NO_OPERATION; }//processIncommingMessage()// /** * Called by the system when the filter has an oportunity to handle or alter an outgoing message before sent over the socket. * @param input The incomming stream of content. * @param output The outgoing stream of content which will be processed by the next filter. * @return The result of the processing. */ public int processOutgoingMessage(StreamBuffer input, StreamBuffer output) throws IOException { return RESULT_NO_OPERATION; }//processOutgoingMessage()// /** * Gives the filter an opportunity to start an initialization sequence of messages. * @return Whether the filter sent an initialization message, otherwise false indicating that no (long running) initialization is required, or the fitler is already initialized. */ public boolean beginInitialization() throws IOException { return false; }//beginInitialization()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.IMessageFilterInputHandler#closeConnection(boolean) */ public void closeConnection(boolean force) { nextInputFilter.closeConnection(force); }//closeConnection()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.IMessageFilterInputHandler#initializeFilter() */ public final void initializeFilter() { try { //If there isn't a long running initialization then simply forward to the next input filter.// if(!beginInitialization()) { nextInputFilter.initializeFilter(); }//if// }//try// catch(IOException e) { Debug.log(e); nextInputFilter.closeConnection(true); }//catch// }//initializeFilter()// /** * Called after a filter completes its initialization, giving the next filter in the chain a chance to initialize. */ public void endInitialization() { //Forward the initialization to the next input filter.// nextInputFilter.initializeFilter(); }//endInitialization()// /** * Handles an incomming message either from the Socket or a previous filter in the chain. *

Warning: The input buffer will be released if it is used up and not retained for later use. The caller should not re-use the buffer for any other purpose.

* @param input The bytes that are the input to this filter. * @return The content to pass to the next filter. This will be null if the message ends with this filter. */ public final void startIncommingMessage(StreamBuffer input) { try { StreamBuffer buffer = input; StreamBuffer output = new StreamBuffer(); if(inputBuffer != null) { inputBuffer.writeBytes(input); buffer = inputBuffer; inputBuffer = null; input.release(); }//if// //Keep processsing incomming messages until there are no more.// while(buffer != null) { StreamBuffer result = null; switch(processIncommingMessage(buffer, output)) { case RESULT_OK: { if(output.getSize() > 0) { result = output; }//if// //Clear the buffer reference so we stop the message processing loop.// if(result == null || buffer.getSize() == 0) { //Save any remaining input for use when additional bytes come into the filter.// if(buffer.getSize() > 0) { inputBuffer = buffer; }//if// buffer = null; }//if// break; }//case// case RESULT_NO_OPERATION: { if(buffer.getSize() > 0) { result = buffer; }//if// inputBuffer = null; buffer = null; break; }//case// default: { Debug.log(new RuntimeException("Invalid result type.")); break; }//default// }//switch// if(result != null && result.getSize() > 0) { nextInputFilter.startIncommingMessage(result); }//if// }//while// }//try// catch(IOException e) { Debug.log(e); closeConnection(true); }//catch// }//startIncommingMessage()// /** * Handles an outgoing message either from the ORB or a previous filter in the chain. *

Warning: The input buffer will be released if it is used up and not retained for later use. The caller should not re-use the buffer for any other purpose.

* @param input The bytes that are the input to this filter. * @return The buffer to pass to the next filter, or null if the next filter should not be called. */ public final void startOutgoingMessage(StreamBuffer input) { try { StreamBuffer buffer = input; StreamBuffer output = new StreamBuffer(); if(outputBuffer != null) { outputBuffer.writeBytes(input); buffer = outputBuffer; outputBuffer = null; input.release(); }//if// //Keep processsing outgoing messages until there are no more.// while(buffer != null) { StreamBuffer result = null; switch(processOutgoingMessage(buffer, output)) { case RESULT_OK: { if(output.getSize() > 0) { result = output; }//if// //Clear the buffer reference so we stop the message processing loop.// if(result == null || buffer.getSize() == 0) { //Save any remaining output for use when additional bytes come into the filter.// if(buffer.getSize() > 0) { outputBuffer = buffer; }//if// buffer = null; }//if// break; }//case// case RESULT_NO_OPERATION: { if(buffer.getSize() > 0) { result = buffer; }//if// outputBuffer = null; buffer = null; break; }//case// default: { Debug.log(new RuntimeException("Invalid result type.")); break; }//default// }//switch// if(result != null && result.getSize() > 0) { nextOutputFilter.startOutgoingMessage(result); }//if// }//while// }//try// catch(IOException e) { Debug.log(e); closeConnection(true); }//catch// }//startOutgoingMessage()// /** * Called when processing an incomming message that requires an immediate response that should not go through higher level filters. * @param output The contents of the message being sent. */ protected final void sendMessage(StreamBuffer output) throws IOException { nextOutputFilter.startOutgoingMessage(output); }//sendMessage()// }//MessageFilter// /** * A standard replacment handler class that is used to replace one class name with another to facilitate communications between two processes where one or more may be mangled. */ protected class ClassReplacementHandler implements IClassReplacementHandler { public String getReplacementClassName(String className) { String result = className; if(socketOptions.getStreamClassNameManipulator() != null) { result = socketOptions.getStreamClassNameManipulator().send(className); //Debug.log("Replacing class name: " + className + " with: " + result, true); if(result == null) { Debug.log(new RuntimeException("Missing class name mapping for the name: " + className + ". This may cause a stream error on the remote process.")); }//if// }//if// return result; }//getReplacementClassName()// }//ClassReplacementHandler// /** * Encapsulates the data about a specific socket connection. * This is used to isolate one connection from another that may take its place if the first one fails. */ protected abstract class SocketData { //The server (or client) side socket states.// public static final int SOCKET_STATE_HANDSHAKING = 1; public static final int SOCKET_STATE_ESTABLISHED = 2; public static final int SOCKET_STATE_RECONNECTING = 3; /** The list of filters from the first on called when processing incomming messages, to the first called when processing outgoing messages (the last in the list). */ private LiteList filters = new LiteList(10, 20); /** The filter handler used to direct messages from the filters. */ private SocketDataFilterHandler filterHandler = new SocketDataFilterHandler(); //Threading Sensitive Attributes// //The following attributes require synchronizing on this instance.// /** The time stamp of the last activity on this socket. */ private long activityTime = 0L; //The following attributes require a write lock (call lockForWrite() and unlockForWrite()).// /** Tracks the last used call number for the socket. */ private int lastUsedCallNumber = 1; /** The current state of the socket. */ private volatile int state = SOCKET_STATE_HANDSHAKING; /** The messages that are queued for sending when the socket finishes connecting. */ private LiteList queuedMessages = null; /** * Encapsuplates the methods used to interact with the filters, keeping the SocketData code easy to understand. */ private class SocketDataFilterHandler implements IMessageFilterOutputHandler, IMessageFilterInputHandler { public SocketDataFilterHandler() { }//SocketDataFilterHandler()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.IMessageFilterInputHandler#startIncommingMessage(com.de22.orb.StreamBuffer) */ public void startIncommingMessage(StreamBuffer input) { try { OrbByteArrayInputStream bin = new OrbByteArrayInputStream(input); IOrbInputStream stream = bin; final OrbObjectInputStream in; if(socketOptions.getStreamInjector() != null) { stream = socketOptions.getStreamInjector().inject(bin); }//if// in = new OrbObjectInputStream(stream, bin, classLoader, new IClassReplacementHandler() { public String getReplacementClassName(String className) { return socketOptions.getStreamClassNameManipulator() != null ? socketOptions.getStreamClassNameManipulator().receive(className) : className; }//getReplacementClassName()// }, classList, orb.getInstanceFactory(), orb.getClassTracker(), AbstractConnection.this); //Pass the message stream to the socket for processing.// //Note: The handler will thread the processing if there is any chance of the message causing blocking, calling long running code, or sending a message via the orb.// getMessageHandler().evaluate(AbstractConnection.this, in); }//try// catch(IOException e) { Debug.log(e); closeConnection(true); }//catch// }//startIncommingMessage()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.IMessageFilterOutputHandler#startOutgoingMessage(com.de22.orb.StreamBuffer) */ public void startOutgoingMessage(StreamBuffer input) { //Perform the physical send of the bytes on the socket.// writeMessage(input); }//startOutgoingMessage()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.IMessageFilterInputHandler#initializeFilter() */ public void initializeFilter() { //Send all queued messages that were waiting for the filters to finish initializing and update the connection state.// sendPendingMessages(); }//initializeFilter()// /* (non-Javadoc) * @see com.de22.orb.AbstractSocket.IMessageFilterInputHandler#closeConnection(boolean) */ public void closeConnection(boolean force) { AbstractConnection.this.close(SocketData.this, force); }//closeConnection()// }//SocketDataFilterHandler// /** * SocketData constructor. */ public SocketData() throws IOException { SecurityMessageFilter securityFilter = new SecurityMessageFilter(null); boolean isReconnection = false; int lastReceivedMessageNumber = 0; //Determine the socket state: Ensure we lock correctly.// try { lockForWrite(); isReconnection = isSocketReconnecting(); }//try// finally { unlockForWrite(null, 0, false); }//finally// if(isReconnection) { synchronized(this) { lastReceivedMessageNumber = AbstractConnection.this.lastReceivedMessageNumber; }//synchronized// }//if// setFilters(new MessageFilter[] {new PreSecurityHandshakeFilter(isServerSide(), AbstractConnection.this, securityFilter), securityFilter, new PostSecurityHandshakeFilter(isServerSide(), AbstractConnection.this, isReconnection, lastReceivedMessageNumber), new OrbMessageFilter(AbstractConnection.this)}); }//SocketData()// /** * Gets the connection associated with the connection data. * @return The connection data's connection. */ public AbstractConnection getConnection() { return AbstractConnection.this; }//getConnection()// /** * Closes and releases any resources. */ protected void close() { //Should be overridden to provide actual socket closure.// }//close()// /** * Writes the message to the socket. * @param buffer The buffer containing the message(s) to be written to the socket. The buffer may be retained for writing at a future time. */ protected abstract void writeMessage(StreamBuffer buffer); /** * Sets the filters used for the socket. *

Filters are chained such that the first filter in the chain is the first one called when processing incomming messages, and the last is the first one called when sending messages.

* @param filters The filters to be added in order. */ protected final void setFilters(MessageFilter[] filters) { this.filters.removeAll(); this.filters.addAll(filters); //Create links between the filters.// for(int index = 0; index < filters.length; index++) { filters[index].nextOutputFilter = index == 0 ? (IMessageFilterOutputHandler) filterHandler : filters[index - 1]; filters[index].nextInputFilter = index == filters.length - 1 ? (IMessageFilterInputHandler) filterHandler : filters[index + 1]; }//for// }//setFilters()// /** * Starts the handshake process. */ protected final void startHandshake() { ((MessageFilter) filters.getFirst()).initializeFilter(); }//startHandshake()// /** * Called after all filters have initialized (all handshaking is completed). */ public void filterInitializationComplete() { //Run the initialization complete handler.// getInitCompleteHandler().evaluate(this); //Mark the last time we have had activity.// socketData.setActivityTime(); //Start a ping task.// setupPing(); }//filterInitializationComplete()// /** * Sends an outgoing message. This should be called for every message sent to the remote process. * @param out The output stream containing the full message. */ public final void sendMessage(OrbObjectOutputStream out) throws IOException { sendMessage(out.getOutputStream().getBuffer()); }//sendMessage()// /** * Sends an outgoing message. This should be called for every message sent to the remote process. * @param buffer The output buffer containing the full message. */ public final void sendMessage(StreamBuffer buffer) throws IOException { if(isEstablished()) { ((MessageFilter) filters.getLast()).startOutgoingMessage(buffer); }//if// else { if(queuedMessages == null) { queuedMessages = new LiteList(10, 20); }//if// //Transfer the buffer contents to a new buffer so that the old buffer can be released.// queuedMessages.add(new StreamBuffer(buffer)); }//else// }//sendMessage()// /** * Sends any messages that could not be sent due to socket initialization. */ private final void sendPendingMessages() { ThreadService.run(new Runnable() { public void run() { try { lockForWrite(); state = SOCKET_STATE_ESTABLISHED; if(queuedMessages != null) { for(int index = 0; index < queuedMessages.getSize(); index++) { StreamBuffer buffer = (StreamBuffer) queuedMessages.get(index); recordSentMessage(buffer.toByteArray(), getCurrentMessageNumber()); ((MessageFilter) filters.getLast()).startOutgoingMessage(buffer); }//for// //Clear the queued messages.// queuedMessages = null; }//if// }//try// finally { unlockForWrite(null, 0, true); }//finally// }//run()// }); }//sendPendingMessages()// /** * Sends a close message to the remote process. * @throws IOException */ private final void sendClose() { ((OrbMessageFilter) filters.getLast()).sendClose(); }//sendClose()// /** * Sends a ping message to the remote process. * @throws IOException */ private final void sendPing() { ((OrbMessageFilter) filters.getLast()).sendPing(); }//sendPing()// /** * Sends a receipt confirmation message to the remote process. * @param confirmationMessageNumber The message number we have received. */ private void sendConfirmation(int confirmationMessageNumber) { ((OrbMessageFilter) filters.getLast()).sendConfirmation(confirmationMessageNumber); }//sendConfirmation()// /** * Receives an incomming message. This should be called for every message received from the remote process. * @param in The incomming message from the socket. */ public final void receiveMessage(StreamBuffer in) { ((MessageFilter) filters.getFirst()).startIncommingMessage(in); }//receiveMessage()// /** * Gets the next non-zero call number in the socket's sequence. Used by the sending threads to track which messages are received by the remote process. *

A zero call number indicates to the orb that the call is one way.

* @return The next call number that should be used when sending data across the socket. */ public long getNextCallNumber() { return lastUsedCallNumber++; }//getNextCallNumber()// /** * Gets the time stamp of the last activity on this socket. *

Callers must lock on the AbstractConnection instance prior to accessing this information.

* @return The timestamp of the last message sent or received. */ protected long getActivityTime() { return activityTime; }//getActivityTime()// /** * Sets the time stamp of the last activity on this socket to the current time. *

Callers must lock on the AbstractConnection instance prior to accessing this information.

*/ protected void setActivityTime() { this.activityTime = System.currentTimeMillis(); }//setActivityTime()// /** * Determines whether the socket is in the process of reconnecting. *

Callers must lock on the AbstractConnection instance prior to accessing this information.

* @return Whether the server side of the socket is flaged as requiring a reconnect. */ public boolean isSocketReconnecting() { return state == SOCKET_STATE_RECONNECTING; }//isSocketReconnecting()// /** * Determines whether the socket is in the process of handshaking (if the socket initialization messages are being exchanged, or if the security is negotiating). *

Callers must lock on the AbstractConnection instance prior to accessing this information.

* @return Whether the socket is exchanging handshake messages that are not related to the application. */ public boolean isHandshaking() { return state == SOCKET_STATE_HANDSHAKING; }//isHandshaking()// /** * Determines whether the socket is connected and all handshaking is completed. *

Callers must lock on the AbstractConnection instance prior to accessing this information.

* @return Whether the socket handshake is completed and the socket is not reconnecting. */ public boolean isEstablished() { return state == SOCKET_STATE_ESTABLISHED; }//isEstablished()// /** * Sets the current state of the socket. *

Callers must lock on the AbstractConnection instance prior to calling this method.

* @param state The connection's current state, useful when a dedicated thread is not being used. */ public void setState(int state) { this.state = state; }//setState()// }//SocketData// /** * Thrown by the reconnect code when the remote machine could be connected to, but it had no knowledge of the previous connection (the process was reset). */ protected static class ReconnectFailedException extends RuntimeException { public ReconnectFailedException() {} }//ReconnectFailedException// /** * AbstractSocket constructor. */ public AbstractConnection() { super(); generateSessionPassword(); }//AbstractSocket()// /** * AbstractSocket constructor. * @param sessionId The session identifier issued by the server socket. * @param autoReconnectTimeLimit The amount of time the server should wait for the client to auto reconnect. */ public AbstractConnection(long sessionId, long autoReconnectTimeLimit) { super(); this.sessionId = sessionId; this.autoReconnectTimeLimit = autoReconnectTimeLimit; }//AbstractSocket()// /** * Gets the names used when the socket is created. * @return All the socket names. */ public LiteHashSet getNames() { return names; }//getNames()// ///** // * Gets the collection of names for this socket. // *

TODO: Does this need to change?

Warning: This method is not thread safe.

// * @return The names that have been assigned to this socket. // */ //public IList getNames() { // IList result = new LiteList(referenceMap.getSize(), 10); // IIterator keyIterator = referenceMap.keyIterator(); // // while(keyIterator.hasNext()) { // result.add(keyIterator.next()); // }//while// // // return result; //}//getNames()// /** * Determines whether the socket is currently open and usable. *

Warning: This method synchronizes to check to see if socket data is null. Most callers don't need to do this as they already have to get the socket data.

* @return Whether this socket is not closed. */ public synchronized boolean isOpen() { return getSocketData() != null; }//isOpen()// /** * Updates any sender specific attributes while a lock is being held on the socket instance. */ protected void updateSenderAttributes() { //Refresh the sender's last received message number attribute so it can be included in the message.// lastReceivedMessageNumberSenderOnly = getLastReceivedMessageNumber(); //Increment the last sent message number by one - we don't care if the number is not used.// setCurrentMessageNumber(getLastSentMessageNumber() + 1); setLastSentMessageNumber(getCurrentMessageNumber()); }//updateSenderAttributes()// /** * Locks the socket so the calling thread can write a message. * @return The socket data to be used for the writing of the message. */ public synchronized SocketData lockForWrite() { Thread currentThread = Thread.currentThread(); if(writingThread == currentThread) { writingLockCount++; }//if// else { if(writingThread != null) { try { while(writingThread != null) { wait(); }//while// }//try// catch(Throwable e) { Debug.log(e); }//catch// }//if// writingThread = currentThread; writingLockCount = 1; //Refresh any sender specific attributes while we are synchronized.// updateSenderAttributes(); }//else// return socketData; }//lockForWrite()// /** * Records a message sent to the remote process. * @param message The bytes for the message. * @param messageNumber The message number to index the message by. */ public void recordSentMessage(byte[] message, int messageNumber) { boolean sendConfirmation = false; int confirmationMessageNumber = 0; SocketData confirmationSocketData = null; synchronized(this) { if(message != null) { if(sendReceivedMessageNumber) { sendConfirmation = true; confirmationMessageNumber = getLastReceivedMessageNumber(); confirmationSocketData = getSocketData(); }//if// else { cachedMessageMap.put(messageNumber, message); }//else// }//if// }//synchronized// if(sendConfirmation) { confirmationSocketData.sendConfirmation(confirmationMessageNumber); synchronized(this) { if(--writingLockCount == 0) { writingThread = null; notifyAll(); }//if// if(message != null) { cachedMessageMap.put(messageNumber, message); }//if// }//synchronized// }//if// }//recordSentMessage()// /** * Unlocks the socket so other threads can write messages. * @param message The bytes for the message. Non-null if the message should be saved until the remote process confirms receipt of the message. * @param messageNumber The message number to index the message by. This is ignored if the message parameter is null. * @param setActivityTime Whether the activity time should be updated with the current timestamp. */ public void unlockForWrite(byte[] message, int messageNumber, boolean setActivityTime) { boolean sendConfirmation = false; int confirmationMessageNumber = 0; SocketData confirmationSocketData = null; synchronized(this) { if(sendReceivedMessageNumber) { sendConfirmation = true; confirmationMessageNumber = getLastReceivedMessageNumber(); confirmationSocketData = getSocketData(); }//if// else { if(--writingLockCount == 0) { writingThread = null; notifyAll(); }//if// if(setActivityTime && socketData != null) { socketData.setActivityTime(); }//if// if(message != null) { cachedMessageMap.put(messageNumber, message); }//if// }//else// }//synchronized// if(sendConfirmation) { confirmationSocketData.sendConfirmation(confirmationMessageNumber); synchronized(this) { if(--writingLockCount == 0) { writingThread = null; notifyAll(); }//if// if(setActivityTime && socketData != null) { socketData.setActivityTime(); }//if// if(message != null) { cachedMessageMap.put(messageNumber, message); }//if// }//synchronized// }//if// }//unlockForWrite()// /** * Clears the cached messages up to and including the given message number. * The caller must synchronize on this socket instance. * @param lastVerifiedMessageNumber The last known to be successful message number. */ protected synchronized void clearCachedMessages(int lastVerifiedMessageNumber) { if(lastVerifiedMessageNumber < lastConfirmedMessageNumber) { //This will only occur if the server was restarted and knows nothing of the old connection.// //Debug.log(new RuntimeException("Broken")); throw new ReconnectFailedException(); }//if// while(lastConfirmedMessageNumber != lastVerifiedMessageNumber) { cachedMessageMap.remove(++lastConfirmedMessageNumber); }//while// }//clearCachedMessages()// /** * Re-sends the messages the client/server report to have not received. * This is always called after reconnecting. * @param socketData The socket data to use to send the missing messages. * @param lastReceivedMessageNumber The last message number received by the remote process. */ protected void sendMissingMessages(StSocketData socketData, int lastReceivedMessageNumber) throws IOException { int sentCount = 0; //Clear any stale cached messages.// clearCachedMessages(lastReceivedMessageNumber); //If there are buffered messages that the client/server report as not being received, resend them now.// while(cachedMessageMap.getSize() > sentCount) { byte[] message = (byte[]) cachedMessageMap.get(++lastReceivedMessageNumber); if(message != null) { sentCount++; if(!sendMessage(message, socketData, lastReceivedMessageNumber)) { //Exit the loop since there was a problem sending messages and another reconnection may have occured (sending the rest of the messages may cause duplicate messages).// sentCount = cachedMessageMap.getSize(); }//if// }//if// }//while// }//sendMissingMessages()// /** * Adds the listener to the collection of objects that will be notified in the even that the socket closes. * These listeners will not be notified if the socket physically closed, but logically still open (if the underlying socket layer closes, but this abstract layer is still open and will reconnect). * @param listener The listener for the socket closed event. */ public synchronized void addCloseListener(ISocketCloseListener listener) { if(isOpen()) { closeListeners.add(listener); }//if// else { listener.onSocketClosed(this); }//else// }//addCloseListener()// /** * Adds a return value holder to the socket's collection of holders waiting values. * @param callNumber Identifies the remote call so that the return value bearing the same call number can be matched up with the holder. * @param holder The holder of the return value which will accept the return value of the call. */ public void addReturnValueHolder(long callNumber, Object holder) { returnValueMap.put(callNumber, holder); }//addReturnValueHolder()// /** * Closes the socket either via the ORB in which case isForced is false, or due to an unrecoverable error in the socket in which case isForced is true. * @param isForced Whether the close must be forced and is due to the socket failing, versus the orb naturally closing the socket. */ protected final void close(boolean isForced) { close(null, isForced, true); }//close()// /** * Closes the socket either via the ORB in which case isForced is false, or due to an unrecoverable error in the socket in which case isForced is true. * @param isForced Whether the close must be forced and is due to the socket failing, versus the orb naturally closing the socket. * @param sendCloseMessage Whether to send the close message to the remote process. This is ignored if isForced is true. */ protected final void close(boolean isForced, boolean sendCloseMessage) { close(null, isForced, sendCloseMessage); }//close()// /** * Closes the socket either via the ORB in which case isForced is false, or due to an unrecoverable error in the socket in which case isForced is true. * @param isForced Whether the close must be forced and is due to the socket failing, versus the orb naturally closing the socket. */ protected final void close(SocketData socketData, boolean isForced) { close(socketData, isForced, true); }//close()// /** * Closes the socket either via the ORB in which case isForced is false, or due to an unrecoverable error in the socket in which case isForced is true. * @param isForced Whether the close must be forced and is due to the socket failing, versus the orb naturally closing the socket. * @param sendCloseMessage Whether to send the close message to the remote process. This is ignored if isForced is true. */ protected final void close(SocketData socketData, boolean isForced, boolean sendCloseMessage) { synchronized(this) { socketData = socketData == null ? getSocketData() : socketData; if(socketData == getSocketData()) { closeInternal(socketData, isForced, sendCloseMessage); }//if// }//synchronized// }//close()// /** * Closes the socket either via the ORB in which case isForced is false, or due to an unrecoverable error in the socket in which case isForced is true. *

The caller must be synchronized on the this instance.

* @param socketData The socket data for the socket being closed. * @param isForced Whether the close must be forced and is due to the socket failing, versus the orb naturally closing the socket. * @param sendCloseMessage Whether to send the close message to the remote process. This is ignored if isForced is true. */ private void closeInternal(SocketData socketData, boolean isForced, boolean sendCloseMessage) { //NOTE: I turned this bit off because it was interfering with proper closing of a non-reconnectable socket. Even if the socket is reconnectable, closing it while it is initializing due to an application version problem shouldn't trigger a reconnect! // //Reconnect if possible.// // if(isForced && !getOrb().isShutDown()) { // reconnect((StSocketData) socketData); // }//if// // else { Object pingTask; if(socketData != null) { //Send the close message to the remote process.// if(!isForced && sendCloseMessage) { lockForWrite(); try { //Notify the other process if possible.// socketData.sendClose(); }//try// finally { unlockForWrite(null, 0, false); }//finally// }//if// //Close the actual socket.// try { ((SocketData) socketData).close(); }//try// catch(Throwable e) { //Ignored.// }//catch// }//if// //Clear the socket data so all callers know there the socket is closed.// setSocketData(null); //Clean up the ping task.// pingTask = getPingTask(); if(pingTask != null) { Scheduler.removeTask(pingTask); }//if// //Clean up the cached messages.// cachedMessageMap.removeAll(); //Tell the orb to clean up after the socket if the closure didn't go through normal channels.// if(isForced) { try { getOrb().cleanupSocket(this); }//try// catch(Throwable e) { Debug.log(e); }//catch// }//if// //Run the post close handler.// postClose(); // }//else// }//closeInternal()// /** * Cleanup and notify listeners after closing. */ protected void postClose() { //Notify the listeners. Note: We don't have to worry about this collection changing since the socket reference is null.// IIterator iterator = closeListeners.iterator(); if(getServerSocket() != null) { getServerSocket().unregisterConnection(this); }//if// while(iterator.hasNext()) { ((ISocketCloseListener) iterator.next()).onSocketClosed(this); }//while// //Clear the collection of listeners so we don't accidentally notify twice.// closeListeners.removeAll(); }//postClose()// /** * Initializes the reconnection to the remote process. * @param address The address of the previous connection. * @param port The port used for the previous connection. */ protected abstract void reconnect(InetAddress address, int port); /** * Attempts to reconnect to the remote process. The server will wait for the reconnection while the client will actively attempt to reconnect. * @param socketData The socket data the thread was using when the error was detected which lead to the request to reconnect. */ protected void reconnect(SocketData socketData) { //TODO: Flag the socket as reconnecting regardless of whether it is client or server.// //TODO: Synchronize such that no other thread can setup to reconnect. //TODO: Ensure another thread hasn't beat us to the punch. Must not be set to the state==reconnect & the socket data must be the same as the thread's that recognized a need to reconnect. //TODO: Ensure that when a reconnection is made that we are first sending any missing messages, then queued messages, then new messages. //TODO: Ensure that the old socket & socket data is properly cleaned up. // if(getAutoReconnectTimeLimit() != 0) { // long failureTimestamp = System.currentTimeMillis() + getAutoReconnectTimeLimit(); // // //TODO: We need to flag the socket as reconnecting and not stick around. No reason to make the thread wait for ever. // synchronized(this) { // if(socketData == getSocketData()) { // Debug.log("Socket failed."); // // //Ensure the socket is actually closed.// // try { // socketData.getSocket().close(); // }//try// // catch(Throwable e) { // //Ignored.// // }//catch// // // if(isServerSide()) { ////TODO: Must wait for the client to reconnect, but don't force the thread to stick around? // try { // //Wait for the client to reconnect. If this is a writing thread then it will hold the write lock so all other writing threads will block on this thread.// // while(!getOrb().isShutDown() && socketData == getSocketData() && failureTimestamp > System.currentTimeMillis()) { // wait(5000); // }//while// // }//try// // catch(Throwable e) { // Debug.log(e); // }//catch// // // //If we were forced out of the wait cycle then close the socket immediately so that the caller of this method fails.// // if(socketData == getSocketData()) { // setSocketData(null); // closeInternal(socketData, true, false); // }//if// // }//if// // else { // InetAddress address = null; // int port = getConnectedAddress().getPort(); // boolean failed = false; // Object reconnectId = getOrb().getReconnectListener() != null ? getOrb().getReconnectListener().startingReconnect(new LiteList(getNames())) : null; // // //Clear the socket data variable so we know when we have successfully reconnected.// // socketData = null; // // while(!failed && (socketData == null) && (System.currentTimeMillis() < failureTimestamp) && (reconnectId == null || getOrb().getReconnectListener().keepReconnecting(reconnectId))) { // if(reconnectId != null) { // getOrb().getReconnectListener().reconnectRemainingTime(reconnectId, failureTimestamp - System.currentTimeMillis()); // }//if// // // try { // if(address == null) { // try { // address = InetAddress.getByName(getConnectedAddress().getName()); // }//try// // catch(Throwable e) { // //Ignore it.// // //TODO: Should we warn the user that the reason we cannot reconnect is that DNS is down? // }//catch// // }//if// // // if(address != null) { // reconnect(address, port); // }//if// // }//try// // catch(Throwable e) { // //Ignore the exception - keep trying to reconnect.// // socketData = null; // }//catch// // // if(socketData == null) { // try { // //Sleep so we don't release our lock on the socket object.// // Thread.sleep(10); // }//try// // catch(Throwable e) { // //Exit the while loop so we fail.// // break; // }//catch// // }//if// // }//while// // // //Set the new socket data, or null if we failed.// // if(socketData != null) { // if(reconnectId != null) { // getOrb().getReconnectListener().reconnectSucceeded(reconnectId); // }//if// // // setSocketData(socketData); // }//if// // else { // if(reconnectId != null) { // getOrb().getReconnectListener().reconnectFailed(reconnectId); // }//if// // // setSocketData(null); // closeInternal(socketData, true); // }//else// // }//else// // }//if// // }//synchronized// // }//if// // else { // setSocketData(null); // closeInternal(socketData, true); // }//else// }//reconnect()// /** * Reconnects a broken socket on the server. This method is called from the server socket or socket that was created by the server socket in response to the client reconnecting. * @param newConnection The new connection that received the reconnect request. This will be closed and the connection data will be transfered to this connection. * @param lastReceivedMessage The last message number received by the client. */ protected void reconnect(AbstractConnection newConnection, int lastReceivedMessage) { // try { // ServerInitializationResponse response = null; // OrbObjectOutputStream out; // int lastReceivedMessageNumberOnClient = lastReceivedMessage; // StSocketData socketData = new StSocketData(socket); // String sessionPassword; // // response = new ServerInitializationResponse(getSocketOptions(), getLastReceivedMessageNumber()); // // //If we are using a security system then pass the data.// // if(getServerSocket().getSecurityProvider() != null) { // response.securitySystemClassName = getServerSocket().getSecurityProvider().getSecuritySystemClientClassName(); // response.securitySystemData = getServerSocket().getSecurityProvider().getSecuritySystemClientMetadata(); // socketData.setSecuritySystem(getServerSocket().getSecurityProvider().createSecuritySystem()); // }//if// // // //Send the response to the client.// // out = new OrbObjectOutputStream(null, new OrbByteArrayOutputStream(5), null, this, new ClassReplacementHandler()); // response.writeExternal(out); // out.getOutputStream().getBuffer()[0] = MESSAGE; // StreamSupport.writeInt(out.getSize() - 5, out.getOutputStream().getBuffer(), 1); // socket.getOutputStream().write(out.getOutputStream().getBuffer(), 0, out.getSize()); // // if(socketData.getSecuritySystem() != null) { // while(socketData.getSecuritySystem().isInitializing()) { // processSecuritySystemHandshake(socketData); // }//while// // }//if// // // sessionPassword = receiveSessionPassword(socketData); // // if(!sessionPassword.equals(getSessionPassword())) { // throw new SecurityException("Invalid session password - someone is hacking."); // }//if// // // //Send the messages that never made it now that we have reconnected to the client.// // sendMissingMessages(socketData, lastReceivedMessageNumberOnClient); // //Flag the socket as back in a normal state.// // socketData.setState(SocketData.SOCKET_STATE_ESTABLISHED); // // //TODO: Notify the waiting threads sitting at the reconnect(StSocketData) method. // synchronized(this) { // //Ensure the old socket dies.// // try { // ((StSocketData) getSocketData()).getSocket().close(); // }//try// // catch(Throwable e) { // //Ignored.// // }//catch// // // setSocketData(socketData); // notifyAll(); // }//synchronized// // }//try// // catch(IOException e) { // Debug.log(e); // //TODO: Reset to allow for another reconnect attempt. // }//catch// // catch(SecurityException e) { // Debug.log(e); // //TODO: Someone is trying to hack into another user's connection - we should probably block their IP for a short time. // //TODO: Close the connection immediately and don't allow reconnect for now. // }//catch// // catch(Throwable e) { // //TODO: Had a problem creating a security system.. shouldn't ever get here. // Debug.log(e); // }//catch// }//reconnect()// /** * Gets a message output stream for the socket. * @return The output stream used to serialize objects for transmission to the remote process. */ public OrbObjectOutputStream getOutputStream() throws IOException { OrbByteArrayOutputStream bout = new OrbByteArrayOutputStream(0); IOrbOutputStream stream = bout; if(socketOptions.getStreamInjector() != null) { stream = socketOptions.getStreamInjector().inject(bout); }//if// return new OrbObjectOutputStream(stream, bout, classMap, AbstractConnection.this, new ClassReplacementHandler()); }//getOutputStream()// /** * Gets the address that the socket is connected to. *

Warning: This is only valid for client side sockets.

* @return The remote process address that this socket connects to. */ public Address getConnectedAddress() { return connectedAddress; }//getConnectedAddress()// /** * Gets the name and port that the socket is connected to on the other end (remote process). * @return The name and port in the [name]:[port] formatted string. */ public String getNameAndPort() { return nameAndPort; }//getNameAndPort()// /** * Gets the ORB associated with this socket. * @return The ORB instance that created this socket. */ public Orb getOrb() { return orb; }//getOrb()// /** * Gets the permissions object associated with this socket. * @return The permissions that callers over this socket have. */ public Object getPermissions() { return socketOptions.getPermissions(); }//getPermissions()// /** * Gets the number of times the socket is being referenced. When this value reaches zero the orb will close the socket. * @return The number of times the socket is referenced. */ public int getReferenceCount() { return totalReferenceCount; }//getReferenceCount()// /** * Gets the number of times the socket is referenced with a given name. * @param name The name of the socket. * @return The count of references to the socket by the name. */ public int getReferenceCount(String name) { int result = 0; if(referenceMap.containsKey(name)) { result = referenceMap.get(name); }//if// return result; }//getReferenceCount()// /** * Increments the reference count which tracks the number of times the socket is being referenced. * @param name The name on which to increment the reference count for the socket. */ public void incrementReferenceCount(String name) { if(referenceMap.containsKey(name)) { referenceMap.put(name, referenceMap.get(name) + 1); }//if// else { referenceMap.put(name, 1); }//else// totalReferenceCount++; }//incrementReferenceCount()// /** * Decrements the reference count which tracks the number of times the socket is being referenced. * @param name The name on which to decrement the reference count for the socket. * @return The number of references by the name after the decrement. */ public int decrementReferenceCount(String name) { if(referenceMap.containsKey(name)) { int count = referenceMap.get(name) - 1; if(count == 0) { referenceMap.remove(name); }//if// else { referenceMap.put(name, count); }//else// totalReferenceCount--; return count; }//if// else { throw new RuntimeException("Unable to decrement the socket count on \"" + name + "\" because the name is not associated with this socket."); }//else// }//decrementReferenceCount()// /** * Initializes a server side socket accepted through a server socket. * @param orb The orb instance that created this socket. * @param serverSocket The server socket that accepted this socket. * @param messageHandler The handler called to process a message after the bytes have been read from the stream. * @param initCompleteHandler The handler called to finish the initialization process (after all initialization of the socket has finished). * @param nameAndPort A string containing the address and port that the socket is connected to in the form: a.b.c.d:port. */ protected void initialize(Orb orb, AbstractConnectionServer serverSocket, VoidHandler2 messageHandler, VoidHandler1 initCompleteHandler, String nameAndPort) throws IOException { this.orb = orb; this.serverSocket = serverSocket; this.classLoader = serverSocket.getClassLoader(); this.messageHandler = messageHandler; this.initCompleteHandler = initCompleteHandler; this.socketOptions = serverSocket.getSocketOptions(); this.nameAndPort = nameAndPort; this.connectedAddress = null; }//initialize()// /** * Initializes a client side socket. * @param orb The orb instance that created this socket. * @param socketOptions The options used to initialize the socket. * @param classLoader The loader used to load classes for the orb. * @param messageHandler The handler called to process a message after the bytes have been read from the stream. * @param initCompleteHandler The handler called to finish the initialization process (after all initialization of the socket has finished). * @param nameAndPort The name and port string that identifies this socket. */ protected void initialize(Orb orb, ISocketOptions socketOptions, ClassLoader classLoader, VoidHandler2 messageHandler, VoidHandler1 initCompleteHandler, String nameAndPort) throws IOException { this.orb = orb; this.serverSocket = null; this.classLoader = classLoader; this.messageHandler = messageHandler; this.initCompleteHandler = initCompleteHandler; this.socketOptions = socketOptions; this.nameAndPort = nameAndPort; this.connectedAddress = null; }//initialize()// /** * Gets the abstract server socket that created this abstract socket. * @return The server socket associated with this socket. This is null if the socket was not accepted by a server socket. */ public AbstractConnectionServer getServerSocket() { return serverSocket; }//getServerSocket()// /** * Determines whether the socket is on the server side of the connection. * @return Whether this socket was accepted by a server socket. */ public boolean isServerSide() { return serverSocket != null; }//isServerSide()// /** * Removes the listener from the collection of objects that will be notified in the even that the socket closes. * @param listener The listener that will no longer be notified upon the socket closed event. */ public synchronized void removeCloseListener(ISocketCloseListener listener) { if(isOpen()) { closeListeners.remove(listener); }//if// }//removeCloseListener()// /** * Removes a return value holder from the socket's collection of holders waiting values. * @param callNumber Identifies the remote call so that the return value bearing the same call number can be matched up with the holder. * @return The holder of the return value which will accept the return value of the call. */ public Object removeReturnValueHolder(long callNumber) { return returnValueMap.remove(callNumber); }//removeReturnValueHolder()// /** * Sends a message across the socket to the remote process. * The callers to this method must all call lockForWrite() and unlockForWrite() before and after the call to ensure only one thread writes at a time. * If the socket fails and could reconnect then the thread should reconnect if on the client, or wait if on the server. * The message will be stored if the socket is reconnectable until the remote process notifies this process that the message was received. * @param message The message to be sent. * @param socketData The data used to interact with the remote process. * @return The message bytes which are to be indexed by the message number until the remote process verifies that the message was received. */ public final byte[] sendMessage(OrbObjectOutputStream message, SocketData socketData) throws IOException { byte[] result = message.getOutputStream().getBuffer().toByteArray(); socketData.sendMessage(message); message.close(); return result; }//sendMessage()// /** * Sends a message across the socket to the remote process. * The callers to this method must all call lockForWrite() and unlockForWrite() before and after the call to ensure only one thread writes at a time. * If the socket fails and could reconnect then the thread should reconnect if on the client, or wait if on the server. * The message will be stored if the socket is reconnectable until the remote process notifies this process that the message was received. * @param message The message to be sent. * @param socketData The data used to interact with the remote process. * @param messageNumber The number of the message being sent. * @return Whether the message was resent without error. If an error occured and another reconnection attempt was made then this result will be false. */ public final boolean sendMessage(byte[] message, SocketData socketData, int messageNumber) throws IOException { StreamBuffer buffer = new StreamBuffer(); boolean result = true; buffer.writeBytes(message); try { socketData.sendMessage(buffer); }//try// catch(IOException e) { Debug.log(e); close(socketData, true); result = false; }//catch// return result; }//sendMessage()// /** * Sets up the ping task. */ protected void setupPing() { if(keepAliveInterval >= 500 && keepAliveInterval <= 600000) { setPingTask(Scheduler.addTask(keepAliveInterval, new Scheduler.Task() { public void evaluate() { try { sendPing(false); }//try// catch(IOException e) { Debug.log("Ping Failed in the Ping Task."); }//catch// catch(Throwable e) { //Silently die: The ping task wasn't removed.// Scheduler.removeTask(this); }//catch// }//evaluate()// }, false)); }//if// }//setupPing()// /** * Sends a ping message to the remote process to verify it is still responding. */ public final void sendPing() throws IOException { sendPing(true); }//sendPing()// /** * Sends a ping message to the remote process to verify it is still responding. * @param required Whether the ping is absolutly necessary, otherwise it will only occur if there has not been any activity in a while. */ protected final void sendPing(boolean required) throws IOException { try { SocketData socketData; boolean send = true; Thread currentThread = Thread.currentThread(); synchronized(this) { if(writingThread == currentThread) { writingLockCount++; }//if// else { if(writingThread != null) { try { while(writingThread != null) { wait(); }//while// }//try// catch(Throwable e) { Debug.log(e); }//catch// }//if// writingThread = currentThread; writingLockCount = 1; }//else// socketData = getSocketData(); send = socketData.isEstablished() && ((required) || (socketData.activityTime == 0) || (System.currentTimeMillis() - socketData.activityTime > keepAliveInterval)); }//synchronized// if(send) { socketData.sendPing(); }//if// }//try// finally { unlockForWrite(null, 0, true); }//finally// }//sendPing()// /** * Gets the ping task. * @return The task used to ping via the socket to ensure the socket stays open. */ protected Object getPingTask() { return pingTask; }//getPingTask()// /** * Sets the ping task. * @param pingTask The task used to ping via the socket to ensure the socket stays open. */ protected void setPingTask(Object pingTask) { this.pingTask = pingTask; }//setPingTask()// /** * Gets the handler that processes the message after it has been read from the stream. * @return The message handler. */ protected VoidHandler2 getMessageHandler() { return messageHandler; }//getMessageHandler()// /** * Gets the amount of time the server should wait for the client to auto reconnect. * @return The number of milliseconds before the client cannot reconnect automatically without the application finding out. A zero value indicates no auto-reconnect is allowed. */ protected long getAutoReconnectTimeLimit() { return autoReconnectTimeLimit; }//getAutoReconnectTimeLimit()// /** * Gets the generated password the client must send to the server when reconnecting. * @return An orb generated and non-unique password that ensures someone can't attack a connection by reconnecting to it. */ protected String getSessionPassword() { return sessionPassword; }//getSessionPassword()// /** * Sets the generated password the client must send to the server when reconnecting. * @param sessionPassword An orb generated and non-unique password that ensures someone can't attack a connection by reconnecting to it. */ protected void setSessionPassword(String sessionPassword) { if(isServerSide()) { this.sessionPassword = sessionPassword; }//if// }//getSessionPassword()// /** * Gets the session identifier provided by the server. * @return A server connection generated session identifier used to reconnect a session at a later time. */ protected long getSessionId() { return sessionId; }//getSessionId()// /** * Sets the session identifier provided by the server. * @param sessionId A server connection generated session identifier used to reconnect a session at a later time. */ protected void setSessionId(long sessionId) { this.sessionId = sessionId; }//setSessionId()// /** * Generates a new session password. This should only be called by the constructor, or when completing a reconnection to the server (called on the client to reset the password since will be exposed while reconnecting). */ protected void generateSessionPassword() { byte[] randomBytes = new byte[10]; //Generate a random password.// new Random().nextBytes(randomBytes); this.sessionPassword = StringSupport.toHexString(randomBytes); }//generateSessionPassword()// /** * Gets the handler called after the socket initialization has completed. * @return The handler the socket should call just after completing the handshake. */ protected VoidHandler1 getInitCompleteHandler() { return initCompleteHandler; }//getInitCompleteHandler()// /** * Gets the handler called when the client is told by the server that it should upgrade to a newer version. * @return A handler notified when the server reports a newer version available. */ protected IVersionChangeHandler getNewVersionHandler() { return socketOptions.getVersionChangeHandler(); }//getNewVersionHandler()// /** * Processes the server's initial initialization response message. * @param autoReconnectTimeLimit * @param connectedAddress * @param keepAliveInterval * @param securitySystemClassName * @param securitySystemData * @throws IOException */ protected void processServerInitializationResponse(long autoReconnectTimeLimit, Address connectedAddress, long keepAliveInterval) { this.autoReconnectTimeLimit = autoReconnectTimeLimit; this.connectedAddress = connectedAddress; this.keepAliveInterval = keepAliveInterval; }//processServerInitializationResponse()// /** * Processes the server's secondary initialize response message. * @param sessionId */ protected void processServerSecondaryResponse(long sessionId) { this.sessionId = sessionId; }//processServerSecondaryResponse()// /** * Gets the options the socket was originally created with. * @return The socket's creation options. */ protected ISocketOptions getSocketOptions() { return socketOptions; }//getSocketOptions()// /** * Gets the last message number received. * @return The last message number received over this socket. */ protected int getLastReceivedMessageNumber() { return lastReceivedMessageNumber; }//getLastReceivedMessageNumber()// /** * Sets the last message number received. * @param lastReceivedMessageNumber The last message number received over this socket. */ protected void setLastReceivedMessageNumber(int lastReceivedMessageNumber) { this.lastReceivedMessageNumber = lastReceivedMessageNumber; }//setLastReceivedMessageNumber()// /** * Gets the last message number sent. * @return The number of the last message sent via the socket. */ protected int getLastSentMessageNumber() { return lastSentMessageNumber; }//getLastSentMessageNumber()// /** * Sets the last message number sent. * @param lastSentMessageNumber The number of the last message sent via the socket. */ protected void setLastSentMessageNumber(int lastSentMessageNumber) { this.lastSentMessageNumber = lastSentMessageNumber; }//setLastSentMessageNumber()// /** * Gets the current socket's data. *

This method should only be called within a synchronize(socket) block, or during initialization when no other threads could be accessing the socket.

* @return The container that encapsulates the data that might change when the socket fails and reconnects. This will be null if the socket is closed. */ protected SocketData getSocketData() { return socketData; }//setSocketData()// /** * Sets the current socket's data. *

This method should only be called within a synchronize(socket) block, or during initialization when no other threads could be accessing the socket.

* @param socketData The container that encapsulates the data that might change when the socket fails and reconnects. */ protected void setSocketData(SocketData socketData) { this.socketData = socketData; }//setSocketData()// /** * Gets the security system's class name, used to re-create the security system on the client. * @return The qualified class name of the security system - only set on the client side of a connection. */ protected String getSecuritySystemClassName() { return securitySystemClassName; }//getSecuritySystemClassName()// /** * Gets the security system's data, used to re-create the security system on the client. * @return The security system data used to initialize a new security system - only set on the client side of a connection. */ protected Object getSecuritySystemData() { return securitySystemData; }//getSecuritySystemData()// /** * Gets the message number to be used with the currently being sent message. This attribute is sender thread only. * @return The current message number, used by the current sending thread. */ protected int getCurrentMessageNumber() { return currentMessageNumber; }//getCurrentMessageNumber()// /** * Sets the message number to be used with the currently being sent message. This attribute is sender thread only. * @param currentMessageNumber The current message number, used by the current sending thread. */ protected void setCurrentMessageNumber(int currentMessageNumber) { this.currentMessageNumber = currentMessageNumber; }//setCurrentMessageNumber()// }//AbstractSocket//