[cpp-threads] Comments on n2094

Ion Gaztañaga igaztanaga at gmail.com
Fri Sep 29 10:11:54 BST 2006


Hi to all,

I've finally found some time to implement N2094 thread launching
interface above Boost (code attached). I've used library-based
pseudo-move semantics, so that my test looks something like this:

   for(int i = 0; i < Repeat; ++i){
      task<int> t (launch_in_pool(boost::bind(func, i)));
      int r (t());
      if(r != i){
         return 1;
      }
   }

The code works also for movable only types, for example, for a movable-only int,

class movable_only_int_t
{
   movable_only_int_t(const movable_only_int_t &);
   movable_only_int_t &operator=(const movable_only_int_t &);
   int int_;

   public:

   movable_only_int_t(int i = 0)
      : int_(i)
   {}

   ~movable_only_int_t()
   {  int_ = 0;   }

   movable_only_int_t(n2094::moved_object<movable_only_int_t> mo)
   {  int_ = mo.get().int_;   mo.get().int_ = 0;   }

   movable_only_int_t & operator=
      (n2094::moved_object<movable_only_int_t> mo)
   {  std::swap(int_, mo.get().int_);  return *this;  }

   friend bool operator ==
      (const movable_only_int_t &a, const movable_only_int_t &b)
   {  return a.int_ == b.int_;  }

   friend bool operator !=
      (const movable_only_int_t &a, const movable_only_int_t &b)
   {  return a.int_ != b.int_;  }
};

namespace n2094 {

//Mark movable_only_int_t as movable
template<>
struct is_movable<movable_only_int_t>
{
   enum {value = true};
};

}  //namespace n2094 {

you can test this:

   for(int i = 0; i < Repeat; ++i){
      task<movable_only_int_t> t(launch_in_pool(boost::bind(movable_func, i)));
      movable_only_int_t r (t());
      movable_only_int_t c(i);
      if(r != c){
         return 1;
      }
   }

The code is not optimized but it was pretty easy to implement it based
on Howard's and Peter's code. It's just toy-code, not production code.

Exception propagation is an experiment: it implements Beman's
cloneable proposal (n2061) but when using task/future: Clones and
rethrows the exception if the exception derives from cloneable,
otherwise, throws thread_exception_error. However, in the current
implementation, if the last future/task is destroyed , std::terminate
is not called. class thread does not implement exception propagation:
it's Peter's set_exception approach, using cloneable exceptions. An
experiment, don't take implemented exception propagation too
seriously.

After revising n2094 a bit, I have some comments. I think exception
specification is a hard issue here. Here we go:

------------------------------------
0. Exceptions
------------------------------------

0.0 Mutexes
-----------

In the proposal, several mutex operations can throw:

-> Constructors
-> xxx_lock() functions
-> Promotion transitions(sharable->upgradable, upgradable->exclusive,
sharable->exclusive).
-> From the proposal: "The try-locking, unlocking, and non-blocking
conversion functions are specified to not throw an exception."

a) The first consequence is that implementing an upgradable_mutex
using the suggested two condition variables + a mutex is not correct,
since all operations (including unlock()) will need to lock the
internal mutex, and that internal mutex locking can throw. Not that
this is worries me much, but it's a bit annoying.

b) This supposes that when promoting from upgradable to exclusive
ownership, the mutex must acquire all the needed resources for
sharable ownership. Otherwise, unlock_and_lock_sharable(), or
unlock_and_lock_upgradable() +  unlock_upgradable_and_lock_sharable()
combo could throw.

c) One might think that the mutex constructor should acquire all the
needed resources for mutex locking so mutex.lock() could be a no-throw
operations. My N2043 suggested that. However, some mutex
implementations use lazy resource allocation, a key feature for
process-shared, persistent mutexes and I don't know if no-throw can be
guaranteed. Other implementations can detect that the thread/process
owning the mutex has died, returning an error in the lock function in
other threads.

d) A throwing lock() operation worries me a bit, since in real-time,
embedded environments exceptions are normally disabled. In those
systems, threads/mutexes (usually with priority inversion/ceiling
capabilities) are present. I know that it's a niche segment, but I
would like to use standard C++ in those systems too. Filesystem
library has functions taking "system_error_code& ec" as an argument
guaranteeing no-throw operations. The throwing constructor might be
more problematic, though.

e) If try_xxx operations don't throw, how is the user supposed to know
if the operation has failed because another thread has the ownership
of the mutex or because there are no resources available? Several
generic lock functions based on try_xxx functions might behave badly
if try_xxx returns false if there are no resources.

We might think about a non-throwing mutex lock but I really don't know
if some lazy initialization implementations (glibc, for example) can
guarantee this.

0.1 Condition variables
------------------------

Condition variables and exceptions are a very tricky combination,
IMHO. In this proposal, the situation is more tricky, since condition
variables can work with user-defined mutexes (which is a great
feature). If mutex locking can throw, that means that condition wait
can also throw, since it must lock the mutex after it's been notified.

void my_func()
{
  exclusive_lock(&mut_);

  //modify shared variables
  //...

  try {
     while(/**/){
       cond_.wait(exclusive_lock);
     }
  }
  catch(/**/){
     //can I change shared variables?
  }
}


void condition::wait(Mutex &external_mutex)
{
   //pseudo code
   this->wait_impl_prepare(); // Can this throw?
   external_mutex.unlock();   // This can't throw
   this->wait_impl();         // Can this throw?
   external_mutex.lock();     // This can throw
}

Basically, we need to define if we can have errors while waiting in
the condition (e.g. unavailable resources in the OS condition queue).
The second question is if the external mutex should be relocked when
the wait throws. If the internal wait can throw and the mutex is
relocked, we can have these situations:

a) wait_impl() throws, the mutex relock does not throw: the user can
do a rollback, and change shared variables modified before waiting in
the condition variable.

b) wait_impl() doesn't throw, but the mutex relock throws: the user
can't touch the modified shared variables.

c) both wait_impl() and mutex.lock() throw. Really frightening.

What is the user supposed to do if a condition throws? Can the user
recover resources and try it again? Can we guarantee that the mutex
will be always relocked?

I can imagine several (bad) alternatives to this problem:

a) Abort if the mutex relock throws.

b) Throw different exceptions to inform the user which type of
exception has been thrown, so the user can know if modifying shared
data is allowed.

c) Guarantee that mutex relock won't throw. This would require
allocating resources before releasing the mutex. Since the condition
variable can be used with user-defined mutexes, this would require a
public function/protocol to request an unlock operation that
guarantees no errors when relocking. The last possibility can be
implemented adding something like unlock_and_guarantee_lock(). Sadly,
some errors reported by some POSIX/Windows implementations (for
example, the thread/process holding the mutex has died) should be
ignored.

To sum up: Throwing mutex.lock() means potentially throwing
condition.wait() and we need to state what should the user do if this
happens. A throwing condition variable can be also used as a
cancellation point, where the implementation checks the flag
suggesting thread termination.

------------------------------------
1. task_wrapper/task/future
------------------------------------

1.0 task_wrapper overhead
-------------------------

I find Peter Dimov's approach to transfer values very useful. I needed
some time to understand it correctly, but really makes communication
between the executor thread and the joiner thread (or threads) really
easy. Having a single transfer method for all the executors has also
its drawbacks, because some executors maybe can find a more efficient
way to pass the value to the task/future and they can merge several
operations into one:

One could merge the function object plus the return value in a single
object, avoiding one allocation. For example, in N2094 we have two
executors:

template <class F>
task<typename result_of<F()>::type>
   launch_in_thread(F f)
{
    typedef typename result_of<F()>::type R;
    task<R> t;

    thread(task_wrapper<R, F>(std::move(f), t));

    return t;
}

task_wrapper and task share the return value. In many implementations
it will be implemented using dynamic memory (which will surely use a
mutex to protect the heap), because task_wrapper and task can be
destroyed at any moment. Apart from that, the thread launching will
pass the function object to the thread (using C API) using another
memory allocation (another mutex lock/unlock).

Another alternative is to pass the function object by pointer and wait
until the thread has copied/moved it to its own stack. But this
requires synchronization/context switch and it might be also
expensive.

With the thread pool example:

template <class F>
task<typename result_of<F()>::type>
thread_pool::operator()(F f)
{
    typedef typename result_of<F()>::type R;

    task<R> t;
    {
        exclusive_lock<mutex> lock(mut_);
        queue_.push_back(task_wrapper<R, F>(std::move(f), t));
    }

    cond_.notify_one();
    return t;
}

we have also two allocations, one the internal return value allocation
plus another one since I guess that the queue will be something like
std::queue<function<void()>> and F can be an arbitrary function object
(allocation would be avoided if function<> has an internal buffer).

1.1 timed_wait
-------------------------

task/future have a timed_wait() function, I suppose that a
mutex/condition variable will be used for each task. Creating a
condition/mutex pair for each task can be expensive in some systems
(we can use pooling/caching, though). It will surely require the
construction of a mutex and a condition variable. Previous proposals
(Boost.Thread, for example) only use infinite waiting, which could be
implemented without a condition/mutex pair (a semaphore, for example,
or just using the return value of pthread_join()).

------------------------------------
2. more synchronization utilities
------------------------------------

2.0 semaphore
----------------------

My n2043 questioned if we should standardize semaphores. Since n2094
is the most complete proposal so far, I wanted to question if
semaphores are seen too low-level for a C++ standard.

Semaphore was removed from Boost.Thread because it was considered too
error prone. It claims that the same effect can be achieved with
greater safety by the combination of a mutex and a condition variable.

Certainly, the semaphore interface can be emulated with a condition
and a mutex, but this emulation can't be used for important system
programming tasks, like device driver implementations, signal handling
or interrupt handlers. Apart from this, a semaphore is much more
efficient for notification tasks than a mutex/condition combination.
It's low-level, but we you need it, you badly need it.

Condition variables can't be used in interrupt handlers because
interrupt handlers should never block and thus, they shouldn't acquire
locks. The post() semaphore operation turns out to be the only
synchronization operation that can be safely used in an interrupt
handler. Shared variables protected by locks cannot be safely accessed
in interrupt handlers. Another argument for semaphores, is that they
are natively implemented in most operating systems. A proposed
interface might be:

class semaphore
{
   //non-copyable and non-movable
   semaphore(const semaphore &);
   semaphore &operator =(const semaphore &);

   public:

   semaphore(int initial_count);

   ~semaphore();

   void post();

   void wait();

   bool try_wait();

   bool timed_wait(const absolute_time &abs_time);
};

void post()

    * Effects: Increments the semaphore count. If there are
processes/threads blocked waiting for the semaphore, then one of these
processes will return successfully from its wait function.

void wait()

    * Effects: Decrements the semaphore. If the semaphore value is not
greater than zero, then the calling process/thread blocks until it can
decrement the counter.

bool try_wait()

    * Effects: Decrements the semaphore if the semaphore's value is
greater than zero and returns true. If the value is not greater than
zero returns false.

bool timed_wait(const absolute_time &abs_time)

    * Effects: Decrements the semaphore if the semaphore's value is
greater than zero and returns true. Otherwise, waits for the semaphore
to the posted or the timeout expires. If the timeout expires, the
function returns false. If the semaphore is posted the function
returns true.

I  have no idea about what memory visibility requirements should be
demanded for semaphores ;-)

2.1 message queue
----------------------------

In Boost.Thread discussions, a message queue is requested many times.
Certainly, it's an important communication mechanism. Apart from
memory visibility issues (should send() be an acquire operation and
receive() a release mechanism, or just full barriers?) I think that we
should provide a higher-level communication mechanisms than
mutex/condition. Basically we can follow POSIX mqueue interface and
propose:

class message_queue
{
   public:

   message_queue(std::size_t max_num_msg,  std::size_t max_msg_size);

   //Send functions. We can specify a priority, so higher priority
(lower number) messages
   //are picked before lower priority ones.
   void send (const void *buffer,     std::size_t buffer_size,
unsigned int priority);

   bool try_send    (const void *buffer,  std::size_t buffer_size,
unsigned int priority);

   bool timed_send    (const void *buffer,     std::size_t buffer_size,
                           unsigned int priority,  const
absolute_time& abs_time);

   //Receive functions.
   void receive (void *buffer,           std::size_t buffer_size,
                 std::size_t &recvd_size,unsigned int &priority);

   bool try_receive (void *buffer,           std::size_t buffer_size,
                     std::size_t &recvd_size,unsigned int &priority);

   bool timed_receive (void *buffer,           std::size_t buffer_size,
                       std::size_t &recvd_size,unsigned int &priority,
                       const absolute_time &abs_time);

   //Get the maximum number of messages
   std::size_t get_max_msg() const;

   //Get the maximum size of a messages
   std::size_t get_max_msg_size() const;

   //Get the current message count
   std::size_t get_num_msg();
};

We can also add more system V-like functions like the possibility to
choose a priority when receiving. We can also add a "source"
identifier so that a user can identify the sender with a number. Or
just allow sending raw bytes and let the user implement these
features. Implementing the presented interface above mutex/condition
variables it's easy.

------------------------------------
EOF
------------------------------------

Regards,

Ion
-------------- next part --------------
A non-text attachment was scrubbed...
Name: n2094.zip
Type: application/zip
Size: 8792 bytes
Desc: not available
Url : http://www.decadentplace.org.uk/pipermail/cpp-threads/attachments/20060929/d7757cdf/n2094.zip


More information about the cpp-threads mailing list