Ticket #7611: asio_bug.cpp

File asio_bug.cpp, 8.3 KB (added by bronf, 5 years ago)

minimal example to reproduce the bug

Line 
1#include <chrono>
2#include <condition_variable>
3#include <cstdint>
4#include <iomanip>
5#include <iostream>
6#include <memory>
7#include <mutex>
8#include <sstream>
9#include <string>
10#include <thread>
11#include <vector>
12
13#include <boost/asio.hpp>
14#include <boost/asio/deadline_timer.hpp>
15
16typedef std::shared_ptr<boost::asio::ip::tcp::socket> socket_t;
17
18// some global variables
19boost::asio::io_service io_service;
20std::vector<std::thread> worker_threads;
21std::atomic<bool> io_service_running{true};
22auto work = std::make_unique<boost::asio::io_service::work>(io_service);
23boost::asio::deadline_timer timer{io_service};
24auto acceptor = boost::asio::ip::tcp::acceptor{io_service};
25std::uint16_t port{3210};
26
27// this is for debugging purposes: prints time, line and message
28#define TIMELINE(s) time_line(__LINE__, s)
29
30inline void time_line(int line, const std::string &s) {
31 std::ostringstream oss;
32 oss << std::fixed << std::setprecision(9)
33 << std::chrono::duration<double>(
34 std::chrono::high_resolution_clock::now().time_since_epoch())
35 .count()
36 << ": " << line << ": " << s << '\n';
37 std::cerr << oss.str();
38}
39
40// Declarations
41
42template <class Handler>
43void server_start(std::uint16_t port, Handler);
44
45template <class Handler>
46void server_accept(Handler);
47
48template <class Handler>
49void client_connect(const std::string &ip, std::uint16_t port, Handler);
50
51template <class Handler>
52void awrite(socket_t socket, const void *buff, std::size_t length, Handler);
53
54// Implementation
55
56template <class Handler>
57void server_start(std::uint16_t port, Handler on_connect_handler) {
58 TIMELINE("server_start(" + std::to_string(port) + ")");
59 boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v6::any(), port);
60 acceptor.open(ep.protocol());
61 acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
62 boost::system::error_code ec;
63 acceptor.bind(ep, ec);
64 if (ec) return on_connect_handler(ec, nullptr);
65 acceptor.listen(boost::asio::socket_base::max_connections, ec);
66 if (ec) return on_connect_handler(ec, nullptr);
67 server_accept(on_connect_handler);
68 TIMELINE("server_start() -> end");
69}
70
71template <class Handler>
72void server_accept(Handler on_connect_handler) {
73 TIMELINE("server_accept()");
74 auto socket = std::make_shared<socket_t::element_type>(io_service);
75 // the socket was created and the program waits for a connection
76 TIMELINE("async_accept()");
77 acceptor.async_accept(
78 *socket.get(),
79 [socket, on_connect_handler](const boost::system::error_code &ec) {
80 TIMELINE("async_accept() callback");
81 if (ec == boost::asio::error::operation_aborted) {
82 // this happens if the timer expired
83 TIMELINE("async_accept() callback: operation aborted");
84 } else {
85 TIMELINE("async_accept() callback: operation not aborted");
86 // First keep the server listening, this is asynchronous:
87 // start new connection in parallel
88 server_accept(on_connect_handler);
89 // Now handle this connection
90 if (ec) {
91 on_connect_handler(ec, nullptr);
92 } else {
93 on_connect_handler(ec, socket);
94 }
95 }
96 TIMELINE("async_accept() callback -> end");
97 });
98 TIMELINE("server_accept() -> end");
99}
100
101template <class Handler>
102void client_connect(const std::string &ip, std::uint16_t port,
103 Handler on_connect_handler) {
104 TIMELINE("client_connect(" + ip + ":" + std::to_string(port) + ")");
105 boost::asio::ip::tcp::resolver resolver{io_service};
106 boost::asio::ip::tcp::resolver::query query{ip, std::to_string(port)};
107 TIMELINE("resolver.resolve(" + ip + ":" + std::to_string(port) + ")");
108 boost::system::error_code ec;
109 auto ep = resolver.resolve(query, ec);
110 if (ec) return on_connect_handler(ec, nullptr);
111 assert(ep != boost::asio::ip::tcp::resolver::iterator{});
112 auto socket = std::make_shared<boost::asio::ip::tcp::socket>(io_service);
113 socket->async_connect(
114 *ep, [socket, on_connect_handler](const boost::system::error_code &ec) {
115 TIMELINE("client callback");
116 on_connect_handler(ec, ec ? nullptr : socket);
117 });
118 TIMELINE("client_connect() -> end");
119}
120
121template <class Handler>
122void awrite(socket_t socket, const void *buff, std::size_t length,
123 Handler on_write_handler) {
124 TIMELINE("awrite()");
125 //if the timeout is set, enable the timer
126 TIMELINE("setting timer");
127 timer.expires_from_now(boost::posix_time::microseconds(25));
128 timer.async_wait([socket](const boost::system::error_code &ec) {
129 TIMELINE("timer callback");
130 if (ec != boost::asio::error::operation_aborted) {
131 TIMELINE("socket->shutdown");
132 socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
133 TIMELINE("socket->close");
134 socket->close();
135 }
136 TIMELINE("timer callback -> end");
137 });
138 boost::asio::async_write(
139 *(socket.get()), boost::asio::buffer(buff, length),
140 [socket, on_write_handler](const boost::system::error_code &ec,
141 std::size_t len) {
142 TIMELINE("async_write callback");
143 // checks if the timer expired, if so pass an error to the handler
144 if (timer.cancel() == 0) {
145 on_write_handler(
146 boost::system::error_code(boost::system::errc::timed_out,
147 boost::system::generic_category()),
148 len);
149 } else {
150 on_write_handler(ec, len);
151 }
152 TIMELINE("async_write callback -> end");
153 });
154 TIMELINE("awrite() -> end");
155}
156
157// this is to wait until client and server have finished their work
158void increment_ended(int &ended, std::mutex &mutex,
159 std::condition_variable &cond) {
160 {
161 std::unique_lock<std::mutex> lock(mutex);
162 ++ended;
163 }
164 cond.notify_all();
165}
166
167void timeout_write_test() {
168 auto ended = 0; // number of server/client that ended
169 std::mutex mutex;
170 std::condition_variable cond;
171
172 // the server never reads
173 TIMELINE("server_start()");
174 server_start(port, [&](const boost::system::error_code &ec, socket_t socket) {
175 TIMELINE("server callback");
176 assert(socket != nullptr);
177 assert(not ec);
178 if (socket and not ec) {
179 {
180 TIMELINE("waiting for condition(ended==1)");
181 std::unique_lock<std::mutex> lock(mutex);
182 cond.wait(lock, [&]() { return ended == 1; });
183 TIMELINE("got condition(ended==1)");
184 }
185 }
186 increment_ended(ended, mutex, cond);
187 TIMELINE("server callback -> end");
188 });
189
190 // the client writes but the sever does not reads so that the timeout is called
191 TIMELINE("client.connect()");
192 client_connect(
193 "::1", port, [&](const boost::system::error_code &ec, socket_t socket) {
194 TIMELINE("client callback");
195 assert(socket != nullptr and not ec);
196 if (socket and not ec) {
197 boost::asio::socket_base::receive_buffer_size option;
198 socket->get_option(option);
199 auto buf_size = 200 * option.value();
200 auto buff = std::make_shared<std::vector<std::uint8_t>>(buf_size, 0);
201 awrite(socket, buff->data(), buf_size,
202 [socket, buff, &ended, &mutex, &cond](
203 const boost::system::error_code &ec, std::size_t /*len*/) {
204 TIMELINE("awrite callback");
205 assert(ec == boost::system::errc::timed_out);
206 increment_ended(ended, mutex, cond);
207 TIMELINE("awrite callback -> end");
208 });
209 } else {
210 increment_ended(ended, mutex, cond);
211 }
212 TIMELINE("client callback -> end");
213 });
214 {
215 TIMELINE("waiting for condition(ended==2)");
216 std::unique_lock<std::mutex> lock(mutex);
217 cond.wait(lock, [&]() { return ended == 2; });
218 TIMELINE("got condition(ended==2)");
219 }
220}
221
222int main() {
223 TIMELINE("starting worker threads");
224 const auto nthreads = 3;
225 worker_threads.reserve(nthreads);
226 for (std::size_t i = 0; i < nthreads; ++i) {
227 worker_threads.emplace_back([]() {
228 while (io_service_running) {
229 try {
230 io_service.run();
231 }
232 catch (...) {
233 }
234 }
235 });
236 }
237 TIMELINE("starting worker threads -> done");
238
239 timeout_write_test();
240
241 TIMELINE("accpetor.close()");
242 acceptor.close();
243
244 TIMELINE("joining worker threads");
245 io_service_running = false;
246 work.reset();
247 for (auto &t : worker_threads) t.join();
248
249 return 0;
250}