Ticket #12320: 12320.patch
File 12320.patch, 26.7 KB (added by , 6 years ago) |
---|
-
doc/examples.qbk
diff --git a/doc/examples.qbk b/doc/examples.qbk index 0c7b1b8..6701ee5 100644
a b and asynchronous operations. 364 364 * [@boost_asio/example/cpp11/echo/blocking_tcp_echo_server.cpp] 365 365 * [@boost_asio/example/cpp11/echo/blocking_udp_echo_client.cpp] 366 366 * [@boost_asio/example/cpp11/echo/blocking_udp_echo_server.cpp] 367 * [@boost_asio/example/cpp11/echo/byte_buffer.hpp] 368 * [@boost_asio/example/cpp11/echo/echo_client.cpp] 369 * [@boost_asio/example/cpp11/echo/echo_server.cpp] 367 370 368 371 369 372 [heading Futures] -
example/cpp11/echo/Jamfile
diff --git a/example/cpp11/echo/Jamfile b/example/cpp11/echo/Jamfile index 278bd38..644a378 100644
a b exe blocking_udp_echo_server 61 61 : <template>asio_echo_example 62 62 blocking_udp_echo_server.cpp 63 63 ; 64 65 exe echo_client 66 : <template>asio_echo_example 67 echo_client.cpp 68 ; 69 70 exe echo_server 71 : <template>asio_echo_example 72 echo_server.cpp 73 ; -
example/cpp11/echo/Jamfile.v2
diff --git a/example/cpp11/echo/Jamfile.v2 b/example/cpp11/echo/Jamfile.v2 index 44cd7d4..84d61bb 100644
a b exe blocking_tcp_echo_client : blocking_tcp_echo_client.cpp ; 49 49 exe blocking_tcp_echo_server : blocking_tcp_echo_server.cpp ; 50 50 exe blocking_udp_echo_client : blocking_udp_echo_client.cpp ; 51 51 exe blocking_udp_echo_server : blocking_udp_echo_server.cpp ; 52 exe echo_client : echo_client.cpp ; 53 exe echo_server : echo_server.cpp ; -
new file example/cpp11/echo/byte_buffer.hpp
diff --git a/example/cpp11/echo/byte_buffer.hpp b/example/cpp11/echo/byte_buffer.hpp new file mode 100644 index 0000000..457e384
- + 1 /// 2 /// Copyright (C) 2016 Xiaoshuang LU (luxiaoshuang at qiyi dot com) 3 /// 4 /// Distributed under the Boost Software License, Version 1.0. (See accompanying 5 /// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 /// 7 8 # ifndef _BYTE_BUFFER_HPP_ 9 # define _BYTE_BUFFER_HPP_ 1 10 11 # include <cstdint> 12 13 /// 14 /// Header Size 15 /// Only 1, 2, 4, and 8 are allowed. 16 /// 17 /// Body Size 18 /// Arbitrary. 19 /// 20 class byte_buffer 21 { 22 public: 23 uintmax_t header_length_; 24 25 uintmax_t body_length_; 26 27 // where header and body reside 28 uint8_t * data_; 29 }; 30 31 # endif -
new file example/cpp11/echo/echo_client.cpp
diff --git a/example/cpp11/echo/echo_client.cpp b/example/cpp11/echo/echo_client.cpp new file mode 100644 index 0000000..eaa6392
- + 1 /// 2 /// Copyright (C) 2016 Xiaoshuang LU (luxiaoshuang at qiyi dot com) 3 /// 4 /// Distributed under the Boost Software License, Version 1.0. (See accompanying 5 /// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 /// 7 8 # include <cstdlib> 9 # include <cstring> 10 11 # include <atomic> 12 # include <chrono> 13 # include <iostream> 14 # include <string> 15 # include <thread> 16 17 # include <boost/asio.hpp> 18 # include <boost/lockfree/queue.hpp> 19 20 # include "byte_buffer.hpp" 21 22 class echo_client 23 { 24 public: 25 echo_client(boost::asio::io_service & io_service, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) 26 : 27 io_service_(io_service), 28 socket_(io_service), 29 write_queue_(1024), 30 handle_write_flag_(false) 31 { 32 this->do_connect(endpoint_iterator); 33 } 34 35 void close() 36 { 37 io_service_.post 38 ( 39 [this]() 40 { 41 socket_.close(); 42 } 43 ); 44 } 45 46 void read() 47 { 48 byte_buffer * byte_buffer_pointer = new byte_buffer; 49 50 byte_buffer_pointer->header_length_ = 4; 51 byte_buffer_pointer->body_length_ = 0; 52 byte_buffer_pointer->data_ = nullptr; 53 54 boost::asio::async_read 55 ( 56 this->socket_, 57 boost::asio::buffer(& (byte_buffer_pointer->body_length_), byte_buffer_pointer->header_length_), 58 [this, byte_buffer_pointer](boost::system::error_code const & error_code, std::size_t length) 59 { 60 # if defined(DEBUG_MODE) 61 std::cout << "async_read header complete." << std::endl; 62 # endif 63 if (! error_code) 64 { 65 if (byte_buffer_pointer->header_length_ != length) 66 { 67 delete byte_buffer_pointer; 68 69 std::cerr << "async_read header failed. Number of bytes read successfully does not equal expectation." << std::endl; 70 71 boost::system::error_code temporary; 72 73 this->socket_.close(temporary); 74 } 75 else 76 { 77 this->handle_header_read(byte_buffer_pointer); 78 } 79 } 80 else 81 { 82 delete byte_buffer_pointer; 83 84 std::cerr << "async_read header failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 85 86 boost::system::error_code temporary; 87 88 this->socket_.close(temporary); 89 } 90 } 91 ); 92 } 93 94 void write(byte_buffer * byte_buffer_pointer) 95 { 96 this->write_queue_.push(byte_buffer_pointer); 97 98 this->handle_write(); 99 } 100 101 private: 102 void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator) 103 { 104 boost::asio::async_connect 105 ( 106 this->socket_, 107 endpoint_iterator, 108 [this](boost::system::error_code const & error_code, boost::asio::ip::tcp::resolver::iterator) 109 { 110 # if defined(DEBUG_MODE) 111 std::cout << "async_connect complete." << std::endl; 112 # endif 113 if (! error_code) 114 { 115 this->read(); 116 } 117 else 118 { 119 std::cerr << "async_connect failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 120 } 121 } 122 ); 123 } 124 125 void handle_header_read(byte_buffer * byte_buffer_pointer) 126 { 127 byte_buffer_pointer->data_ = (uint8_t *)malloc(byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_); 128 # if defined(DEBUG_MODE) 129 std::cout << "(header_length = " << byte_buffer_pointer->header_length_ << ", body_length = " << byte_buffer_pointer->body_length_ << ", data = " << (uintmax_t)(byte_buffer_pointer->data_) << ")" << std::endl; 130 # endif 131 memcpy(byte_buffer_pointer->data_, & (byte_buffer_pointer->body_length_), byte_buffer_pointer->header_length_); 132 133 boost::asio::async_read 134 ( 135 this->socket_, 136 boost::asio::buffer(byte_buffer_pointer->data_ + byte_buffer_pointer->header_length_, byte_buffer_pointer->body_length_), 137 [this, byte_buffer_pointer](boost::system::error_code const & error_code, std::size_t length) 138 { 139 # if defined(DEBUG_MODE) 140 std::cout << "async_read body complete." << std::endl; 141 # endif 142 if (! error_code) 143 { 144 if (byte_buffer_pointer->body_length_ != length) 145 { 146 free(byte_buffer_pointer->data_); 147 delete byte_buffer_pointer; 148 149 std::cerr << "async_read body failed. Number of bytes read successfully does not equal expectation." << std::endl; 150 151 boost::system::error_code temporary; 152 153 this->socket_.close(temporary); 154 } 155 else 156 { 157 this->handle_body_read(byte_buffer_pointer); 158 } 159 } 160 else 161 { 162 free(byte_buffer_pointer->data_); 163 delete byte_buffer_pointer; 164 165 std::cerr << "async_read body failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 166 167 boost::system::error_code temporary; 168 169 this->socket_.close(temporary); 170 } 171 } 172 ); 173 } 174 175 void handle_body_read(byte_buffer * byte_buffer_pointer) 176 { 177 std::string string((char *)(byte_buffer_pointer->data_ + byte_buffer_pointer->header_length_), byte_buffer_pointer->body_length_); 178 179 std::cout << "server: " << string << std::endl; 180 181 free(byte_buffer_pointer->data_); 182 delete byte_buffer_pointer; 183 184 this->read(); 185 } 186 187 void handle_write() 188 { 189 if (! this->write_queue_.empty()) 190 { 191 bool expected = false; 192 bool desired = true; 193 194 if (!this->handle_write_flag_.compare_exchange_weak(expected, desired)) 195 { 196 return; 197 } 198 199 byte_buffer * byte_buffer_pointer; 200 201 if (this->write_queue_.pop(byte_buffer_pointer)) 202 { 203 boost::asio::async_write 204 ( 205 this->socket_, 206 boost::asio::buffer(byte_buffer_pointer->data_, byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_), 207 [this, byte_buffer_pointer](boost::system::error_code const & error_code, std::size_t length) 208 { 209 # if defined(DEBUG_MODE) 210 std::cout << "async_write complete." << std::endl; 211 # endif 212 this->handle_write_flag_.store(false); 213 214 if (! error_code) 215 { 216 if (byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_ != length) 217 { 218 std::cerr << "async_write failed. Number of bytes written successfully does not equal expectation." << std::endl; 219 220 boost::system::error_code temporary; 221 222 this->socket_.close(temporary); 223 } 224 else 225 { 226 this->handle_write(); 227 } 228 } 229 else 230 { 231 std::cerr << "async_write failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 232 233 boost::system::error_code temporary; 234 235 this->socket_.close(temporary); 236 } 237 238 free(byte_buffer_pointer->data_); 239 delete byte_buffer_pointer; 240 } 241 ); 242 } 243 else 244 { 245 std::cerr << "It is impossible to reach here." << std::endl;; 246 247 this->handle_write_flag_.store(false); 248 } 249 } 250 } 251 252 boost::asio::io_service & io_service_; 253 boost::asio::ip::tcp::socket socket_; 254 // high performance concurrent queue is required 255 boost::lockfree::queue<byte_buffer *> write_queue_; 256 std::atomic<bool> handle_write_flag_; 257 }; 258 259 int main(int argc, char ** argv) 260 { 261 if (argc != 3) 262 { 263 std::cerr << "SYNOPSIS: echo_client <address> <port>" << std::endl; 264 return -1; 265 } 266 267 try 268 { 269 boost::asio::io_service io_service; 270 271 boost::asio::io_service::work work(io_service); 272 273 boost::asio::ip::tcp::resolver resolver(io_service); 274 275 boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve({ argv[1], argv[2] }); 276 277 echo_client client(io_service, endpoint_iterator); 278 279 std::atomic<uintmax_t> call_id_generator(0); 280 281 int io_service_thread_number = 8; 282 283 std::vector<std::thread *> io_service_thread_vector; 284 285 int business_logic_thread_vector_number = 8; 286 287 std::vector<std::thread *> business_logic_thread_vector; 288 289 for (int i = 0; i < io_service_thread_number; ++ i) 290 { 291 std::thread * thread = 292 new std::thread 293 ( 294 [& io_service] 295 { 296 io_service.run(); 297 } 298 ); 299 300 io_service_thread_vector.push_back(thread); 301 } 302 303 for (int i = 0; i < business_logic_thread_vector_number; ++ i) 304 { 305 std::thread * thread = 306 new std::thread 307 ( 308 [& client, & call_id_generator] 309 { 310 while (true) 311 { 312 std::string string = "who's your daddy? " + std::to_string(call_id_generator.fetch_add(1)); 313 314 byte_buffer * byte_buffer_pointer = new byte_buffer; 315 316 byte_buffer_pointer->header_length_ = 4; 317 byte_buffer_pointer->body_length_ = string.length(); 318 byte_buffer_pointer->data_ = (uint8_t *)malloc(byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_); 319 320 memcpy(byte_buffer_pointer->data_, &(byte_buffer_pointer->body_length_), byte_buffer_pointer->header_length_); 321 memcpy(byte_buffer_pointer->data_ + byte_buffer_pointer->header_length_, string.c_str(), string.length()); 322 323 client.write(byte_buffer_pointer); 324 325 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 326 } 327 } 328 ); 329 330 business_logic_thread_vector.push_back(thread); 331 } 332 333 while (true) 334 { 335 std::this_thread::sleep_for(std::chrono::hours(INTMAX_MAX)); 336 } 337 338 for (int i = 0; i < business_logic_thread_vector_number; ++ i) 339 { 340 business_logic_thread_vector[i]->join(); 341 } 342 343 client.close(); 344 345 for (int i = 0; i < io_service_thread_number; ++ i) 346 { 347 io_service_thread_vector[i]->join(); 348 } 349 } 350 catch (std::exception & exception) 351 { 352 std::cerr << "Exception: " << exception.what() << std::endl; 353 } 354 355 return 0; 356 } -
new file example/cpp11/echo/echo_server.cpp
diff --git a/example/cpp11/echo/echo_server.cpp b/example/cpp11/echo/echo_server.cpp new file mode 100644 index 0000000..0ba32d8
- + 1 /// 2 /// Copyright (C) 2016 Xiaoshuang LU (luxiaoshuang at qiyi dot com) 3 /// 4 /// Distributed under the Boost Software License, Version 1.0. (See accompanying 5 /// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 /// 7 8 # include <cstdlib> 9 # include <cstring> 10 11 # include <chrono> 12 # include <iostream> 13 # include <memory> 14 # include <string> 15 # include <thread> 16 17 # include <boost/asio.hpp> 18 # include <boost/lockfree/queue.hpp> 19 20 # include "byte_buffer.hpp" 21 22 class server_socket 23 : 24 public std::enable_shared_from_this<server_socket> 25 { 26 public: 27 server_socket(boost::asio::io_service & io_service) 28 : 29 socket_(io_service), 30 write_queue_(1024), 31 handle_write_flag_(false) 32 { 33 } 34 35 boost::asio::ip::tcp::socket & get_socket() 36 { 37 return this->socket_; 38 } 39 40 void read() 41 { 42 byte_buffer * byte_buffer_pointer = new byte_buffer; 43 44 byte_buffer_pointer->header_length_ = 4; 45 byte_buffer_pointer->body_length_ = 0; 46 byte_buffer_pointer->data_ = nullptr; 47 48 std::shared_ptr<server_socket> self(shared_from_this()); 49 50 boost::asio::async_read 51 ( 52 this->socket_, 53 boost::asio::buffer(& (byte_buffer_pointer->body_length_), byte_buffer_pointer->header_length_), 54 [this, self, byte_buffer_pointer](boost::system::error_code const & error_code, std::size_t length) 55 { 56 # if defined(DEBUG_MODE) 57 std::cout << "async_read header complete." << std::endl; 58 # endif 59 if (! error_code) 60 { 61 if (byte_buffer_pointer->header_length_ != length) 62 { 63 delete byte_buffer_pointer; 64 65 std::cerr << "async_read header failed. Number of bytes read successfully does not equal expectation." << std::endl; 66 67 boost::system::error_code temporary; 68 69 this->socket_.close(temporary); 70 } 71 else 72 { 73 this->handle_header_read(byte_buffer_pointer); 74 } 75 } 76 else 77 { 78 delete byte_buffer_pointer; 79 80 std::cerr << "async_read header failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 81 82 boost::system::error_code temporary; 83 84 this->socket_.close(temporary); 85 } 86 } 87 ); 88 } 89 90 void write(byte_buffer * byte_buffer_pointer) 91 { 92 this->write_queue_.push(byte_buffer_pointer); 93 94 this->handle_write(); 95 } 96 97 private: 98 void handle_header_read(byte_buffer * byte_buffer_pointer) 99 { 100 byte_buffer_pointer->data_ = (uint8_t *)malloc(byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_); 101 # if defined(DEBUG_MODE) 102 std::cout << "(header_length = " << byte_buffer_pointer->header_length_ << ", body_length = " << byte_buffer_pointer->body_length_ << ", data = " << (uintmax_t)(byte_buffer_pointer->data_) << ")" << std::endl; 103 # endif 104 memcpy(byte_buffer_pointer->data_, & (byte_buffer_pointer->body_length_), byte_buffer_pointer->header_length_); 105 106 std::shared_ptr<server_socket> self(shared_from_this()); 107 108 boost::asio::async_read 109 ( 110 this->socket_, 111 boost::asio::buffer(byte_buffer_pointer->data_ + byte_buffer_pointer->header_length_, byte_buffer_pointer->body_length_), 112 [this, self, byte_buffer_pointer](boost::system::error_code const & error_code, std::size_t length) 113 { 114 # if defined(DEBUG_MODE) 115 std::cout << "async_read body complete." << std::endl; 116 # endif 117 if (! error_code) 118 { 119 if (byte_buffer_pointer->body_length_ != length) 120 { 121 free(byte_buffer_pointer->data_); 122 delete byte_buffer_pointer; 123 124 std::cerr << "async_read body failed. Number of bytes read successfully does not equal expectation." << std::endl; 125 126 boost::system::error_code temporary; 127 128 this->socket_.close(temporary); 129 } 130 else 131 { 132 this->handle_body_read(byte_buffer_pointer); 133 } 134 } 135 else 136 { 137 free(byte_buffer_pointer->data_); 138 delete byte_buffer_pointer; 139 140 std::cerr << "async_read body failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 141 142 boost::system::error_code temporary; 143 144 this->socket_.close(temporary); 145 } 146 } 147 ); 148 } 149 150 void handle_body_read(byte_buffer * byte_buffer_pointer) 151 { 152 std::string string((char *)(byte_buffer_pointer->data_ + byte_buffer_pointer->header_length_), byte_buffer_pointer->body_length_); 153 154 std::cout << "client: " << string << std::endl; 155 156 this->write(byte_buffer_pointer); 157 158 this->read(); 159 } 160 161 void handle_write() 162 { 163 if (! this->write_queue_.empty()) 164 { 165 bool expected = false; 166 bool desired = true; 167 168 if (!this->handle_write_flag_.compare_exchange_weak(expected, desired)) 169 { 170 return; 171 } 172 173 byte_buffer * byte_buffer_pointer; 174 175 if (this->write_queue_.pop(byte_buffer_pointer)) 176 { 177 std::shared_ptr<server_socket> self(shared_from_this()); 178 179 boost::asio::async_write 180 ( 181 this->socket_, 182 boost::asio::buffer(byte_buffer_pointer->data_, byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_), 183 [this, self, byte_buffer_pointer](boost::system::error_code const & error_code, std::size_t length) 184 { 185 # if defined(DEBUG_MODE) 186 std::cout << "async_write complete." << std::endl; 187 # endif 188 this->handle_write_flag_.store(false); 189 190 if (! error_code) 191 { 192 if (byte_buffer_pointer->header_length_ + byte_buffer_pointer->body_length_ != length) 193 { 194 std::cerr << "async_write failed. Number of bytes written successfully does not equal expectation." << std::endl; 195 196 boost::system::error_code temporary; 197 198 this->socket_.close(temporary); 199 } 200 else 201 { 202 this->handle_write(); 203 } 204 } 205 else 206 { 207 std::cerr << "async_write failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 208 209 boost::system::error_code temporary; 210 211 this->socket_.close(temporary); 212 } 213 214 free(byte_buffer_pointer->data_); 215 delete byte_buffer_pointer; 216 } 217 ); 218 } 219 else 220 { 221 std::cerr << "It is impossible to reach here." << std::endl;; 222 223 this->handle_write_flag_.store(false); 224 } 225 } 226 } 227 228 boost::asio::ip::tcp::socket socket_; 229 // high performance concurrent queue is required 230 boost::lockfree::queue<byte_buffer *> write_queue_; 231 std::atomic<bool> handle_write_flag_; 232 }; 233 234 class echo_server 235 { 236 public: 237 echo_server(boost::asio::io_service & io_service, short port) 238 : 239 io_service_(io_service), 240 acceptor_(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) 241 { 242 this->do_accept(); 243 } 244 245 private: 246 void do_accept() 247 { 248 std::shared_ptr<server_socket> serverSocket{ new server_socket(this->io_service_) }; 249 250 this->acceptor_.async_accept 251 ( 252 serverSocket->get_socket(), 253 [this, serverSocket](boost::system::error_code const & error_code) 254 { 255 # if defined(DEBUG_MODE) 256 std::cout << "async_accept complete." << std::endl; 257 # endif 258 if (! error_code) 259 { 260 serverSocket->read(); 261 262 this->do_accept(); 263 } 264 else 265 { 266 std::cerr << "async_accept failed. (error_code = " << error_code.value() << ", error_message = " << error_code.message() << ", error_category = " << error_code.category().name() << ")" << std::endl; 267 } 268 } 269 ); 270 } 271 272 boost::asio::io_service & io_service_; 273 boost::asio::ip::tcp::acceptor acceptor_; 274 }; 275 276 int main(int argc, char ** argv) 277 { 278 if (argc != 2) 279 { 280 std::cerr << "SYNOPSIS: echo_server <port>" << std::endl; 281 282 return -1; 283 } 284 285 try 286 { 287 boost::asio::io_service io_service; 288 289 boost::asio::io_service::work work(io_service); 290 291 echo_server server(io_service, std::atoi(argv[1])); 292 293 int io_service_thread_number = 8; 294 295 std::vector<std::thread *> io_service_thread_vector; 296 297 for (int i = 0; i < io_service_thread_number; ++ i) 298 { 299 std::thread * thread = 300 new std::thread 301 ( 302 [& io_service] 303 { 304 io_service.run(); 305 } 306 ); 307 308 io_service_thread_vector.push_back(thread); 309 } 310 311 while (true) 312 { 313 std::this_thread::sleep_for(std::chrono::hours(INTMAX_MAX)); 314 } 315 316 for (int i = 0; i < io_service_thread_number; ++ i) 317 { 318 io_service_thread_vector[i]->join(); 319 } 320 } 321 catch (std::exception & exception) 322 { 323 std::cerr << "Exception: " << exception.what() << std::endl; 324 } 325 326 return 0; 327 }