diff -ru boost_1_50_0_beta1.org/boost/interprocess/ipc/message_queue.hpp boost_1_50_0_beta1/boost/interprocess/ipc/message_queue.hpp --- boost_1_50_0_beta1.org/boost/interprocess/ipc/message_queue.hpp 2012-05-24 18:43:57.000000000 +0200 +++ boost_1_50_0_beta1/boost/interprocess/ipc/message_queue.hpp 2012-06-26 12:07:29.026509306 +0200 @@ -43,15 +43,16 @@ namespace boost{ namespace interprocess{ //!A class that allows sending messages -//!between processes. -template -class message_queue_t +//!between processes. Allows customization of the queue data storage. +template +class message_queue_base_t { + protected: /// @cond //Blocking modes enum block_t { blocking, timed, non_blocking }; - message_queue_t(); + message_queue_base_t() {} /// @endcond public: @@ -62,41 +63,6 @@ typedef typename boost::intrusive::pointer_traits::difference_type difference_type; typedef typename boost::make_unsigned::type size_type; - //!Creates a process shared message queue with name "name". For this message queue, - //!the maximum number of messages will be "max_num_msg" and the maximum message size - //!will be "max_msg_size". Throws on error and if the queue was previously created. - message_queue_t(create_only_t create_only, - const char *name, - size_type max_num_msg, - size_type max_msg_size, - const permissions &perm = permissions()); - - //!Opens or creates a process shared message queue with name "name". - //!If the queue is created, the maximum number of messages will be "max_num_msg" - //!and the maximum message size will be "max_msg_size". If queue was previously - //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters - //!are ignored. Throws on error. - message_queue_t(open_or_create_t open_or_create, - const char *name, - size_type max_num_msg, - size_type max_msg_size, - const permissions &perm = permissions()); - - //!Opens a previously created process shared message queue with name "name". - //!If the queue was not previously created or there are no free resources, - //!throws an error. - message_queue_t(open_only_t open_only, - const char *name); - - //!Destroys *this and indicates that the calling process is finished using - //!the resource. All opened message queues are still - //!valid after destruction. The destructor function will deallocate - //!any system resources allocated by the system for use by this process for - //!this resource. The resource can still be opened again calling - //!the open constructor overload. To erase the message queue from the system - //!use remove(). - ~message_queue_t(); - //!Sends a message stored in buffer "buffer" with size "buffer_size" in the //!message queue with priority "priority". If the message queue is full //!the sender is blocked. Throws interprocess_error on error.*/ @@ -157,10 +123,6 @@ //!Never throws size_type get_num_msg(); - //!Removes the message queue from the system. - //!Returns false on error. Never throws - static bool remove(const char *name); - /// @cond private: typedef boost::posix_time::ptime ptime; @@ -173,12 +135,31 @@ const void *buffer, size_type buffer_size, unsigned int priority, const ptime &abs_time); + protected: //!Returns the needed memory size for the shared message queue. //!Never throws static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); - - ipcdetail::managed_open_or_create_impl m_shmem; /// @endcond + + //!This is the atomic functor to be executed when creating or opening + //!shared memory. Never throws + class initialization_func_t + { + public: + typedef typename boost::intrusive:: + pointer_traits::template + rebind_pointer::type char_ptr; + typedef typename boost::intrusive::pointer_traits::difference_type difference_type; + typedef typename boost::make_unsigned::type size_type; + + initialization_func_t(size_type maxmsg = 0, + size_type maxmsgsize = 0) + : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} + + bool operator()(void *address, size_type, bool created); + const size_type m_maxmsg; + const size_type m_maxmsgsize; + }; }; /// @cond @@ -324,8 +305,7 @@ r_hdr_size = ipcdetail::ct_rounded_size::value, r_index_size = ipcdetail::get_rounded_size(sizeof(msg_hdr_ptr_t)*max_num_msg, msg_hdr_align), r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); - return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + - ipcdetail::managed_open_or_create_impl::ManagedOpenOrCreateUserOffset; + return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size); } //!Initializes the memory structures to preallocate messages and constructs the @@ -376,112 +356,45 @@ }; -//!This is the atomic functor to be executed when creating or opening -//!shared memory. Never throws -template -class initialization_func_t -{ - public: - typedef typename boost::intrusive:: - pointer_traits::template - rebind_pointer::type char_ptr; - typedef typename boost::intrusive::pointer_traits::difference_type difference_type; - typedef typename boost::make_unsigned::type size_type; - - initialization_func_t(size_type maxmsg = 0, - size_type maxmsgsize = 0) - : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} +} //namespace ipcdetail { - bool operator()(void *address, size_type, bool created) - { - char *mptr; +template +bool message_queue_base_t::initialization_func_t::operator()(void *address, size_type, bool created) +{ + char *mptr; - if(created){ - mptr = reinterpret_cast(address); - //Construct the message queue header at the beginning - BOOST_TRY{ - new (mptr) mq_hdr_t(m_maxmsg, m_maxmsgsize); - } - BOOST_CATCH(...){ - return false; - } - BOOST_CATCH_END + if(created){ + mptr = reinterpret_cast(address); + //Construct the message queue header at the beginning + BOOST_TRY{ + new (mptr) ipcdetail::mq_hdr_t(m_maxmsg, m_maxmsgsize); } - return true; + BOOST_CATCH(...){ + return false; + } + BOOST_CATCH_END } - const size_type m_maxmsg; - const size_type m_maxmsgsize; -}; - -} //namespace ipcdetail { - -template -inline message_queue_t::~message_queue_t() -{} + return true; +} -template -inline typename message_queue_t::size_type message_queue_t::get_mem_size +template +inline typename message_queue_base_t::size_type +message_queue_base_t::get_mem_size (size_type max_msg_size, size_type max_num_msg) -{ return ipcdetail::mq_hdr_t::get_mem_size(max_msg_size, max_num_msg); } - -template -inline message_queue_t::message_queue_t(create_only_t create_only, - const char *name, - size_type max_num_msg, - size_type max_msg_size, - const permissions &perm) - //Create shared memory and execute functor atomically - : m_shmem(create_only, - name, - get_mem_size(max_msg_size, max_num_msg), - read_write, - static_cast(0), - //Prepare initialization functor - ipcdetail::initialization_func_t (max_num_msg, max_msg_size), - perm) -{} - -template -inline message_queue_t::message_queue_t(open_or_create_t open_or_create, - const char *name, - size_type max_num_msg, - size_type max_msg_size, - const permissions &perm) - //Create shared memory and execute functor atomically - : m_shmem(open_or_create, - name, - get_mem_size(max_msg_size, max_num_msg), - read_write, - static_cast(0), - //Prepare initialization functor - ipcdetail::initialization_func_t (max_num_msg, max_msg_size), - perm) -{} - -template -inline message_queue_t::message_queue_t(open_only_t open_only, - const char *name) - //Create shared memory and execute functor atomically - : m_shmem(open_only, - name, - read_write, - static_cast(0), - //Prepare initialization functor - ipcdetail::initialization_func_t ()) -{} +{ return ipcdetail::mq_hdr_t::get_mem_size (max_msg_size, max_num_msg) + Impl::get_data_offset(); } -template -inline void message_queue_t::send +template +inline void message_queue_base_t::send (const void *buffer, size_type buffer_size, unsigned int priority) { this->do_send(blocking, buffer, buffer_size, priority, ptime()); } -template -inline bool message_queue_t::try_send +template +inline bool message_queue_base_t::try_send (const void *buffer, size_type buffer_size, unsigned int priority) { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); } -template -inline bool message_queue_t::timed_send +template +inline bool message_queue_base_t::timed_send (const void *buffer, size_type buffer_size ,unsigned int priority, const boost::posix_time::ptime &abs_time) { @@ -492,12 +405,12 @@ return this->do_send(timed, buffer, buffer_size, priority, abs_time); } -template -inline bool message_queue_t::do_send(block_t block, +template +inline bool message_queue_base_t::do_send(block_t block, const void *buffer, size_type buffer_size, unsigned int priority, const boost::posix_time::ptime &abs_time) { - ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(static_cast (*this).get_data()); //Check if buffer is smaller than maximum allowed if (buffer_size > p_hdr->m_max_msg_size) { throw interprocess_exception(size_error); @@ -538,7 +451,7 @@ } //Get the first free message from free message queue - ipcdetail::msg_hdr_t *free_msg = p_hdr->free_msg(); + ipcdetail::msg_hdr_t *free_msg = p_hdr->free_msg(); if (free_msg == 0) { throw interprocess_exception("boost::interprocess::message_queue corrupted"); } @@ -563,20 +476,20 @@ return true; } -template -inline void message_queue_t::receive(void *buffer, size_type buffer_size, +template +inline void message_queue_base_t::receive(void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority) { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); } -template +template inline bool - message_queue_t::try_receive(void *buffer, size_type buffer_size, + message_queue_base_t::try_receive(void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority) { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); } -template +template inline bool - message_queue_t::timed_receive(void *buffer, size_type buffer_size, + message_queue_base_t::timed_receive(void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority, const boost::posix_time::ptime &abs_time) { @@ -587,14 +500,14 @@ return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time); } -template +template inline bool - message_queue_t::do_receive(block_t block, + message_queue_base_t::do_receive(block_t block, void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority, const boost::posix_time::ptime &abs_time) { - ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(static_cast (*this).get_data()); //Check if buffer is big enough for any message if (buffer_size < p_hdr->m_max_msg_size) { throw interprocess_exception(size_error); @@ -636,7 +549,7 @@ } //Thre is at least message ready to pick, get the top one - ipcdetail::msg_hdr_t *top_msg = p_hdr->top_msg(); + ipcdetail::msg_hdr_t *top_msg = p_hdr->top_msg(); //Paranoia check if (top_msg == 0) { @@ -664,23 +577,26 @@ return true; } -template -inline typename message_queue_t::size_type message_queue_t::get_max_msg() const +template +inline typename message_queue_base_t::size_type +message_queue_base_t::get_max_msg() const { - ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(static_cast (*this).get_data()); return p_hdr ? p_hdr->m_max_num_msg : 0; } -template -inline typename message_queue_t::size_type message_queue_t::get_max_msg_size() const +template +inline typename message_queue_base_t::size_type +message_queue_base_t::get_max_msg_size() const { - ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(static_cast (*this).get_data()); return p_hdr ? p_hdr->m_max_msg_size : 0; } -template -inline typename message_queue_t::size_type message_queue_t::get_num_msg() +template +inline typename message_queue_base_t::size_type +message_queue_base_t::get_num_msg() { - ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(static_cast (*this).get_data()); if(p_hdr){ //--------------------------------------------- scoped_lock lock(p_hdr->m_mutex); @@ -691,10 +607,127 @@ return 0; } +/// @endcond + +//!A class that allows sending messages +//!between processes. +template +class message_queue_t : public message_queue_base_t > +{ + typedef message_queue_base_t > message_queue_base; + + message_queue_t(); + + public: + typedef typename message_queue_base::size_type size_type; + + protected: + ipcdetail::managed_open_or_create_impl m_shmem; + + friend class message_queue_base_t >; + //!Get pointer to message queue data + void* get_data() { return m_shmem.get_user_address(); } + //!Offset to add to message queue size (for extra data required by backend) + static size_type get_data_offset() + { return ipcdetail::managed_open_or_create_impl::ManagedOpenOrCreateUserOffset; } + + public: + //!Creates a process shared message queue with name "name". For this message queue, + //!the maximum number of messages will be "max_num_msg" and the maximum message size + //!will be "max_msg_size". Throws on error and if the queue was previously created. + message_queue_t(create_only_t create_only, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm = permissions()); + + //!Opens or creates a process shared message queue with name "name". + //!If the queue is created, the maximum number of messages will be "max_num_msg" + //!and the maximum message size will be "max_msg_size". If queue was previously + //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters + //!are ignored. Throws on error. + message_queue_t(open_or_create_t open_or_create, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm = permissions()); + + //!Opens a previously created process shared message queue with name "name". + //!If the queue was not previously created or there are no free resources, + //!throws an error. + message_queue_t(open_only_t open_only, + const char *name); + + //!Destroys *this and indicates that the calling process is finished using + //!the resource. All opened message queues are still + //!valid after destruction. The destructor function will deallocate + //!any system resources allocated by the system for use by this process for + //!this resource. The resource can still be opened again calling + //!the open constructor overload. To erase the message queue from the system + //!use remove(). + ~message_queue_t(); + + //!Removes the message queue from the system. + //!Returns false on error. Never throws + static bool remove(const char *name); +}; + +template +inline message_queue_t::~message_queue_t() +{} + +/// @cond + template inline bool message_queue_t::remove(const char *name) { return shared_memory_object::remove(name); } +template +inline message_queue_t::message_queue_t(create_only_t create_only, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm) + //Create shared memory and execute functor atomically + : m_shmem(create_only, + name, + get_mem_size(max_msg_size, max_num_msg), + read_write, + static_cast(0), + //Prepare initialization functor + typename message_queue_base::initialization_func_t (max_num_msg, max_msg_size), + perm) +{} + +template +inline message_queue_t::message_queue_t(open_or_create_t open_or_create, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm) + //Create shared memory and execute functor atomically + : m_shmem(open_or_create, + name, + get_mem_size(max_msg_size, max_num_msg), + read_write, + static_cast(0), + //Prepare initialization functor + typename message_queue_base::initialization_func_t (max_num_msg, max_msg_size), + perm) +{} + +template +inline message_queue_t::message_queue_t(open_only_t open_only, + const char *name) + //Create shared memory and execute functor atomically + : m_shmem(open_only, + name, + read_write, + static_cast(0), + //Prepare initialization functor + typename message_queue_base::initialization_func_t ()) +{} + /// @endcond }} //namespace boost{ namespace interprocess{ Only in boost_1_50_0_beta1/boost/interprocess/ipc: windows_message_queue.hpp