Ticket #9355: UnitTest_Concurrency_Test.cpp

File UnitTest_Concurrency_Test.cpp, 8.3 KB (added by craig@…, 9 years ago)

coroutine multi-thread crash test code

Line 
1// UnitTest_Concurrency_Test.cpp : Defines the entry point for the console application.
2//
3
4#include "stdafx.h"
5#include <boost/coroutine/all.hpp>
6#include <boost/lockfree/spsc_queue.hpp>
7#include <boost/lockfree/queue.hpp>
8#include <boost/circular_buffer.hpp>
9#include <boost/bind.hpp>
10#include <boost/chrono.hpp>
11#include <cstdio>
12#include <iostream>
13#include <thread>
14#include <future>
15#include <queue>
16#include <atomic>
17#include <random>
18#include <numeric> // std::accumulate
19
20typedef boost::coroutines::coroutine< void > coro_t;
21static std::atomic_size_t iterCounters[3];
22
23struct JobTask1
24{
25 JobTask1( const uint32_t kIters, bool foo )
26 : kIters(kIters)
27 {}
28
29 //virtual ~JobTask1(){}
30 const uint32_t kIters;
31#ifdef _DEBUG
32 size_t iterCounter_;
33#endif
34
35 void operator() ( coro_t::push_type& yield )
36 {
37#ifdef _DEBUG
38 iterCounter_ = 0;
39#endif
40
41 for( uint32_t i = 0; i < kIters; ++i)
42 {
43 ++iterCounters[0]; //< Atomic increment, woudl think it would be okay to not be!
44#ifdef _DEBUG
45 ++iterCounter_;
46#endif
47 // std::cout << "fn(): local variable i == " << i << " of " << kIters << std::endl;
48
49 // save current coroutine
50 // value of local variable is preserved
51 // transfer execution control back to main()
52 yield();
53
54 // coroutine<>::operator()() was called
55 // execution control transferred back from main()
56 }
57
58#ifdef _DEBUG
59 assert( iterCounter_ == kIters );
60#endif
61 }
62};
63
64
65struct JobTask2
66{
67 JobTask2( const uint32_t kIters )
68 : kIters(kIters)
69 {}
70 const uint32_t kIters;
71
72 void operator() ( coro_t::push_type& yield )
73 {
74 for( uint32_t i = 0; i < kIters; ++i)
75 {
76 ++iterCounters[1]; //< Atomic increment, woudl think it would be okay to not be!
77 yield();
78 }
79 }
80};
81
82struct JobTask3
83{
84 JobTask3( const uint32_t kIters )
85 : kIters(kIters)
86 {}
87 const uint32_t kIters;
88
89 void operator() ( coro_t::push_type& yield )
90 {
91 for( uint32_t i = 0; i < kIters; ++i)
92 {
93 ++iterCounters[2]; //< Atomic increment, woudl think it would be okay to not be!
94 yield();
95 }
96 }
97};
98
99
100class Job
101{
102public:
103 virtual ~Job(){}
104
105 //Job( Job&& rhs ) : coroutine_( rhs ) {}
106
107 typedef void (*Fn)( Job* inst, coro_t::push_type& yield );
108
109 //template< class Task >
110 Job( Fn fn )
111 : fn_(fn)
112 {
113 if ( !fn )
114 std::terminate();
115 }
116
117
118 Fn fn() const
119 { return fn_; }
120
121private:
122 Fn fn_;
123};
124
125template< class Task >
126class Job_t : public Job, public Task
127{
128public:
129
130
131 Job_t( Task&& task )
132 : Job( &sproc )
133 , Task(task)
134 {
135 int i = 0;
136 }
137
138 static void sproc( Job* inst, coro_t::push_type& yield )
139 {
140 Task task( *static_cast<Job_t*>(inst) );
141 task( yield );
142 }
143
144};
145
146
147class JobRun : public coro_t
148{
149public:
150 JobRun( JobRun&& rhs )
151 : coroutine_( std::move(rhs.coroutine_) )
152 {}
153
154 JobRun( Job* job )
155 : coroutine_( boost::bind( job->fn(), job, _1 ) )
156 {
157 }
158
159 void run()
160 { coroutine_(); }
161
162 operator bool() const
163 { return coroutine_; }
164
165 bool isFinished() const
166 { return !coroutine_; }
167
168private:
169 pull_type coroutine_;
170};
171
172
173static const size_t kThreadBufferSize = 1024;//*4; //< more than enough jobs queued up, scheduler may have more than this still but these are pending work queues!
174
175#define SPSC_QUEUE 1 //< spsc_queue technically the one to chose as it is wait free (i.e. no chance fo deadlock as it uses no locks!)
176#if SPSC_QUEUE
177typedef boost::lockfree::spsc_queue<Job*, boost::lockfree::capacity<kThreadBufferSize> > JobQueue; //< Single-producer-single-consumer (94-119ns) mult-thread (30ns)
178#else
179typedef boost::lockfree::queue<Job*, boost::lockfree::capacity<kThreadBufferSize> > JobQueue; //< many to many queue (94-121ns) mult-thread (32ns)
180#endif
181
182#define CIRCULAR_BUFFER 0 //< Circular-buffer has preallocate and may technically be faster as new/malloc can block, however cannot grow but queue is limited by JobQueue being fixed size anyway!
183#if CIRCULAR_BUFFER
184typedef std::queue<JobRun, boost::circular_buffer<JobRun> > JobRunQueue; //,circular_buffer (94-113ns) mult-thread (30ns)
185#else
186typedef std::queue<JobRun > JobRunQueue; //<deque (117-128ns) mult-thread (34ns)
187#endif
188
189class Worker
190{
191public:
192 Worker()
193 : done(false)
194#if CIRCULAR_BUFFER
195 , running( boost::circular_buffer<JobRun>(kThreadBufferSize) )
196#endif
197 {}
198
199 volatile bool done;
200
201 void addJob( Job* job )
202 {
203 //TODO: deal with full queue somehow?
204 pending.push( job ); //pending is a lock-free queue so is thread safe
205 }
206
207 void process()
208 {
209 //Move all the jobs into our local running-queue
210 // - This reduces the round-cost for coroutine executions as the local-queue does not need to be thread safe and therefore more lightweight
211 pending.consume_all( [&]( Job* job )
212 {
213 //assert( running.size() < kThreadBufferSize ); //< TODO: we could avoid moving the job to the runnign queue if the queu is full!
214 running.push( JobRun(job) );
215 } );
216
217 while ( !running.empty() )
218 {
219 JobRun run( std::move(running.front()) );
220 running.pop();
221 run.run();
222
223 if ( !run.isFinished() )
224 {
225 assert( running.size() < kThreadBufferSize );
226 running.push( std::move(run) );
227 }
228 }
229 }
230
231 void operator()()
232 {
233 while ( !done )
234 process();
235
236 process();
237
238 assert( pending.empty() );
239 assert( running.empty() );
240 }
241
242protected:
243 JobQueue pending;
244 JobRunQueue running; //, jobs that have further work to do
245};
246
247
248void test()
249{
250 for ( auto& iterCount: iterCounters )
251 iterCount = 0;
252
253 uint32_t kJobTotal = 1024;
254
255 //std::random_device rd;
256 const uint32_t kSeed = 3;
257 std::default_random_engine randomEngine( kSeed/*rd()*/ );
258 std::uniform_int_distribution<uint32_t> jobWorkDistribution(100, 200 ); //, number of iterations per job
259
260
261 size_t kItersTotal = 0;
262 std::vector<Job*> jobs;
263 uint32_t iJob = 0;
264 std::generate_n(std::back_inserter(jobs), kJobTotal, [&]()
265 {
266 uint32_t iters = jobWorkDistribution(randomEngine);
267 kItersTotal += iters;
268
269 Job* newJob = 0;
270 switch ( (iJob++) % 3 )
271 {
272 case 0: newJob = new Job_t<JobTask1>( JobTask1(iters, false) ); break;
273 case 1: newJob = new Job_t<JobTask2>( JobTask2(iters) ); break;
274 case 2: newJob = new Job_t<JobTask3>( JobTask3(iters) ); break;
275 default: assert(false); break;
276 }
277 assert(newJob);
278 return newJob;
279 });
280
281 assert( jobs.size() <= kThreadBufferSize );
282
283 //std::cout << "main() starts coroutine c" << std::endl;
284
285 //Target: coroutine time = 40ns
286 //Local-running queue: 49-51ns
287
288 typedef boost::chrono::high_resolution_clock Clock;
289 //typedef high_resolution_clock Clock;
290
291#define THREAD 1
292#define ASYNC 0
293
294 const uint32_t kOverOccupancy = 2;
295 const uint32_t kWorkerCount = std::thread::hardware_concurrency() * kOverOccupancy;
296
297 std::vector< std::unique_ptr<Worker> > workers; //<TODO: Worker not copyable so must use pointers!? TODO: solve this albeit jsut a niggle!
298 std::generate_n(std::back_inserter(workers), kWorkerCount, [&]()
299 {
300 return std::unique_ptr<Worker>(new Worker());
301 });
302
303#if THREAD
304 std::vector<std::thread> threads;
305 uint32_t iThread = 0;
306 std::generate_n(std::back_inserter(threads), kWorkerCount, [&]()
307 {
308 return std::thread( std::ref(*workers[iThread++]) );
309 });
310#endif
311
312 auto t1 = Clock::now();
313
314 for ( uint32_t iJob = 0; iJob != jobs.size(); ++iJob )
315 workers[iJob%kWorkerCount]->addJob( jobs[iJob] );
316
317 for ( auto& worker: workers )
318 worker->done = true; //< Signal we want the worker to complete all its work so the thread may end
319
320#if THREAD
321 for ( auto& thread: threads )
322 thread.join();
323 threads.clear();
324
325#elif ASYNC
326 std::async(std::launch::async, std::ref(worker) ).wait();
327#else
328 for ( auto& worker: workers )
329 (*worker)();
330#endif
331
332 auto t2 = Clock::now();
333
334 size_t iterCounter = std::accumulate( std::begin(iterCounters), std::end(iterCounters), 0);
335 std::cout << "Iters = " << iterCounter << " of " << kItersTotal << " [" << (iterCounter==kItersTotal ? "TRUE" : "FALSE") << "]\n";
336
337 assert( iterCounter == kItersTotal );
338
339 auto nanos = (t2-t1).count();
340 std::cout << "Time: " << nanos / 1000000.f << "ms (iteration=" << nanos/kItersTotal << "ns)\n";
341
342 std::cout << "Done" << std::endl;
343}
344
345int main( int argc, char * argv[])
346{
347#if 1 //< Issue seems to only occur on first run of tests, related to startup of application?
348 int i = 3;
349 while ( i-- )
350#endif
351 test();
352
353 return EXIT_SUCCESS;
354}
355