// Copyright (c) 2017 Diego Barrios Romero #include #include #include #include #include #include #include class ThreadPool { public: ThreadPool(const size_t threadCount = boost::thread::hardware_concurrency()) : work(new boost::asio::io_service::work(service)) { for (size_t i = 0; i < threadCount; ++i) { threads.create_thread(boost::bind(&boost::asio::io_service::run, &service)); } } template void post(FunctionType f) { service.post(f); } void interrupt() { threads.interrupt_all(); } void cancel() { work.reset(); service.stop(); } ~ThreadPool() { work.reset(); threads.join_all(); } private: boost::asio::io_service service; boost::thread_group threads; std::unique_ptr work; }; struct Functor { void operator()() { boost::this_thread::sleep(boost::posix_time::milliseconds(200)); boost::lock_guard lock(mutex); wasCalled_ = true; } bool wasCalled() const { boost::lock_guard lock(mutex); return wasCalled_; } private: bool wasCalled_ = false; mutable boost::mutex mutex; }; struct UninterruptableFunctor : public Functor { void operator()() { boost::this_thread::disable_interruption disableInterruptions; Functor::operator()(); } }; template void check(F compare, T1 expected, T2 current, const std::string& msg) { if (compare(expected, current)) { std::cout << "\tpassed." << std::endl; } else { std::cout << std::boolalpha << "\tError: " << msg << " expected: " << expected << " current: " << current << std::endl; } } struct ThreadPoolTest { boost::int_least64_t getRunningTimeInMS() const { auto executionTime = boost::chrono::high_resolution_clock::now() - start; return boost::chrono::duration_cast(executionTime).count(); } template void runTest(F f, bool shouldFunctor1BeCalled, bool shouldFunctor2BeCalled) { FunctorType functor1, functor2; { ThreadPool pool(1); pool.post(boost::bind(&FunctorType::operator(), &functor1)); pool.post(boost::bind(&FunctorType::operator(), &functor2)); f(pool); } auto eq = [](bool a, bool b) { return a == b; }; check(eq, shouldFunctor1BeCalled, functor1.wasCalled(), "functor 1 call"); check(eq, shouldFunctor2BeCalled, functor2.wasCalled(), "functor 2 call"); } private: boost::chrono::high_resolution_clock::time_point start = boost::chrono::high_resolution_clock::now(); }; void doNothing(ThreadPool&) { } void cancel(ThreadPool& pool) { pool.cancel(); } void waitForStartThenInterruptThenCancel(ThreadPool& pool) { boost::this_thread::sleep(boost::posix_time::milliseconds(100)); pool.interrupt(); pool.cancel(); } bool lessEq (const boost::int_least64_t a, const boost::int_least64_t b) { return a <= b; } bool greaterEq (const boost::int_least64_t a, const boost::int_least64_t b) { return a >= b; } void checkAllWorkIsProcessedBeforeDestruction() { ThreadPoolTest test; std::cout << "checkAllWorkIsProcessedBeforeDestruction\n"; test.runTest(doNothing, true, true); check(lessEq, 350, test.getRunningTimeInMS(), "running time"); } void checkWorkCanBeCancelled() { ThreadPoolTest test; std::cout << "checkWorkCanBeCancelled\n"; test.runTest(cancel, false, false); check(greaterEq, 150, test.getRunningTimeInMS(), "running time"); } void checkWorkCanBeInterrupted() { ThreadPoolTest test; std::cout << "checkWorkCanBeInterrupted\n"; test.runTest(waitForStartThenInterruptThenCancel, false, false); check(greaterEq, 150, test.getRunningTimeInMS(), "running time"); } void checkUninterruptableWorkIsNotInterruptedButCanBeDropped() { ThreadPoolTest test; std::cout << "checkUninterruptableWorkIsNotInterruptedButCanBeDropped\n"; test.runTest(waitForStartThenInterruptThenCancel, true, false); check(lessEq, 150, test.getRunningTimeInMS(), "running time"); check(greaterEq, 250, test.getRunningTimeInMS(), "running time"); } int main(int, char*[]) { checkAllWorkIsProcessedBeforeDestruction(); checkWorkCanBeCancelled(); checkWorkCanBeInterrupted(); checkUninterruptableWorkIsNotInterruptedButCanBeDropped(); }