Ticket #7027: boost-message-queue-customize.diff
File boost-message-queue-customize.diff, 21.4 KB (added by , 10 years ago) |
---|
-
boost/interprocess/ipc/message_queue.hpp
diff -ru boost_1_50_0_beta1.org/boost/interprocess/ipc/message_queue.hpp boost_1_50_0_beta1/boost/interprocess/ipc/message_queue.hpp
old new 43 43 namespace boost{ namespace interprocess{ 44 44 45 45 //!A class that allows sending messages 46 //!between processes. 47 template<class VoidPointer >48 class message_queue_ t46 //!between processes. Allows customization of the queue data storage. 47 template<class VoidPointer, class Impl> 48 class message_queue_base_t 49 49 { 50 protected: 50 51 /// @cond 51 52 //Blocking modes 52 53 enum block_t { blocking, timed, non_blocking }; 53 54 54 message_queue_ t();55 message_queue_base_t() {} 55 56 /// @endcond 56 57 57 58 public: … … 62 63 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; 63 64 typedef typename boost::make_unsigned<difference_type>::type size_type; 64 65 65 //!Creates a process shared message queue with name "name". For this message queue,66 //!the maximum number of messages will be "max_num_msg" and the maximum message size67 //!will be "max_msg_size". Throws on error and if the queue was previously created.68 message_queue_t(create_only_t create_only,69 const char *name,70 size_type max_num_msg,71 size_type max_msg_size,72 const permissions &perm = permissions());73 74 //!Opens or creates a process shared message queue with name "name".75 //!If the queue is created, the maximum number of messages will be "max_num_msg"76 //!and the maximum message size will be "max_msg_size". If queue was previously77 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters78 //!are ignored. Throws on error.79 message_queue_t(open_or_create_t open_or_create,80 const char *name,81 size_type max_num_msg,82 size_type max_msg_size,83 const permissions &perm = permissions());84 85 //!Opens a previously created process shared message queue with name "name".86 //!If the queue was not previously created or there are no free resources,87 //!throws an error.88 message_queue_t(open_only_t open_only,89 const char *name);90 91 //!Destroys *this and indicates that the calling process is finished using92 //!the resource. All opened message queues are still93 //!valid after destruction. The destructor function will deallocate94 //!any system resources allocated by the system for use by this process for95 //!this resource. The resource can still be opened again calling96 //!the open constructor overload. To erase the message queue from the system97 //!use remove().98 ~message_queue_t();99 100 66 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the 101 67 //!message queue with priority "priority". If the message queue is full 102 68 //!the sender is blocked. Throws interprocess_error on error.*/ … … 157 123 //!Never throws 158 124 size_type get_num_msg(); 159 125 160 //!Removes the message queue from the system.161 //!Returns false on error. Never throws162 static bool remove(const char *name);163 164 126 /// @cond 165 127 private: 166 128 typedef boost::posix_time::ptime ptime; … … 173 135 const void *buffer, size_type buffer_size, 174 136 unsigned int priority, const ptime &abs_time); 175 137 138 protected: 176 139 //!Returns the needed memory size for the shared message queue. 177 140 //!Never throws 178 141 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); 179 180 ipcdetail::managed_open_or_create_impl<shared_memory_object> m_shmem;181 142 /// @endcond 143 144 //!This is the atomic functor to be executed when creating or opening 145 //!shared memory. Never throws 146 class initialization_func_t 147 { 148 public: 149 typedef typename boost::intrusive:: 150 pointer_traits<void_pointer>::template 151 rebind_pointer<char>::type char_ptr; 152 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; 153 typedef typename boost::make_unsigned<difference_type>::type size_type; 154 155 initialization_func_t(size_type maxmsg = 0, 156 size_type maxmsgsize = 0) 157 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} 158 159 bool operator()(void *address, size_type, bool created); 160 const size_type m_maxmsg; 161 const size_type m_maxmsgsize; 162 }; 182 163 }; 183 164 184 165 /// @cond … … 324 305 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, 325 306 r_index_size = ipcdetail::get_rounded_size(sizeof(msg_hdr_ptr_t)*max_num_msg, msg_hdr_align), 326 307 r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); 327 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + 328 ipcdetail::managed_open_or_create_impl<shared_memory_object>::ManagedOpenOrCreateUserOffset; 308 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size); 329 309 } 330 310 331 311 //!Initializes the memory structures to preallocate messages and constructs the … … 376 356 }; 377 357 378 358 379 //!This is the atomic functor to be executed when creating or opening 380 //!shared memory. Never throws 381 template<class VoidPointer> 382 class initialization_func_t 383 { 384 public: 385 typedef typename boost::intrusive:: 386 pointer_traits<VoidPointer>::template 387 rebind_pointer<char>::type char_ptr; 388 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; 389 typedef typename boost::make_unsigned<difference_type>::type size_type; 390 391 initialization_func_t(size_type maxmsg = 0, 392 size_type maxmsgsize = 0) 393 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} 359 } //namespace ipcdetail { 394 360 395 bool operator()(void *address, size_type, bool created) 396 { 397 char *mptr; 361 template<class VoidPointer, class Impl> 362 bool message_queue_base_t<VoidPointer, Impl>::initialization_func_t::operator()(void *address, size_type, bool created) 363 { 364 char *mptr; 398 365 399 if(created){ 400 mptr = reinterpret_cast<char*>(address); 401 //Construct the message queue header at the beginning 402 BOOST_TRY{ 403 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize); 404 } 405 BOOST_CATCH(...){ 406 return false; 407 } 408 BOOST_CATCH_END 366 if(created){ 367 mptr = reinterpret_cast<char*>(address); 368 //Construct the message queue header at the beginning 369 BOOST_TRY{ 370 new (mptr) ipcdetail::mq_hdr_t<void_pointer>(m_maxmsg, m_maxmsgsize); 409 371 } 410 return true; 372 BOOST_CATCH(...){ 373 return false; 374 } 375 BOOST_CATCH_END 411 376 } 412 const size_type m_maxmsg; 413 const size_type m_maxmsgsize; 414 }; 415 416 } //namespace ipcdetail { 417 418 template<class VoidPointer> 419 inline message_queue_t<VoidPointer>::~message_queue_t() 420 {} 377 return true; 378 } 421 379 422 template<class VoidPointer> 423 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size 380 template<class VoidPointer, class Impl> 381 inline typename message_queue_base_t<VoidPointer, Impl>::size_type 382 message_queue_base_t<VoidPointer, Impl>::get_mem_size 424 383 (size_type max_msg_size, size_type max_num_msg) 425 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); } 426 427 template<class VoidPointer> 428 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t create_only, 429 const char *name, 430 size_type max_num_msg, 431 size_type max_msg_size, 432 const permissions &perm) 433 //Create shared memory and execute functor atomically 434 : m_shmem(create_only, 435 name, 436 get_mem_size(max_msg_size, max_num_msg), 437 read_write, 438 static_cast<void*>(0), 439 //Prepare initialization functor 440 ipcdetail::initialization_func_t<VoidPointer> (max_num_msg, max_msg_size), 441 perm) 442 {} 443 444 template<class VoidPointer> 445 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t open_or_create, 446 const char *name, 447 size_type max_num_msg, 448 size_type max_msg_size, 449 const permissions &perm) 450 //Create shared memory and execute functor atomically 451 : m_shmem(open_or_create, 452 name, 453 get_mem_size(max_msg_size, max_num_msg), 454 read_write, 455 static_cast<void*>(0), 456 //Prepare initialization functor 457 ipcdetail::initialization_func_t<VoidPointer> (max_num_msg, max_msg_size), 458 perm) 459 {} 460 461 template<class VoidPointer> 462 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t open_only, 463 const char *name) 464 //Create shared memory and execute functor atomically 465 : m_shmem(open_only, 466 name, 467 read_write, 468 static_cast<void*>(0), 469 //Prepare initialization functor 470 ipcdetail::initialization_func_t<VoidPointer> ()) 471 {} 384 { return ipcdetail::mq_hdr_t<void_pointer>::get_mem_size (max_msg_size, max_num_msg) + Impl::get_data_offset(); } 472 385 473 template<class VoidPointer >474 inline void message_queue_ t<VoidPointer>::send386 template<class VoidPointer, class Impl> 387 inline void message_queue_base_t<VoidPointer, Impl>::send 475 388 (const void *buffer, size_type buffer_size, unsigned int priority) 476 389 { this->do_send(blocking, buffer, buffer_size, priority, ptime()); } 477 390 478 template<class VoidPointer >479 inline bool message_queue_ t<VoidPointer>::try_send391 template<class VoidPointer, class Impl> 392 inline bool message_queue_base_t<VoidPointer, Impl>::try_send 480 393 (const void *buffer, size_type buffer_size, unsigned int priority) 481 394 { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); } 482 395 483 template<class VoidPointer >484 inline bool message_queue_ t<VoidPointer>::timed_send396 template<class VoidPointer, class Impl> 397 inline bool message_queue_base_t<VoidPointer, Impl>::timed_send 485 398 (const void *buffer, size_type buffer_size 486 399 ,unsigned int priority, const boost::posix_time::ptime &abs_time) 487 400 { … … 492 405 return this->do_send(timed, buffer, buffer_size, priority, abs_time); 493 406 } 494 407 495 template<class VoidPointer >496 inline bool message_queue_ t<VoidPointer>::do_send(block_t block,408 template<class VoidPointer, class Impl> 409 inline bool message_queue_base_t<VoidPointer, Impl>::do_send(block_t block, 497 410 const void *buffer, size_type buffer_size, 498 411 unsigned int priority, const boost::posix_time::ptime &abs_time) 499 412 { 500 ipcdetail::mq_hdr_t< VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());413 ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data()); 501 414 //Check if buffer is smaller than maximum allowed 502 415 if (buffer_size > p_hdr->m_max_msg_size) { 503 416 throw interprocess_exception(size_error); … … 538 451 } 539 452 540 453 //Get the first free message from free message queue 541 ipcdetail::msg_hdr_t< VoidPointer> *free_msg = p_hdr->free_msg();454 ipcdetail::msg_hdr_t<void_pointer> *free_msg = p_hdr->free_msg(); 542 455 if (free_msg == 0) { 543 456 throw interprocess_exception("boost::interprocess::message_queue corrupted"); 544 457 } … … 563 476 return true; 564 477 } 565 478 566 template<class VoidPointer >567 inline void message_queue_ t<VoidPointer>::receive(void *buffer, size_type buffer_size,479 template<class VoidPointer, class Impl> 480 inline void message_queue_base_t<VoidPointer, Impl>::receive(void *buffer, size_type buffer_size, 568 481 size_type &recvd_size, unsigned int &priority) 569 482 { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); } 570 483 571 template<class VoidPointer >484 template<class VoidPointer, class Impl> 572 485 inline bool 573 message_queue_ t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,486 message_queue_base_t<VoidPointer, Impl>::try_receive(void *buffer, size_type buffer_size, 574 487 size_type &recvd_size, unsigned int &priority) 575 488 { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); } 576 489 577 template<class VoidPointer >490 template<class VoidPointer, class Impl> 578 491 inline bool 579 message_queue_ t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,492 message_queue_base_t<VoidPointer, Impl>::timed_receive(void *buffer, size_type buffer_size, 580 493 size_type &recvd_size, unsigned int &priority, 581 494 const boost::posix_time::ptime &abs_time) 582 495 { … … 587 500 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time); 588 501 } 589 502 590 template<class VoidPointer >503 template<class VoidPointer, class Impl> 591 504 inline bool 592 message_queue_ t<VoidPointer>::do_receive(block_t block,505 message_queue_base_t<VoidPointer, Impl>::do_receive(block_t block, 593 506 void *buffer, size_type buffer_size, 594 507 size_type &recvd_size, unsigned int &priority, 595 508 const boost::posix_time::ptime &abs_time) 596 509 { 597 ipcdetail::mq_hdr_t< VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());510 ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data()); 598 511 //Check if buffer is big enough for any message 599 512 if (buffer_size < p_hdr->m_max_msg_size) { 600 513 throw interprocess_exception(size_error); … … 636 549 } 637 550 638 551 //Thre is at least message ready to pick, get the top one 639 ipcdetail::msg_hdr_t< VoidPointer> *top_msg = p_hdr->top_msg();552 ipcdetail::msg_hdr_t<void_pointer> *top_msg = p_hdr->top_msg(); 640 553 641 554 //Paranoia check 642 555 if (top_msg == 0) { … … 664 577 return true; 665 578 } 666 579 667 template<class VoidPointer> 668 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const 580 template<class VoidPointer, class Impl> 581 inline typename message_queue_base_t<VoidPointer, Impl>::size_type 582 message_queue_base_t<VoidPointer, Impl>::get_max_msg() const 669 583 { 670 ipcdetail::mq_hdr_t< VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());584 ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data()); 671 585 return p_hdr ? p_hdr->m_max_num_msg : 0; } 672 586 673 template<class VoidPointer> 674 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const 587 template<class VoidPointer, class Impl> 588 inline typename message_queue_base_t<VoidPointer, Impl>::size_type 589 message_queue_base_t<VoidPointer, Impl>::get_max_msg_size() const 675 590 { 676 ipcdetail::mq_hdr_t< VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());591 ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data()); 677 592 return p_hdr ? p_hdr->m_max_msg_size : 0; 678 593 } 679 594 680 template<class VoidPointer> 681 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() 595 template<class VoidPointer, class Impl> 596 inline typename message_queue_base_t<VoidPointer, Impl>::size_type 597 message_queue_base_t<VoidPointer, Impl>::get_num_msg() 682 598 { 683 ipcdetail::mq_hdr_t< VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());599 ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data()); 684 600 if(p_hdr){ 685 601 //--------------------------------------------- 686 602 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); … … 691 607 return 0; 692 608 } 693 609 610 /// @endcond 611 612 //!A class that allows sending messages 613 //!between processes. 614 template<class VoidPointer> 615 class message_queue_t : public message_queue_base_t<VoidPointer, message_queue_t<VoidPointer> > 616 { 617 typedef message_queue_base_t<VoidPointer, message_queue_t<VoidPointer> > message_queue_base; 618 619 message_queue_t(); 620 621 public: 622 typedef typename message_queue_base::size_type size_type; 623 624 protected: 625 ipcdetail::managed_open_or_create_impl<shared_memory_object> m_shmem; 626 627 friend class message_queue_base_t<VoidPointer, message_queue_t<VoidPointer> >; 628 //!Get pointer to message queue data 629 void* get_data() { return m_shmem.get_user_address(); } 630 //!Offset to add to message queue size (for extra data required by backend) 631 static size_type get_data_offset() 632 { return ipcdetail::managed_open_or_create_impl<shared_memory_object>::ManagedOpenOrCreateUserOffset; } 633 634 public: 635 //!Creates a process shared message queue with name "name". For this message queue, 636 //!the maximum number of messages will be "max_num_msg" and the maximum message size 637 //!will be "max_msg_size". Throws on error and if the queue was previously created. 638 message_queue_t(create_only_t create_only, 639 const char *name, 640 size_type max_num_msg, 641 size_type max_msg_size, 642 const permissions &perm = permissions()); 643 644 //!Opens or creates a process shared message queue with name "name". 645 //!If the queue is created, the maximum number of messages will be "max_num_msg" 646 //!and the maximum message size will be "max_msg_size". If queue was previously 647 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters 648 //!are ignored. Throws on error. 649 message_queue_t(open_or_create_t open_or_create, 650 const char *name, 651 size_type max_num_msg, 652 size_type max_msg_size, 653 const permissions &perm = permissions()); 654 655 //!Opens a previously created process shared message queue with name "name". 656 //!If the queue was not previously created or there are no free resources, 657 //!throws an error. 658 message_queue_t(open_only_t open_only, 659 const char *name); 660 661 //!Destroys *this and indicates that the calling process is finished using 662 //!the resource. All opened message queues are still 663 //!valid after destruction. The destructor function will deallocate 664 //!any system resources allocated by the system for use by this process for 665 //!this resource. The resource can still be opened again calling 666 //!the open constructor overload. To erase the message queue from the system 667 //!use remove(). 668 ~message_queue_t(); 669 670 //!Removes the message queue from the system. 671 //!Returns false on error. Never throws 672 static bool remove(const char *name); 673 }; 674 675 template<class VoidPointer> 676 inline message_queue_t<VoidPointer>::~message_queue_t() 677 {} 678 679 /// @cond 680 694 681 template<class VoidPointer> 695 682 inline bool message_queue_t<VoidPointer>::remove(const char *name) 696 683 { return shared_memory_object::remove(name); } 697 684 685 template<class VoidPointer> 686 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t create_only, 687 const char *name, 688 size_type max_num_msg, 689 size_type max_msg_size, 690 const permissions &perm) 691 //Create shared memory and execute functor atomically 692 : m_shmem(create_only, 693 name, 694 get_mem_size(max_msg_size, max_num_msg), 695 read_write, 696 static_cast<void*>(0), 697 //Prepare initialization functor 698 typename message_queue_base::initialization_func_t (max_num_msg, max_msg_size), 699 perm) 700 {} 701 702 template<class VoidPointer> 703 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t open_or_create, 704 const char *name, 705 size_type max_num_msg, 706 size_type max_msg_size, 707 const permissions &perm) 708 //Create shared memory and execute functor atomically 709 : m_shmem(open_or_create, 710 name, 711 get_mem_size(max_msg_size, max_num_msg), 712 read_write, 713 static_cast<void*>(0), 714 //Prepare initialization functor 715 typename message_queue_base::initialization_func_t (max_num_msg, max_msg_size), 716 perm) 717 {} 718 719 template<class VoidPointer> 720 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t open_only, 721 const char *name) 722 //Create shared memory and execute functor atomically 723 : m_shmem(open_only, 724 name, 725 read_write, 726 static_cast<void*>(0), 727 //Prepare initialization functor 728 typename message_queue_base::initialization_func_t ()) 729 {} 730 698 731 /// @endcond 699 732 700 733 }} //namespace boost{ namespace interprocess{