Wefts::Subscription Class Reference
[Advanced Syncronization constructs]

#include <wefts_subscription.h>

Inheritance diagram for Wefts::Subscription:

Inheritance graph
[legend]
Collaboration diagram for Wefts::Subscription:

Collaboration graph
[legend]
List of all members.

Detailed Description

Subscribe/notify model abstraction.

This class implements subscribe/notify model, with late subscription capabilities and FIFO (fair scheduling) policy.

In this model there are two kind of thread agents: the subscribers and the notifiers. The formers are threads willing to receive a "news" about something that happened or something that should be processed. The notifiers are the threads that can detect when there is something that the subscirbers should know, and that can eventually produce some data suitable of processing by the subscribers. Usually, subscribers are served in a FIFO order: the first thread that has subscriber an object will receive the first available notification. This feature can be turned off so that the first subscriber that is able to wake up is served with the newest notification; the first subscriber woken up depends on OS choices. This is suitable in tasks where all the subscribers are equivalent, so that overhead needed to select the first subscriber in line can be removed.

This model allows late subscription; this means that a notification issued by a thread will be left pending if no subscriber is waiting when the notification is sent; as soon as a threrad subscribes, it receives immediately the notified message without blocking. An arbitrary amount of notify() can be issued before an equal amount of subscribe() is served in this way. A subscribing thread will only have to wait if there aren't currently pending notifications.

However, late subscription model is optional and can be controller both by the notifiers and the subscribers. The subscribeNow() method will discard any pending notification, and will put the calling thread in wait for a new notify() that must be issued after that moment. The notifyAll() method will notify all the waiting subscribers altogether; if there isn't any subscriber waiting when notifyAll() is called, then the notification is not left pending; any subscription after a notifyAll() will block (unless other notify() messages were already pending before).

Subscription can be timed. A subscriber may decide to block only for a certain amount of time, or even to peek for available notification mesasges without blocking, and then it can return the control to the calling thread if a notification is not issued in the meanwhile.

The most interesting part of the subscribe/notify model is that notifiers can present some data to the subscribers. An object of class Referenced can be passed to the subscriber receiving the notification. An application will subclass Referenced adding the data that has to be transferred to subscribers.

If a notifiers doesn't want to send any particular data to the subscriber, the subscriber will receive 0 as the reference parameter. If the subscriber fails to get a notification (because of a timeout or an interrupt), subscribe() will return false.

Reference class has also support for disposability. A notifier may issue some read-only data to the subscribers, and allocate this data i.e. in a private stack instead of creating a dynamic memory object that would be soon destroyed by the subscriber.

Also notifyAll() can send references of the notified object to the subscribers, as it will increment the reference count to the amount of the subscribed threads. Each subscriber must then use the value as a read-only data, and can dispose it by decrementing the reference count with the method decRef().

This class is pretty unefficient. In an enviroment where the notifiers and the subscribers have well known interactions, and limited complexity, it is better to just derive a new class from FastCondition, using its data as a mean to communicate between threads.

But this class does pretty efficiently ITS job, that is allowing threadsafe communication of subscribers and notifiers in complex patterns, expecially where the internal working of each agent of the process is isolated or abstracted.

Anyhow use this class to process relatively time-sparse data, as the synchronization of the subscribers and the notifiers may require several lock/signal cycles. Finally, this class works better if there is one subscriber and many notifiers than many soubscribers and one notifier; processing time of notifications is o( notifiers * subscribers^2 ). In the most common case, with one subscriber and many notifiresr, the overhead is minimum.

See also:
Referenced


Public Member Functions

 Subscription (bool fairPolicy=true)
 Initializes the internal data of the subscription.

bool subscribe (double time=-1.0, Referenced **result=0)
 Subscribe to this object for a given amount of seconds.

bool subscribeNow (double time=-1.0, Referenced **result=0)
 Subscribe to this object, clearing all previous notifications.

bool subscribe (Referenced **result)
 Subscribe to this object.

bool subscribeNow (Referenced **result)
 Subscribe to this object clrearing previous notifications.

void notify (Referenced *data=0)
 Notifies that the subscription needs attention.

void notifyAll (Referenced *data=0)
 Notifies all the waiting threads at once NotifyAll sends a notification (or a "proceed" request) to every thread that is waiting with subscribe() or subscribeNow() method at the moment of its call.

int subscribers ()
 Returns the count of the subscribers currently waiting for this object to get a notification.

void setPolicy (bool fair)
 Sets or resets fair policy.

virtual void handleCleanup (int, void *)
 Reimplementation of cleanup.


Protected Member Functions

virtual bool subscriberCanGo (OSThread &pself)
 Subclassess hook for scheduling policy.

void limboSubscribe ()
 Put the thread in limbo subscribing if needed.


Private Member Functions

bool subscribeInternal (double time, Referenced **result)

Private Attributes

Mutex m_mutex
 Synchronizer.

Condition m_condNotify
 Signaled when a new notify is ready.

Condition m_condFinish
 Signaled when a notifyAll is done.

std::list< Referenced * > m_items
std::list< OSThread * > m_waiting
bool m_front
 True if fair policy is set.

int m_notifyAll
 Amount of notified ones.

int m_subscribed
int m_notifies
int m_limbo
 Count of thread waiting to be admitted as subscribers while notifyall is in progress.


Constructor & Destructor Documentation

Wefts::Subscription::Subscription bool  fairPolicy = true  ) 
 

Initializes the internal data of the subscription.

This basic subscription objec can be created with fair or unfair policy. Fair policy means that the first subscriber that enter the waiting list will be served with the first available notify. If policy is not fair, if there are more than one subscriber waiting, anyone of them can be awaken with no particular order. Notified objects are server to the awaken subscribers in the order they are posted.

Parameters:
fairPolicy true if policy has to be fair


Member Function Documentation

void Wefts::Subscription::handleCleanup int  ,
void * 
[virtual]
 

Reimplementation of cleanup.

Called when a subscription thread is canceled.

Implements Wefts::CleanupHandler.

void Wefts::Subscription::limboSubscribe  )  [protected]
 

Put the thread in limbo subscribing if needed.

The notifyAll process is a relatively long process. While a notifyAll is being processed, and all the subscribers are being released, is important not to have new subscriptions or avoid new notifyAll().

A subscribe() request may cause a rearrangement of the priority queue so that the newly subscribed object goes, leaving a notified thread still in the queue instead of waking it.

A subscribeNow() would be disastrous: it would delete all the notified object queue, forcing the notified object that are still not gone to stay in the waiting queue.

A notifyAll() could be even worse: it would create a copy of the object to be notified for each subscriber that is still not gone. So, an equal amount of threads would get an invalid object and proceed, while a notifyAll should have done nothing (being not present in the queue any non-notified thread).

notify() is safe, as it will just add a notification that will be grabbed by the first subscriber after the notifyAll phase is ceased.

BUT, notfy has to be delayed until the last limbo-sleeping thread has been relased, or the notification could be taken by an unrightful thread. Same is for notifyAll(): all the limbo subscribers should be notified as soon as they are able of reactivate themselves, and not just a random part of them.

void Wefts::Subscription::notify Referenced data = 0  ) 
 

Notifies that the subscription needs attention.

The notify() method leaves a notification so that the first thread that has subscribed in the past, or the first thread that will subscribe in the future, is immediately notified to proceed with elaboration. Notify() can also pass some data to the subscribers; if the parameter Referenced is given, that pointer will be returned by the subscribe() method in the waiting thread.

Parameters:
data the data that must be returned by subscribe() or 0 for none.

void Wefts::Subscription::notifyAll Referenced data = 0  ) 
 

Notifies all the waiting threads at once NotifyAll sends a notification (or a "proceed" request) to every thread that is waiting with subscribe() or subscribeNow() method at the moment of its call.

NotifyAll() can also pass some data to the subscribers; if the parameter Referenced is given, that will be returned by the subscribe() method in the waiting thread. For each subscription, a reference is added to data. If there aren't subscribers waiting when notifyAll() is called, then this notification is lost; notice that in this case, if the data passed as parameter is disposeable, it is destroyed anyway.

Parameters:
data the data that must be returned by subscribe() or 0 for none.
See also:
subscribe() Referenced

void Wefts::Subscription::setPolicy bool  fair  )  [inline]
 

Sets or resets fair policy.

A fair policy subscription is made so that the the subscibers are served in a FIFO queue; the first subscriber will get the first object that has been notified. If the policy is set to unfair, then the first subscribers that gets in control will get receive the notification.

Parameters:
fair true to set policy to fair (FIFO), false to set it to unfair (OS dependant)

bool Wefts::Subscription::subscribe Referenced **  result  )  [inline]
 

Subscribe to this object.

Candy grammar for subscribe( -1.0, result );

bool Wefts::Subscription::subscribe double  time = -1.0,
Referenced **  result = 0
 

Subscribe to this object for a given amount of seconds.

If time is set to -1, waits until a notify is issued, or until the wait is otherwise interrupted. If 0, the function returns immediately, returning a notification only if it were already present. If the timeout expires or the wait is interrupted, the method returns false, else it returns true. If the notification that has woken up the subscriber has a valid Referenced object, the result parameter will be filled with a valid poitner, else it will be set to 0.

The subscriber may also discard the notified object automatcally: if the result parameter is set to zero, the referenced object will be dereferenced (calling it's decRef() method) by the subscribe method itself.

Parameters:
time maximum time to wait for a notification in seconds, or -1 to wait forever.
result a pointer to a referenced object returned as notified object, if given. If not given, it is ignored.
Returns:
true if subscription is notified, false otherwise
See also:
Referenced

bool Wefts::Subscription::subscribeInternal double  time,
Referenced **  result
[private]
 

bool Wefts::Subscription::subscribeNow Referenced **  result  )  [inline]
 

Subscribe to this object clrearing previous notifications.

Candy grammar for subscribeNow( -1.0, result );

bool Wefts::Subscription::subscribeNow double  time = -1.0,
Referenced **  result = 0
 

Subscribe to this object, clearing all previous notifications.

This method works the same as subscribe(), except for the fact that all the pending notifications are removed before the thread begins to wait for new notifications.

Parameters:
time time to wait for a notification in seconds
result the notified object (if any) or 0. If not given, it is ignored.
Returns:
true on success or false on error or timeout
See also:
subscribe()

virtual bool Wefts::Subscription::subscriberCanGo OSThread pself  )  [inline, protected, virtual]
 

Subclassess hook for scheduling policy.

This function returns true if this subscriber is the one authorized to proceed at this moment. It is meant for subclasses willing to implement their scheduling policy. If all the waiting subscribers are notified, this function is NOT called, so it is necessary only to check if this thread should go when in competition with other threads. Only one thread is elected; if two or more notifications arrives before a single subscriber is able to proceed, a new election will be done after the first elected is gone, so the criterium of this funcion is "one at a time".

This method implements the FIFO scheduling, unless policy is set to unfair, in which case, the function returns true allowing the first checked item to go.

Parameters:
pself the thread-id of the waiting thread.

int Wefts::Subscription::subscribers  )  [inline]
 

Returns the count of the subscribers currently waiting for this object to get a notification.

Notice that the value returned could become invalid by the moment the calling thread is able to check or process it.


Member Data Documentation

Condition Wefts::Subscription::m_condFinish [private]
 

Signaled when a notifyAll is done.

Condition Wefts::Subscription::m_condNotify [private]
 

Signaled when a new notify is ready.

bool Wefts::Subscription::m_front [private]
 

True if fair policy is set.

std::list<Referenced *> Wefts::Subscription::m_items [private]
 

int Wefts::Subscription::m_limbo [private]
 

Count of thread waiting to be admitted as subscribers while notifyall is in progress.

Mutex Wefts::Subscription::m_mutex [private]
 

Synchronizer.

int Wefts::Subscription::m_notifies [private]
 

int Wefts::Subscription::m_notifyAll [private]
 

Amount of notified ones.

int Wefts::Subscription::m_subscribed [private]
 

std::list<OSThread *> Wefts::Subscription::m_waiting [private]
 


The documentation for this class was generated from the following files:
Generated on Tue Oct 5 14:57:03 2004 for Wefts by doxygen 1.3.7