Ticket #12474: test_engine_client_dbg_2.cpp

File test_engine_client_dbg_2.cpp, 11.9 KB (added by michele.de.stefano@…, 6 years ago)

This is the code that has the race condition

Line 
1#include <functional>
2#include <iostream>
3#include <memory>
4#include <sstream>
5#include <stdexcept>
6#include <thread>
7
8#include <boost/asio/io_service.hpp>
9#include <boost/asio/connect.hpp>
10#include <boost/asio/read.hpp>
11#include <boost/asio/write.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/program_options.hpp>
14
15using namespace std;
16using namespace boost;
17
18namespace po = program_options;
19
20typedef unsigned short port_type;
21
22typedef asio::io_service scheduler_type;
23typedef asio::ip::tcp::acceptor acceptor_type;
24typedef asio::ip::tcp::endpoint endpoint_type;
25typedef asio::ip::tcp::resolver resolver_type;
26typedef resolver_type::query query_type;
27typedef asio::ip::tcp::socket socket_type;
28
29typedef boost::system::error_code system_error_type;
30
31std::unique_ptr<scheduler_type::work>
32 work(nullptr);
33
34
35class ClientSockets {
36
37public:
38
39 ClientSockets() :
40 mSendSocketIsSet(false)
41 , mRecvSocketIsSet(false)
42 , mSendSocket(nullptr)
43 , mRecvSocket(nullptr)
44 {}
45
46 void setSendSocket(std::unique_ptr<socket_type> pSocket) {
47 mSendSocket = std::move(pSocket);
48 mSendSocketIsSet = true;
49 }
50
51 void setRecvSocket(std::unique_ptr<socket_type> pSocket) {
52 mRecvSocket = std::move(pSocket);
53 mRecvSocketIsSet = true;
54 }
55
56 bool messagingConnIsReady() const {
57 return mSendSocketIsSet && mRecvSocketIsSet;
58 }
59
60private:
61
62 bool
63 mSendSocketIsSet
64 , mRecvSocketIsSet;
65
66 std::unique_ptr<socket_type>
67 mSendSocket
68 , mRecvSocket;
69};
70
71
72class EngineClient {
73
74public:
75
76 typedef std::function<void()> callback_type;
77
78public:
79
80 EngineClient(
81 scheduler_type *pScheduler
82 , ClientSockets *pClientSockets
83 , std::string pServerAddress
84 , port_type pServerPort
85 , callback_type pOnConnected
86 ) :
87 mScheduler(pScheduler)
88 , mClientSockets(pClientSockets)
89 , mResolver(*mScheduler)
90 , mSocket(nullptr)
91 , mNumMsgConn(0)
92 , mOnConnected(pOnConnected)
93 {
94 std::shared_ptr<query_type>
95 query(new query_type(
96 asio::ip::tcp::v4()
97 , pServerAddress
98 , std::to_string(static_cast<int>(pServerPort))
99 ));
100
101 mResolver.async_resolve(
102 *query
103 , [this,query](
104 system_error_type const& pError
105 , resolver_type::iterator pIterator
106 ) {
107 handleEndpointResolved(pError,pIterator);
108 }
109 );
110 }
111
112private:
113
114 void handleEndpointResolved(
115 system_error_type const& pError
116 , resolver_type::iterator pIterator
117 ) {
118 if (pError) {
119 ostringstream oss;
120 oss << "Could not resolve peer.\n" <<
121 "Error: " << pError.message();
122 throw runtime_error(oss.str());
123 }
124
125 mEndpointIterator = pIterator;
126
127 connectMessagingNext();
128 }
129
130
131 void connectMessagingNext() {
132
133 mSocket.reset(new socket_type(*mScheduler));
134
135 asio::async_connect(
136 *mSocket
137 , mEndpointIterator
138 , [this](
139 system_error_type const& pError
140 , resolver_type::iterator
141 ) {
142 handleMessagingConnected(pError);
143 }
144 );
145 }
146
147
148 void handleMessagingConnected(system_error_type const& pError) {
149
150 if (pError) {
151 // Continue to try connecting (maybe the server is not up yet)
152 connectMessagingNext();
153 return;
154 }
155
156 ++mNumMsgConn;
157
158 if (mNumMsgConn == 1) {
159 mClientSockets->setSendSocket(std::move(mSocket));
160 } else {
161 mClientSockets->setRecvSocket(std::move(mSocket));
162 }
163
164 if (mClientSockets->messagingConnIsReady()) {
165 mOnConnected();
166 } else {
167 connectMessagingNext();
168 }
169 }
170
171private:
172
173 scheduler_type *mScheduler;
174
175 ClientSockets *mClientSockets;
176
177 resolver_type mResolver;
178
179 resolver_type::iterator
180 mEndpointIterator;
181
182 std::unique_ptr<socket_type>
183 mSocket;
184
185 unsigned short mNumMsgConn;
186
187 callback_type mOnConnected;
188
189};
190
191
192class Server {
193public:
194 Server(
195 scheduler_type *pScheduler
196 , port_type pPort
197 ) :
198 mScheduler(pScheduler)
199 , mAcceptor(
200 *mScheduler
201 , endpoint_type(asio::ip::tcp::v4(),pPort)
202 , false
203 )
204 , mSocket(nullptr)
205 , mControlSocket(nullptr)
206 , mNumAccepted(0)
207 , mAuxChar(0)
208 {
209 acceptNext();
210 }
211
212private:
213
214 void acceptNext() {
215 mSocket.reset(new socket_type(*mScheduler));
216 mAcceptor.async_accept(
217 *mSocket
218 , [this](system_error_type const& pError) {
219 handleAccepted(pError);
220 }
221 );
222 }
223
224 void handleAccepted(system_error_type const& pError) {
225
226 if (pError) {
227 ostringstream oss;
228 oss << "Failure accepting connection " << mNumAccepted +1 <<
229 ".\nError: " << pError.message();
230 throw runtime_error(oss.str());
231 }
232
233 ++mNumAccepted;
234
235 if (mNumAccepted == 1) {
236 mControlSocket = std::move(mSocket);
237 acceptNext();
238 asio::async_read(
239 *mControlSocket
240 , asio::buffer(&mAuxChar,1)
241 , [this](system_error_type const& pError,size_t) {
242 handleStopSignalReceived(pError);
243 }
244 );
245 } else if (mNumAccepted < 3) { // Server accepts 3 connections
246 acceptNext();
247 }
248 }
249
250 void handleStopSignalReceived(system_error_type const& pError) {
251 if (pError) {
252 ostringstream oss;
253 oss << "Error while reading on the control socket.\n" <<
254 "Error: " << pError.message();
255 throw runtime_error(oss.str());
256 }
257
258 work.reset(nullptr);
259 }
260
261
262private:
263
264 scheduler_type *mScheduler;
265
266 acceptor_type mAcceptor;
267
268 unique_ptr<socket_type>
269 mSocket
270 , mControlSocket;
271
272 unsigned short mNumAccepted;
273
274 char mAuxChar;
275};
276
277class Client {
278public:
279
280 Client(
281 scheduler_type *pScheduler
282 , port_type pPort
283 ) :
284 mScheduler(pScheduler)
285 , mServerPort(pPort)
286 , mResolver(*pScheduler)
287 , mControlSocket(*pScheduler)
288 , mSocket(nullptr)
289 , mEngineClient(nullptr)
290 , mAuxChar(1)
291 {
292 std::shared_ptr<query_type>
293 query(new query_type(
294 asio::ip::tcp::v4()
295 , "localhost"
296 , std::to_string(static_cast<int>(pPort))
297 ));
298
299 mResolver.async_resolve(
300 *query
301 , [this,query](
302 system_error_type const& pError
303 , resolver_type::iterator pIterator
304 ) {
305 handleControlEndpointResolved(pError,pIterator);
306 }
307 );
308 }
309
310private:
311
312 void handleControlEndpointResolved(
313 system_error_type const& pError
314 , resolver_type::iterator pIterator
315 ) {
316 if (pError) {
317 ostringstream oss;
318 oss << "Could not resolver peer.\n" <<
319 "Error: " << pError.message();
320
321 throw runtime_error(oss.str());
322 }
323
324 mEndPointIterator = pIterator;
325
326 asio::async_connect(
327 mControlSocket
328 , mEndPointIterator
329 , [this](
330 system_error_type const& pError
331 , resolver_type::iterator
332 ) {
333 handleControlConnected(pError);
334 }
335 );
336 }
337
338
339 void handleControlConnected(system_error_type const& pError) {
340 if (pError) {
341 ostringstream oss;
342 oss << "Could not connect the control socket.\n" <<
343 "Error: " << pError.message();
344
345 throw runtime_error(oss.str());
346 }
347
348 mEngineClient.reset(new EngineClient(
349 mScheduler
350 , &mClientSockets
351 , "localhost"
352 , mServerPort
353 , [this](){
354 onConnectionReady();
355 }
356 ));
357 }
358
359
360 void onConnectionReady() {
361 if (mEngineClient.get() == nullptr) {
362 ostringstream oss;
363 oss << "mEngineClient address: " << hex << mEngineClient.get() << dec << endl;
364 throw runtime_error(oss.str());
365 }
366
367 cout << "\nSending the stop signal to the server ..." << endl;
368
369 asio::async_write(
370 mControlSocket
371 , asio::buffer(&mAuxChar,1)
372 , [this](system_error_type const& pError,size_t) {
373 handleWaitServerSwitchOff(pError);
374 }
375 );
376 }
377
378
379 void handleWaitServerSwitchOff(system_error_type const& pError) {
380 if (pError) {
381 ostringstream oss;
382 oss << "Error while sending termination signal to server\n" <<
383 "Error: " << pError.message();
384 throw runtime_error(oss.str());
385 }
386
387 cout << "\nWaiting to detect the server shut down ..." << endl;
388
389 asio::async_read(
390 mControlSocket
391 , asio::buffer(&mAuxChar,1)
392 , [this](system_error_type const& pError,size_t) {
393 handleTerminate(pError);
394 }
395 );
396 }
397
398
399 void handleTerminate(system_error_type const& pError) {
400
401 if (pError != asio::error::eof) {
402 ostringstream oss;
403 oss << "Unexpeced error on termination\n" <<
404 "Error: " << pError.message();
405 throw runtime_error(oss.str());
406 }
407
408 work.reset(nullptr);
409 }
410
411
412private:
413
414 scheduler_type *mScheduler;
415
416 port_type mServerPort;
417
418 ClientSockets mClientSockets;
419
420 resolver_type mResolver;
421
422 resolver_type::iterator mEndPointIterator;
423
424 socket_type mControlSocket;
425
426 unique_ptr<socket_type> mSocket;
427
428 unique_ptr<EngineClient>
429 mEngineClient;
430
431 char mAuxChar;
432};
433
434int main(int argc,char *argv[]) {
435 port_type
436 port(0);
437
438 po::options_description
439 opt_desc("OPTIONS:");
440
441 opt_desc.add_options()
442 ("server,s","Runs the server")
443 ("client,c","Runs the client")
444 ("port,p",po::value<port_type>(&port),"Server port")
445 ("help,h","Produce this help message");
446
447 po::variables_map vm;
448 po::store(po::parse_command_line(argc,argv,opt_desc),vm);
449 po::notify(vm);
450
451 if (argc == 1 || vm.count("help") > 0) {
452 cout << "\nSyntax:\n"
453" " << argv[0] << " [OPTIONS]\n" << endl;
454 cout << opt_desc << "\n" << endl;
455
456 if (vm.count("help") == 0) {
457 return EXIT_FAILURE;
458 }
459
460 return EXIT_SUCCESS;
461 }
462
463 bool
464 is_server(vm.count("server") > 0)
465 , is_client(vm.count("client") > 0);
466
467 if (is_server && is_client) {
468 throw invalid_argument("Decide if running either a server or a client");
469 }
470
471 scheduler_type io_service;
472
473 work.reset(new scheduler_type::work(io_service));
474
475 std::unique_ptr<Server>
476 server(nullptr);
477 std::unique_ptr<Client>
478 client(nullptr);
479
480 if (is_server) {
481 server.reset(new Server(&io_service,port));
482 } else {
483 client.reset(new Client(&io_service,port));
484 }
485
486 std::thread
487 thr1([&io_service](){ io_service.run(); })
488 , thr2([&io_service](){ io_service.run(); });
489
490 thr1.join();
491 thr2.join();
492
493 if (is_server) {
494 server.reset(nullptr);
495 } else {
496 client.reset(nullptr);
497 }
498
499 return EXIT_SUCCESS;
500}