Ticket #3862: futures-patch.hpp

File futures-patch.hpp, 37.9 KB (added by dauphic@…, 13 years ago)

Patched futures.hpp

Line 
1// (C) Copyright 2008-9 Anthony Williams
2//
3// Distributed under the Boost Software License, Version 1.0. (See
4// accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#ifndef BOOST_THREAD_FUTURE_HPP
8#define BOOST_THREAD_FUTURE_HPP
9#include <stdexcept>
10#include <boost/thread/detail/move.hpp>
11#include <boost/thread/thread_time.hpp>
12#include <boost/exception_ptr.hpp>
13#include <boost/shared_ptr.hpp>
14#include <boost/scoped_ptr.hpp>
15#include <boost/type_traits/is_fundamental.hpp>
16#include <boost/type_traits/is_convertible.hpp>
17#include <boost/mpl/if.hpp>
18#include <boost/config.hpp>
19#include <boost/throw_exception.hpp>
20#include <algorithm>
21#include <boost/function.hpp>
22#include <boost/bind.hpp>
23#include <boost/ref.hpp>
24#include <boost/scoped_array.hpp>
25#include <boost/utility/enable_if.hpp>
26#include <list>
27#include <boost/next_prior.hpp>
28#include <vector>
29
30namespace boost
31{
32 class future_uninitialized:
33 public std::logic_error
34 {
35 public:
36 future_uninitialized():
37 std::logic_error("Future Uninitialized")
38 {}
39 };
40 class broken_promise:
41 public std::logic_error
42 {
43 public:
44 broken_promise():
45 std::logic_error("Broken promise")
46 {}
47 };
48 class future_already_retrieved:
49 public std::logic_error
50 {
51 public:
52 future_already_retrieved():
53 std::logic_error("Future already retrieved")
54 {}
55 };
56 class promise_already_satisfied:
57 public std::logic_error
58 {
59 public:
60 promise_already_satisfied():
61 std::logic_error("Promise already satisfied")
62 {}
63 };
64
65 class task_already_started:
66 public std::logic_error
67 {
68 public:
69 task_already_started():
70 std::logic_error("Task already started")
71 {}
72 };
73
74 class task_moved:
75 public std::logic_error
76 {
77 public:
78 task_moved():
79 std::logic_error("Task moved")
80 {}
81 };
82
83 namespace future_state
84 {
85 enum state { uninitialized, waiting, ready, moved };
86 }
87
88 namespace detail
89 {
90 struct future_object_base
91 {
92 boost::exception_ptr exception;
93 bool done;
94 boost::mutex mutex;
95 boost::condition_variable waiters;
96 typedef std::list<boost::condition_variable_any*> waiter_list;
97 waiter_list external_waiters;
98 boost::function<void()> callback;
99
100 future_object_base():
101 done(false)
102 {}
103 virtual ~future_object_base()
104 {}
105
106 waiter_list::iterator register_external_waiter(boost::condition_variable_any& cv)
107 {
108 boost::unique_lock<boost::mutex> lock(mutex);
109 do_callback(lock);
110 return external_waiters.insert(external_waiters.end(),&cv);
111 }
112
113 void remove_external_waiter(waiter_list::iterator it)
114 {
115 boost::lock_guard<boost::mutex> lock(mutex);
116 external_waiters.erase(it);
117 }
118
119 void mark_finished_internal()
120 {
121 done=true;
122 waiters.notify_all();
123 for(waiter_list::const_iterator it=external_waiters.begin(),
124 end=external_waiters.end();it!=end;++it)
125 {
126 (*it)->notify_all();
127 }
128 }
129
130 struct relocker
131 {
132 boost::unique_lock<boost::mutex>& lock;
133
134 relocker(boost::unique_lock<boost::mutex>& lock_):
135 lock(lock_)
136 {
137 lock.unlock();
138 }
139 ~relocker()
140 {
141 lock.lock();
142 }
143 };
144
145 void do_callback(boost::unique_lock<boost::mutex>& lock)
146 {
147 if(callback && !done)
148 {
149 boost::function<void()> local_callback=callback;
150 relocker relock(lock);
151 local_callback();
152 }
153 }
154
155
156 void wait(bool rethrow=true)
157 {
158 boost::unique_lock<boost::mutex> lock(mutex);
159 do_callback(lock);
160 while(!done)
161 {
162 waiters.wait(lock);
163 }
164 if(rethrow && exception)
165 {
166 boost::rethrow_exception(exception);
167 }
168 }
169
170 bool timed_wait_until(boost::system_time const& target_time)
171 {
172 boost::unique_lock<boost::mutex> lock(mutex);
173 do_callback(lock);
174 while(!done)
175 {
176 bool const success=waiters.timed_wait(lock,target_time);
177 if(!success && !done)
178 {
179 return false;
180 }
181 }
182 return true;
183 }
184
185 void mark_exceptional_finish_internal(boost::exception_ptr const& e)
186 {
187 exception=e;
188 mark_finished_internal();
189 }
190 void mark_exceptional_finish()
191 {
192 boost::lock_guard<boost::mutex> lock(mutex);
193 mark_exceptional_finish_internal(boost::current_exception());
194 }
195
196 bool has_value()
197 {
198 boost::lock_guard<boost::mutex> lock(mutex);
199 return done && !exception;
200 }
201 bool has_exception()
202 {
203 boost::lock_guard<boost::mutex> lock(mutex);
204 return done && exception;
205 }
206
207 template<typename F,typename U>
208 void set_wait_callback(F f,U* u)
209 {
210 callback=boost::bind(f,boost::ref(*u));
211 }
212
213 private:
214 future_object_base(future_object_base const&);
215 future_object_base& operator=(future_object_base const&);
216 };
217
218 template<typename T>
219 struct future_traits
220 {
221 typedef boost::scoped_ptr<T> storage_type;
222#ifdef BOOST_HAS_RVALUE_REFS
223 typedef T const& source_reference_type;
224 struct dummy;
225 typedef typename boost::mpl::if_<boost::is_fundamental<T>,dummy&,T&&>::type rvalue_source_type;
226 typedef typename boost::mpl::if_<boost::is_fundamental<T>,T,T&&>::type move_dest_type;
227#else
228 typedef T& source_reference_type;
229 typedef typename boost::mpl::if_<boost::is_convertible<T&,boost::detail::thread_move_t<T> >,boost::detail::thread_move_t<T>,T const&>::type rvalue_source_type;
230 typedef typename boost::mpl::if_<boost::is_convertible<T&,boost::detail::thread_move_t<T> >,boost::detail::thread_move_t<T>,T>::type move_dest_type;
231#endif
232
233 static void init(storage_type& storage,source_reference_type t)
234 {
235 storage.reset(new T(t));
236 }
237
238 static void init(storage_type& storage,rvalue_source_type t)
239 {
240 storage.reset(new T(static_cast<rvalue_source_type>(t)));
241 }
242
243 static void cleanup(storage_type& storage)
244 {
245 storage.reset();
246 }
247 };
248
249 template<typename T>
250 struct future_traits<T&>
251 {
252 typedef T* storage_type;
253 typedef T& source_reference_type;
254 struct rvalue_source_type
255 {};
256 typedef T& move_dest_type;
257
258 static void init(storage_type& storage,T& t)
259 {
260 storage=&t;
261 }
262
263 static void cleanup(storage_type& storage)
264 {
265 storage=0;
266 }
267 };
268
269 template<>
270 struct future_traits<void>
271 {
272 typedef bool storage_type;
273 typedef void move_dest_type;
274
275 static void init(storage_type& storage)
276 {
277 storage=true;
278 }
279
280 static void cleanup(storage_type& storage)
281 {
282 storage=false;
283 }
284
285 };
286
287 template<typename T>
288 struct future_object:
289 detail::future_object_base
290 {
291 typedef typename future_traits<T>::storage_type storage_type;
292 typedef typename future_traits<T>::source_reference_type source_reference_type;
293 typedef typename future_traits<T>::rvalue_source_type rvalue_source_type;
294 typedef typename future_traits<T>::move_dest_type move_dest_type;
295
296 storage_type result;
297
298 future_object():
299 result(0)
300 {}
301
302 void mark_finished_with_result_internal(source_reference_type result_)
303 {
304 future_traits<T>::init(result,result_);
305 mark_finished_internal();
306 }
307 void mark_finished_with_result_internal(rvalue_source_type result_)
308 {
309 future_traits<T>::init(result,static_cast<rvalue_source_type>(result_));
310 mark_finished_internal();
311 }
312
313 void mark_finished_with_result(source_reference_type result_)
314 {
315 boost::lock_guard<boost::mutex> lock(mutex);
316 mark_finished_with_result_internal(result_);
317 }
318 void mark_finished_with_result(rvalue_source_type result_)
319 {
320 boost::lock_guard<boost::mutex> lock(mutex);
321 mark_finished_with_result_internal(result_);
322 }
323
324 move_dest_type get()
325 {
326 wait();
327 return *result;
328 }
329
330 future_state::state get_state()
331 {
332 boost::lock_guard<boost::mutex> guard(mutex);
333 if(!done)
334 {
335 return future_state::waiting;
336 }
337 else
338 {
339 return future_state::ready;
340 }
341 }
342
343 private:
344 future_object(future_object const&);
345 future_object& operator=(future_object const&);
346 };
347
348 template<>
349 struct future_object<void>:
350 detail::future_object_base
351 {
352 future_object()
353 {}
354
355 void mark_finished_with_result_internal()
356 {
357 mark_finished_internal();
358 }
359
360 void mark_finished_with_result()
361 {
362 boost::lock_guard<boost::mutex> lock(mutex);
363 mark_finished_with_result_internal();
364 }
365
366 void get()
367 {
368 wait();
369 }
370
371 future_state::state get_state()
372 {
373 boost::lock_guard<boost::mutex> guard(mutex);
374 if(!done)
375 {
376 return future_state::waiting;
377 }
378 else
379 {
380 return future_state::ready;
381 }
382 }
383
384 private:
385 future_object(future_object const&);
386 future_object& operator=(future_object const&);
387 };
388
389 class future_waiter
390 {
391 struct registered_waiter
392 {
393 boost::shared_ptr<detail::future_object_base> future;
394 detail::future_object_base::waiter_list::iterator wait_iterator;
395 unsigned index;
396
397 registered_waiter(boost::shared_ptr<detail::future_object_base> const& future_,
398 detail::future_object_base::waiter_list::iterator wait_iterator_,
399 unsigned index_):
400 future(future_),wait_iterator(wait_iterator_),index(index_)
401 {}
402
403 };
404
405 struct all_futures_lock
406 {
407 std::vector<registered_waiter>::size_type count;
408 boost::scoped_array<boost::unique_lock<boost::mutex> > locks;
409
410 all_futures_lock(std::vector<registered_waiter>& futures):
411 count(futures.size()),locks(new boost::unique_lock<boost::mutex>[count])
412 {
413 for (std::vector<registered_waiter>::size_type i=0;i<count;++i)
414 {
415 locks[i]=boost::unique_lock<boost::mutex>(futures[i].future->mutex);
416 }
417 }
418
419 void lock()
420 {
421 boost::lock(locks.get(),locks.get()+count);
422 }
423
424 void unlock()
425 {
426 for (std::vector<registered_waiter>::size_type i=0;i<count;++i)
427 {
428 locks[i].unlock();
429 }
430 }
431 };
432
433 boost::condition_variable_any cv;
434 std::vector<registered_waiter> futures;
435 unsigned future_count;
436
437 public:
438 future_waiter():
439 future_count(0)
440 {}
441
442 template<typename F>
443 void add(F& f)
444 {
445 if(f.future)
446 {
447 futures.push_back(registered_waiter(f.future,f.future->register_external_waiter(cv),future_count));
448 }
449 ++future_count;
450 }
451
452 unsigned wait()
453 {
454 all_futures_lock lk(futures);
455 for(;;)
456 {
457 for(unsigned i=0;i<futures.size();++i)
458 {
459 if(futures[i].future->done)
460 {
461 return futures[i].index;
462 }
463 }
464 cv.wait(lk);
465 }
466 }
467
468 ~future_waiter()
469 {
470 for(unsigned i=0;i<futures.size();++i)
471 {
472 futures[i].future->remove_external_waiter(futures[i].wait_iterator);
473 }
474 }
475
476 };
477
478 }
479
480 template <typename R>
481 class unique_future;
482
483 template <typename R>
484 class shared_future;
485
486 template<typename T>
487 struct is_future_type
488 {
489 BOOST_STATIC_CONSTANT(bool, value=false);
490 };
491
492 template<typename T>
493 struct is_future_type<unique_future<T> >
494 {
495 BOOST_STATIC_CONSTANT(bool, value=true);
496 };
497
498 template<typename T>
499 struct is_future_type<shared_future<T> >
500 {
501 BOOST_STATIC_CONSTANT(bool, value=true);
502 };
503
504 template<typename Iterator>
505 typename boost::disable_if<is_future_type<Iterator>,void>::type wait_for_all(Iterator begin,Iterator end)
506 {
507 for(Iterator current=begin;current!=end;++current)
508 {
509 current->wait();
510 }
511 }
512
513 template<typename F1,typename F2>
514 typename boost::enable_if<is_future_type<F1>,void>::type wait_for_all(F1& f1,F2& f2)
515 {
516 f1.wait();
517 f2.wait();
518 }
519
520 template<typename F1,typename F2,typename F3>
521 void wait_for_all(F1& f1,F2& f2,F3& f3)
522 {
523 f1.wait();
524 f2.wait();
525 f3.wait();
526 }
527
528 template<typename F1,typename F2,typename F3,typename F4>
529 void wait_for_all(F1& f1,F2& f2,F3& f3,F4& f4)
530 {
531 f1.wait();
532 f2.wait();
533 f3.wait();
534 f4.wait();
535 }
536
537 template<typename F1,typename F2,typename F3,typename F4,typename F5>
538 void wait_for_all(F1& f1,F2& f2,F3& f3,F4& f4,F5& f5)
539 {
540 f1.wait();
541 f2.wait();
542 f3.wait();
543 f4.wait();
544 f5.wait();
545 }
546
547 template<typename Iterator>
548 typename boost::disable_if<is_future_type<Iterator>,Iterator>::type wait_for_any(Iterator begin,Iterator end)
549 {
550 detail::future_waiter waiter;
551 for(Iterator current=begin;current!=end;++current)
552 {
553 waiter.add(*current);
554 }
555 return boost::next(begin,waiter.wait());
556 }
557
558 template<typename F1,typename F2>
559 typename boost::enable_if<is_future_type<F1>,unsigned>::type wait_for_any(F1& f1,F2& f2)
560 {
561 detail::future_waiter waiter;
562 waiter.add(f1);
563 waiter.add(f2);
564 return waiter.wait();
565 }
566
567 template<typename F1,typename F2,typename F3>
568 unsigned wait_for_any(F1& f1,F2& f2,F3& f3)
569 {
570 detail::future_waiter waiter;
571 waiter.add(f1);
572 waiter.add(f2);
573 waiter.add(f3);
574 return waiter.wait();
575 }
576
577 template<typename F1,typename F2,typename F3,typename F4>
578 unsigned wait_for_any(F1& f1,F2& f2,F3& f3,F4& f4)
579 {
580 detail::future_waiter waiter;
581 waiter.add(f1);
582 waiter.add(f2);
583 waiter.add(f3);
584 waiter.add(f4);
585 return waiter.wait();
586 }
587
588 template<typename F1,typename F2,typename F3,typename F4,typename F5>
589 unsigned wait_for_any(F1& f1,F2& f2,F3& f3,F4& f4,F5& f5)
590 {
591 detail::future_waiter waiter;
592 waiter.add(f1);
593 waiter.add(f2);
594 waiter.add(f3);
595 waiter.add(f4);
596 waiter.add(f5);
597 return waiter.wait();
598 }
599
600 template <typename R>
601 class promise;
602
603 template <typename R>
604 class packaged_task;
605
606 template <typename R>
607 class unique_future
608 {
609 unique_future(unique_future & rhs);// = delete;
610 unique_future& operator=(unique_future& rhs);// = delete;
611
612 typedef boost::shared_ptr<detail::future_object<R> > future_ptr;
613
614 future_ptr future;
615
616 friend class shared_future<R>;
617 friend class promise<R>;
618 friend class packaged_task<R>;
619 friend class detail::future_waiter;
620
621 typedef typename detail::future_traits<R>::move_dest_type move_dest_type;
622
623 unique_future(future_ptr future_):
624 future(future_)
625 {}
626
627 public:
628 typedef future_state::state state;
629
630 unique_future()
631 {}
632
633 ~unique_future()
634 {}
635
636#ifdef BOOST_HAS_RVALUE_REFS
637 unique_future(unique_future && other)
638 {
639 future.swap(other.future);
640 }
641 unique_future& operator=(unique_future && other)
642 {
643 future=other.future;
644 other.future.reset();
645 return *this;
646 }
647#else
648 unique_future(boost::detail::thread_move_t<unique_future> other):
649 future(other->future)
650 {
651 other->future.reset();
652 }
653
654 unique_future& operator=(boost::detail::thread_move_t<unique_future> other)
655 {
656 future=other->future;
657 other->future.reset();
658 return *this;
659 }
660
661 operator boost::detail::thread_move_t<unique_future>()
662 {
663 return boost::detail::thread_move_t<unique_future>(*this);
664 }
665#endif
666
667 void swap(unique_future& other)
668 {
669 future.swap(other.future);
670 }
671
672 // retrieving the value
673 move_dest_type get()
674 {
675 if(!future)
676 {
677 boost::throw_exception(future_uninitialized());
678 }
679
680 return future->get();
681 }
682
683 // functions to check state, and wait for ready
684 state get_state() const
685 {
686 if(!future)
687 {
688 return future_state::uninitialized;
689 }
690 return future->get_state();
691 }
692
693
694 bool is_ready() const
695 {
696 return get_state()==future_state::ready;
697 }
698
699 bool has_exception() const
700 {
701 return future && future->has_exception();
702 }
703
704 bool has_value() const
705 {
706 return future && future->has_value();
707 }
708
709 void wait() const
710 {
711 if(!future)
712 {
713 boost::throw_exception(future_uninitialized());
714 }
715 future->wait(false);
716 }
717
718 template<typename Duration>
719 bool timed_wait(Duration const& rel_time) const
720 {
721 return timed_wait_until(boost::get_system_time()+rel_time);
722 }
723
724 bool timed_wait_until(boost::system_time const& abs_time) const
725 {
726 if(!future)
727 {
728 boost::throw_exception(future_uninitialized());
729 }
730 return future->timed_wait_until(abs_time);
731 }
732
733 };
734
735 template <typename R>
736 class shared_future
737 {
738 typedef boost::shared_ptr<detail::future_object<R> > future_ptr;
739
740 future_ptr future;
741
742// shared_future(const unique_future<R>& other);
743// shared_future& operator=(const unique_future<R>& other);
744
745 friend class detail::future_waiter;
746 friend class promise<R>;
747 friend class packaged_task<R>;
748
749 shared_future(future_ptr future_):
750 future(future_)
751 {}
752
753 public:
754 shared_future(shared_future const& other):
755 future(other.future)
756 {}
757
758 typedef future_state::state state;
759
760 shared_future()
761 {}
762
763 ~shared_future()
764 {}
765
766 shared_future& operator=(shared_future const& other)
767 {
768 future=other.future;
769 return *this;
770 }
771#ifdef BOOST_HAS_RVALUE_REFS
772 shared_future(shared_future && other)
773 {
774 future.swap(other.future);
775 }
776 shared_future(unique_future<R> && other)
777 {
778 future.swap(other.future);
779 }
780 shared_future& operator=(shared_future && other)
781 {
782 future.swap(other.future);
783 other.future.reset();
784 return *this;
785 }
786 shared_future& operator=(unique_future<R> && other)
787 {
788 future.swap(other.future);
789 other.future.reset();
790 return *this;
791 }
792#else
793 shared_future(boost::detail::thread_move_t<shared_future> other):
794 future(other->future)
795 {
796 other->future.reset();
797 }
798// shared_future(const unique_future<R> &) = delete;
799 shared_future(boost::detail::thread_move_t<unique_future<R> > other):
800 future(other->future)
801 {
802 other->future.reset();
803 }
804 shared_future& operator=(boost::detail::thread_move_t<shared_future> other)
805 {
806 future.swap(other->future);
807 other->future.reset();
808 return *this;
809 }
810 shared_future& operator=(boost::detail::thread_move_t<unique_future<R> > other)
811 {
812 future.swap(other->future);
813 other->future.reset();
814 return *this;
815 }
816
817 operator boost::detail::thread_move_t<shared_future>()
818 {
819 return boost::detail::thread_move_t<shared_future>(*this);
820 }
821
822#endif
823
824 void swap(shared_future& other)
825 {
826 future.swap(other.future);
827 }
828
829 // retrieving the value
830 R get()
831 {
832 if(!future)
833 {
834 boost::throw_exception(future_uninitialized());
835 }
836
837 return future->get();
838 }
839
840 // functions to check state, and wait for ready
841 state get_state() const
842 {
843 if(!future)
844 {
845 return future_state::uninitialized;
846 }
847 return future->get_state();
848 }
849
850
851 bool is_ready() const
852 {
853 return get_state()==future_state::ready;
854 }
855
856 bool has_exception() const
857 {
858 return future && future->has_exception();
859 }
860
861 bool has_value() const
862 {
863 return future && future->has_value();
864 }
865
866 void wait() const
867 {
868 if(!future)
869 {
870 boost::throw_exception(future_uninitialized());
871 }
872 future->wait(false);
873 }
874
875 template<typename Duration>
876 bool timed_wait(Duration const& rel_time) const
877 {
878 return timed_wait_until(boost::get_system_time()+rel_time);
879 }
880
881 bool timed_wait_until(boost::system_time const& abs_time) const
882 {
883 if(!future)
884 {
885 boost::throw_exception(future_uninitialized());
886 }
887 return future->timed_wait_until(abs_time);
888 }
889
890 };
891
892 template <typename R>
893 class promise
894 {
895 typedef boost::shared_ptr<detail::future_object<R> > future_ptr;
896
897 future_ptr future;
898 bool future_obtained;
899
900 promise(promise & rhs);// = delete;
901 promise & operator=(promise & rhs);// = delete;
902
903 void lazy_init()
904 {
905 if(!future)
906 {
907 future_obtained=false;
908 future.reset(new detail::future_object<R>);
909 }
910 }
911
912 public:
913// template <class Allocator> explicit promise(Allocator a);
914
915 promise():
916 future(),future_obtained(false)
917 {}
918
919 ~promise()
920 {
921 if(future)
922 {
923 boost::lock_guard<boost::mutex> lock(future->mutex);
924
925 if(!future->done)
926 {
927 future->mark_exceptional_finish_internal(boost::copy_exception(broken_promise()));
928 }
929 }
930 }
931
932 // Assignment
933#ifdef BOOST_HAS_RVALUE_REFS
934 promise(promise && rhs):
935 future_obtained(rhs.future_obtained)
936 {
937 future.swap(rhs.future);
938 }
939 promise & operator=(promise&& rhs)
940 {
941 future.swap(rhs.future);
942 future_obtained=rhs.future_obtained;
943 rhs.future.reset();
944 return *this;
945 }
946#else
947 promise(boost::detail::thread_move_t<promise> rhs):
948 future(rhs->future),future_obtained(rhs->future_obtained)
949 {
950 rhs->future.reset();
951 }
952 promise & operator=(boost::detail::thread_move_t<promise> rhs)
953 {
954 future=rhs->future;
955 future_obtained=rhs->future_obtained;
956 rhs->future.reset();
957 return *this;
958 }
959
960 operator boost::detail::thread_move_t<promise>()
961 {
962 return boost::detail::thread_move_t<promise>(*this);
963 }
964#endif
965
966 void swap(promise& other)
967 {
968 future.swap(other.future);
969 std::swap(future_obtained,other.future_obtained);
970 }
971
972 // Result retrieval
973 unique_future<R> get_future()
974 {
975 lazy_init();
976 if(future_obtained)
977 {
978 boost::throw_exception(future_already_retrieved());
979 }
980 future_obtained=true;
981 return unique_future<R>(future);
982 }
983
984 void set_value(typename detail::future_traits<R>::source_reference_type r)
985 {
986 lazy_init();
987 boost::lock_guard<boost::mutex> lock(future->mutex);
988 if(future->done)
989 {
990 boost::throw_exception(promise_already_satisfied());
991 }
992 future->mark_finished_with_result_internal(r);
993 }
994
995// void set_value(R && r);
996 void set_value(typename detail::future_traits<R>::rvalue_source_type r)
997 {
998 lazy_init();
999 boost::lock_guard<boost::mutex> lock(future->mutex);
1000 if(future->done)
1001 {
1002 boost::throw_exception(promise_already_satisfied());
1003 }
1004 future->mark_finished_with_result_internal(static_cast<typename detail::future_traits<R>::rvalue_source_type>(r));
1005 }
1006
1007 void set_exception(boost::exception_ptr p)
1008 {
1009 lazy_init();
1010 boost::lock_guard<boost::mutex> lock(future->mutex);
1011 if(future->done)
1012 {
1013 boost::throw_exception(promise_already_satisfied());
1014 }
1015 future->mark_exceptional_finish_internal(p);
1016 }
1017
1018 template<typename F>
1019 void set_wait_callback(F f)
1020 {
1021 lazy_init();
1022 future->set_wait_callback(f,this);
1023 }
1024
1025 };
1026
1027 template <>
1028 class promise<void>
1029 {
1030 typedef boost::shared_ptr<detail::future_object<void> > future_ptr;
1031
1032 future_ptr future;
1033 bool future_obtained;
1034
1035 promise(promise & rhs);// = delete;
1036 promise & operator=(promise & rhs);// = delete;
1037
1038 void lazy_init()
1039 {
1040 if(!future)
1041 {
1042 future_obtained=false;
1043 future.reset(new detail::future_object<void>);
1044 }
1045 }
1046 public:
1047// template <class Allocator> explicit promise(Allocator a);
1048
1049 promise():
1050 future(),future_obtained(false)
1051 {}
1052
1053 ~promise()
1054 {
1055 if(future)
1056 {
1057 boost::lock_guard<boost::mutex> lock(future->mutex);
1058
1059 if(!future->done)
1060 {
1061 future->mark_exceptional_finish_internal(boost::copy_exception(broken_promise()));
1062 }
1063 }
1064 }
1065
1066 // Assignment
1067#ifdef BOOST_HAS_RVALUE_REFS
1068 promise(promise && rhs):
1069 future_obtained(rhs.future_obtained)
1070 {
1071 future.swap(rhs.future);
1072 }
1073 promise & operator=(promise&& rhs)
1074 {
1075 future.swap(rhs.future);
1076 future_obtained=rhs.future_obtained;
1077 rhs.future.reset();
1078 return *this;
1079 }
1080#else
1081 promise(boost::detail::thread_move_t<promise> rhs):
1082 future(rhs->future),future_obtained(rhs->future_obtained)
1083 {
1084 rhs->future.reset();
1085 }
1086 promise & operator=(boost::detail::thread_move_t<promise> rhs)
1087 {
1088 future=rhs->future;
1089 future_obtained=rhs->future_obtained;
1090 rhs->future.reset();
1091 return *this;
1092 }
1093
1094 operator boost::detail::thread_move_t<promise>()
1095 {
1096 return boost::detail::thread_move_t<promise>(*this);
1097 }
1098#endif
1099
1100 void swap(promise& other)
1101 {
1102 future.swap(other.future);
1103 std::swap(future_obtained,other.future_obtained);
1104 }
1105
1106 // Result retrieval
1107 unique_future<void> get_future()
1108 {
1109 lazy_init();
1110
1111 if(future_obtained)
1112 {
1113 boost::throw_exception(future_already_retrieved());
1114 }
1115 future_obtained=true;
1116 return unique_future<void>(future);
1117 }
1118
1119 void set_value()
1120 {
1121 lazy_init();
1122 boost::lock_guard<boost::mutex> lock(future->mutex);
1123 if(future->done)
1124 {
1125 boost::throw_exception(promise_already_satisfied());
1126 }
1127 future->mark_finished_with_result_internal();
1128 }
1129
1130 void set_exception(boost::exception_ptr p)
1131 {
1132 lazy_init();
1133 boost::lock_guard<boost::mutex> lock(future->mutex);
1134 if(future->done)
1135 {
1136 boost::throw_exception(promise_already_satisfied());
1137 }
1138 future->mark_exceptional_finish_internal(p);
1139 }
1140
1141 template<typename F>
1142 void set_wait_callback(F f)
1143 {
1144 lazy_init();
1145 future->set_wait_callback(f,this);
1146 }
1147
1148 };
1149
1150 namespace detail
1151 {
1152 template<typename R>
1153 struct task_base:
1154 detail::future_object<R>
1155 {
1156 bool started;
1157
1158 task_base():
1159 started(false)
1160 {}
1161
1162 void run()
1163 {
1164 {
1165 boost::lock_guard<boost::mutex> lk(this->mutex);
1166 if(started)
1167 {
1168 boost::throw_exception(task_already_started());
1169 }
1170 started=true;
1171 }
1172 do_run();
1173 }
1174
1175 void owner_destroyed()
1176 {
1177 boost::lock_guard<boost::mutex> lk(this->mutex);
1178 if(!started)
1179 {
1180 started=true;
1181 this->mark_exceptional_finish_internal(boost::copy_exception(boost::broken_promise()));
1182 }
1183 }
1184
1185
1186 virtual void do_run()=0;
1187 };
1188
1189
1190 template<typename R,typename F>
1191 struct task_object:
1192 task_base<R>
1193 {
1194 F f;
1195 task_object(F const& f_):
1196 f(f_)
1197 {}
1198 task_object(boost::detail::thread_move_t<F> f_):
1199 f(f_)
1200 {}
1201
1202 void do_run()
1203 {
1204 try
1205 {
1206 this->mark_finished_with_result(f());
1207 }
1208 catch(...)
1209 {
1210 this->mark_exceptional_finish();
1211 }
1212 }
1213 };
1214
1215 template<typename F>
1216 struct task_object<void,F>:
1217 task_base<void>
1218 {
1219 F f;
1220 task_object(F const& f_):
1221 f(f_)
1222 {}
1223 task_object(boost::detail::thread_move_t<F> f_):
1224 f(f_)
1225 {}
1226
1227 void do_run()
1228 {
1229 try
1230 {
1231 f();
1232 this->mark_finished_with_result();
1233 }
1234 catch(...)
1235 {
1236 this->mark_exceptional_finish();
1237 }
1238 }
1239 };
1240
1241 }
1242
1243
1244 template<typename R>
1245 class packaged_task
1246 {
1247 boost::shared_ptr<detail::task_base<R> > task;
1248 bool future_obtained;
1249
1250 packaged_task(packaged_task&);// = delete;
1251 packaged_task& operator=(packaged_task&);// = delete;
1252
1253 public:
1254 packaged_task():
1255 future_obtained(false)
1256 {}
1257
1258 // construction and destruction
1259 template <class F>
1260 explicit packaged_task(F const& f):
1261 task(new detail::task_object<R,F>(f)),future_obtained(false)
1262 {}
1263 explicit packaged_task(R(*f)()):
1264 task(new detail::task_object<R,R(*)()>(f)),future_obtained(false)
1265 {}
1266
1267 template <class F>
1268 explicit packaged_task(boost::detail::thread_move_t<F> f):
1269 task(new detail::task_object<R,F>(f)),future_obtained(false)
1270 {}
1271
1272// template <class F, class Allocator>
1273// explicit packaged_task(F const& f, Allocator a);
1274// template <class F, class Allocator>
1275// explicit packaged_task(F&& f, Allocator a);
1276
1277
1278 ~packaged_task()
1279 {
1280 if(task)
1281 {
1282 task->owner_destroyed();
1283 }
1284 }
1285
1286 // assignment
1287#ifdef BOOST_HAS_RVALUE_REFS
1288 packaged_task(packaged_task&& other):
1289 future_obtained(other.future_obtained)
1290 {
1291 task.swap(other.task);
1292 other.future_obtained=false;
1293 }
1294 packaged_task& operator=(packaged_task&& other)
1295 {
1296 packaged_task temp(static_cast<packaged_task&&>(other));
1297 swap(temp);
1298 return *this;
1299 }
1300#else
1301 packaged_task(boost::detail::thread_move_t<packaged_task> other):
1302 future_obtained(other->future_obtained)
1303 {
1304 task.swap(other->task);
1305 other->future_obtained=false;
1306 }
1307 packaged_task& operator=(boost::detail::thread_move_t<packaged_task> other)
1308 {
1309 packaged_task temp(other);
1310 swap(temp);
1311 return *this;
1312 }
1313 operator boost::detail::thread_move_t<packaged_task>()
1314 {
1315 return boost::detail::thread_move_t<packaged_task>(*this);
1316 }
1317#endif
1318
1319 void swap(packaged_task& other)
1320 {
1321 task.swap(other.task);
1322 std::swap(future_obtained,other.future_obtained);
1323 }
1324
1325 // result retrieval
1326 unique_future<R> get_future()
1327 {
1328 if(!task)
1329 {
1330 boost::throw_exception(task_moved());
1331 }
1332 else if(!future_obtained)
1333 {
1334 future_obtained=true;
1335 return unique_future<R>(task);
1336 }
1337 else
1338 {
1339 boost::throw_exception(future_already_retrieved());
1340 }
1341 }
1342
1343
1344 // execution
1345 void operator()()
1346 {
1347 if(!task)
1348 {
1349 boost::throw_exception(task_moved());
1350 }
1351 task->run();
1352 }
1353
1354 template<typename F>
1355 void set_wait_callback(F f)
1356 {
1357 task->set_wait_callback(f,this);
1358 }
1359
1360 };
1361
1362}
1363
1364
1365#endif