Main Page   Modules   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   Related Pages  

Some example

How to use condition variables and mutexes

The minimal usage of a condition variables require something like the following pseudo-code:

      lock the mutex
      While the predicate is false
            Wait for the condition
      (optionally change some data)
      unlock the mutex.
   

A mutex that may alter the outcome of the predicate should do something like:

      lock the mutex
      change the data
      signal the condition
      unlock the mutex
   

To demonstrate how this work in practice, we'll reinvent a mutex(!). Our code will make a thread wait until it can achive the lock on the mutex.

// $Id: fakemutex.cpp,v 1.1 2003/08/16 20:45:57 jonnymind Exp $
// fakemutex.cpp - an introductory test
//
// This program is free software - GPL 2.0 license
// (C) Giancarlo Niccolai
//
// NOTE: this program is demonstrative and hence not compiled
//       in the make process of Wefts test suite.
//

#include <wefts.h>
using namespace Wefts;

class FakeMutex {
private:
   // our condition
   Condition m_cond;

   // the mutex related with the condition.
   Mutex m_mutex;

   // Status of our mutex. Is it locked?
   bool m_locked;

public:
   FakeMutex() {
      // we have to tell the condition what mutex to use.
      m_cond.setMutex( &m_mutex );
      // initally unlocked
      m_locked = false;
   }

   void achive() {
      m_mutex.lock();

      // our predicate: we must wait for the lock to be free
      while( m_locked )
         m_cond.wait(); // this also does m_mutex.unlock()

      // good news here: m_locked is false AND we hold m_mutex, and
      // this means we are the only ones that can touch m_locked.
      m_locked = true;

      // done, we can release internal mutex
      m_mutex.unlock();
   }

   void release() {
      // this ensure we are the only one handling m_locked.
      m_mutex.lock();

      m_locked = false;

      // hey, we have changed the predicate outcome!
      m_cond.signal();

      // ok, we are done with our data. Lets let it go.
      m_mutex.unlock();
   }
};

More complete example

Now, we can get far better than this. First of all, we signal each time we call the release() method; if the mutex is not locked, or if there aren't threads waiting to achive it, the signal is wasted (well, it is not a GREAT waste, but it can be avoided with a very little espedient). Moreover, the mutex is not reentrant: if a thread that has already the lock tries to achive it again, it will be blocked forever as no one will be able to release the mutex.

So, a better pseudocode for the waiter can be:

      lock the mutex
      IF the predicate is false
         chage some data saying that we are waiting
         do preparation things
         While the predicate is false
            wait
         undo preparation things
         change data to state we are not waiting anymore
      ENDIF
      work with shared data
      unlock the mutex.
   

And the signaler should be just a little smarter:

      lock the mutex
      change the data
      IF there are threads waiting
         signal the condition
      unlock the mutex
   

To make our mutex reentrant, we must be able to tell if our thread is the one that locked the thread previously. Wefts provides a low-level wrapper class called OSThread that has the support for such tests. We are interested in fuor methods: OSThread::setCurrent() set the internal data of the OSThread object so that it reflects our current thread; OSThread::invalidate() clears those data, so that the object can be considered not referencing any existing thread. OSThread::same() returns true if the thread object matches our current thread, (that is to say, if the caller thread is the same that called OSThread::setCurrent() before), and OSThread::equal( OSThread & ) that returns true if a given OSThread object represent the same system thread as the parameter.

When a thread that has already achieved the lock tries to do it again, a coutner is incremented; when it unlocks, the counter is decremented, and when it reaches 0, other threads are allowed trying to get the lock.

Here is the change to our code:

// $Id: rfakemutex.cpp,v 1.1 2003/08/16 20:45:57 jonnymind Exp $
// rfakemutex.cpp - an introductory test
//
// This program is free software - GPL 2.0 license
// (C) Giancarlo Niccolai
//
// NOTE: this program is demonstrative and hence not compiled
//       in the make process of Wefts test suite.
//

#include <wefts.h>
using namespace Wefts;

class RFakeMutex {
private:
   Condition m_cond;
   Mutex m_mutex;
   int m_locked; // notice: now we have a counter here
   // We also need a count to specify that we are waiting.
   int m_waiting;

   //this the thread owning the mutex
   OSThread m_owner;

public:
   RFakeMutex() {
      m_cond.setMutex( &m_mutex );
      // initial count of locks
      m_locked = 0;
      // initial count of threads waiting to achive the lock
      m_waiting = 0;
      //the owner is invalidated in its constructor; we don't need to
      // set it here.
   }

   void achive() {
      m_mutex.lock();

      // Now we must see if we must begin our waiting process.
      if ( m_locked > 0 && ! m_owner.same() ) {

         // Communicate to the world that there is someone waiting
         m_waiting++;

         while( m_locked > 0)
            m_cond.wait();

         // we are not waiting anymore here.
         m_waiting--;
      }
      // ok, if we are here we have the right to achive the lock
      m_locked++;

      // and we want to mark the mutex as ours
      if ( m_locked == 1 ) // first lock request ?
         m_owner.setCurrent();

      m_mutex.unlock();
   }

   void release() {
      m_mutex.lock();

      // are we the rightful owners
      if ( m_locked > 0  && m_owner.same() ) {
         m_locked--;
         // has this changed the predicate outcome?
         if ( m_locked == 0 ) {
            m_owner.invalidate(); // we are not the owners anymore
            //...yes..., we have chaged it, but is there someone interested?
            if ( m_waiting > 0 )
               m_cond.signal();
         }
      }

      m_mutex.unlock();
   }
};

In this example code, the m_owner.invalidate() in release() method is not strictly necessary, as it is simply overwriten when the lock count gets to 1; anyhow is better to invalidate it as the overhead is minimal, and in the future you may wish to have more sophisticated chech on the current owner of the mutex.

Note:
While the overhead of invalidating an OSThread object is minimal, the burden of setCurrent() may be significant, depending on the underlying system implementation.

Timed watis and wait tests.

Adding a timing to our RFakeMutex class is simple. You just have to change the return type of achive method from void to bool, so that the calling thread can know if the lock has been achived or not. Add also a time parameter; a double stating how many second we have to wait will be good.

Then change the lines:

      while( m_locked > 0)
         m_cond.wait();

with:

      while( m_locked > 0 ) {
         // second == the parameter you added at achive() decl
         if( ! m_cond.wait( seconds ) ) {
            //timed out!
            m_waiting--; // we are not waiting waiting anymore
            m_mutex.unlock(); //anyway, we are given our lock.
            return false;
         }
      }

Finally add a return true; after the last line of the method.

Note:
If the wait is interrupted by a system signal, Condition::wait() returns true anyway. This ensures that false is returned only if an explicit timeout has elapsed, that is, if the caller is no more willing to wait.
To build a try-lock function, that tries to lock the mutex, retruning true on succes or false if the lock can't be achived, just test for the predicate once, and follow the lock operations if the predicate is true.

Deferred cancelation handling.

What if your condition wait is stopped from another thread?

You have two options here; you may rely on kind cancelation, using just timed waits and checking periodically if a cancelation request has been sent, or you can use automatized Wefts condition wait cancelation scheme.

If the thread is cancelable when entering a conditon wait, then a stop() call on the waiting thread will cause the condition to be interrupted, and a pre-set routine to be called. Just derive your class from the CleanupHandler, and add a void handleCleanup( int position ) method. Then, before engaging the wait add a Condition::onStop( this ) call to set the handler to your method. You can pass an optional integer parameter that will be given to the method; this allows you to handle different condition wait cleanups from the same handler, if your class or your code needs it.

After the wait() returns, you may want to call Condition::onStop() (without parameters) to clear the cleanup function. This class wouldn't really need it, as each conditon wait is preceded by a onStop clause; but we do this for code clarity.

In our case, the cleanup routine will have to undo all the things the thread has done before entering the wait: the thread has been canceled, and this means that it must release all the claims it has made while waiting for the condition to become true.

This is the final code of our fake mutex:

// $Id: xfakemutex.cpp,v 1.2 2003/08/17 01:15:19 jonnymind Exp $
// xfakemutex.cpp - an introductory test
//
// This program is free software - GPL 2.0 license
// (C) Giancarlo Niccolai
//
// NOTE: this program is demonstrative and hence not compiled
//       in the make process of Wefts test suite.
//

#include <wefts.h>
using namespace Wefts;

class XFakeMutex: public CleanupHandler {
private:
   Condition m_cond;
   Mutex m_mutex;
   int m_locked;
   int m_waiting;
   OSThread m_owner;

public:
   XFakeMutex() {
      m_cond.setMutex( &m_mutex );
      m_locked = 0;
      m_waiting = 0;
   }

   void achive() {
      m_mutex.lock();

      if ( m_locked > 0 && ! m_owner.same() ) {
         m_waiting++;

         m_cond.onStop( this, 0 ); // optional integer parameter
         while( m_locked > 0)
            m_cond.wait();

         m_cond.onStop(); // clear cleanup handler
         m_waiting--;
      }
      m_locked++;

      if ( m_locked == 1 )
         m_owner.setCurrent();

      m_mutex.unlock();
   }

   bool tryachive() {
      m_mutex.lock();

      // can we lock?
      if ( m_locked > 0 && ! m_owner.same() ) {
         // no? ... well we give up.
         return false;
      }
      // yes? GREAT!
      m_locked++;

      if ( m_locked == 1 )
         m_owner.setCurrent();

      m_mutex.unlock();
      return true;
   }


   //timed wait
   bool achive( double seconds ) {
      m_mutex.lock();

      // Now we must see if we must begin our waiting process.
      if ( m_locked > 0 && ! m_owner.same() ) {
         m_waiting++;

         m_cond.onStop( this, 0 ); // optional integer parameter
         while( m_locked > 0) {
            if ( ! m_cond.wait() ) {
               m_waiting--;
               m_cond.onStop();
               m_mutex.unlock();
               return false;
            }
         }

         m_cond.onStop();
         m_waiting--;
      }
      // ok, if we are here we have the right to achive the lock
      m_locked++;

      // and we want to mark the mutex as ours
      if ( m_locked == 1 ) // first lock request ?
         m_owner.setCurrent();

      m_mutex.unlock();
      return true;
   }


   void release() {
      m_mutex.lock();

      // are we the rightful owners
      if ( m_locked > 0  && m_owner.same() ) {
         m_locked--;
         // has this changed the predicate outcome?
         if ( m_locked == 0 ) {
            m_owner.invalidate(); // we are not the owners anymore
            //...yes..., we have chaged it, but is there someone interested?
            if ( m_waiting > 0 )
               m_cond.signal();
         }
      }

      m_mutex.unlock();
   }

   // we don't need the parameter here...
   void handleCleanup( int )
   {
      // undoing all the preparations
      m_waiting --; // no more waiting
      m_cond.onStop(); // clearing cleanup function
      m_mutex.unlock(); // releasing the mutex we are given on entrance.
   }
};

As you can see, our fake mutex is not anymore so fake; it implements reentrancy, timed trylocks and cancelation during lock achiving. In fact, this is a modified code taken from Wefts::XMutex

Complex conditions.

As conditions variables are often associated with mutex in a 1:1 relationship, Wefts provides two classes that derives both from Condition and Mutex, and that uses themselves as condition mutexes. The net effect is to have a powerful and very fast object that can be used both as a mutex and as a condition, providing lock(), unlock() and trylock() methods together with wait() and signal().

Two basic classes are provided: FastCondition and RCondition. The first is implemented with a flat non-reentrant (and thus fast) mutex, while the second is derived from a reentrant mutex and a condition. So, if your program does not need mutex reentrancy (as a well designed application should), you can safely use the FastCondition version, while you can use RCondition if you plan to use recursive locks.

To understand how a FastCondition may work, get the XFakeMutex class and:

  1. change Condition m_cond in FastCondition m_cond;
  2. remove the declaration of m_mutex;
  3. remove the m_cond.setMutex() call in the constructor;
  4. search & replace m_mutex with m_cond.
Voila', you are done: more code readability and more incapsulation.

The best aspect of this class is the fact that you may want to add the data related with the condition predicate to a subclass of your own, like that:

      class MyCondition: public FastCondition, public cleanup
      {
      public:
         int m_waiting;
         int m_locked;

         MyCondition()
            :FastCondition()
         {
            m_waiting = 0;
            m_locked = 0;
         }
         ...
      };

This encapsulates all the data that can be shared among threads in one object, which has already synchronization primitives both for excluding access from shared data AND for waiting for the data to satisfy a predicate. If you will, you can even code the predicate as a member of your class... The advantage of this is that you need to send a just pointer around to move a whole lot of data, the method assiciated with that and the syncrhonization needed to handle that correcly across threads.

More complex conditions.

You may begin to wonder "what if I need something like that, but with a read-write access?"

Well complex mutexes as XMutex, RWMutex and RRWMutex are not descendants of the mutex class; a read-write mutex cannot be used to lock the predicate that a condition must wait for. To explain why, I need just one word: mess. Using a complex mutex to wait for a condition rises too many problems, as, "Should we use a read-only lock or a read write one?" or "should we starve readers or writers?" not to talk about cancelation issues. What would be the cleanup sequence? Would you cancel the wait or the mutex lock? Too messy, really.

But then, it can be still interesting to provide timed, interruptable, read only/readwrite access to the data you may also want to wait for changes.

The answer is to implement a mix of a conditon and a complex mutex like the one we need.

Observe this class

      class MyThing: public RRWMutex
      {
         void * data;
         ...
      };

      class Pulsar: public CleanupHandler
      {
         ...
         MyThing *cargo;
         bool m_changed;
         FastCondition m_cond;
         ...

         void WaitForAChange() {
            m_cond.lock();
            m_cond.onStop(this, 0 );
            while( ! m_changed ) m_cond.wait();
            cond.unlock();
            // IMPORTANT: do it after.
            cargo->lockRead(); // or the kind of lock you need.
         }
      };

Done. When there is something worth the attention of a waiter, other threads working on the data will just change the m_changed and signal the condition. The only drawback is that anything could happen before WaitForAChange() can achive its lock(), but this is probably unimportant, as nothing is gonna change the fact that m_changed told us that we must take care of something in the cargo that has changed.

Note:
NEVER cross a mutex. Don't ever lock a mutex while are holding another lock, unless:
  1. The process cannot be interrupted while holding the two mutexes and
  2. The second mutex is released before you try to unlock the first and
  3. The order in which you lock the mutexes is never inverted in any part of the program and
  4. Avoid it anyway.
If you don't understand why... well what happens if thread 1 locks mutex A, then locks mutex B, while thread 2 tries to lock mutex B? Oh well, thread 2 is stopped, and you are ok. And what if thread 1 locks A, THEN thread 2 locks B... and then your thread 1 try to lock B? Thread 1 blocks, until 2 releases B. And if now Thread 2 wants to lock A?

This is the most common example of deadlock.

If you need precisely to serve some data when a change happens in another thread, and you need not to have interferences while waiting to be able to get the locks, try the Subscription class.

Multiple conditions for a mutex.

It is possible that a correlated set of data (that must then be guarded with a single mutex) is susceptible to interest partially some threads and partially some others. In this case, you may have two or more predicates that refer to the same set of data; in this situation, you may want to have more than a condition to be signaled to specify that one of the possible predicates may have changed.

This can be handled also with a single condition variable, but doing so would wake up, in some moments, threads whose predicate is known not to be changed, and causing in this way useless wakeups of threads, that is to say, useless context switches.

A typical example is a set of reader and writer threads operating on a finite ring buffer( See Wefts::RingBuffer ).

Using just one condition, if the buffer becomes full and the writers are put in wait, a reader would signal to every thread that it has freed a cell; obviously, all the other readers may be uninterested in this.

Note:
It is improbable that the readers are waiting if the buffer is big enough, but it is highly possible that they are waiting if the buffer has less cells than the sum of the readers and writers.
Wefts allows using the base condition class to create a set of conditions referencing to the mutexes. This class implements our smart ring buffer:

// $Id: smartring.cpp,v 1.2 2003/08/17 01:15:19 jonnymind Exp $
// smartring.cpp
//
// This program is free software - GPL 2.0 license
// (C) Giancarlo Niccolai
//
// NOTE: this program is demonstrative and hence not compiled
//       in the make process of Wefts test suite.
//

#include <wefts.h>
using namespace Wefts;

// an arbitrary small size
#define BUFDIM 10

class SmartRing: public CleanupHandler
{
   // the mutex
   Mutex m_mutex;

   // reader's condition
   Condition m_rCond;

   // writer's condition
   Condition m_wCond;

   int m_buffer[ BUFDIM ];   // a buffer
   // write pointer
   int m_wPos;
   // read pointer
   int m_rPos;

   // writers waiting
   int m_wWaiting;
   // readers waiting
   int m_rWaiting;

public:
   SmartRing()
   {
      m_wPos = 1; // read + 1 == write means empty
      m_rPos = 0;

      // no thread waiting.
      m_wWaiting = 0;
      m_rWaiting = 0;

      // setting here our mutex to work with two conditions
      m_wCond.setMutex( &m_mutex );
      m_rCond.setMutex( &m_mutex );

      // we have just one kind of cleanup per condition.
      m_wCond.onStop( this, 0 ); // 0 == cleanup a writer
      m_rCond.onStop( this, 1 ); // 1 == cleanup a reader
   }

   // is our ring empty?
   bool empty() {
      return (m_wPos == m_rPos + 1|| (m_wPos == 0 && m_rPos == BUFDIM-1));
   }

   // is our ring full?
   bool full() {
      return (m_wPos == m_rPos);
   }

   void write( int data )
   {
      m_mutex.lock();

      // have we to wait?
      if( full() ) {
         m_wWaiting++;
         while ( full() ) {
            m_wCond.wait();
         }
         m_wWaiting--;
      }

      // ok, we can write;
      // we take note of the other predicate to signal if it changes
      bool was_empty = empty();

      // write then advance
      m_buffer[ m_wPos ++ ] = data;

      // ring-around
      if ( m_wPos == BUFDIM ) m_wPos = 0;

      //now, if the predicate changed, and if someone is listening, signal.
      if( was_empty && m_rWaiting != 0 ) {
         m_rCond.signal(); // tell the readers situation has changed.
      }

      // done
      m_mutex.unlock();
   }

   int read()
   {
      m_mutex.lock();

      // have we to wait?
      if( empty() ) {
         m_rWaiting++;
         while ( empty() ) {
            m_rCond.wait();
         }
         m_rWaiting--;
      }

      // ok, we can read
      // we take note of the other predicate to signal if it changes
      bool was_full = full();

      // advance and then read
      if ( m_rPos == BUFDIM )
         m_rPos = 0;
      else
         m_rPos++;

      int data = m_buffer[ m_rPos ];

      //now, if the predicate changed, and if someone is listening, signal.
      if( was_full && m_wWaiting != 0 ) {
         m_wCond.signal(); // tell the writers situation has changed.
      }

      // done
      m_mutex.unlock();
      return data;
   }

   void handleCleanup( int pos )
   {
      if ( pos == 0 ) // cleanup a writer
         m_wWaiting--;
      else
         m_rWaiting--;

      m_mutex.unlock();
   }
};

As you can see from lines 50 and 51, you can assign a mutex to any given amount of conditions. Moreover, when a condition is destroyed, its mutex remains intact, so it is possible to dynamically allocate and destroy conditions while holding a pivotal mutex that is assigned to all of them.

Also, notice the hack in lines 54 and 55: as long as there is only one kind of cleanup sequence for a given condition (as in our case), it is possible to set the cleanup handler only once. This handler is in fact copied in the waiting thread stack when a wait() is issued, so there isn't any need to re-set it each time if it does not changes.

As you can see in lines 93 and 127, using two conditions is possible to have more focused signalations, an thus more efficient code; the cost of an additional condition is really small (as Condition is a very small class), while if we used just one condition, we would have had to 1) call onStop() before each condition wait and 2) signal threads that would have probably not been interested.


Generated on Mon Aug 18 05:53:44 2003 for Wefts by doxygen1.2.18