#include #include #include #include #include #include #include #include #include #include #include #include #include typedef std::shared_ptr socket_t; // some global variables boost::asio::io_service io_service; std::vector worker_threads; std::atomic io_service_running{true}; auto work = std::make_unique(io_service); boost::asio::deadline_timer timer{io_service}; auto acceptor = boost::asio::ip::tcp::acceptor{io_service}; std::uint16_t port{3210}; // this is for debugging purposes: prints time, line and message #define TIMELINE(s) time_line(__LINE__, s) inline void time_line(int line, const std::string &s) { std::ostringstream oss; oss << std::fixed << std::setprecision(9) << std::chrono::duration( std::chrono::high_resolution_clock::now().time_since_epoch()) .count() << ": " << line << ": " << s << '\n'; std::cerr << oss.str(); } // Declarations template void server_start(std::uint16_t port, Handler); template void server_accept(Handler); template void client_connect(const std::string &ip, std::uint16_t port, Handler); template void awrite(socket_t socket, const void *buff, std::size_t length, Handler); // Implementation template void server_start(std::uint16_t port, Handler on_connect_handler) { TIMELINE("server_start(" + std::to_string(port) + ")"); boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v6::any(), port); acceptor.open(ep.protocol()); acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); boost::system::error_code ec; acceptor.bind(ep, ec); if (ec) return on_connect_handler(ec, nullptr); acceptor.listen(boost::asio::socket_base::max_connections, ec); if (ec) return on_connect_handler(ec, nullptr); server_accept(on_connect_handler); TIMELINE("server_start() -> end"); } template void server_accept(Handler on_connect_handler) { TIMELINE("server_accept()"); auto socket = std::make_shared(io_service); // the socket was created and the program waits for a connection TIMELINE("async_accept()"); acceptor.async_accept( *socket.get(), [socket, on_connect_handler](const boost::system::error_code &ec) { TIMELINE("async_accept() callback"); if (ec == boost::asio::error::operation_aborted) { // this happens if the timer expired TIMELINE("async_accept() callback: operation aborted"); } else { TIMELINE("async_accept() callback: operation not aborted"); // First keep the server listening, this is asynchronous: // start new connection in parallel server_accept(on_connect_handler); // Now handle this connection if (ec) { on_connect_handler(ec, nullptr); } else { on_connect_handler(ec, socket); } } TIMELINE("async_accept() callback -> end"); }); TIMELINE("server_accept() -> end"); } template void client_connect(const std::string &ip, std::uint16_t port, Handler on_connect_handler) { TIMELINE("client_connect(" + ip + ":" + std::to_string(port) + ")"); boost::asio::ip::tcp::resolver resolver{io_service}; boost::asio::ip::tcp::resolver::query query{ip, std::to_string(port)}; TIMELINE("resolver.resolve(" + ip + ":" + std::to_string(port) + ")"); boost::system::error_code ec; auto ep = resolver.resolve(query, ec); if (ec) return on_connect_handler(ec, nullptr); assert(ep != boost::asio::ip::tcp::resolver::iterator{}); auto socket = std::make_shared(io_service); socket->async_connect( *ep, [socket, on_connect_handler](const boost::system::error_code &ec) { TIMELINE("client callback"); on_connect_handler(ec, ec ? nullptr : socket); }); TIMELINE("client_connect() -> end"); } template void awrite(socket_t socket, const void *buff, std::size_t length, Handler on_write_handler) { TIMELINE("awrite()"); //if the timeout is set, enable the timer TIMELINE("setting timer"); timer.expires_from_now(boost::posix_time::microseconds(25)); timer.async_wait([socket](const boost::system::error_code &ec) { TIMELINE("timer callback"); if (ec != boost::asio::error::operation_aborted) { TIMELINE("socket->shutdown"); socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); TIMELINE("socket->close"); socket->close(); } TIMELINE("timer callback -> end"); }); boost::asio::async_write( *(socket.get()), boost::asio::buffer(buff, length), [socket, on_write_handler](const boost::system::error_code &ec, std::size_t len) { TIMELINE("async_write callback"); // checks if the timer expired, if so pass an error to the handler if (timer.cancel() == 0) { on_write_handler( boost::system::error_code(boost::system::errc::timed_out, boost::system::generic_category()), len); } else { on_write_handler(ec, len); } TIMELINE("async_write callback -> end"); }); TIMELINE("awrite() -> end"); } // this is to wait until client and server have finished their work void increment_ended(int &ended, std::mutex &mutex, std::condition_variable &cond) { { std::unique_lock lock(mutex); ++ended; } cond.notify_all(); } void timeout_write_test() { auto ended = 0; // number of server/client that ended std::mutex mutex; std::condition_variable cond; // the server never reads TIMELINE("server_start()"); server_start(port, [&](const boost::system::error_code &ec, socket_t socket) { TIMELINE("server callback"); assert(socket != nullptr); assert(not ec); if (socket and not ec) { { TIMELINE("waiting for condition(ended==1)"); std::unique_lock lock(mutex); cond.wait(lock, [&]() { return ended == 1; }); TIMELINE("got condition(ended==1)"); } } increment_ended(ended, mutex, cond); TIMELINE("server callback -> end"); }); // the client writes but the sever does not reads so that the timeout is called TIMELINE("client.connect()"); client_connect( "::1", port, [&](const boost::system::error_code &ec, socket_t socket) { TIMELINE("client callback"); assert(socket != nullptr and not ec); if (socket and not ec) { boost::asio::socket_base::receive_buffer_size option; socket->get_option(option); auto buf_size = 200 * option.value(); auto buff = std::make_shared>(buf_size, 0); awrite(socket, buff->data(), buf_size, [socket, buff, &ended, &mutex, &cond]( const boost::system::error_code &ec, std::size_t /*len*/) { TIMELINE("awrite callback"); assert(ec == boost::system::errc::timed_out); increment_ended(ended, mutex, cond); TIMELINE("awrite callback -> end"); }); } else { increment_ended(ended, mutex, cond); } TIMELINE("client callback -> end"); }); { TIMELINE("waiting for condition(ended==2)"); std::unique_lock lock(mutex); cond.wait(lock, [&]() { return ended == 2; }); TIMELINE("got condition(ended==2)"); } } int main() { TIMELINE("starting worker threads"); const auto nthreads = 3; worker_threads.reserve(nthreads); for (std::size_t i = 0; i < nthreads; ++i) { worker_threads.emplace_back([]() { while (io_service_running) { try { io_service.run(); } catch (...) { } } }); } TIMELINE("starting worker threads -> done"); timeout_write_test(); TIMELINE("accpetor.close()"); acceptor.close(); TIMELINE("joining worker threads"); io_service_running = false; work.reset(); for (auto &t : worker_threads) t.join(); return 0; }