#include #include #include #include #include #include #include #include #include #include #include #include using namespace std; using namespace boost; namespace po = program_options; typedef unsigned short port_type; typedef asio::io_service scheduler_type; typedef asio::ip::tcp::acceptor acceptor_type; typedef asio::ip::tcp::endpoint endpoint_type; typedef asio::ip::tcp::resolver resolver_type; typedef resolver_type::query query_type; typedef asio::ip::tcp::socket socket_type; typedef boost::system::error_code system_error_type; std::unique_ptr work(nullptr); class ClientSockets { public: ClientSockets() : mSendSocketIsSet(false) , mRecvSocketIsSet(false) , mSendSocket(nullptr) , mRecvSocket(nullptr) {} void setSendSocket(std::unique_ptr pSocket) { mSendSocket = std::move(pSocket); mSendSocketIsSet = true; } void setRecvSocket(std::unique_ptr pSocket) { mRecvSocket = std::move(pSocket); mRecvSocketIsSet = true; } bool messagingConnIsReady() const { return mSendSocketIsSet && mRecvSocketIsSet; } private: bool mSendSocketIsSet , mRecvSocketIsSet; std::unique_ptr mSendSocket , mRecvSocket; }; class EngineClient { public: typedef std::function callback_type; public: EngineClient( scheduler_type *pScheduler , ClientSockets *pClientSockets , resolver_type::iterator pEndPointIterator , callback_type pOnConnected ) : mScheduler(pScheduler) , mClientSockets(pClientSockets) , mEndPointIterator(pEndPointIterator) , mSocket(nullptr) , mNumMsgConn(0) , mOnConnected(pOnConnected) { connectMessagingNext(); } private: void connectMessagingNext() { mSocket.reset(new socket_type(*mScheduler)); asio::async_connect( *mSocket , mEndPointIterator , [this]( system_error_type const& pError , resolver_type::iterator ) { handleMessagingConnected(pError); } ); } void handleMessagingConnected(system_error_type const& pError) { if (pError) { // Continue to try connecting (maybe the server is not up yet) connectMessagingNext(); return; } ++mNumMsgConn; if (mNumMsgConn == 1) { mClientSockets->setSendSocket(std::move(mSocket)); } else { mClientSockets->setRecvSocket(std::move(mSocket)); } if (mClientSockets->messagingConnIsReady()) { mOnConnected(); } else { connectMessagingNext(); } } private: scheduler_type *mScheduler; ClientSockets *mClientSockets; resolver_type::iterator mEndPointIterator; std::unique_ptr mSocket; unsigned short mNumMsgConn; callback_type mOnConnected; }; class Server { public: Server( scheduler_type *pScheduler , port_type pPort ) : mScheduler(pScheduler) , mAcceptor( *mScheduler , endpoint_type(asio::ip::tcp::v4(),pPort) , false ) , mSocket(nullptr) , mControlSocket(nullptr) , mNumAccepted(0) , mAuxChar(0) { acceptNext(); } private: void acceptNext() { mSocket.reset(new socket_type(*mScheduler)); mAcceptor.async_accept( *mSocket , [this](system_error_type const& pError) { handleAccepted(pError); } ); } void handleAccepted(system_error_type const& pError) { if (pError) { ostringstream oss; oss << "Failure accepting connection " << mNumAccepted +1 << ".\nError: " << pError.message(); throw runtime_error(oss.str()); } ++mNumAccepted; if (mNumAccepted == 1) { mControlSocket = std::move(mSocket); acceptNext(); asio::async_read( *mControlSocket , asio::buffer(&mAuxChar,1) , [this](system_error_type const& pError,size_t) { handleStopSignalReceived(pError); } ); } else if (mNumAccepted < 3) { // Server accepts 3 connections acceptNext(); } } void handleStopSignalReceived(system_error_type const& pError) { if (pError) { ostringstream oss; oss << "Error while reading on the control socket.\n" << "Error: " << pError.message(); throw runtime_error(oss.str()); } work.reset(nullptr); } private: scheduler_type *mScheduler; acceptor_type mAcceptor; unique_ptr mSocket , mControlSocket; unsigned short mNumAccepted; char mAuxChar; }; class Client { public: Client( scheduler_type *pScheduler , port_type pPort ) : mScheduler(pScheduler) , mServerPort(pPort) , mResolver(*pScheduler) , mControlSocket(*pScheduler) , mSocket(nullptr) , mEngineClient(nullptr) , mAuxChar(1) { std::shared_ptr query(new query_type( asio::ip::tcp::v4() , "localhost" , std::to_string(static_cast(pPort)) )); mResolver.async_resolve( *query , [this,query]( system_error_type const& pError , resolver_type::iterator pIterator ) { handleControlEndpointResolved(pError,pIterator); } ); } private: void handleControlEndpointResolved( system_error_type const& pError , resolver_type::iterator pIterator ) { if (pError) { ostringstream oss; oss << "Could not resolver peer.\n" << "Error: " << pError.message(); throw runtime_error(oss.str()); } mEndPointIterator = pIterator; asio::async_connect( mControlSocket , mEndPointIterator , [this]( system_error_type const& pError , resolver_type::iterator ) { handleControlConnected(pError); } ); } void handleControlConnected(system_error_type const& pError) { if (pError) { ostringstream oss; oss << "Could not connect the control socket.\n" << "Error: " << pError.message(); throw runtime_error(oss.str()); } mEngineClient.reset(new EngineClient( mScheduler , &mClientSockets , mEndPointIterator , [this](){ onConnectionReady(); } )); } void onConnectionReady() { if (mEngineClient.get() == nullptr) { ostringstream oss; oss << "mEngineClient address: " << hex << mEngineClient.get() << dec << endl; throw runtime_error(oss.str()); } cout << "\nSending the stop signal to the server ..." << endl; asio::async_write( mControlSocket , asio::buffer(&mAuxChar,1) , [this](system_error_type const& pError,size_t) { handleWaitServerSwitchOff(pError); } ); } void handleWaitServerSwitchOff(system_error_type const& pError) { if (pError) { ostringstream oss; oss << "Error while sending termination signal to server\n" << "Error: " << pError.message(); throw runtime_error(oss.str()); } cout << "\nWaiting to detect the server shut down ..." << endl; asio::async_read( mControlSocket , asio::buffer(&mAuxChar,1) , [this](system_error_type const& pError,size_t) { handleTerminate(pError); } ); } void handleTerminate(system_error_type const& pError) { if (pError != asio::error::eof) { ostringstream oss; oss << "Unexpeced error on termination\n" << "Error: " << pError.message(); throw runtime_error(oss.str()); } work.reset(nullptr); } private: scheduler_type *mScheduler; port_type mServerPort; ClientSockets mClientSockets; resolver_type mResolver; resolver_type::iterator mEndPointIterator; socket_type mControlSocket; unique_ptr mSocket; unique_ptr mEngineClient; char mAuxChar; }; int main(int argc,char *argv[]) { port_type port(0); po::options_description opt_desc("OPTIONS:"); opt_desc.add_options() ("server,s","Runs the server") ("client,c","Runs the client") ("port,p",po::value(&port),"Server port") ("help,h","Produce this help message"); po::variables_map vm; po::store(po::parse_command_line(argc,argv,opt_desc),vm); po::notify(vm); if (argc == 1 || vm.count("help") > 0) { cout << "\nSyntax:\n" " " << argv[0] << " [OPTIONS]\n" << endl; cout << opt_desc << "\n" << endl; if (vm.count("help") == 0) { return EXIT_FAILURE; } return EXIT_SUCCESS; } bool is_server(vm.count("server") > 0) , is_client(vm.count("client") > 0); if (is_server && is_client) { throw invalid_argument("Decide if running either a server or a client"); } scheduler_type io_service; work.reset(new scheduler_type::work(io_service)); std::unique_ptr server(nullptr); std::unique_ptr client(nullptr); if (is_server) { server.reset(new Server(&io_service,port)); } else { client.reset(new Client(&io_service,port)); } std::thread thr1([&io_service](){ io_service.run(); }) , thr2([&io_service](){ io_service.run(); }); thr1.join(); thr2.join(); if (is_server) { server.reset(nullptr); } else { client.reset(nullptr); } return EXIT_SUCCESS; }