[cpp-threads] Thread API interface tweaks
Peter Dimov
pdimov at mmltd.net
Mon Aug 28 18:59:27 BST 2006
Howard Hinnant wrote:
> On Aug 27, 2006, at 3:09 PM, Peter Dimov wrote:
>
>> Cancellation complicates matters a bit, and so does "multiple wait"
>> support, and I haven't implemented these yet.
>
> I like the idea of a completely layered approach that is non-
> intrusive of the thread (single owner) model. Indeed this is what I
> was trying to implement. But I haven't figured out how to implement
> it yet. Specifically the "multiple wait", or even really the "single
> wait", has me stumped for the moment. I studied your outline but
> don't yet understand how a future can be created, linked to the
> return value of a thread, but still have the choice of blocking on it
> or not (and be copyable).
In my proposed model, threads have no return value. The mechanism for
transporting a return value of type R or an exception from point A to point
B is a future<R>. Points A and B can be two separate threads, but futures
are not tied to this specific use case; point B can be a remote RPC
endpoint, for example.
A future<R> starts out empty. When someone calls set_value( r ), it is
stored. When someone calls operator(), the stored value is returned. If
there is no value yet, operator() blocks.
Spawning a function object with a return value in a separate thread is
performed with the help of a wrapper that executes this function object and
then puts its return value in the provided future. I call this
future_wrapper in the code, but task<> seems much more appropriate, so I'll
switch to that name.
This is completely independent of the thread spawning API; it only needs to
support execution. Return values and exceptions are the domain of future<>
and task<>. Here's threaded_executor again (with future_wrapper renamed to
task):
class threaded_executor
{
public:
template<class Fn> future< typename boost::result_of<Fn()>::type >
execute( Fn fn )
{
typedef typename boost::result_of<Fn()>::type R;
future<R> ft;
task<Fn, R> fw( fn, ft );
boost::thread th( fw );
return ft;
}
};
future<R> provides storage for the return value (or the exception). task<Fn,
R> wraps Fn and puts its return value into the future. boost::thread spawns
a thread that executes the task. The future (it has full reference
semantics) is returned to the caller, who can use wait(), timed_wait() or
operator(), as needed.
I'll try to prepare a paper for Portland.
Here's my future<> sketchy implementation for reference:
template<class R> class future_impl;
template<class E, class R> void throw_exception_0( future_impl<R> *
/*this_*/ )
{
throw E();
}
template<class E, class R> void throw_exception_1( future_impl<R> * this_ )
{
throw E( this_->what_ );
}
template<class R> class future_impl
{
private:
bool ready_;
boost::optional<R> r_;
mutable boost::mutex mx_;
boost::condition cn_;
std::string what_;
typedef void (*throw_exception_type) ( future_impl<R> * this_ );
throw_exception_type throw_exception_;
future_impl( future_impl const & );
future_impl & operator=( future_impl const & );
template<class E, class R> friend void throw_exception_1( future_impl<R>
* this_ );
public:
future_impl(): ready_( false ), throw_exception_( 0 )
{
}
bool ready() const // throw()
{
boost::mutex::scoped_lock lock( mx_ );
return ready_;
}
void wait() // throw( thread::cancelation_exception )
{
boost::mutex::scoped_lock lock( mx_ );
while( !ready_ )
{
cn_.wait( lock );
}
}
bool timed_wait( boost::xtime const & xt ) // throw(
thread::cancelation_exception )
{
boost::mutex::scoped_lock lock( mx_ );
while( !ready_ )
{
if( !cn_.timed_wait( lock ) ) return false;
}
return true;
}
R const & operator()() // throw( thread::cancelation_exception, ... )
{
wait();
if( throw_exception_ != 0 )
{
throw_exception_( this );
}
assert( r_.is_initialized() );
return r_.get();
}
void set_value( R const & r )
{
boost::mutex::scoped_lock lock( mx_ );
assert( !ready_ );
assert( !r_.is_initialized() );
assert( throw_exception_ == 0 );
r_.reset( r );
ready_ = true;
cn_.notify_all();
}
template<class E> void set_exception() // throw()
{
boost::mutex::scoped_lock lock( mx_ );
assert( !ready_ );
assert( !r_.is_initialized() );
assert( throw_exception_ == 0 );
throw_exception_ = &throw_exception_0<E, R>;
ready_ = true;
cn_.notify_all();
}
template<class E> void set_exception( char const * what ) // throw()
{
boost::mutex::scoped_lock lock( mx_ );
assert( !ready_ );
assert( !r_.is_initialized() );
assert( throw_exception_ == 0 );
try
{
what_ = what;
throw_exception_ = &throw_exception_1<E, R>;
}
catch( std::bad_alloc const & )
{
throw_exception_ = &throw_exception_0<std::bad_alloc, R>;
}
catch( ... )
{
throw_exception_ = &throw_exception_0<std::bad_exception, R>;
}
ready_ = true;
cn_.notify_all();
}
};
template<class R> class future
{
private:
boost::shared_ptr< future_impl<R> > pi_;
public:
future(): pi_( new future_impl<R>() )
{
}
bool ready() const
{
return pi_->ready();
}
void wait()
{
pi_->wait();
}
bool timed_wait( boost::xtime const & xt )
{
return pi_->timed_wait( xt );
}
R const & operator()() const
{
return pi_->operator()();
}
void set_value( R const & r )
{
pi_->set_value( r );
}
template<class E> void set_exception() // throw()
{
pi_->template set_exception<E>();
}
template<class E> void set_exception( char const * what ) // throw()
{
pi_->template set_exception<E>( what );
}
};
template<class R, class Fn> class future_wrapper
{
private:
Fn fn_;
future<R> ft_;
public:
future_wrapper( Fn fn, future<R> const & ft ): fn_( fn ), ft_( ft )
{
}
void operator()() // throw()
{
try
{
ft_.set_value( fn_() );
}
catch( std::bad_alloc const & )
{
ft_.template set_exception<std::bad_alloc>();
}
catch( std::logic_error const & x )
{
ft_.template set_exception<std::logic_error>( x.what() );
}
catch( std::exception const & x )
{
ft_.template set_exception<std::runtime_error>( x.what() );
}
catch( ... )
{
ft_.template set_exception<std::bad_exception>();
}
}
};
More information about the cpp-threads
mailing list