Files
Brainstorm/Orb/src/com/de22/orb/Orb.java

2622 lines
98 KiB
Java

/*
* Copyright (c) 1999,2009 Declarative Engineering LLC.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Declarative Engineering LLC
* verson 1 which accompanies this distribution, and is available at
* http://declarativeengineering.com/legal/DE_Developer_License_v1.txt
*/
package com.de22.orb;
import java.io.Externalizable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import com.common.io.*;
import com.common.util.*;
import com.common.debug.*;
import com.common.event.VoidHandler1;
import com.common.event.VoidHandler2;
import com.common.exception.ThreadException;
import com.common.thread.*;
import com.common.comparison.*;
import com.common.util.optimized.*;
import com.de22.orb.AbstractConnection.SocketData;
import com.de22.orb.development.*;
import com.de22.orb.exception.*;
import com.de22.orb.io.*;
/**
* <p>Restrictions:
* A proxied object should not have a changeable hash code value. If the proxied object's hash value changes then it will not be properly comparable with its proxy.
* </p>
* <p>TODO:
* 1) Determine whether server socket accepted sockets should be cached so that the server threads can use them, or does the server thread have to create a socket to the client if it tries to create a socket to that address?
* </p>
*/
public class Orb {
public static final int PROTOCOL_VERSION = 1;
static final byte MESSAGE_EXECUTE = 1;
static final byte MESSAGE_RETURN = 2;
static final byte MESSAGE_LOCATE = 3;
static final byte MESSAGE_AGENT = 4;
static final byte MESSAGE_DECREMENT_PROXY_COUNT = 5;
public static final long OPTIONS_NONE = 0x00;
public static final long OPTIONS_USE_SOCKET_GROUPS = 0x01;
public static final long OPTIONS_USE_TCP_NODELAY = 0x02;
private IClassTracker classTracker = null; //Used to maintain track of which classes are serialized.//
private IInstanceFactory instanceFactory = null; //Used to enable deserialization of custom application classes in an acceptable manner.//
private ClassLoader classLoader = null; //Used to create proxy classes as needed within the application.//
/** Abstracts finding a proxy class for a given object or interface. */
private IProxyInterfaceLoader proxyInterfaceLoader = null;
private static final boolean DEBUG = true; //An internal only debug flag to turn on extra checking durring testing.//
private boolean debug = false;
/** The reconnection listener which is called when a reconnection attempt is made. */
private IReconnectListener reconnectListener = null;
/** The optional engine to use by default for handling NIO threading of socket comms. If not provided then NIO will only be used for sockets that pass an NIO engine when opened. */
private NioEngine nioEngine = null;
private LiteHashMap serverSocketsByName = new LiteHashMap(5);
private LiteHashMap serverSocketsByAddressPort = new LiteHashMap(20);
private LiteHashMap socketsByName = new LiteHashMap(20);
private LiteHashMap socketsByAddressPort = new LiteHashMap(20);
private boolean allowDuplicateSockets = false; //Whether to allow more than one socket to a specific address and port.//
private LiteHashMap proxySingletons = new LiteHashMap(10);
private LiteHashMap boundValueMap = new LiteHashMap(10); //A map of bound values by resource.//
private LongObjectHashMap luidToRefMap = new LongObjectHashMap(20); //Maps a locally unique identifier to a Reference object containing a local or remote reference to an object.//
private ObjectLongHashMap remoteRefKeyToLuidMap = new ObjectLongHashMap(10); ////
private ObjectLongHashMap objectToLuidMap = new ObjectLongHashMap(10, Comparator.getIdentityComparator()); ////
private ObjectIntHashMap proxyTypeToTypeIdMap = new ObjectIntHashMap(20); //A map of proxy class to numeric proxy id.//
private IntObjectHashMap proxyTypeIdToProxyMap = new IntObjectHashMap(20); //A mapping of proxy class id to a proxy that can be used to invoke methods on an object that is being proxied.//
private int nextProxyLuid = 0; //The last used locally unique socket id for identifying proxy classes.//
private long nextObjectLuid = 0; //The last used locally unique object id for the orb.//
private Object monitorReferenceCounter = new Object();
private Object monitorSocketChange = new Object();
private Object monitorObjectLuidCreator = new Object();
/** Whether the orb has been shutdown. Once shutdown the orb does not restart ever. */
private volatile boolean isShutDown = false;
private final SocketInitCompletionHandler socketInitCompletionHandler = new SocketInitCompletionHandler();
private final SocketProcessMessageHandler socketProcessMessageHandler = new SocketProcessMessageHandler();
/**
* The default instance factory used by the orb.
* Applications should normally provide their own implementations.
*/
public static class DefaultInstanceFactory implements IInstanceFactory {
public DefaultInstanceFactory() {
}//DefaultInstanceFactory()//
public Object createInstance(Class type) {
try {
return type.newInstance();
}//try//
catch(Throwable e) {
return null;
}//catch//
}//createInstance()//
}//DefaultInstanceFactory//
/**
* Provides customized deserialization of streamed application objects.
* <p><b>WARNING: Circular reference to this serialized object may not occur within its deserialization.</b>
* <p>The deserialize method will receive the object's class object and the stream reference. It should determine whether it can deserialize that type and either pass the request to the encapsulated handler, or return null indicating that the default action should occur within the stream.</p>
* <p>The encapsulated handler allows the handlers to be chained together such that the stream calls the first handler in the chain and it will call the next handler until a handler deserializes the object, or all handlers return null.</p>
* <p>TODO: Allow for circular references in the deserialized object.
*/
public static class ProxyDeserializationHandler extends AbstractObjectInputStream.CustomDeserializationHandler {
public ProxyDeserializationHandler() {
super();
}//ProxyDeserializationHandler()//
public ProxyDeserializationHandler(AbstractObjectInputStream.CustomDeserializationHandler encapsulatedHandler) {
super(encapsulatedHandler);
}//ProxyDeserializationHandler()//
/**
* Receives the object's class object and the stream reference and determines whether it can deserialize that type, either passing the request to the encapsulated handler, or returning null indicating that the default action should occur within the stream.
* <p><b>WARNING: Circular reference to this serialized object may not occur within its deserialization.</b>
* @param type The class for the object needing deserialization.
* @param in The input stream containing the serialized object's bytes.
* @return The object deserialized or null indicating that the stream should deserialize the object using the default deserialization algorithms.
*/
public Object deserialize(Class type, com.common.io.ObjectInputStream in) {
if(type.isAssignableFrom(Proxy.class)) {
try {
Proxy proxy = (Proxy) type.newInstance();
proxy.readExternal(in);
return proxy;
}//try//
catch(Throwable e) {
throw new RuntimeException("Failed to read a proxy from the input stream. See OrbObjectInputStream.ProxyDeserializationHandler.");
}//catch//
}//if//
else {
return super.deserialize(type, in);
}//else//
}//deserialize()//
}//ProxyDeserializationHandler//
/**
* Encapsulates the messages in a runnable class so that when received they may simply be run.
*/
private class RemoteRunnable implements Runnable, java.io.Externalizable {
private byte type = 0; //The message type which determines how it will be handled.//
/** Whether the message data can be forwarded without destreaming. <p>Currently not used because almost all messages may contain connection specific references, such as type name to number mappings, which cannot easily be transfered without deserializing and re-serializing the stream. */
private boolean isForwardable = false;
private long callNumber = 0L; //The call id which is used to send a return message in the context of the initial message. This should be zero for one way messages.//
private long objectLuid = 0L; //References the remote object to interact with.//
private int proxyType = 0; //Identifies the proxy class for execute messages. This is also used by the agent to specify whether the objectLuid is used to follow a proxy's path (1=use luid; 0=don't).//
private Object objectId = null; //The objectId is used to locate a remote object.//
private Ref ref = null; //Only used for an execute or agent commands.//
private Object data = null; //Dependant on the type of the runnable.//
private AbstractConnection socket = null;
private int decrementSize = 0; //The number of times to decrement the count on the remote process. This is only used by the decrement message.//
private long timeout = 0L; //The minimum amount of time that the orb should wait for a response.//
/** The stream buffer containing the portion of the message to be forwarded as is to the next hop to the destination. The forward may be part of an execute or return call. <p>WARNING: Must ensure that any stream specific stuff like class name to number mappings don't exist in the data or are somehow transformed when switching streams (sockets). */
private StreamBuffer dataStream = null;
/** The holder attached to a return message. The holder will either be a ReturnValueHolder or an IResultCallback. */
private Object holderObject = null;
/** The return object value, or null if the result is null, or if the data stream should be used (is not null). */
private Object returnObject = null;
public RemoteRunnable() {
}//RemoteRunnable()//
public void run() {
switch(type) {
case MESSAGE_EXECUTE:
// //If the execute is local then execute the method and (if it is not one way) return the result.//
// if(ref.object != null) {
// //Execute the method on the proper object.//
// remoteExecute(socket, callNumber, objectLuid, proxyType, ((ExecuteData) data).method, ((ExecuteData) data).parameters, ref, ((ExecuteData) data).autoProxyClass);
// }//if//
//If the reference is remote then forward the message to the remote process.//
if(ref != null) {
if(ref.object == null) {
localForward(socket, this, dataStream);
}//if//
else if(ref.object != null) {
//Execute the method on the proper object.//
remoteExecute(socket, callNumber, objectLuid, proxyType, ((ExecuteData) data).method, ((ExecuteData) data).parameters, ref, ((ExecuteData) data).autoProxyClass);
}//else if//
else {
Debug.log("Unhandled exception caught by ORB: ", new InvalidProxyException("The object reference is null, but the message is not forwardable."));
}//else//
}//if//
else {
localReturn(socket, callNumber, new InvalidProxyException("Unable to traverse the network to reach the proxied object."));
}//else//
break;
case MESSAGE_RETURN:
if(holderObject instanceof ReturnValueHolder) {
ReturnValueHolder holder = (ReturnValueHolder) holderObject;
if(holder.socket != null) {
localReturn(holder.socket, holder.callNumber, returnObject);
}//if//
else {
holder.setValue(returnObject);
}//else//
}//if//
else if(holderObject instanceof IResultCallback) {
((IResultCallback) holderObject).run(returnObject);
}//else if//
break;
case MESSAGE_LOCATE:
//Locate the requested object and return a proxy to it.//
localReturn(socket, callNumber, lookupInternal(objectId, socket.getPermissions()));
break;
case MESSAGE_AGENT:
//If the agent has reached its destination then execute the agent and (if it is not one way) return the result.//
if((ref == null) || (ref.object != null)) {
//Execute the agent.//
remoteExecuteAgent(socket, ((AgentData) data).agent, callNumber);
}//if//
else if((proxyType == 1) && (ref == null)) {
//Error: Cannot find the specified reference! The agent cannot be sent to the next destination.//
if(callNumber != 0L) {
localReturn(socket, callNumber, new InvalidProxyException("Unable to traverse the network to reach the proxied object."));
}//if//
}//if//
else if((proxyType == 1) && (ref.object == null)) {
localForward(socket, this, dataStream);
}//else if//
break;
case MESSAGE_DECREMENT_PROXY_COUNT:
remoteDecrementProxyCount(socket, objectLuid, decrementSize);
break;
default:
Debug.halt();
break;
}//switch//
}//run()//
public void readExternal(java.io.ObjectInput in) throws ClassNotFoundException, IOException {
type = (byte) in.readByte();
switch(type) {
case MESSAGE_EXECUTE:
callNumber = in.readLong();
isForwardable = in.readBoolean();
objectLuid = in.readLong();
proxyType = in.readInt();
timeout = in.readLong();
break;
case MESSAGE_RETURN:
callNumber = in.readLong();
isForwardable = in.readBoolean();
//If the return is being forwarded then send the message data to the appropriate socket to be sent.//
break;
case MESSAGE_LOCATE:
callNumber = in.readLong();
objectId = in.readObject();
timeout = in.readLong();
break;
case MESSAGE_AGENT:
callNumber = in.readLong();
isForwardable = in.readBoolean();
objectLuid = in.readLong();
timeout = in.readLong();
proxyType = in.readBoolean() ? 1 : 0;
break;
case MESSAGE_DECREMENT_PROXY_COUNT:
objectLuid = in.readLong();
decrementSize = in.readInt();
break;
default:
Debug.halt();
break;
}//switch//
}//readExternal()//
public void writeExternal(java.io.ObjectOutput out) throws IOException {
out.writeByte(type);
switch(type) {
case MESSAGE_EXECUTE:
out.writeLong(callNumber);
out.writeBoolean(isForwardable);
out.writeLong(objectLuid);
out.writeInt(proxyType);
out.writeLong(timeout);
break;
case MESSAGE_RETURN:
out.writeLong(callNumber);
out.writeBoolean(isForwardable);
//If the return is being forwarded then send the message data to the appropriate socket to be sent.//
break;
case MESSAGE_LOCATE:
out.writeLong(callNumber);
out.writeObject(objectId);
out.writeLong(timeout);
break;
case MESSAGE_AGENT:
out.writeLong(callNumber);
out.writeBoolean(isForwardable);
out.writeLong(objectLuid);
out.writeLong(timeout);
out.writeBoolean(proxyType == 1);
break;
case MESSAGE_DECREMENT_PROXY_COUNT:
out.writeLong(objectLuid);
out.writeInt(decrementSize);
break;
default:
Debug.halt();
break;
}//switch//
}//writeExternal()//
}//RemoteRunnable//
/**
* Encapsulates the data required to execute a method on a remote process.
*/
private static class ExecuteData implements Externalizable {
private long method = 0L;
private Object[] parameters = null;
private Class autoProxyClass = null;
public ExecuteData() {
}//ExecuteData()//
public void readExternal(java.io.ObjectInput in) throws ClassNotFoundException, IOException {
method = in.readLong();
parameters = new Object[in.readShort()];
autoProxyClass = (Class) in.readObject();
for(int index = 0; index < parameters.length; index++) {
parameters[index] = in.readObject();
}//for//
}//readExternal()//
public void writeExternal(java.io.ObjectOutput out) throws IOException {
out.writeLong(method);
out.writeShort((short) (parameters != null ? parameters.length : 0));
out.writeObject(autoProxyClass);
if(parameters != null) {
for(int index = 0; index < parameters.length; index++) {
out.writeObject(parameters[index]);
}//for//
}//if//
}//writeExternal()//
}//ExecuteData//
/**
* Encapsulates the data required to execute a method on a remote process.
*/
private static class AgentData {
private Object agent = null;
private boolean followProxy = false;
private AgentData() {
}//AgentData()//
public void readExternal(java.io.ObjectInput in) throws ClassNotFoundException, IOException {
agent = (Object) in.readObject();
followProxy = in.readBoolean();
}//readExternal()//
public void writeExternal(java.io.ObjectOutput out) throws IOException {
out.writeObject(agent);
out.writeBoolean(followProxy);
}//writeExternal()//
}//AgentData//
private class Ref extends ObjectIntHashMap implements ISocketCloseListener {
private int localCount = 0;
private int totalCount = 0;
/** The luid for the locally referenced object, or the luid for the remotely referenced object (if the ref is remote). */
private long luid = -1;
/** The socket that proxy messages travel across to reach the proxied object. This is only non-null in the event the ref is remote. */
private AbstractConnection returnSocket = null;
/** The referenced object, or null if it is remotely held. */
private Object object = null;
/** Maintains the total number of local increments on a non-local reference. This number should be sent to the remote process when the decrement occurs. */
private int remoteLocalCountTotal = 0;
/** An optional reference to a remote reference key that allows lookup of this reference. This will be non-null if this reference is not local. */
private RemoteRefKey remoteRefKey = null;
private Ref(Object object, long luid) { //For local.//
super(5);
this.luid = luid;
this.object = object;
}//Ref()//
private Ref(Proxy proxy, RemoteRefKey remoteRefKey) { //For remote.//
super(5);
//this.proxy = proxy;
this.returnSocket = proxy.getSocket__Proxy();
this.luid = proxy.getLuid__Proxy();
this.remoteRefKey = remoteRefKey;
}//Ref()//
/**
* Increments the reference count for the reference object.
* @param socket The socket to increment the count on, or null if the proxy was created locally (cloned or instantiated). If the proxy is deserializing and not backtracking then it will be the socket it travelled over. A handler will be registered with the socket to receive close notification so that the count will be decremented if the socket closes.
*/
private synchronized void increment(AbstractConnection socket) {
if(socket == null) {
localCount++;
}//if//
else if(socket == returnSocket) {
//The socket is deserializing from the origin. Keep track of the remote local count so we can stay desynchronized from the remote process.//
localCount++;
remoteLocalCountTotal++;
}//else if//
else {
Entry hashEntry = null;
if((hashEntry = getHashEntry(socket)) != null) {
hashEntry.value++;
}//if//
else {
put(socket, 1);
socket.addCloseListener(this);
}//else//
}//else//
totalCount++;
}//increment()//
/**
* Decrements the reference count on the given socket or the local reference count.
* <p>Note: If this is a remote reference and the socket is the return socket then nothing should happen.
* The proxies using the ref will eventually be GC'd and they will decrement the local count via the Orb which will remove the reference from the object tree.</p>
* @param socket The socket to decrement the count on, or if null the local count will be decremented.
*/
private synchronized void decrement(AbstractConnection socket) {
if((socket == null) || (socket == returnSocket)) {
if(localCount > 0) {
localCount--;
totalCount--;
}//if//
else {
Debug.log("Error: Unable to decrement the local count for the proxy because the local count is already zero.");
}//else//
}//if//
else if(socket != returnSocket) {
Entry hashEntry = getHashEntry(socket);
if(hashEntry != null) {
hashEntry.value--;
totalCount--;
if(hashEntry.value == 0) {
socket.removeCloseListener(this);
remove(socket);
}//if//
}//if//
else {
Debug.halt();
}//else//
}//else if//
else {
Debug.halt();
//Do nothing. The local reference count will be decremented as the proxies using the reference are GC'd.//
}//else//
}//decrement()//
private Entry getHashEntry(AbstractConnection key) {
IBasicEntry[] entries = getEntries();
int hash = getKeyComparator().hash(key);
int index = (hash & 0x7FFFFFFF) % entries.length;
for(Entry entry = (Entry) entries[index]; entry != null; entry = (Entry) entry.getNext()) {
if((entry.hash == hash) && (Comparator.isEqual(getKeyComparator().compare(entry.key, key)))) {
return entry;
}//if//
}//for//
return null;
}//getHashEntry()//
/**
* Called when a socket associated with this reference closes.
* @param socket The socket that closed. If this is the origin socket then the reference is not valid any more, but will be kept around until all proxies have been GC'd.
*/
public synchronized void onSocketClosed(AbstractConnection socket) {
if(socket == returnSocket) {
remoteLocalCountTotal = 0;
returnSocket = null;
}//if//
else {
Entry hashEntry = getHashEntry(socket);
if(hashEntry != null) {
totalCount -= hashEntry.value;
hashEntry.value = 0;
remove(socket);
}//if//
else {
Debug.halt();
}//else//
}//else//
}//onSocketClosed()//
public String toString() {
return "Ref[total=" + totalCount + "; local=" + localCount + "; isLocal=" + (returnSocket == null) + (returnSocket == null ? "]" : "; decrementTotal=" + remoteLocalCountTotal + "]");
}//toString()//
}//Ref//
private static class RemoteRefKey {
private long luid = 0L;
private AbstractConnection socket = null;
private RemoteRefKey(AbstractConnection socket, long luid) {
this.luid = luid;
this.socket = socket;
}//RemoteRefKey()//
public int hashCode() {
return ((int) luid) ^ socket.hashCode();
}//hashCode()//
public boolean equals(Object object) {
return (object instanceof RemoteRefKey) && (((RemoteRefKey) object).luid == luid) && (((RemoteRefKey) object).socket == socket);
}//equals()//
}//RemoteRefKey//
/**
* Calls the close socket method when the timer expires.
*/
private class CloseSocketTask extends Scheduler.Task {
private AbstractConnection socket;
private String name;
private boolean hasRun = false;
private CloseSocketTask(AbstractConnection socket, String name) {
this.socket = socket;
this.name = name;
}//CloseSocketTask()//
public synchronized void run() {
if(!hasRun) {
Scheduler.removeTask(this);
try {
closeSocket(name, 0, socket);
}//try//
catch(IOException e) {
Debug.log(e);
}//catch//
hasRun = true;
}//if//
}//run()//
}//CloseSocketTask//
/**
* Manages the completion of the socket's initialization.
*/
class SocketInitCompletionHandler extends VoidHandler1 {
public void evaluate(Object socket) {
prepareSocket((AbstractConnection) socket);
}//evaluate()//
}//SocketInitCompletionHandler//
/**
* Manages the processing of messages received by sockets.
*/
class SocketProcessMessageHandler extends VoidHandler2 {
//Provides a handler to the socket that will be invoked when a message is received by the socket (after the message has been read to a byte stream).//
public void evaluate(Object socket, Object inputStream) {
//Read the incomming message.//
processMessage((AbstractConnection) socket, (ObjectInputStream) inputStream);
}//evaluate//
}//SocketProcessMessageHandler//
/**
* Used to hide the socket implementation from the user code.
*/
static class NetworkConnectionIdentifier {
AbstractConnection socket = null;
/**
* NetworkConnectionIdentifier constructor.
* @param socket The socket being identified.
*/
NetworkConnectionIdentifier(AbstractConnection socket) {
this.socket = socket;
}//NetworkConnectionIdentifier()//
public boolean equals(Object object) {
return (object instanceof NetworkConnectionIdentifier) && (((NetworkConnectionIdentifier) object).socket == socket);
}//equals()//
public int hashCode() {
return socket.hashCode();
}//hashCode()//
private AbstractConnection getSocket() {
return socket;
}//getSocket()//
}//NetworkConnectionIdentifier//
/**
* Used to hide the server socket implementation from the user code.
*/
private static class ServerSocketIdentifier {
private AbstractConnectionServer serverSocket = null;
private String name = null;
public ServerSocketIdentifier(AbstractConnectionServer serverSocket, String name) {
this.serverSocket = serverSocket;
this.name = name;
}//ServerSocketIdentifier()//
public boolean equals(Object object) {
return (object instanceof ServerSocketIdentifier) && (((ServerSocketIdentifier) object).serverSocket == serverSocket);
}//equals()//
public int hashCode() {
return serverSocket.hashCode();
}//hashCode()//
}//ServerSocketIdentifier//
/**
* Used to hide the socket implementation from the user code.
*/
private static class SocketIdentifier extends NetworkConnectionIdentifier {
String name = null;
public SocketIdentifier(AbstractConnection socket, String name) {
super(socket);
this.name = name;
}//SocketIdentifier()//
public boolean equals(Object object) {
return (object instanceof SocketIdentifier) && (super.equals(object)) && (((SocketIdentifier) object).name.equals(name));
}//equals()//
}//SocketIdentifier//
/**
* Encapsulates a bound object and its permissions.
*/
private static class BoundValue {
private Object object = null;
private LiteHashSet permissions = null;
public BoundValue(Object object, LiteHashSet permissions) {
this.object = object;
this.permissions = permissions;
}//BoundValue()//
}//BoundValue//
private class ReturnValueHolder {
private Object value = null; //The value assigned. This will never be modified if the return value requires forwarding.//
private boolean hasValue = false; //Whether a value has been assigned. This will never be true if the return value requires forwarding.//
private AbstractConnection socket = null; //Will only be non-null if the return value requires forwarding.//
private long callNumber = 0L; //Will only be set if the return value requires forwarding.//
/**
* ReturnValueHolder constructor.
*/
ReturnValueHolder() {
super();
}//ReturnValueHolder()//
/**
* ReturnValueHolder constructor.
* @param socket The socket that the return value should be forwarded across.
* @param callNumber The call number that the forwarded return value should use.
*/
ReturnValueHolder(AbstractConnection socket, long callNumber) {
super();
this.socket = socket;
this.callNumber = callNumber;
}//ReturnValueHolder()//
/**
* Gets the value being held.
* @return The value being held.
*/
public Object getValue() {
return value;
}//getValue()//
/**
* Sets the value being held.
* @param value The value to be held.
*/
public void setValue(Object value) {
if(!hasValue) {
synchronized(this) {
if(!hasValue) {
this.value = value;
hasValue = true;
notifyAll();
}//if//
else {
Debug.halt();
}//else//
}//synchronized//
}//if//
}//getValue()//
/**
* Waits for the value to be set by another thread until either the value is set, or a timeout occurs.
* <p>Warning: This method may throw a thread interrupted exception.
* @param socket The socket that the return value will be arriving on. This must not be null. It will be used to ensure the socket is still open.
* @param timeout The number of milliseconds to wait for the value to be set. A value of zero will cause the thread to wait for ever or until the value has been set.
* @return Whether the timeout occured. The timeout will not have occured if the value is set.
*/
public boolean waitForValue(AbstractConnection socket, long timeout) {
if(!hasValue) {
synchronized(this) {
if(timeout == 0) {
while((!hasValue) && (socket.isOpen())) {
try {
wait(10000);
}//try//
catch(InterruptedException e) {
Debug.handle(e);
}//catch//
//Send a ping every ten seconds while we are waiting for a response. The ping ensures the socket is still active.//
if((!hasValue) && (socket.isOpen())) {
try {
socket.sendPing(true);
}//try//
catch(Throwable e) {
//Ignored.//
}//catch//
}//if//
}//while//
}//if//
else {
//Don't block more than 10 seconds at a time without checking the socket status.//
while((timeout > 0) && (socket.isOpen())) {
long remainingTimeout = 0;
if(timeout > 10000) {
remainingTimeout = timeout - 10000;
timeout = 10000;
}//if//
if(!hasValue) {
try {
wait(timeout);
}//try//
catch(InterruptedException e) {
Debug.handle(e);
}//catch//
}//if//
//Send a ping every ten seconds while we are waiting for a response. The ping ensures the socket is still active.//
if((!hasValue) && (socket.isOpen())) {
try {
socket.sendPing(true);
}//try//
catch(Throwable e) {
//Ignored.//
}//catch//
}//if//
timeout = remainingTimeout;
}//while//
}//else//
}//synchronized//
}//if//
return hasValue;
}//waitForValue()//
}//ReturnValueHolder//
/**
* Orb constructor.
*/
protected Orb() {
super();
}//Orb()//
/**
* Orb constructor.
* @param classLoader The optional customized class loader to be used for loading classes. A null value will have the orb use the system or thread default loader. To have proxies created on during runtime an instance of the OrbClassLoader should be provided.
* @param instanceFactory The optional factory used by the streams for instantiating objects when reading from the stream. The streams will default to loading instances via the com.common.system.SystemManager if this is not specified, or if the factory returns a null instance for any class.
* @param classTracker The optional class tracker will maintain a collection of all classes serialized and deserialized within the orb.
* @param selectorOptions TODO: Flesh this out... Optional
* @param proxyInterfaceLoader The optional object used to load proxy classes from the proxy'd interface or object.
* @see com.common.system.SystemManager
* @see OrbClassLoader
*/
public Orb(ClassLoader classLoader, IInstanceFactory instanceFactory, IClassTracker classTracker, NioEngine nioEngine, IProxyInterfaceLoader proxyInterfaceLoader, IReconnectListener reconnectListener) {
super();
this.reconnectListener = reconnectListener;
this.classLoader = classLoader;
this.instanceFactory = instanceFactory == null ? new DefaultInstanceFactory() : instanceFactory;
this.classTracker = classTracker;
this.proxyInterfaceLoader = proxyInterfaceLoader == null ? new DefaultProxyInterfaceLoader() : proxyInterfaceLoader;
this.nioEngine = nioEngine;
}//Orb()//
/**
* Gets the reconnection listener if one was provided upon starting the ORB.
* @return The reconnection listener which will be notified of reconnection attempts, or null if none was provided.
*/
public IReconnectListener getReconnectListener() {
return reconnectListener;
}//getReconnectListener()//
/**
* Gets the loader to be called when ever needing to convert an external proxy reference to an internal one.
* @return The code that can convert unmangled proxy class references into mangled ones.
*/
public IProxyInterfaceLoader getProxyInterfaceLoader() {
return proxyInterfaceLoader;
}//getProxyInterfaceLoader()//
/**
* Gets the optional engine to use by default for handling NIO threading of socket comms. If not provided then NIO will only be used for sockets that pass an NIO engine when opened.
* @return The optional nio engine instance.
*/
protected NioEngine getNioEngine() {
return nioEngine;
}//getNioEngine()//
/**
* Binds an object to a resource with specified permissions.
* @param object The object to bind to the resource. If this is null then any previous binding for the resource will be removed.
* @param resource The application defined non-null resource to bind the object to. This resource is how other processes can identify the bound object. The resource should be capable of logical comparison and serialization.
* @param permissions The application defined permission set. If this value is null then all clients or peers will have permissions to the bound object.
*/
public void bind(Object object, Object resource, LiteHashSet permissions) {
if(resource == null) {
throw new NullPointerException("A valid resource must be provided.");
}//if//
if(object == null) {
boundValueMap.remove(resource);
}//if//
else {
boundValueMap.put(resource, new BoundValue(object, permissions));
}//else//
}//bind()//
/**
* Closes the specified socket. An exception will be raised if the socket does not exist.
* @param serverSocketId The identifier for the server socket (as returned by the openSocket method), or the name of the server socket.
* @param delayTime The number of milliseconds to delay the closing of the socket.
*/
public void closeServerSocket(Object serverSocketId) throws IOException {
AbstractConnectionServer serverSocket = null;
String name = null;
if(serverSocketId == null) {
throw new RuntimeException("Invalid server socket identifier.");
}//if//
if(serverSocketId instanceof String) {
name = (String) serverSocketId;
}//if//
else if(serverSocketId instanceof ServerSocketIdentifier) {
name = ((ServerSocketIdentifier) serverSocketId).name;
}//else if//
else {
throw new RuntimeException("Invalid server socket identifier.");
}//else//
serverSocket = (AbstractConnectionServer) serverSocketsByName.get(name);
closeServerSocket(name, serverSocket);
}//closeServerSocket()//
/**
* Closes the server socket and removes it from the collections if the reference count reaches zero.
* <p>TODO: Determine whether all connected sockets also close.</p>
* @param name The name of the server socket as it was given in the openServerSocket method.
* @param serverSocket The server socket to close.
*/
void closeServerSocket(String name, AbstractConnectionServer serverSocket) throws IOException {
if(serverSocket == null) {
throw new IOException("Server socket not found.");
}//if//
//TODO: Determine whether the server socket stuff should have its own monitor.//
synchronized(monitorSocketChange) {
try {
if(name != null) {
int count = serverSocket.decrementReferenceCount(name);
//Remove the reference to the server socket.//
if(count == 0) {
serverSocketsByName.remove(name);
}//if//
//Close the socket if it is not being referenced any more.//
if(serverSocket.getReferenceCount() == 0) {
if(debug) {
Debug.log("Closing server socket: " + name);
}//if//
try {
//Remove the reference to the server socket.//
serverSocketsByAddressPort.remove(serverSocket.getNameAndPort());
}//try//
catch(Throwable e) {
Debug.log(e, "Failed to properly cleanup after a socket was closed. (1)");
}//catch//
serverSocket.close();
}//if//
}//if//
else {
//Remove the reference to the server socket.//
try {
IList serverSocketNames = serverSocket.getNames();
if(serverSocketNames != null) {
IIterator iterator = serverSocketNames.iterator();
while(iterator.hasNext()) {
serverSocketsByName.remove((String) iterator.next());
}//while//
}//if//
serverSocketsByAddressPort.remove(serverSocket.getNameAndPort());
}//try//
catch(Throwable e) {
Debug.log(e, "Failed to properly cleanup after a socket was closed. (2)");
}//catch//
//Close the socket immediatly.//
serverSocket.close();
}//else//
}//try//
catch(IOException e) {
Debug.log(e);
}//catch//
}//synchronized//
}//closeServerSocket()//
/**
* Closes the specified socket. An exception will be raised if the socket does not exist.
* @param socketId The identifier for the socket (as returned by the openSocket method), or the name of the server socket.
*/
public void closeSocket(Object socketId) throws IOException {
closeSocket(socketId, 0L);
}//closeSocket()//
/**
* Closes the specified socket. An exception will be raised if the socket does not exist.
* @param socketId The identifier for the socket (as returned by the openSocket method), or the name of the server socket.
* @param delayTime The number of milliseconds to delay the closing of the socket.
*/
public void closeSocket(Object socketId, long delayTime) throws IOException {
AbstractConnection socket = null;
String name = null;
if(socketId == null) {
throw new RuntimeException("Invalid socket identifier.");
}//if//
if(socketId instanceof String) {
name = (String) socketId;
}//if//
else if(socketId instanceof SocketIdentifier) {
name = ((SocketIdentifier) socketId).name;
}//else if//
else {
throw new RuntimeException("Invalid socket identifier.");
}//else//
socket = (AbstractConnection) socketsByName.get(name);
closeSocket(name, delayTime, socket);
}//closeSocket()//
/**
* Closes the specified socket. An exception will be raised if the socket does not exist.
* @param name The name of the socket to be closed.
*/
public void closeSocket(String name) throws IOException {
closeSocket(name, 0L);
}//closeSocket()//
/**
* Closes the specified socket. An exception will be raised if the socket does not exist.
* @param name The name of the socket to be closed.
* @param delayTime The number of milliseconds to delay the closing of the socket.
*/
public void closeSocket(String name, long delayTime) throws IOException {
AbstractConnection socket = null;
if(name == null) {
throw new RuntimeException("Invalid socket name.");
}//if//
socket = (AbstractConnection) socketsByName.get(name);
closeSocket(name, delayTime, socket);
}//closeSocket()//
/**
* Closes the socket and removes it from the collections if the reference count reaches zero.
* @param name The name of the socket as it was given in the openSocket method.
* @param delay The number of milliseconds to wait until the socket is closed. This is useful if one thread is done with the socket, but another thread may soon open it back up.
* @param socket The socket to close.
*/
void closeSocket(String name, long delay, AbstractConnection socket) throws IOException {
if(socket == null) {
throw new IOException("Socket not found.");
}//if//
if(delay > 500) {
Scheduler.addTask(delay, new CloseSocketTask(socket, name), false);
}//if//
else {
synchronized(monitorSocketChange) {
if(name != null) {
int count = socket.decrementReferenceCount(name);
//The map maintains a reference counter internally so we can just remove.//
socket.getNames().remove(name);
//Remove the reference to the socket.//
if(count == 0) {
socketsByName.remove(name);
}//if//
//Close the socket if it is not being referenced any more.//
if(socket.getReferenceCount() == 0) {
if(debug) {
Debug.log("Closing socket: " + name);
}//if//
try {
//Remove the reference to the socket.//
socketsByAddressPort.remove(socket.getNameAndPort());
}//try//
catch(Throwable e) {
Debug.log(e, "Failed to properly cleanup after a socket was closed. (1)");
}//catch//
//Close the socket.//
socket.close(false);
}//if//
}//if//
else {
//Remove the reference to the socket.//
try {
IIterator iterator = socket.getNames().iterator();
while(iterator.hasNext()) {
socketsByName.remove((String) iterator.next());
}//while//
socket.getNames().removeAll();
socketsByAddressPort.remove(socket.getNameAndPort());
}//try//
catch(Throwable e) {
Debug.log(e, "Failed to properly cleanup after a socket was closed. (2)");
}//catch//
//Close the socket.//
socket.close(false);
}//else//
}//synchronized//
}//else//
}//closeSocket()//
/**
* Cleans up after a failed socket by removing all references to the socket.
* @param socket The socket to clean up after.
*/
void cleanupSocket(AbstractConnection socket) throws IOException {
if(socket == null) {
throw new IOException("Socket not found.");
}//if//
synchronized(monitorSocketChange) {
//Remove all references to the socket.//
try {
IIterator iterator = socket.getNames().iterator();
while(iterator.hasNext()) {
socketsByName.remove((String) iterator.next());
}//while//
socket.getNames().removeAll();
socketsByAddressPort.remove(socket.getNameAndPort());
}//try//
catch(Throwable e) {
Debug.log(e, "Failed to properly cleanup after a socket was closed. (2)");
}//catch//
}//synchronized//
}//closeSocket()//
/**
* Decrements the count for an object reference.
* <p>This is necessary to ensure that we have a reference to the proxied object just long enough for all the proxies to be used and discarded.</p>
* @param socket The socket to decrement the count on. This will be null if the decrement occurs locally.
* @param luid The locally unique id for the proxy.
*/
void decrementProxyCount(AbstractConnection socket, long luid, int decrementSize) {
synchronized(monitorReferenceCounter) {
Ref ref = (Ref) luidToRefMap.get(luid);
if(ref != null) {
for(int index = decrementSize; index > 0; index--) {
ref.decrement(socket);
}//for//
if(ref.totalCount < 1) {
luidToRefMap.remove(luid);
if(ref.remoteRefKey != null) {
remoteRefKeyToLuidMap.remove(ref.remoteRefKey);
}//if//
else {
objectToLuidMap.remove(ref.object);
}//else//
if(ref.returnSocket != null) {
localDecrementProxyCount(ref.returnSocket, ref.remoteRefKey.luid, ref.remoteLocalCountTotal);
}//if//
ref.object = null;
ref.returnSocket = null;
}//if//
}//if//
else {
Debug.log("Error: Null reference found for luid: " + luid);
}//else//
}//synchronized//
}//decrementProxyCount()//
/**
* Fixes the proxy after it has serialized across the same stream it had been communicating across.
* <p>It requires either looking up the local object if it is now local, or looking up the new socket it should communcate across.</p>
* @param proxy The proxy to fix.
*/
void fixProxy(Proxy proxy) {
long localLuid = proxy.getLuid__Proxy();
Ref ref = (Ref) luidToRefMap.get(localLuid);
if(ref.object != null) {
proxy.setLocalObject__Proxy(ref.object);
proxy.setRemoteLuid__Proxy(-1);
ref.increment(null);
}//if//
else {
proxy.setLuid__Proxy(localLuid);
proxy.setSocket__Proxy(ref.returnSocket);
proxy.setRemoteLuid__Proxy(ref.luid);
ref.increment(null);
}//else//
}//fixProxy()//
/**
* Gets the class tracker used by socket streams for tracking which application classes are sent or received.
* @return The class tracker that will be used by the socket streams to keep track of which classes are (de)serialized.
*/
IClassTracker getClassTracker() {
return classTracker;
}//getClassTracker()//
/**
* Gets the instance factory all socket streams must use when instantiating application classes.
* @return The instance factory used to provide instance of application classes. This may be a null value if the default behavior is desired.
*/
IInstanceFactory getInstanceFactory() {
return instanceFactory;
}//getInstanceFactory()//
/**
* Gets the protocol version used by the ORB. This will increment when the orb changes in such a way that previous versions are not compatable.
* @return The current orb's protocol version.
*/
public int getProtocolVersion() {
return PROTOCOL_VERSION;
}//getProtocolVersion()//
/**
* Gets the next available locally unique object identifier.
* @return The locally unique number that can be used to identify an object.
*/
private long getNextObjectLuid() {
synchronized(monitorObjectLuidCreator) {
return nextObjectLuid++;
}//synchronized//
}//getNextObjectLuid()//
/**
* Gets the locally unique identifier for a given object.
* @param object The object whose proxy related luid is desired.
*/
protected long getObjectLuid(Object object) {
return objectToLuidMap.get(object);
}//getObjectLuid()//
/**
* Gets a proxy for the given object and interface class.
* @param object The non-null object that will be proxied. The default interface for this object will be "[package].I[object.getClass().getName()]".
* @param interfaceClass The interface class that the proxy must implement. The return value will be castable to this interface. If this value is null then the 'default' interface class will be composed by putting an 'I' infront of the object's class name.
*/
public Proxy getProxy(Object object, Class interfaceClass) throws IllegalArgumentException {
Proxy proxy = null;
if(object != null && !(object instanceof Proxy)) {
boolean genericProxy = interfaceClass.equals(Proxy.class);
//If no interface class was specified then get the 'default' interface for the object.//
if(!genericProxy && (interfaceClass == null) && (object != null)) {
try {
interfaceClass = proxyInterfaceLoader.loadDefaultProxyInterface(object, classLoader);
}//try//
catch(Throwable e) {
//Ignore//
}//catch//
}//if//
if((object != null) && (interfaceClass != null) && (((interfaceClass.isInterface()) && (interfaceClass.isAssignableFrom(object.getClass())) || (genericProxy)))) {
try {
Class proxyClass = null;
int typeId = 0;
if(genericProxy) {
proxyClass = Proxy.class;
}//if//
else {
proxyClass = proxyInterfaceLoader.loadProxyClass(interfaceClass, classLoader);
}//else//
//Note: Commented this code out because un-synchronized access to the proxyTypeToTypeIdMap is probably not a good idea. I can investigate this later.//
//Try to locate the proxy class in the type map to get the type id.//
//if(proxyTypeToTypeIdMap.containsKey(proxyClass)) {
// typeId = proxyTypeToTypeIdMap.get(proxyClass);
//}//if//
//else {
synchronized(proxyTypeToTypeIdMap) {
//Try a second time to locate the proxy class in the type map.//
if(proxyTypeToTypeIdMap.containsKey(proxyClass)) {
typeId = proxyTypeToTypeIdMap.get(proxyClass);
}//if//
else {
proxy = (Proxy) proxyClass.newInstance();
typeId = nextProxyLuid++;
proxyTypeToTypeIdMap.put(proxyClass, typeId);
proxyTypeIdToProxyMap.put(typeId, proxy);
proxy = null; //Don't use this proxy class since it will never be GC'd.//
}//else//
}//synchronized//
//}//else//
proxy = (Proxy) proxyClass.newInstance();
proxy.setOrb__Proxy(this);
proxy.setLocalObject__Proxy(object);
proxy.setLuid__Proxy(incrementProxyCount(object));
proxy.setTypeId__Proxy(typeId);
}//try//
catch(InstantiationException e) {
proxy = null;
Debug.log(e, "Problem instantiating a proxy instance.");
}//catch//
catch(ClassNotFoundException e) {
Debug.log(e);
throw new IllegalArgumentException("Unable to create a proxy due to problems loading required classes.");
}//catch//
catch(IllegalAccessException e) {
Debug.log(e);
throw new IllegalArgumentException("Unable to create a proxy due to problems instantiating the proxy object. This should never occur.");
}//catch//
}//if//
else {
throw new IllegalArgumentException("Unable to create a proxy without a valid interface class object for the proxied object.");
}//else//
}//if//
else {
proxy = (Proxy) object;
}//else//
return proxy;
}//getProxy()//
/**
* Gets the socket with the given name.
* @param name The name of the socket as defined by the user.
* @return The socket that is already connected to the address and port, or null is none was found.
*/
private AbstractConnection getSocketByName(String name) {
return (AbstractConnection) socketsByName.get(name);
}//getSocketByName()//
/**
* Increments the count for an object reference.
* <p>This is necessary to ensure that we have a reference to the proxied object just long enough for all the proxies to be used and discarded.</p>
* @param socket The socket to increment the count on. This must be null if the proxy is local and was created locally. This must be the socket the proxy travelled over if the proxy is in the process of deserializing. If the proxy is deserializing and is backtracking (travelling back over the socket it originated from) then it will not call this method, but instead will call fixProxy(..).
* @param proxy The proxy that is causing the count to be incremented.
* @return The locally unique identifier for the object. This will be the localLuid for the proxy if it is a remote proxy.
*/
long incrementProxyCount(AbstractConnection socket, Proxy proxy) {
long result = proxy.getLuid__Proxy();
synchronized(monitorReferenceCounter) {
Ref ref = null;
if(proxy.getIsProxyLocal__Proxy()) {
//The proxy is local and will always have an existing reference object.//
ref = (Ref) luidToRefMap.get(result);
//Increment the local count (if socket==null) or the count on the given socket.//
ref.increment(socket);
}//if//
else {
long localLuid;
RemoteRefKey key = new RemoteRefKey(proxy.getSocket__Proxy(), proxy.getRemoteLuid__Proxy());
//Locate the reference for the remote proxy.//
if(remoteRefKeyToLuidMap.containsKey(key)) {
localLuid = remoteRefKeyToLuidMap.get(key);
ref = (Ref) luidToRefMap.get(localLuid);
//Increment the count on the given socket, or if this proxy is deserializing from the source then increment the local count and the remote local count.//
ref.increment(socket);
}//if//
else {
//Create a new reference and a luid.//
ref = new Ref(proxy, key);
localLuid = getNextObjectLuid();
//Increment the count on the given socket, or if this proxy is deserializing from the source then increment the local count and the remote local count.//
ref.increment(socket);
//Setup the mappings. These mappings will be released in the decrementProxyCount method.//
luidToRefMap.put(localLuid, ref);
remoteRefKeyToLuidMap.put(key, localLuid);
}//else//
result = localLuid;
}//else//
}//synchronized//
return result;
}//incrementProxyCount()//
/**
* Increments the count for an object reference.
* <p>This is necessary to ensure that we have a reference to the proxied object just long enough for all the proxies to be used and discarded.</p>
* @param object The object whose reference count should be incremented.
* @return The locally unique identifier for the object.
*/
long incrementProxyCount(Object object) {
long result = 0L;
synchronized(monitorReferenceCounter) {
Ref ref = null;
if(objectToLuidMap.containsKey(object)) {
long luid = objectToLuidMap.get(object);
ref = (Ref) luidToRefMap.get(luid);
if(ref != null) {
ref.increment(null);
result = luid;
}//if//
else {
Debug.log("Error: Found an object LUID without a corresponding Ref object. See incrementProxyCount(Object).");
}//else//
}//if//
if(ref == null) {
long luid = getNextObjectLuid();
ref = new Ref(object, luid);
ref.increment(null);
luidToRefMap.put(luid, ref);
objectToLuidMap.put(object, luid);
result = luid;
}//if//
}//synchronized//
return result;
}//incrementProxyCount()//
/**
* Decrements a proxy reference count on a remote process.
* <p>TODO: Cleanup the output stream release code. Also put this code in other remote call methods so we cleanup properly.</p>
* <p>Note: Since it is possible that this is called at the same time a socket is not valid we will perform extra checks to verify the socket is valid. Even then there is a very small chance that the socket will be invalidated while sending.</p>
* @param socket The socket to communicate across.
* @param luid The identifier for the proxy whose count is decrementing.
* @param decrementSize The number of times to decrement. The size may not be the total size for the socket on the remote process if another thread simultaniously sends another copy of the proxy.
*/
void localDecrementProxyCount(AbstractConnection socket, long luid, int decrementSize) {
OrbObjectOutputStream out = null;
try {
if(socket != null) {
RemoteRunnable runnable = new RemoteRunnable();
SocketData socketData = null;
byte[] message = null;
try {
//Blocks until this thread can 'lock' the socket for writing, and returns the socket data used to interact with the streams.//
socketData = socket.lockForWrite();
if(socketData != null) {
out = socket.getOutputStream();
//Setup the runnable object.//
runnable.type = MESSAGE_DECREMENT_PROXY_COUNT;
runnable.objectLuid = luid;
runnable.decrementSize = decrementSize;
//Write the message to the stream.//
runnable.writeExternal(out);
message = socket.sendMessage(out, socketData); //TODO: Need to wrapper with a try block and catch the socket exception - when the socket is closed and needs to reconnect?//
//Reset the stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
}//if//
}//try//
finally {
socket.unlockForWrite(message, socket.getCurrentMessageNumber(), true);
}//finally//
}//if//
}//try//
catch(Throwable e) {
Debug.log(e, "Caught while decrementing a proxy count remotely (sending the decrement message to the remote process whence the proxy came.");
Debug.halt();
//Cleanup the output stream.//
if(out != null) {
try {
//Reset the stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
}//try//
catch(Throwable e2) {
}//catch//
}//if//
}//catch//
}//localLocateObject()//
/**
* Executes a method on a remote object.
* @param socket The socket to communicate across.
* @param objectLuid The locally unique identifier of the object to execute on the remote process. This should be the luid for the next hop on the path to the remote process if more than one hop is required.
* @param proxyTypeId The identifier of the proxy class that is used to invoke the method.
* @param method Identifies which method to call.
* @param parameters The collection of parameter values in order that the method accepts them.
* @param isOneWay Whether a return value will be expected. If this is one way then this method will return null.
* @param autoProxyClass An optional interface class that will be used to proxy the resulting value. If null then no auto proxying will occur.
* @param timeout The amount of time to stay blocked waiting for a response. If this time is exceeded then a timeout exception will be raised.
* @param callback The optional callback object that will receive the result of the execution. If this is non-null and the method call is one-way, we will simply ignore the callback.
* @return The value resulting from the execution of the remote method.
*/
ValueHolder localExecute(AbstractConnection socket, long objectLuid, int proxyTypeId, long method, Object[] parameters, boolean isOneWay, Class autoProxyClass, long timeout, IResultCallback callback) {
ValueHolder valueHolder = new ValueHolder();
if(socket != null) {
try {
ReturnValueHolder holder = null;
RemoteRunnable runnable = new RemoteRunnable();
ExecuteData data = new ExecuteData();
boolean isSent = false;
SocketData socketData = null;
byte[] message = null;
try {
//Blocks until this thread can 'lock' the socket for writing, and returns the socket data used to interact with the streams.//
socketData = socket.lockForWrite();
if(socketData != null) {
OrbObjectOutputStream out = socket.getOutputStream();
long callNumber = 0L;
if(!isOneWay) {
callNumber = socketData.getNextCallNumber();
if(callback != null) {
socket.addReturnValueHolder(callNumber, callback);
}//if//
else {
holder = new ReturnValueHolder();
socket.addReturnValueHolder(callNumber, holder);
}//else//
}//if//
//Setup the runnable.//
runnable.type = MESSAGE_EXECUTE;
runnable.callNumber = callNumber;
runnable.objectLuid = objectLuid;
runnable.proxyType = proxyTypeId;
runnable.timeout = timeout;
data.method = method;
data.parameters = parameters;
data.autoProxyClass = autoProxyClass;
//Send the message to the remote process.//
runnable.writeExternal(out);
data.writeExternal(out);
message = socket.sendMessage(out, socketData); //TODO: Need to wrapper with a try block and catch the socket exception - when the socket is closed and needs to reconnect?//
//Reset the output stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
isSent = true;
}//if//
}//try//
finally {
socket.unlockForWrite(message, socket.getCurrentMessageNumber(), true);
}//finally//
if(isSent) {
if(holder != null) {
if(holder.waitForValue(socket, timeout)) {
valueHolder.value = holder.getValue();
valueHolder.status = ValueHolder.STATUS_OK;
}//if//
else {
valueHolder.status = ValueHolder.STATUS_TIMEOUT;
}//else//
}//if//
else {
valueHolder.status = ValueHolder.STATUS_OK;
}//else//
}//if//
}//try//
catch(Throwable e) {
Debug.log(e, "Caught while executing an automated message via a proxy.");
Debug.halt();
valueHolder.status = ValueHolder.STATUS_SOCKET_ERROR;
}//catch//
}//if//
else {
valueHolder.status = ValueHolder.STATUS_SOCKET_ERROR;
}//else//
return valueHolder;
}//localExecute()//
/**
* Forwards an execute or agent message to another process.
* @param receivingSocket The socket on which the message was received and any response should be sent.
* @param runnable The message header that can be modified and re-serialized.
* @param dataStream The data stream containing the rest of the message.
*/
private void localForward(AbstractConnection receivingSocket, RemoteRunnable runnable, StreamBuffer dataStream) {
Ref returnReference = runnable.ref;
try {
ReturnValueHolder holder = null;
AbstractConnection socket = returnReference.returnSocket;
SocketData socketData = null;
byte[] message = null;
try {
//Blocks until this thread can 'lock' the socket for writing, and returns the socket data used to interact with the streams.//
socketData = socket.lockForWrite();
OrbObjectOutputStream out = socket.getOutputStream();
long callNumber = 0L;
if(runnable.callNumber != 0L) {
callNumber = socketData.getNextCallNumber();
holder = new ReturnValueHolder(receivingSocket, runnable.callNumber);
//Add the return value holder to the socket to catch the return value and automatically forward it back to the original sender.//
socket.addReturnValueHolder(callNumber, holder);
}//if//
//Setup the runnable object.//
runnable.callNumber = callNumber;
//Write the message to the stream.//
runnable.writeExternal(out);
//Write the execute data to the output stream.//
if(dataStream != null) {
//Transfer the execute data bytes to the stream.//
dataStream.readBytes(out);
dataStream.release();
}//if//
else if(runnable.data instanceof ExecuteData) { //Handle this special since we don't want to write out the class name since it won't be expected.//
((ExecuteData) runnable.data).writeExternal(out);
}//else if//
else {
//Write the execute data object to the stream.//
((java.io.Externalizable) runnable.data).writeExternal(out);
}//else//
//Send the message to the remote process.//
message = socket.sendMessage(out, socketData); //TODO: Need to wrapper with a try block and catch the socket exception - when the socket is closed and needs to reconnect?//
//Reset the stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
}//try//
finally {
socket.unlockForWrite(message, socket.getCurrentMessageNumber(), true);
}//finally//
}//try//
catch(Throwable e) {
Debug.log(e, "Caught while forwarding an automated execute message.");
Debug.halt();
}//catch//
}//localForward()//
/**
* Executes a method on a remote object.
* @param socket The socket to communicate across.
* @param objectId The reference to the object that is being located.
* @return A reference to the located object, or null if no object was found.
*/
private Object localLocateObject(AbstractConnection socket, Object objectId) {
Object result = null;
try {
ReturnValueHolder holder = new ReturnValueHolder();
RemoteRunnable runnable = new RemoteRunnable();
boolean success = false;
SocketData socketData = null;
byte[] message = null;
try {
//Blocks until this thread can 'lock' the socket for writing, and returns the socket data used to interact with the streams.//
socketData = socket.lockForWrite();
if(socketData != null) {
OrbObjectOutputStream out = socket.getOutputStream();
long callNumber = socketData.getNextCallNumber();
//Add the return value holder to the socket to catch the return value.//
socket.addReturnValueHolder(callNumber, holder);
//Setup the runnable object.//
runnable.type = MESSAGE_LOCATE;
runnable.callNumber = callNumber;
runnable.objectId = objectId;
//Write the message to the stream.//
runnable.writeExternal(out);
message = socket.sendMessage(out, socketData); //TODO: Need to wrapper with a try block and catch the socket exception - when the socket is closed and needs to reconnect?//
//Reset the stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
success = true;
}//if//
}//try//
finally {
socket.unlockForWrite(message, socket.getCurrentMessageNumber(), true);
}//finally//
if(success) {
holder.waitForValue(socket, 0);
result = holder.getValue();
}//if//
}//try//
catch(Throwable e) {
Debug.log("Caught while executing an automated message via a proxy.", e);
Debug.halt();
}//catch//
return result;
}//localLocateObject()//
/**
* Sends a return value from a remote call or forwarded call.
* <p>Warning: This method may block until the socket output stream is available.
* @param socket The socket to communicate across.
* @param callNumber The id number for the call that this is a return for.
* @param value The return value, or null if the valueStream is provided.
* @param valueStream The stream containing the serialized value. This will be null if the value is passed. This allows efficient forwarding of return values.
* @return A reference to the located object, or null if no object was found.
*/
private Object localReturn(AbstractConnection socket, long callNumber, Object value) {
Object result = null;
try {
RemoteRunnable runnable = new RemoteRunnable();
SocketData socketData = null;
byte[] message = null;
try {
//Blocks until this thread can 'lock' the socket for writing, and returns the socket data used to interact with the streams.//
socketData = socket.lockForWrite();
if(socketData != null) {
OrbObjectOutputStream out = socket.getOutputStream();
//Setup the runnable object.//
runnable.type = MESSAGE_RETURN;
runnable.callNumber = callNumber;
//Write the message to the stream.//
runnable.writeExternal(out);
// Debug.log("Returning: " + (value == null ? "null" : value.toString()));
out.writeObject(value);
message = socket.sendMessage(out, socketData); //TODO: Need to wrapper with a try block and catch the socket exception - when the socket is closed and needs to reconnect?//
//Reset the stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
}//if//
}//try//
finally {
socket.unlockForWrite(message, socket.getCurrentMessageNumber(), true);
}//finally//
}//try//
catch(Throwable e) {
Debug.log(e, "Caught while executing an automated message via a proxy.");
Debug.halt();
//Close the socket since any errors here are not recoverable.//
try {
socket.close(true);
}//try//
catch(Throwable e2) {
Debug.handle(e2);
}//catch//
}//catch//
return result;
}//localReturn()//
/**
* Sends an agent to a remote process.
* @param socket The socket to communicate across.
* @param agent The agent that will be executed after arriving at its destination.
* @param followProxy Whether the agent is following a proxy's path back to its proxied value before executing. If false, then the objectLuid can be ignored.
* @param objectLuid The locally unique identifier of the object whose process should execute the agent. This should be the luid for the next hop on the path to the remote process if more than one hop is required.
* @param isOneWay Whether a return value will be expected. If this is one way then this method will return null.
* @param timeout The amount of time to stay blocked waiting for a response. If this time is exceeded then a timeout exception will be raised.
* @param callback The optional callback object that will receive the result of the execution. If this is non-null and the method call is one-way, we will simply ignore the callback.
* @return The value resulting from the execution of the remote method.
*/
private ValueHolder localSendAgent(AbstractConnection socket, Object agent, boolean followProxy, long objectLuid, boolean isOneWay, long timeout, IResultCallback callback) {
ValueHolder valueHolder = new ValueHolder();
if(socket != null) {
try {
ReturnValueHolder holder = null;
RemoteRunnable runnable = new RemoteRunnable();
AgentData data = new AgentData();
boolean isSent = false;
SocketData socketData = null;
byte[] message = null;
try {
//Blocks until this thread can 'lock' the socket for writing, and returns the socket data used to interact with the streams.//
socketData = socket.lockForWrite();
if(socketData != null) {
OrbObjectOutputStream out = socket.getOutputStream();
long callNumber = 0L;
if(!isOneWay) {
callNumber = socketData.getNextCallNumber();
if(callback != null) {
socket.addReturnValueHolder(callNumber, callback);
}//if//
else {
holder = new ReturnValueHolder();
socket.addReturnValueHolder(callNumber, holder);
}//else//
}//if//
//Setup the runnable.//
runnable.type = MESSAGE_AGENT;
runnable.callNumber = callNumber;
runnable.objectLuid = objectLuid;
runnable.proxyType = followProxy ? 1 : 0; //Flag whether the object luid should be used to follow a proxy's path before executing the agent.//
runnable.timeout = timeout;
data.agent = agent;
//Send the message to the remote process.//
runnable.writeExternal(out);
data.writeExternal(out);
message = socket.sendMessage(out, socketData); //TODO: Need to wrapper with a try block and catch the socket exception - when the socket is closed and needs to reconnect?//
//Reset the output stream.//
if(out.canReset()) {
out.reset();
}//if//
else {
out.close();
}//else//
isSent = true;
}//if//
}//try//
finally {
socket.unlockForWrite(message, socket.getCurrentMessageNumber(), true);
}//finally//
if(isSent) {
if(!isOneWay && holder != null) {
if(holder.waitForValue(socket, timeout)) {
valueHolder.value = holder.getValue();
valueHolder.status = ValueHolder.STATUS_OK;
}//if//
else {
valueHolder.status = ValueHolder.STATUS_TIMEOUT;
}//else//
}//if//
else {
valueHolder.status = ValueHolder.STATUS_OK;
}//else//
}//if//
}//try//
catch(Throwable e) {
Debug.log(e, "Caught while sending an agent.");
Debug.halt();
valueHolder.status = ValueHolder.STATUS_SOCKET_ERROR;
}//catch//
}//if//
else {
valueHolder.status = ValueHolder.STATUS_SOCKET_ERROR;
}//else//
return valueHolder;
}//localSendAgent()//
/**
* Looks up an object previously bound to an application defined resource.
* @param resource The application defined resource the requested object is bound to.
* @param object The object to bound to the resource, or null if the resource does not exist.
*/
public Object lookup(Object resource) {
return lookupInternal(resource, null);
}//lookup()//
/**
* Looks up an object previously bound to an application defined resource.
* @param resource The application defined resource the requested object is bound to. This must be serializeable (or externalizable), and it is usually a string.
* @param socketId The object that identifies the socket (returned when the socket was created), or the name of the socket.
*/
public Object lookup(Object resource, Object socketId) {
Object result = null;
if(socketId != null) {
AbstractConnection socket = null;
if(socketId instanceof String) {
socket = getSocketByName((String) socketId);
}//if//
else if(socketId instanceof SocketIdentifier) {
socket = ((SocketIdentifier) socketId).socket;
}//else if//
if(socket != null) {
result = localLocateObject(socket, resource);
}//if//
else {
throw new RuntimeException("Invalid socketId.");
}//else//
}//if//
else {
result = lookup(resource);
}//else//
return result;
}//lookup()//
/**
* Looks up an object previously bound to an application defined resource.
* @param resource The application defined resource the requested object is bound to.
* @param permission The permission used to lookup the object. This permission is associated with the socket that the request traveled across. This will be null for local requests.
* @param object The object to bound to the resource, or null if the resource does not exist, or the caller does not have the correct permissions.
*/
private Object lookupInternal(Object resource, Object permission) {
BoundValue boundValue = (BoundValue) boundValueMap.get(resource);
Object value = null;
if((boundValue != null) && ((permission == null) || (boundValue.permissions == null) || (boundValue.permissions.containsValue(permission)))) {
value = boundValue.object;
}//if//
return value;
}//lookupInternal()//
/**
* Opens a server socket and returns a server socket identifier that can be used to identify the server socket in future calls.
* @param name The name of the server socket. If this matches with an existing server socket then the existing server socket will be used instead of creating a new one.
* @param options The options for the server socket.
* @return The socket identifier. The application should not rely on this being of any perticular type.
*/
public Object openServerSocket(String name, IServerSocketOptions options) throws IOException {
AbstractConnectionServer serverSocket = null;
if(options == null) {
throw new RuntimeException("A valid server socket options object must be provided.");
}//if//
if((options.getAddress() == null) || (options.getAddress().getPort() < 0)) {
throw new RuntimeException("An address containing at least a valid port must be specified.");
}//if//
synchronized(monitorSocketChange) { //TODO: have a monitor specially for server sockets?
serverSocket = (AbstractConnectionServer) serverSocketsByName.get(name);
if(serverSocket == null) {
Address address = options.getAddress();
InetAddress inetAddress = null;
String addressPort = null;
//Get the inet address.//
if(address.getName() != null) {
try {
inetAddress = InetAddress.getByName(address.getName());
}//try//
catch(java.net.UnknownHostException e) {
Debug.log(e);
throw new IOException("Unable to open a server socket on an address that is unknown.");
}//catch//
}//if//
addressPort = (inetAddress == null ? "" : inetAddress.getHostAddress()) + ':' + address.getPort();
serverSocket = (AbstractConnectionServer) serverSocketsByAddressPort.get(addressPort);
//If an existing server socket was not found, then we will try to create one.//
if(serverSocket == null) {
NioEngine nioEngine = options.getNioEngine() != null ? options.getNioEngine() : getNioEngine();
if(nioEngine == null) {
serverSocket = new StServerSocket();
}//if//
else {
serverSocket = new NioConnectionServer(nioEngine);
}//else//
//Increment the reference count on the server socket name.//
serverSocket.incrementReferenceCount(name);
//Add the server socket to the mapping by the address and port.//
serverSocketsByAddressPort.put(addressPort, serverSocket);
//Add the server socket to the mapping by the name.//
serverSocketsByName.put(name, serverSocket);
try {
serverSocket.initialize(this, options, inetAddress, classLoader, socketProcessMessageHandler, socketInitCompletionHandler);
}//try//
catch(IOException e) {
//Cleanup after the server socket and then rethrow the exception.//
serverSocketsByAddressPort.remove(addressPort);
serverSocketsByName.remove(name);
throw e;
}//catch//
if(debug) {
Debug.log("Server Socket on " + serverSocket.getExternalAddress() + " opened.");
}//if//
}//if//
else {
serverSocketsByName.put(name, serverSocket);
//Increment the reference count on the socket name.//
serverSocket.incrementReferenceCount(name);
}//else//
}//if//
else {
serverSocket.incrementReferenceCount(name);
}//else//
}//synchronized//
return serverSocket != null ? new ServerSocketIdentifier(serverSocket, name) : null;
}//openSocket()//
/**
* Opens a socket and returns a socket identifier that can be used to identify the socket in future calls.
* @param name The name of the socket. If this matches with an existing socket then the existing socket will be used instead of creating a new one.
* @param options The options for the socket.
* @return The socket identifier. The application should not rely on this being of any perticular type.
*/
public Object openSocket(String name, ISocketOptions options) throws IOException {
AbstractConnection socket = null;
if(options == null) {
throw new RuntimeException("A valid socket options object must be provided.");
}//if//
if(options.getAddresses() == null || options.getAddresses().length == 0) {
throw new RuntimeException("At least one address must be specified.");
}//if//
synchronized(monitorSocketChange) {
socket = getSocketByName(name);
if(socket == null) {
InetAddress[] inetAddresses = new InetAddress[options.getAddresses().length];
//Search for an existing socket that can be used.//
for(int index = 0; socket == null && index < options.getAddresses().length; index++) {
Address address = options.getAddresses()[index];
if(address != null) {
try {
inetAddresses[index] = InetAddress.getByName(address.getName());
}//try//
catch(java.net.UnknownHostException e) {
Debug.log(e);
}//catch//
if(inetAddresses[index] != null) {
//Check to see if a socket exists to the address and port.//
socket = (AbstractConnection) socketsByAddressPort.get(inetAddresses[index].getHostAddress() + ':' + address.getPort());
}//if//
}//if//
}//for//
//If an existing socket was not found, then we must try to create one.//
if(socket == null) {
NioEngine nioEngine = options.getNioEngine() != null ? options.getNioEngine() : getNioEngine();
if(nioEngine == null) {
socket = new StConnection();
//Attempt to connect a socket to each address provided in order until one connects or we run out of addresses to use.//
for(int index = 0; socket.getSocketData() == null && index < inetAddresses.length; index++) {
((StConnection) socket).initialize(this, inetAddresses[index], options.getAddresses()[index].getPort(), options, classLoader, socketProcessMessageHandler, socketInitCompletionHandler);
}//for//
}//if//
else {
socket = new NioConnection();
//Attempt to connect a socket to each address provided in order until one connects or we run out of addresses to use.//
for(int index = 0; socket.getSocketData() == null && index < inetAddresses.length; index++) {
((NioConnection) socket).initialize(this, inetAddresses[index], options.getAddresses()[index].getPort(), options, classLoader, socketProcessMessageHandler, socketInitCompletionHandler, nioEngine);
}//for//
}//else//
//TODO: Setup the reuse handler for reusing the session information.//
//TODO: Try to connect on the reuse address.//
if(debug) {
Debug.log("Socket to " + socket.getNameAndPort() + " was opened.");
}//if//
//Note: No need to synchronize on the socket to get the socket data since only this thread has access to the socket currently.//
if(socket.getSocketData() != null) {
socket.getNames().add(name);
//Increment the reference count on the socket name.//
socket.incrementReferenceCount(name);
//Add the socket name to the mapping.//
socketsByName.put(name, socket);
}//if//
else {
//The socket failed to connect on any address.//
socket = null;
}//else//
}//if//
else {
socket.getNames().add(name);
//Add the socket name to the mapping.//
socketsByName.put(name, socket);
//Increment the reference count on the socket name.//
socket.incrementReferenceCount(name);
}//else//
}//if//
else {
//Increment the reference count on the socket name.//
socket.incrementReferenceCount(name);
}//else//
}//synchronized//
return socket != null ? new SocketIdentifier(socket, name) : null;
}//openSocket()//
/**
* Prepares the socket for use by the orb.
* <p>TODO: Determine whether this is really necessary. This code could be moved into where the socket is created since there is no caching of ServerSocket accepted sockets.</p>
* @param socket The socket to prepare.
*/
private void prepareSocket(AbstractConnection socket) {
if((socket != null) && (!socket.isServerSide())) {
//Add socket to the necessary maps.//
socketsByAddressPort.put(socket.getNameAndPort(), socket);
}//if//
}//prepareSocket()//
/**
* Processes a message read from a socket and threads the processing if there is any chance that the message will result in long running code or sending a message via the orb.
* <p>This method will not block.</p>
* @param socket The socket that received the message.
* @param inputStream The message bytes wrappered by an object input stream.
*/
private void processMessage(AbstractConnection socket, ObjectInputStream inputStream) {
RemoteRunnable runnable = new RemoteRunnable();
try {
//Read the message as a runnable so that it is encapsulated and easy to thread.//
runnable.readExternal(inputStream);
runnable.socket = socket;
//Determine what to do with the runnable based on what type of runnable it is.//
switch(runnable.type) {
case MESSAGE_EXECUTE: {
runnable.ref = (Ref) luidToRefMap.get(runnable.objectLuid);
//If the reference is remote then forward the message to the remote process.//
if(runnable.ref != null && runnable.ref.object != null) {
ExecuteData data = new ExecuteData();
data.readExternal(inputStream);
runnable.data = data;
}//if//
else if(runnable.ref != null && (runnable.ref.object == null)) {
ExecuteData data = new ExecuteData();
data.readExternal(inputStream);
runnable.data = data;
//Can't just forward the bytes on the stream to the remote system due to connection specific references to classes.//
// runnable.dataStream = new StreamBuffer();
// runnable.dataStream.writeBytes(inputStream);
//Set the object luid (for forwarding to the next remote system in the chain) to the remote system's luid for the object.//
runnable.objectLuid = runnable.ref.remoteRefKey.luid;
}//if//
else if(runnable.callNumber == 0L) {
//Error: Cannot find the specified reference! The message cannot be executed or sent to the next destination.//
Debug.log("Unhandled exception caught by ORB: ", new InvalidProxyException("Unable to traverse the network to reach the proxied object."));
runnable = null;
}//else if//
break;
}//case//
case MESSAGE_AGENT: {
//If we are following a proxy's path then lookup the proxy reference.//
if(runnable.proxyType == 1) {
runnable.ref = (Ref) luidToRefMap.get(runnable.objectLuid);
}//if//
//Forward or execute the agent.//
if((runnable.proxyType == 1) && (runnable.ref == null)) {
//Error: Cannot find the specified reference! The agent cannot be sent to the next destination.//
if(runnable.callNumber == 0L) {
Debug.log("Unhandled exception caught by ORB: ", new InvalidProxyException("Unable to traverse the network to reach the proxied object."));
runnable = null;
}//if//
}//if//
else if(!((runnable.proxyType == 1) && (runnable.ref.object == null))) {
AgentData data = new AgentData();
data.readExternal(inputStream);
runnable.data = data;
}//else if//
// //Forward or execute the agent.//
// if((runnable.proxyType == 1) && (runnable.ref == null)) {
// //Error: Cannot find the specified reference! The agent cannot be sent to the next destination.//
// if(runnable.callNumber != 0L) {
// localReturn(socket, runnable.callNumber, new InvalidProxyException("Unable to traverse the network to reach the proxied object."), null);
// }//if//
// else {
// Debug.log(new InvalidProxyException("Unable to traverse the network to reach the proxied object."), "Unhandled exception caught by ORB: ");
// }//else//
// }//if//
// else if((runnable.proxyType == 1) && (runnable.isForwardable) && (runnable.ref.object == null)) {
// localForward(socket, runnable, inputStream);
// runnable = null;
// }//if//
// else {
// AgentData data = new AgentData();
//
// data.readExternal(inputStream);
// runnable.data = data;
// }//else//
break;
}//case//
case MESSAGE_RETURN: {
runnable.holderObject = socket.removeReturnValueHolder(runnable.callNumber);
if(runnable.holderObject instanceof ReturnValueHolder) {
ReturnValueHolder holder = (ReturnValueHolder) runnable.holderObject;
if(holder.socket != null) {
//Can't just forward the rest of the stream since it may contain class references that are mapped to a number to compress the stream. Those mappings are specific to the connection upon which the message is sent.//
// runnable.dataStream = new StreamBuffer();
// runnable.dataStream.writeBytes(inputStream);
runnable.returnObject = inputStream.readObject();
}//if//
else {
runnable.returnObject = inputStream.readObject();
}//else//
}//if//
else if(runnable.holderObject instanceof IResultCallback) {
runnable.returnObject = inputStream.readObject();
}//else if//
break;
}//case//
case MESSAGE_LOCATE:
//Performed in the thread such that the return message doesn't tie up the listener thread.//
break;
case MESSAGE_DECREMENT_PROXY_COUNT:
//Performed in the thread such that chained decrement messages don't tie up the listener thread.//
break;
default:
Debug.halt();
runnable = null;
break;
}//switch//
//Run the runnable if this is not a special case.//
if(runnable != null) {
if(ThreadService.isRunning()) {
try {
ThreadService.run(runnable);
}//try//
catch(ThreadException e) {
//Ignore - only occurs while shutting down.//
}//catch//
}//if//
}//if//
}//try//
catch(ClassNotFoundException e) {
Debug.log(e);
Debug.halt();
}//catch//
catch(IOException e) {
Debug.log(e);
Debug.halt();
}//catch//
}//processMessage()//
/**
* Decrements the count on a proxy after receiving a message to do so.
* @param socket The socket that we received the message from.
* @param luid The identifier for the proxy whose count is decrementing.
*/
private void remoteDecrementProxyCount(AbstractConnection socket, long luid, int decrementSize) {
try {
decrementProxyCount(socket, luid, decrementSize);
}//try//
catch(Throwable e) {
Debug.log(e, "Caught while decrementing a proxy count.");
Debug.halt();
}//catch//
}//remoteDecrementProxyCount()//
/**
* Executes a method on a remote object.
* @param socket The socket that requested the method invokation.
* @param callNumber The number identifying the call to invoke the method.
* @param objectId Identifies the object to invoke the method on.
* @param proxyTypeId The identifier of the proxy class that is used to invoke the method.
* @param method Identifies which method to call.
* @param parameters The collection of parameter values in order that the method accepts them.
* @param ref An optional reference to the object's Ref object. This is an optimization to avoid double map lookups.
* @param autoProxyClass An optional interface class that must be used to proxy the resulting value.
*/
private void remoteExecute(AbstractConnection socket, long callNumber, long objectId, int proxyTypeId, long method, Object[] parameters, Ref ref, Class autoProxyClass) {
Object result = null;
try {
if(ref == null) {
ref = (Ref) luidToRefMap.get(objectId);
}//if//
if(ref.object != null) {
Proxy proxy = (Proxy) proxyTypeIdToProxyMap.get(proxyTypeId);
if((DEBUG) && (proxy == null)) {
Debug.log("Error: Failed to lookup proxy by id.");
}//if//
// Debug.log("Executing: " + method + " on " + ref.object.getClass().getName() + " via proxy: " + proxy.getClass().getName());
//Execute the method call.//
result = proxy.remoteExecute__Proxy(ref.object, method, parameters);
}//if//
else {
//Error: Should not ever get here.//
Debug.halt();
}//else//
}//try//
catch(Throwable e) {
Debug.handle(e);
if(callNumber != 0L) {
result = e;
}//if//
else {
Debug.log(e, "Unhandled exception caught by ORB: ");
}//else//
}//catch//
if(callNumber != 0L) {
if((autoProxyClass != null) && (result != null) && (autoProxyClass.isAssignableFrom(result.getClass()))) {
result = getProxy(result, autoProxyClass);
}//if//
localReturn(socket, callNumber, result);
}//if//
}//localExecute()//
/**
* Executes an agent.
* @param socket The socket that requested the method invokation.
* @param agent The agent to be executed.
* @param callNumber The number identifying the call to invoke the method.
*/
private void remoteExecuteAgent(AbstractConnection socket, Object agent, long callNumber) {
Object result = null;
try {
if(agent instanceof Runnable) {
((Runnable) agent).run();
}//if//
else if(agent instanceof IRunnable) {
result = ((IRunnable) agent).run();
}//else if//
}//try//
catch(Throwable e) {
Debug.log(e);
Debug.handle(e);
if(callNumber != 0L) {
result = e;
}//if//
else {
Debug.log("Unhandled exception caught by ORB: ", e);
}//else//
}//catch//
if(callNumber != 0L) {
localReturn(socket, callNumber, result);
}//if//
}//remoteExecuteAgent()//
/**
* Reports on the status of the orb.
* <p>This is intended for debug useage only and will be removed in the future.</p>
* @return The status of the orb at this time.
*/
public String reportStatus() {
StringBuffer buffer = new StringBuffer(1000);
String newLine = "\r\n";
Object next = null;
long lNext = 0L;
int iNext = 0;
IIterator iterator = null;
ILongIterator lIterator = null;
IIntIterator iIterator = null;
buffer.append("debug: ");
buffer.append(debug);
buffer.append(newLine);
buffer.append("allowDuplicateSockets: ");
buffer.append(allowDuplicateSockets);
buffer.append(newLine);
buffer.append("nextProxyLuid: ");
buffer.append(nextProxyLuid);
buffer.append(newLine);
buffer.append("nextObjectLuid: ");
buffer.append(nextObjectLuid);
buffer.append(newLine);
iterator = serverSocketsByName.keyIterator();
buffer.append("serverSocketsByName.getSize(): ");
buffer.append(serverSocketsByName.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Server Socket Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(serverSocketsByName.get(next));
buffer.append(newLine);
}//while//
iterator = serverSocketsByAddressPort.keyIterator();
buffer.append("serverSocketsByAddressPort.getSize(): ");
buffer.append(serverSocketsByAddressPort.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Server Socket Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(serverSocketsByAddressPort.get(next));
buffer.append(newLine);
}//while//
iterator = socketsByName.keyIterator();
buffer.append("socketsByName.getSize(): ");
buffer.append(socketsByName.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Socket Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(socketsByName.get(next));
buffer.append(newLine);
}//while//
iterator = socketsByAddressPort.keyIterator();
buffer.append("socketsByAddressPort.getSize(): ");
buffer.append(socketsByAddressPort.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Socket Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(socketsByAddressPort.get(next));
buffer.append(newLine);
}//while//
iterator = proxySingletons.keyIterator();
buffer.append("proxySingletons.getSize(): ");
buffer.append(proxySingletons.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Proxy Singleton Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(proxySingletons.get(next));
buffer.append(newLine);
}//while//
iterator = boundValueMap.keyIterator();
buffer.append("boundValueMap.getSize(): ");
buffer.append(boundValueMap.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Bound Value Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(boundValueMap.get(next));
buffer.append(newLine);
}//while//
lIterator = luidToRefMap.keyIterator();
buffer.append("luidToRefMap.getSize(): ");
buffer.append(luidToRefMap.getSize());
buffer.append(newLine);
while(lIterator.hasNext()) {
buffer.append('\t');
buffer.append("Luid/Ref Entry: ");
buffer.append(lNext = lIterator.next());
buffer.append(" -> ");
buffer.append(luidToRefMap.get(lNext));
buffer.append(newLine);
}//while//
iterator = remoteRefKeyToLuidMap.keyIterator();
buffer.append("remoteRefKeyToLuidMap.getSize(): ");
buffer.append(remoteRefKeyToLuidMap.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("RefKey/Luid Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(remoteRefKeyToLuidMap.get(next));
buffer.append(newLine);
}//while//
iterator = objectToLuidMap.keyIterator();
buffer.append("objectToLuidMap.getSize(): ");
buffer.append(objectToLuidMap.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Object/Luid Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(objectToLuidMap.get(next));
buffer.append(newLine);
}//while//
iterator = proxyTypeToTypeIdMap.keyIterator();
buffer.append("proxyTypeToTypeIdMap.getSize(): ");
buffer.append(proxyTypeToTypeIdMap.getSize());
buffer.append(newLine);
while(iterator.hasNext()) {
buffer.append('\t');
buffer.append("Proxy.class/ProxyLuid Entry: ");
buffer.append(next = iterator.next());
buffer.append(" -> ");
buffer.append(proxyTypeToTypeIdMap.get(next));
buffer.append(newLine);
}//while//
iIterator = proxyTypeIdToProxyMap.keyIterator();
buffer.append("proxyTypeIdToProxyMap.getSize(): ");
buffer.append(proxyTypeIdToProxyMap.getSize());
buffer.append(newLine);
while(iIterator.hasNext()) {
buffer.append('\t');
buffer.append("ProxyLuid/Proxy Entry: ");
buffer.append(iNext = iIterator.next());
buffer.append(" -> ");
buffer.append(proxyTypeIdToProxyMap.get(iNext));
buffer.append(newLine);
}//while//
/*
private boolean debug = false;
private HashMap serverSocketsByName = new HashMap(5);
private HashMap serverSocketsByAddressPort = new HashMap(20);
private HashMap socketsByName = new HashMap(20);
private HashMap socketsByAddressPort = new HashMap(20);
//private HashMap socketsBySocketId
//private LongObjectHashMap returnValueHolders = new LongObjectHashMap(30);
private boolean allowDuplicateSockets = false; //Whether to allow more than one socket to a specific address and port.//
private HashMap proxySingletons = new HashMap(10);
private HashMap boundValueMap = new HashMap(10); //A map of bound values by resource.//
//private ExceptionProcessor exceptionProcessor = null;
private LongObjectHashMap luidToRefMap = new LongObjectHashMap(20); //Maps a locally unique identifier to a Reference object containing a local or remote reference to an object.//
private ObjectLongHashMap remoteRefKeyToLuidMap = new ObjectLongHashMap(10); ////
private ObjectLongHashMap objectToLuidMap = new ObjectLongHashMap(10, Comparator.getIdentityComparator()); ////
private ObjectIntHashMap proxyTypeToTypeIdMap = new ObjectIntHashMap(20); //A map of proxy class to numeric proxy id.//
private IntObjectHashMap proxyTypeIdToProxyMap = new IntObjectHashMap(20); //A mapping of proxy class id to a proxy that can be used to invoke methods on an object that is being proxied.//
//private long nextSocketLuid = 0; //The last used locally unique socket id for the orb.//
private int nextProxyLuid = 0; //The last used locally unique socket id for identifying proxy classes.//
private long nextObjectLuid = 0; //The last used locally unique object id for the orb.//
private Object monitorReferenceCounter = new Object();
private Object monitorObjectRegistration = new Object();
private Object monitorSocketChange = new Object();
//private Object monitorSocketLuidCreator = new Object();
private Object monitorObjectLuidCreator = new Object();
*/
return buffer.toString();
}//reportStatus()//
/**
* Sends an agent to be executed on a remote process.
* @param agent The agent to be sent. The agent must implement IExternalizable or Externalizable, and it must implement IRunnable or Runnable.
* @param connectionIdentifier The network connection to send the agent to, or a proxy whose path will be used to send the agent.
* @param isOneWay Whether a return value will be expected. If this is one way then this method will return null.
* @param timeout The amount of time to stay blocked waiting for a response. If this time is exceeded then a timeout exception will be raised. A value of zero indicates no timeout.
* @param callback The optional callback object that will receive the result of the execution. If this is non-null and the method call is one-way, we will simply ignore the callback.
* @return The agent's result, or null if the call is one way or a callback was provided.
*/
public Object sendAgent(Object agent, Object connectionIdentifier, boolean isOneWay, long timeout, IResultCallback callback) {
Object result = null;
ValueHolder valueHolder = null;
if((!(agent instanceof Runnable)) && (!(agent instanceof com.common.thread.IRunnable))) {
throw new IllegalArgumentException("Invalid agent type. The agent must be serializable and must either implement java.lang.Runnable or com.common.thread.IRunnable.");
}//if//
if(connectionIdentifier instanceof NetworkConnectionIdentifier) {
//Execute the agent on the process at the other end of the socket.//
valueHolder = localSendAgent(((NetworkConnectionIdentifier) connectionIdentifier).getSocket(), agent, false, 0L, isOneWay, timeout, callback);
}//if//
else if(connectionIdentifier instanceof Proxy) {
if(((Proxy) connectionIdentifier).getIsProxyLocal__Proxy()) {
//Execute the agent locally.//
if(agent instanceof Runnable) {
((Runnable) agent).run();
}//if//
else if(agent instanceof IRunnable) {
result = ((IRunnable) agent).run();
}//else if//
if(!isOneWay && callback != null) {
final Object internalResult = result;
final IResultCallback internalCallback = callback;
if(ThreadService.isRunning()) {
try {
ThreadService.run(new Runnable() {
public void run() {
internalCallback.run(internalResult);
}//run()//
});
}//try//
catch(ThreadException e) {
//Ignore - only occurs while shutting down.//
}//catch//
}//if//
}//if//
}//if//
else {
//Execute the agent on the same process that the proxy's proxied object exists on.//
valueHolder = localSendAgent(((Proxy) connectionIdentifier).getSocket__Proxy(), agent, true, ((Proxy) connectionIdentifier).getLuid__Proxy(), isOneWay, timeout, callback);
}//else//
}//else if//
else {
throw new IllegalArgumentException("Invalid connection identifier. This value must either be a proxy or network connection identifier.");
}//else//
if(valueHolder != null) {
if(valueHolder.status == ValueHolder.STATUS_OK) {
if((!isOneWay) && (callback == null)) {
//TODO: May wish to identify the exceptions that should be rethrown just in case one is returned as a legitimate return value (not thrown).//
if(valueHolder.value instanceof Throwable) {
com.de22.orb.exception.ExceptionSupport.rethrow((Throwable) valueHolder.value);
}//if//
result = valueHolder.value;
}//if//
}//if//
else if(valueHolder.status == ValueHolder.STATUS_SOCKET_NOT_FOUND) {
throw new IllegalArgumentException("Socket not found: bad connection identifier.");
}//else if//
else if(valueHolder.status == ValueHolder.STATUS_SOCKET_ERROR) {
throw new SocketFailureException();
}//else if//
else if(valueHolder.status == ValueHolder.STATUS_TIMEOUT) {
throw new TimeoutException();
}//else if//
else {
throw new RuntimeException("Unexpected error in making a remote method call.");
}//else//
}//if//
return result;
}//sendAgent()//
/**
* Determines whether the orb has been shutdown.
* @return Whether the orb is shutdown.
*/
public boolean isShutDown() {
return isShutDown;
}//isShutDown()//
/**
* Shuts the orb down, closing all server sockets and sockets.
*/
public void shutdown() {
IIterator iterator = new LiteList(serverSocketsByName.keyIterator(), serverSocketsByName.getSize(), 10, false).iterator();
isShutDown = true;
while(iterator.hasNext()) {
try {
String name = (String) iterator.next();
AbstractConnectionServer serverSocket = (AbstractConnectionServer) serverSocketsByName.get(name);
while(serverSocket.getReferenceCount(name) != 0) {
closeServerSocket(name);
}//while//
}//try//
catch(Throwable e) {
Debug.log(e);
}//catch//
}//while//
iterator = new LiteList(socketsByName.keyIterator(), socketsByName.getSize(), 10, false).iterator();
while(iterator.hasNext()) {
try {
String name = (String) iterator.next();
AbstractConnection socket = (AbstractConnection) socketsByName.get(name);
while(socket.getReferenceCount(name) != 0) {
closeSocket(name);
}//while//
}//try//
catch(Throwable e) {
Debug.log(e);
}//catch//
}//while//
//Stop the engine if we are using NIO.//
if(nioEngine != null) {
nioEngine.stop();
}//if//
}//shutdown()//
}//Orb//