[cpp-threads] Thread API interface tweaks

Peter Dimov pdimov at mmltd.net
Mon Aug 28 18:59:27 BST 2006


Howard Hinnant wrote:
> On Aug 27, 2006, at 3:09 PM, Peter Dimov wrote:
>
>> Cancellation complicates matters a bit, and so does "multiple wait"
>> support, and I haven't implemented these yet.
>
> I like the idea of a completely layered approach that is non-
> intrusive of the thread (single owner) model.  Indeed this is what I
> was trying to implement.  But I haven't figured out how to implement
> it yet.  Specifically the "multiple wait", or even really the "single
> wait", has me stumped for the moment.  I studied your outline but
> don't yet understand how a future can be created, linked to the
> return value of a thread, but still have the choice of blocking on it
> or not (and be copyable).

In my proposed model, threads have no return value. The mechanism for 
transporting a return value of type R or an exception from point A to point 
B is a future<R>. Points A and B can be two separate threads, but futures 
are not tied to this specific use case; point B can be a remote RPC 
endpoint, for example.

A future<R> starts out empty. When someone calls set_value( r ), it is 
stored. When someone calls operator(), the stored value is returned. If 
there is no value yet, operator() blocks.

Spawning a function object with a return value in a separate thread is 
performed with the help of a wrapper that executes this function object and 
then puts its return value in the provided future. I call this 
future_wrapper in the code, but task<> seems much more appropriate, so I'll 
switch to that name.

This is completely independent of the thread spawning API; it only needs to 
support execution. Return values and exceptions are the domain of future<> 
and task<>. Here's threaded_executor again (with future_wrapper renamed to 
task):

class threaded_executor
{
public:

    template<class Fn> future< typename boost::result_of<Fn()>::type >
execute( Fn fn )
    {
        typedef typename boost::result_of<Fn()>::type R;

        future<R> ft;
        task<Fn, R> fw( fn, ft );

        boost::thread th( fw );

        return ft;
    }
};

future<R> provides storage for the return value (or the exception). task<Fn, 
R> wraps Fn and puts its return value into the future. boost::thread spawns 
a thread that executes the task. The future (it has full reference 
semantics) is returned to the caller, who can use wait(), timed_wait() or 
operator(), as needed.

I'll try to prepare a paper for Portland.

Here's my future<> sketchy implementation for reference:


template<class R> class future_impl;

template<class E, class R> void throw_exception_0( future_impl<R> * 
/*this_*/ )
{
    throw E();
}

template<class E, class R> void throw_exception_1( future_impl<R> * this_ )
{
    throw E( this_->what_ );
}

template<class R> class future_impl
{
private:

    bool ready_;
    boost::optional<R> r_;

    mutable boost::mutex mx_;
    boost::condition cn_;

    std::string what_;

    typedef void (*throw_exception_type) ( future_impl<R> * this_ );

    throw_exception_type throw_exception_;

    future_impl( future_impl const & );
    future_impl & operator=( future_impl const & );

    template<class E, class R> friend void throw_exception_1( future_impl<R> 
* this_ );

public:

    future_impl(): ready_( false ), throw_exception_( 0 )
    {
    }

    bool ready() const // throw()
    {
        boost::mutex::scoped_lock lock( mx_ );
        return ready_;
    }

    void wait() // throw( thread::cancelation_exception )
    {
        boost::mutex::scoped_lock lock( mx_ );

        while( !ready_ )
        {
            cn_.wait( lock );
        }
    }

    bool timed_wait( boost::xtime const & xt ) // throw( 
thread::cancelation_exception )
    {
        boost::mutex::scoped_lock lock( mx_ );

        while( !ready_ )
        {
            if( !cn_.timed_wait( lock ) ) return false;
        }

        return true;
    }

    R const & operator()() // throw( thread::cancelation_exception, ... )
    {
        wait();

        if( throw_exception_ != 0 )
        {
            throw_exception_( this );
        }

        assert( r_.is_initialized() );
        return r_.get();
    }

    void set_value( R const & r )
    {
        boost::mutex::scoped_lock lock( mx_ );

        assert( !ready_ );
        assert( !r_.is_initialized() );
        assert( throw_exception_ == 0 );

        r_.reset( r );

        ready_ = true;
        cn_.notify_all();
    }

    template<class E> void set_exception() // throw()
    {
        boost::mutex::scoped_lock lock( mx_ );

        assert( !ready_ );
        assert( !r_.is_initialized() );
        assert( throw_exception_ == 0 );

        throw_exception_ = &throw_exception_0<E, R>;

        ready_ = true;
        cn_.notify_all();
    }

    template<class E> void set_exception( char const * what ) // throw()
    {
        boost::mutex::scoped_lock lock( mx_ );

        assert( !ready_ );
        assert( !r_.is_initialized() );
        assert( throw_exception_ == 0 );

        try
        {
            what_ = what;
            throw_exception_ = &throw_exception_1<E, R>;
        }
        catch( std::bad_alloc const & )
        {
            throw_exception_ = &throw_exception_0<std::bad_alloc, R>;
        }
        catch( ... )
        {
            throw_exception_ = &throw_exception_0<std::bad_exception, R>;
        }

        ready_ = true;
        cn_.notify_all();
    }
};

template<class R> class future
{
private:

    boost::shared_ptr< future_impl<R> > pi_;

public:

    future(): pi_( new future_impl<R>() )
    {
    }

    bool ready() const
    {
        return pi_->ready();
    }

    void wait()
    {
        pi_->wait();
    }

    bool timed_wait( boost::xtime const & xt )
    {
        return pi_->timed_wait( xt );
    }

    R const & operator()() const
    {
        return pi_->operator()();
    }

    void set_value( R const & r )
    {
        pi_->set_value( r );
    }

    template<class E> void set_exception() // throw()
    {
        pi_->template set_exception<E>();
    }

    template<class E> void set_exception( char const * what ) // throw()
    {
        pi_->template set_exception<E>( what );
    }
};

template<class R, class Fn> class future_wrapper
{
private:

    Fn fn_;
    future<R> ft_;

public:

    future_wrapper( Fn fn, future<R> const & ft ): fn_( fn ), ft_( ft )
    {
    }

    void operator()() // throw()
    {
        try
        {
            ft_.set_value( fn_() );
        }
        catch( std::bad_alloc const & )
        {
            ft_.template set_exception<std::bad_alloc>();
        }
        catch( std::logic_error const & x )
        {
            ft_.template set_exception<std::logic_error>( x.what() );
        }
        catch( std::exception const & x )
        {
            ft_.template set_exception<std::runtime_error>( x.what() );
        }
        catch( ... )
        {
            ft_.template set_exception<std::bad_exception>();
        }
    }
};




More information about the cpp-threads mailing list