/* * Copyright (c) 2006,2008 Declarative Engineering LLC. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Declarative Engineering LLC * verson 1 which accompanies this distribution, and is available at * http://declarativeengineering.com/legal/DE_Developer_License_v1.txt */ package com.foundation.util; import com.common.thread.Monitor; import com.common.thread.ThreadService; import com.common.util.ICollection; import com.common.util.LiteHashMap; import com.common.util.LiteList; import com.foundation.util.Updater.UpdaterRunnable; /** * This base class provides functionality for aggregators of zero or more observable collections. */ public abstract class ObservableAggregator { /** The number of times the startChanges() method has been called. This allows multiple threads to notify this aggregator of changes at the same time. */ protected int changeStartCounter = 0; /** The set of listeners indexed by the collection they are listening to. */ private LiteHashMap listenersByCollectionMap = new LiteHashMap(10); /** * An instance of Listener should be created for each collection being listened to - each collection that makes up the aggregate collection. */ private class Listener implements IInlineCollectionObserver { /** The collection being listened to. */ private IInlineCollectionObservable collection = null; /** A collection used to temporarily collect items being added to the aggregate collection. */ protected LiteList addedItems = null; /** A collection used to temporarily collect items being removed from the aggregate collection. */ protected LiteList removedItems = null; /** * Listener constructor. * @param collection The collection being listened to. */ public Listener(IInlineCollectionObservable collection) { this.collection = collection; }//Listener()// /* (non-Javadoc) * @see com.foundation.util.IInlineCollectionObserver#removingAll() */ public void removingAll() { removedItems.addAll(collection.getCollection()); }//removingAll()// /* (non-Javadoc) * @see com.foundation.util.IInlineCollectionObserver#valueAdded(java.lang.Object) */ public void valueAdded(Object value) { if(addedItems != null) { addedItems.add(value); }//if// else { final Object added = value; applyChange(new Runnable() { public void run() { applyAddChange(added); }//run()// }); }//else// }//valueAdded()// /* (non-Javadoc) * @see com.foundation.util.IInlineCollectionObserver#valueRemoved(java.lang.Object) */ public void valueRemoved(Object value) { if(removedItems != null) { removedItems.add(value); }//if// else { final Object removed = value; applyChange(new Runnable() { public void run() { applyRemoveChange(removed); }//run()// }); }//else// }//valueRemoved()// /* (non-Javadoc) * @see com.foundation.util.IInlineCollectionObserver#startChanges(int) */ public void startChanges(int changeCount) { changeStartCounter++; addedItems = new LiteList(changeCount); removedItems = new LiteList(changeCount); }//startChanges()// /* (non-Javadoc) * @see com.foundation.util.IInlineCollectionObserver#stopChanges() */ public void stopChanges() { if(--changeStartCounter == 0) { final LiteList addedItems = this.addedItems; final LiteList removedItems = this.removedItems; this.addedItems = null; this.removedItems = null; //Apply the changes.// applyChange(new Runnable() { public void run() { applyChanges(addedItems, removedItems); }//run()// }); }//if// }//stopChanges()// }//Listener// /** * ObservableAggregator constructor. */ public ObservableAggregator() { super(); }//ObservableAggregator()// /** * Releases the aggregator and stops all listeners. *

Warning: Failure to call this method may result in memory leaks and wasted cycles.

*/ public void release() { IInlineCollectionObservable[] collections = new IInlineCollectionObservable[listenersByCollectionMap.getSize()]; listenersByCollectionMap.toKeyArray(collections); for(int index = 0; index < collections.length; index++) { removeCollection(collections[index]); }//for// }//release()// /** * Adds the collection to this aggregator. * @param collection The collection to be added. */ public void addCollection(IInlineCollectionObservable collection) { Monitor monitor = collection.getMonitor(); Monitor.lock(monitor); try { if(listenersByCollectionMap.containsKey(collection)) { //Cannot add a collection twice.// throw new IllegalArgumentException(); }//if// Listener listener = new Listener(collection); collection.addCollectionObserver(listener); listenersByCollectionMap.put(collection, listener); }//try// finally { Monitor.unlock(monitor); }//finally// }//addCollection()// /** * Removes the collection from this aggregator. * @param collection The collection to be removed. */ public void removeCollection(IInlineCollectionObservable collection) { Monitor monitor = collection.getMonitor(); Monitor.lock(monitor); try { Listener listener = (Listener) listenersByCollectionMap.get(collection); if(listener != null) { collection.removeCollectionObserver(listener); }//if// else { //Cannot remove a collection not previously added.// throw new IllegalArgumentException(); }//else// }//try// finally { Monitor.unlock(monitor); }//finally// }//removeCollection()// /** * Applies the changes represented by the runnable that will enact the changes. * @param runnable The object run to apply the changes. */ private void applyChange(final Runnable runnable) { if(getAggregateMonitor() != null && !ThreadService.getIsSingleThreadedContext()) { Updater.getSingleton().run(new UpdaterRunnable() { public void run() { runnable.run(); }//run()// public Monitor getMonitor() { return getAggregateMonitor(); }//getMonitor()// }); }//if// else { runnable.run(); }//else// }//applyChange()// /** * Gets the monitor for the aggregate. * @return The monitor used to lock the aggregate collection as changes get applied. */ protected abstract Monitor getAggregateMonitor(); /** * Requests that the added and removed items are applied to the aggregate collection. * The caller will have already locked on the aggregate's monitor. * @param addedItems The items to be added. * @param removedItems The items to be removed. */ protected abstract void applyChanges(ICollection addedItems, ICollection removedItems); /** * Requests that the item be added to the aggregate collection. * The caller will have already locked on the aggregate's monitor. * @param item The item to be added. */ protected abstract void applyAddChange(Object item); /** * Requests that the item be removed from the aggregate collection. * The caller will have already locked on the aggregate's monitor. * @param item The item to be removed. */ protected abstract void applyRemoveChange(Object item); }//ObservableAggregator//