1 | // Copyright (c) 2017 Diego Barrios Romero <eldruin@gmail.com>
|
---|
2 |
|
---|
3 | #include <iostream>
|
---|
4 | #include <string>
|
---|
5 | #include <memory>
|
---|
6 | #include <boost/asio.hpp>
|
---|
7 | #include <boost/bind.hpp>
|
---|
8 | #include <boost/thread.hpp>
|
---|
9 | #include <boost/chrono.hpp>
|
---|
10 |
|
---|
11 | class ThreadPool
|
---|
12 | {
|
---|
13 | public:
|
---|
14 | ThreadPool(const size_t threadCount = boost::thread::hardware_concurrency())
|
---|
15 | : work(new boost::asio::io_service::work(service))
|
---|
16 | {
|
---|
17 | for (size_t i = 0; i < threadCount; ++i)
|
---|
18 | {
|
---|
19 | threads.create_thread(boost::bind(&boost::asio::io_service::run, &service));
|
---|
20 | }
|
---|
21 | }
|
---|
22 | template<typename FunctionType>
|
---|
23 | void post(FunctionType f)
|
---|
24 | {
|
---|
25 | service.post(f);
|
---|
26 | }
|
---|
27 |
|
---|
28 | void interrupt()
|
---|
29 | {
|
---|
30 | threads.interrupt_all();
|
---|
31 | }
|
---|
32 |
|
---|
33 | void cancel()
|
---|
34 | {
|
---|
35 | work.reset();
|
---|
36 | service.stop();
|
---|
37 | }
|
---|
38 |
|
---|
39 | ~ThreadPool()
|
---|
40 | {
|
---|
41 | work.reset();
|
---|
42 | threads.join_all();
|
---|
43 | }
|
---|
44 | private:
|
---|
45 | boost::asio::io_service service;
|
---|
46 | boost::thread_group threads;
|
---|
47 | std::unique_ptr<boost::asio::io_service::work> work;
|
---|
48 | };
|
---|
49 |
|
---|
50 |
|
---|
51 | struct Functor
|
---|
52 | {
|
---|
53 | void operator()()
|
---|
54 | {
|
---|
55 | boost::this_thread::sleep(boost::posix_time::milliseconds(200));
|
---|
56 | boost::lock_guard<boost::mutex> lock(mutex);
|
---|
57 | wasCalled_ = true;
|
---|
58 | }
|
---|
59 | bool wasCalled() const
|
---|
60 | {
|
---|
61 | boost::lock_guard<boost::mutex> lock(mutex);
|
---|
62 | return wasCalled_;
|
---|
63 | }
|
---|
64 |
|
---|
65 | private:
|
---|
66 | bool wasCalled_ = false;
|
---|
67 | mutable boost::mutex mutex;
|
---|
68 | };
|
---|
69 |
|
---|
70 | struct UninterruptableFunctor : public Functor
|
---|
71 | {
|
---|
72 | void operator()()
|
---|
73 | {
|
---|
74 | boost::this_thread::disable_interruption disableInterruptions;
|
---|
75 | Functor::operator()();
|
---|
76 | }
|
---|
77 | };
|
---|
78 |
|
---|
79 | template<typename F, typename T1, typename T2>
|
---|
80 | void check(F compare, T1 expected, T2 current, const std::string& msg)
|
---|
81 | {
|
---|
82 | if (compare(expected, current))
|
---|
83 | {
|
---|
84 | std::cout << "\tpassed." << std::endl;
|
---|
85 | }
|
---|
86 | else
|
---|
87 | {
|
---|
88 | std::cout << std::boolalpha
|
---|
89 | << "\tError: " << msg << " expected: " << expected
|
---|
90 | << " current: " << current << std::endl;
|
---|
91 | }
|
---|
92 | }
|
---|
93 |
|
---|
94 | struct ThreadPoolTest
|
---|
95 | {
|
---|
96 | boost::int_least64_t getRunningTimeInMS() const
|
---|
97 | {
|
---|
98 | auto executionTime = boost::chrono::high_resolution_clock::now() - start;
|
---|
99 | return boost::chrono::duration_cast<boost::chrono::milliseconds>(executionTime).count();
|
---|
100 | }
|
---|
101 |
|
---|
102 | template<typename FunctorType, typename F>
|
---|
103 | void runTest(F f, bool shouldFunctor1BeCalled, bool shouldFunctor2BeCalled)
|
---|
104 | {
|
---|
105 | FunctorType functor1, functor2;
|
---|
106 | {
|
---|
107 | ThreadPool pool(1);
|
---|
108 | pool.post(boost::bind(&FunctorType::operator(), &functor1));
|
---|
109 | pool.post(boost::bind(&FunctorType::operator(), &functor2));
|
---|
110 | f(pool);
|
---|
111 | }
|
---|
112 |
|
---|
113 | auto eq = [](bool a, bool b) { return a == b; };
|
---|
114 | check(eq, shouldFunctor1BeCalled, functor1.wasCalled(), "functor 1 call");
|
---|
115 | check(eq, shouldFunctor2BeCalled, functor2.wasCalled(), "functor 2 call");
|
---|
116 | }
|
---|
117 |
|
---|
118 | private:
|
---|
119 | boost::chrono::high_resolution_clock::time_point start = boost::chrono::high_resolution_clock::now();
|
---|
120 | };
|
---|
121 |
|
---|
122 | void doNothing(ThreadPool&) { }
|
---|
123 | void cancel(ThreadPool& pool)
|
---|
124 | {
|
---|
125 | pool.cancel();
|
---|
126 | }
|
---|
127 | void waitForStartThenInterruptThenCancel(ThreadPool& pool)
|
---|
128 | {
|
---|
129 | boost::this_thread::sleep(boost::posix_time::milliseconds(100));
|
---|
130 | pool.interrupt();
|
---|
131 | pool.cancel();
|
---|
132 | }
|
---|
133 |
|
---|
134 | bool lessEq (const boost::int_least64_t a, const boost::int_least64_t b) { return a <= b; }
|
---|
135 | bool greaterEq (const boost::int_least64_t a, const boost::int_least64_t b) { return a >= b; }
|
---|
136 |
|
---|
137 | void checkAllWorkIsProcessedBeforeDestruction()
|
---|
138 | {
|
---|
139 | ThreadPoolTest test;
|
---|
140 | std::cout << "checkAllWorkIsProcessedBeforeDestruction\n";
|
---|
141 | test.runTest<Functor>(doNothing, true, true);
|
---|
142 | check(lessEq, 350, test.getRunningTimeInMS(), "running time");
|
---|
143 | }
|
---|
144 |
|
---|
145 | void checkWorkCanBeCancelled()
|
---|
146 | {
|
---|
147 | ThreadPoolTest test;
|
---|
148 | std::cout << "checkWorkCanBeCancelled\n";
|
---|
149 | test.runTest<Functor>(cancel, false, false);
|
---|
150 | check(greaterEq, 150, test.getRunningTimeInMS(), "running time");
|
---|
151 | }
|
---|
152 |
|
---|
153 | void checkWorkCanBeInterrupted()
|
---|
154 | {
|
---|
155 | ThreadPoolTest test;
|
---|
156 | std::cout << "checkWorkCanBeInterrupted\n";
|
---|
157 | test.runTest<Functor>(waitForStartThenInterruptThenCancel, false, false);
|
---|
158 | check(greaterEq, 150, test.getRunningTimeInMS(), "running time");
|
---|
159 | }
|
---|
160 |
|
---|
161 | void checkUninterruptableWorkIsNotInterruptedButCanBeDropped()
|
---|
162 | {
|
---|
163 | ThreadPoolTest test;
|
---|
164 | std::cout << "checkUninterruptableWorkIsNotInterruptedButCanBeDropped\n";
|
---|
165 | test.runTest<UninterruptableFunctor>(waitForStartThenInterruptThenCancel, true, false);
|
---|
166 | check(lessEq, 150, test.getRunningTimeInMS(), "running time");
|
---|
167 | check(greaterEq, 250, test.getRunningTimeInMS(), "running time");
|
---|
168 | }
|
---|
169 |
|
---|
170 | int main(int, char*[])
|
---|
171 | {
|
---|
172 | checkAllWorkIsProcessedBeforeDestruction();
|
---|
173 | checkWorkCanBeCancelled();
|
---|
174 | checkWorkCanBeInterrupted();
|
---|
175 | checkUninterruptableWorkIsNotInterruptedButCanBeDropped();
|
---|
176 | }
|
---|
177 |
|
---|