Ticket #11895: Source.cpp

File Source.cpp, 2.8 KB (added by Chris White <chriswhitemsu@…>, 7 years ago)
Line 
1#include <deque>
2#include <iostream>
3#include <boost/function.hpp>
4#include <boost/asio/io_service.hpp>
5#include <boost/asio/strand.hpp>
6#include <boost/chrono/chrono.hpp>
7#include <boost/thread/thread.hpp>
8
9using namespace std;
10using namespace boost::asio;
11using namespace boost::chrono;
12using namespace boost::this_thread;
13using boost::mutex;
14using boost::thread;
15using boost::thread_group;
16
17// simplified strand implementation that fixes scheduling issue
18struct CStrand
19{
20 CStrand(io_service& IoService) :
21 IoService_(IoService),
22 bSignaled_(false) { }
23
24 void post(function<void()> Fxn)
25 {
26 mutex::scoped_lock Lock(Mutex_);
27 PendingWorkQueue_.push_back(move(Fxn));
28 if (bSignaled_)
29 return;
30 bSignaled_ = true;
31 Lock.unlock();
32 IoService_.post([this] { service(); });
33 }
34
35 // don't give control back to the io_service until the queue is drained!
36 void service(void)
37 {
38 for (;;) {
39 deque<function<void()>> WorkQueue;
40 mutex::scoped_lock Lock(Mutex_);
41 WorkQueue.swap(PendingWorkQueue_);
42 Lock.unlock();
43 for (auto& Fxn : WorkQueue)
44 Fxn();
45 Lock.lock();
46 if (PendingWorkQueue_.empty()) {
47 bSignaled_ = false;
48 return;
49 }
50 }
51 }
52
53 io_service& IoService_;
54
55 mutable mutex Mutex_;
56 bool bSignaled_;
57 deque<function<void()>> PendingWorkQueue_;
58};
59
60
61template<typename TStrand>
62void Test(const string& sName)
63{
64 cout << "__________" << sName << "__________" << endl << endl;
65
66 io_service IoService;
67 auto pWork = make_unique<io_service::work>(IoService);
68 auto Cores = thread::hardware_concurrency();
69
70 thread_group Threads;
71 for (unsigned i = 0; i < Cores; ++i)
72 Threads.create_thread([&]{ IoService.run(); });
73
74 TStrand Strand(IoService);
75 mutex Mutex;
76
77 // post() a thread-proportional amount of work to the io_service and the strand
78 // for 4 cores that would be 30 seconds of work on the io_service and 10 second of work on the strand.
79 // optimally, this should only take 10 seconds in parallel but you'll see that's not the case.
80 auto Start = high_resolution_clock::now();
81 for (int i = 0, Count = 0; i < 10; ++i) {
82 for (unsigned u = 0; u < Cores - 1; ++u) {
83 ++Count;
84 IoService.post([&, Count] {
85 mutex::scoped_lock Lock(Mutex);
86 cout << "io_service " << Count << endl;
87 Lock.unlock();
88 sleep_for(seconds(1));
89 });
90 }
91 ++Count;
92 Strand.post([&, Count] {
93 mutex::scoped_lock Lock(Mutex);
94 cout << "strand " << Count << endl;
95 Lock.unlock();
96 sleep_for(seconds(1));
97 });
98 }
99 pWork.reset();
100 Threads.join_all();
101 auto Stop = high_resolution_clock::now();
102
103 auto Elapsed = duration_cast<seconds>(Stop - Start);
104 cout << endl << Elapsed.count() << " seconds" << endl << endl;
105}
106
107
108void main()
109{
110 Test<strand>("strand");
111 Test<CStrand>("CStrand");
112 cin.get();
113}