// UnitTest_Concurrency_Test.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include // std::accumulate typedef boost::coroutines::coroutine< void > coro_t; static std::atomic_size_t iterCounters[3]; struct JobTask1 { JobTask1( const uint32_t kIters, bool foo ) : kIters(kIters) {} //virtual ~JobTask1(){} const uint32_t kIters; #ifdef _DEBUG size_t iterCounter_; #endif void operator() ( coro_t::push_type& yield ) { #ifdef _DEBUG iterCounter_ = 0; #endif for( uint32_t i = 0; i < kIters; ++i) { ++iterCounters[0]; //< Atomic increment, woudl think it would be okay to not be! #ifdef _DEBUG ++iterCounter_; #endif // std::cout << "fn(): local variable i == " << i << " of " << kIters << std::endl; // save current coroutine // value of local variable is preserved // transfer execution control back to main() yield(); // coroutine<>::operator()() was called // execution control transferred back from main() } #ifdef _DEBUG assert( iterCounter_ == kIters ); #endif } }; struct JobTask2 { JobTask2( const uint32_t kIters ) : kIters(kIters) {} const uint32_t kIters; void operator() ( coro_t::push_type& yield ) { for( uint32_t i = 0; i < kIters; ++i) { ++iterCounters[1]; //< Atomic increment, woudl think it would be okay to not be! yield(); } } }; struct JobTask3 { JobTask3( const uint32_t kIters ) : kIters(kIters) {} const uint32_t kIters; void operator() ( coro_t::push_type& yield ) { for( uint32_t i = 0; i < kIters; ++i) { ++iterCounters[2]; //< Atomic increment, woudl think it would be okay to not be! yield(); } } }; class Job { public: virtual ~Job(){} //Job( Job&& rhs ) : coroutine_( rhs ) {} typedef void (*Fn)( Job* inst, coro_t::push_type& yield ); //template< class Task > Job( Fn fn ) : fn_(fn) { if ( !fn ) std::terminate(); } Fn fn() const { return fn_; } private: Fn fn_; }; template< class Task > class Job_t : public Job, public Task { public: Job_t( Task&& task ) : Job( &sproc ) , Task(task) { int i = 0; } static void sproc( Job* inst, coro_t::push_type& yield ) { Task task( *static_cast(inst) ); task( yield ); } }; class JobRun : public coro_t { public: JobRun( JobRun&& rhs ) : coroutine_( std::move(rhs.coroutine_) ) {} JobRun( Job* job ) : coroutine_( boost::bind( job->fn(), job, _1 ) ) { } void run() { coroutine_(); } operator bool() const { return coroutine_; } bool isFinished() const { return !coroutine_; } private: pull_type coroutine_; }; static const size_t kThreadBufferSize = 1024;//*4; //< more than enough jobs queued up, scheduler may have more than this still but these are pending work queues! #define SPSC_QUEUE 1 //< spsc_queue technically the one to chose as it is wait free (i.e. no chance fo deadlock as it uses no locks!) #if SPSC_QUEUE typedef boost::lockfree::spsc_queue > JobQueue; //< Single-producer-single-consumer (94-119ns) mult-thread (30ns) #else typedef boost::lockfree::queue > JobQueue; //< many to many queue (94-121ns) mult-thread (32ns) #endif #define CIRCULAR_BUFFER 0 //< Circular-buffer has preallocate and may technically be faster as new/malloc can block, however cannot grow but queue is limited by JobQueue being fixed size anyway! #if CIRCULAR_BUFFER typedef std::queue > JobRunQueue; //,circular_buffer (94-113ns) mult-thread (30ns) #else typedef std::queue JobRunQueue; //(kThreadBufferSize) ) #endif {} volatile bool done; void addJob( Job* job ) { //TODO: deal with full queue somehow? pending.push( job ); //pending is a lock-free queue so is thread safe } void process() { //Move all the jobs into our local running-queue // - This reduces the round-cost for coroutine executions as the local-queue does not need to be thread safe and therefore more lightweight pending.consume_all( [&]( Job* job ) { //assert( running.size() < kThreadBufferSize ); //< TODO: we could avoid moving the job to the runnign queue if the queu is full! running.push( JobRun(job) ); } ); while ( !running.empty() ) { JobRun run( std::move(running.front()) ); running.pop(); run.run(); if ( !run.isFinished() ) { assert( running.size() < kThreadBufferSize ); running.push( std::move(run) ); } } } void operator()() { while ( !done ) process(); process(); assert( pending.empty() ); assert( running.empty() ); } protected: JobQueue pending; JobRunQueue running; //, jobs that have further work to do }; void test() { for ( auto& iterCount: iterCounters ) iterCount = 0; uint32_t kJobTotal = 1024; //std::random_device rd; const uint32_t kSeed = 3; std::default_random_engine randomEngine( kSeed/*rd()*/ ); std::uniform_int_distribution jobWorkDistribution(100, 200 ); //, number of iterations per job size_t kItersTotal = 0; std::vector jobs; uint32_t iJob = 0; std::generate_n(std::back_inserter(jobs), kJobTotal, [&]() { uint32_t iters = jobWorkDistribution(randomEngine); kItersTotal += iters; Job* newJob = 0; switch ( (iJob++) % 3 ) { case 0: newJob = new Job_t( JobTask1(iters, false) ); break; case 1: newJob = new Job_t( JobTask2(iters) ); break; case 2: newJob = new Job_t( JobTask3(iters) ); break; default: assert(false); break; } assert(newJob); return newJob; }); assert( jobs.size() <= kThreadBufferSize ); //std::cout << "main() starts coroutine c" << std::endl; //Target: coroutine time = 40ns //Local-running queue: 49-51ns typedef boost::chrono::high_resolution_clock Clock; //typedef high_resolution_clock Clock; #define THREAD 1 #define ASYNC 0 const uint32_t kOverOccupancy = 2; const uint32_t kWorkerCount = std::thread::hardware_concurrency() * kOverOccupancy; std::vector< std::unique_ptr > workers; //(new Worker()); }); #if THREAD std::vector threads; uint32_t iThread = 0; std::generate_n(std::back_inserter(threads), kWorkerCount, [&]() { return std::thread( std::ref(*workers[iThread++]) ); }); #endif auto t1 = Clock::now(); for ( uint32_t iJob = 0; iJob != jobs.size(); ++iJob ) workers[iJob%kWorkerCount]->addJob( jobs[iJob] ); for ( auto& worker: workers ) worker->done = true; //< Signal we want the worker to complete all its work so the thread may end #if THREAD for ( auto& thread: threads ) thread.join(); threads.clear(); #elif ASYNC std::async(std::launch::async, std::ref(worker) ).wait(); #else for ( auto& worker: workers ) (*worker)(); #endif auto t2 = Clock::now(); size_t iterCounter = std::accumulate( std::begin(iterCounters), std::end(iterCounters), 0); std::cout << "Iters = " << iterCounter << " of " << kItersTotal << " [" << (iterCounter==kItersTotal ? "TRUE" : "FALSE") << "]\n"; assert( iterCounter == kItersTotal ); auto nanos = (t2-t1).count(); std::cout << "Time: " << nanos / 1000000.f << "ms (iteration=" << nanos/kItersTotal << "ns)\n"; std::cout << "Done" << std::endl; } int main( int argc, char * argv[]) { #if 1 //< Issue seems to only occur on first run of tests, related to startup of application? int i = 3; while ( i-- ) #endif test(); return EXIT_SUCCESS; }