Ticket #12474: test_engine_client_dbg_4.cpp

File test_engine_client_dbg_4.cpp, 11.0 KB (added by michele.de.stefano@…, 6 years ago)

This is the code that does NOT have the race condition

Line 
1#include <functional>
2#include <iostream>
3#include <memory>
4#include <sstream>
5#include <stdexcept>
6#include <thread>
7
8#include <boost/asio/io_service.hpp>
9#include <boost/asio/connect.hpp>
10#include <boost/asio/read.hpp>
11#include <boost/asio/write.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/program_options.hpp>
14
15using namespace std;
16using namespace boost;
17
18namespace po = program_options;
19
20typedef unsigned short port_type;
21
22typedef asio::io_service scheduler_type;
23typedef asio::ip::tcp::acceptor acceptor_type;
24typedef asio::ip::tcp::endpoint endpoint_type;
25typedef asio::ip::tcp::resolver resolver_type;
26typedef resolver_type::query query_type;
27typedef asio::ip::tcp::socket socket_type;
28
29typedef boost::system::error_code system_error_type;
30
31std::unique_ptr<scheduler_type::work>
32 work(nullptr);
33
34
35class ClientSockets {
36
37public:
38
39 ClientSockets() :
40 mSendSocketIsSet(false)
41 , mRecvSocketIsSet(false)
42 , mSendSocket(nullptr)
43 , mRecvSocket(nullptr)
44 {}
45
46 void setSendSocket(std::unique_ptr<socket_type> pSocket) {
47 mSendSocket = std::move(pSocket);
48 mSendSocketIsSet = true;
49 }
50
51 void setRecvSocket(std::unique_ptr<socket_type> pSocket) {
52 mRecvSocket = std::move(pSocket);
53 mRecvSocketIsSet = true;
54 }
55
56 bool messagingConnIsReady() const {
57 return mSendSocketIsSet && mRecvSocketIsSet;
58 }
59
60private:
61
62 bool
63 mSendSocketIsSet
64 , mRecvSocketIsSet;
65
66 std::unique_ptr<socket_type>
67 mSendSocket
68 , mRecvSocket;
69};
70
71
72class EngineClient {
73
74public:
75
76 typedef std::function<void()> callback_type;
77
78public:
79
80 EngineClient(
81 scheduler_type *pScheduler
82 , ClientSockets *pClientSockets
83 , resolver_type::iterator pEndPointIterator
84 , callback_type pOnConnected
85 ) :
86 mScheduler(pScheduler)
87 , mClientSockets(pClientSockets)
88 , mEndPointIterator(pEndPointIterator)
89 , mSocket(nullptr)
90 , mNumMsgConn(0)
91 , mOnConnected(pOnConnected)
92 {
93 connectMessagingNext();
94 }
95
96private:
97
98 void connectMessagingNext() {
99
100 mSocket.reset(new socket_type(*mScheduler));
101
102 asio::async_connect(
103 *mSocket
104 , mEndPointIterator
105 , [this](
106 system_error_type const& pError
107 , resolver_type::iterator
108 ) {
109 handleMessagingConnected(pError);
110 }
111 );
112 }
113
114
115 void handleMessagingConnected(system_error_type const& pError) {
116
117 if (pError) {
118 // Continue to try connecting (maybe the server is not up yet)
119 connectMessagingNext();
120 return;
121 }
122
123 ++mNumMsgConn;
124
125 if (mNumMsgConn == 1) {
126 mClientSockets->setSendSocket(std::move(mSocket));
127 } else {
128 mClientSockets->setRecvSocket(std::move(mSocket));
129 }
130
131 if (mClientSockets->messagingConnIsReady()) {
132 mOnConnected();
133 } else {
134 connectMessagingNext();
135 }
136 }
137
138private:
139
140 scheduler_type *mScheduler;
141
142 ClientSockets *mClientSockets;
143
144 resolver_type::iterator
145 mEndPointIterator;
146
147 std::unique_ptr<socket_type>
148 mSocket;
149
150 unsigned short mNumMsgConn;
151
152 callback_type mOnConnected;
153
154};
155
156
157class Server {
158public:
159 Server(
160 scheduler_type *pScheduler
161 , port_type pPort
162 ) :
163 mScheduler(pScheduler)
164 , mAcceptor(
165 *mScheduler
166 , endpoint_type(asio::ip::tcp::v4(),pPort)
167 , false
168 )
169 , mSocket(nullptr)
170 , mControlSocket(nullptr)
171 , mNumAccepted(0)
172 , mAuxChar(0)
173 {
174 acceptNext();
175 }
176
177private:
178
179 void acceptNext() {
180 mSocket.reset(new socket_type(*mScheduler));
181 mAcceptor.async_accept(
182 *mSocket
183 , [this](system_error_type const& pError) {
184 handleAccepted(pError);
185 }
186 );
187 }
188
189 void handleAccepted(system_error_type const& pError) {
190
191 if (pError) {
192 ostringstream oss;
193 oss << "Failure accepting connection " << mNumAccepted +1 <<
194 ".\nError: " << pError.message();
195 throw runtime_error(oss.str());
196 }
197
198 ++mNumAccepted;
199
200 if (mNumAccepted == 1) {
201 mControlSocket = std::move(mSocket);
202 acceptNext();
203 asio::async_read(
204 *mControlSocket
205 , asio::buffer(&mAuxChar,1)
206 , [this](system_error_type const& pError,size_t) {
207 handleStopSignalReceived(pError);
208 }
209 );
210 } else if (mNumAccepted < 3) { // Server accepts 3 connections
211 acceptNext();
212 }
213 }
214
215 void handleStopSignalReceived(system_error_type const& pError) {
216 if (pError) {
217 ostringstream oss;
218 oss << "Error while reading on the control socket.\n" <<
219 "Error: " << pError.message();
220 throw runtime_error(oss.str());
221 }
222
223 work.reset(nullptr);
224 }
225
226
227private:
228
229 scheduler_type *mScheduler;
230
231 acceptor_type mAcceptor;
232
233 unique_ptr<socket_type>
234 mSocket
235 , mControlSocket;
236
237 unsigned short mNumAccepted;
238
239 char mAuxChar;
240};
241
242class Client {
243public:
244
245 Client(
246 scheduler_type *pScheduler
247 , port_type pPort
248 ) :
249 mScheduler(pScheduler)
250 , mServerPort(pPort)
251 , mResolver(*pScheduler)
252 , mControlSocket(*pScheduler)
253 , mSocket(nullptr)
254 , mEngineClient(nullptr)
255 , mAuxChar(1)
256 {
257 std::shared_ptr<query_type>
258 query(new query_type(
259 asio::ip::tcp::v4()
260 , "localhost"
261 , std::to_string(static_cast<int>(pPort))
262 ));
263
264 mResolver.async_resolve(
265 *query
266 , [this,query](
267 system_error_type const& pError
268 , resolver_type::iterator pIterator
269 ) {
270 handleControlEndpointResolved(pError,pIterator);
271 }
272 );
273 }
274
275private:
276
277 void handleControlEndpointResolved(
278 system_error_type const& pError
279 , resolver_type::iterator pIterator
280 ) {
281 if (pError) {
282 ostringstream oss;
283 oss << "Could not resolver peer.\n" <<
284 "Error: " << pError.message();
285
286 throw runtime_error(oss.str());
287 }
288
289 mEndPointIterator = pIterator;
290
291 asio::async_connect(
292 mControlSocket
293 , mEndPointIterator
294 , [this](
295 system_error_type const& pError
296 , resolver_type::iterator
297 ) {
298 handleControlConnected(pError);
299 }
300 );
301 }
302
303
304 void handleControlConnected(system_error_type const& pError) {
305 if (pError) {
306 ostringstream oss;
307 oss << "Could not connect the control socket.\n" <<
308 "Error: " << pError.message();
309
310 throw runtime_error(oss.str());
311 }
312
313 mEngineClient.reset(new EngineClient(
314 mScheduler
315 , &mClientSockets
316 , mEndPointIterator
317 , [this](){
318 onConnectionReady();
319 }
320 ));
321 }
322
323
324 void onConnectionReady() {
325 if (mEngineClient.get() == nullptr) {
326 ostringstream oss;
327 oss << "mEngineClient address: " << hex << mEngineClient.get() << dec << endl;
328 throw runtime_error(oss.str());
329 }
330
331 cout << "\nSending the stop signal to the server ..." << endl;
332
333 asio::async_write(
334 mControlSocket
335 , asio::buffer(&mAuxChar,1)
336 , [this](system_error_type const& pError,size_t) {
337 handleWaitServerSwitchOff(pError);
338 }
339 );
340 }
341
342
343 void handleWaitServerSwitchOff(system_error_type const& pError) {
344 if (pError) {
345 ostringstream oss;
346 oss << "Error while sending termination signal to server\n" <<
347 "Error: " << pError.message();
348 throw runtime_error(oss.str());
349 }
350
351 cout << "\nWaiting to detect the server shut down ..." << endl;
352
353 asio::async_read(
354 mControlSocket
355 , asio::buffer(&mAuxChar,1)
356 , [this](system_error_type const& pError,size_t) {
357 handleTerminate(pError);
358 }
359 );
360 }
361
362
363 void handleTerminate(system_error_type const& pError) {
364
365 if (pError != asio::error::eof) {
366 ostringstream oss;
367 oss << "Unexpeced error on termination\n" <<
368 "Error: " << pError.message();
369 throw runtime_error(oss.str());
370 }
371
372 work.reset(nullptr);
373 }
374
375
376private:
377
378 scheduler_type *mScheduler;
379
380 port_type mServerPort;
381
382 ClientSockets mClientSockets;
383
384 resolver_type mResolver;
385
386 resolver_type::iterator mEndPointIterator;
387
388 socket_type mControlSocket;
389
390 unique_ptr<socket_type> mSocket;
391
392 unique_ptr<EngineClient>
393 mEngineClient;
394
395 char mAuxChar;
396};
397
398int main(int argc,char *argv[]) {
399 port_type
400 port(0);
401
402 po::options_description
403 opt_desc("OPTIONS:");
404
405 opt_desc.add_options()
406 ("server,s","Runs the server")
407 ("client,c","Runs the client")
408 ("port,p",po::value<port_type>(&port),"Server port")
409 ("help,h","Produce this help message");
410
411 po::variables_map vm;
412 po::store(po::parse_command_line(argc,argv,opt_desc),vm);
413 po::notify(vm);
414
415 if (argc == 1 || vm.count("help") > 0) {
416 cout << "\nSyntax:\n"
417" " << argv[0] << " [OPTIONS]\n" << endl;
418 cout << opt_desc << "\n" << endl;
419
420 if (vm.count("help") == 0) {
421 return EXIT_FAILURE;
422 }
423
424 return EXIT_SUCCESS;
425 }
426
427 bool
428 is_server(vm.count("server") > 0)
429 , is_client(vm.count("client") > 0);
430
431 if (is_server && is_client) {
432 throw invalid_argument("Decide if running either a server or a client");
433 }
434
435 scheduler_type io_service;
436
437 work.reset(new scheduler_type::work(io_service));
438
439 std::unique_ptr<Server>
440 server(nullptr);
441 std::unique_ptr<Client>
442 client(nullptr);
443
444 if (is_server) {
445 server.reset(new Server(&io_service,port));
446 } else {
447 client.reset(new Client(&io_service,port));
448 }
449
450 std::thread
451 thr1([&io_service](){ io_service.run(); })
452 , thr2([&io_service](){ io_service.run(); });
453
454 thr1.join();
455 thr2.join();
456
457 if (is_server) {
458 server.reset(nullptr);
459 } else {
460 client.reset(nullptr);
461 }
462
463 return EXIT_SUCCESS;
464}