Ticket #7027: boost-message-queue-customize.diff

File boost-message-queue-customize.diff, 21.4 KB (added by frank.richter@…, 10 years ago)

interprocess: message queue generalization to support different storages

  • boost/interprocess/ipc/message_queue.hpp

    diff -ru boost_1_50_0_beta1.org/boost/interprocess/ipc/message_queue.hpp boost_1_50_0_beta1/boost/interprocess/ipc/message_queue.hpp
    old new  
    4343namespace boost{  namespace interprocess{
    4444
    4545//!A class that allows sending messages
    46 //!between processes.
    47 template<class VoidPointer>
    48 class message_queue_t
     46//!between processes. Allows customization of the queue data storage.
     47template<class VoidPointer, class Impl>
     48class message_queue_base_t
    4949{
     50   protected:
    5051   /// @cond
    5152   //Blocking modes
    5253   enum block_t   {  blocking,   timed,   non_blocking   };
    5354
    54    message_queue_t();
     55   message_queue_base_t() {}
    5556   /// @endcond
    5657
    5758   public:
     
    6263   typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
    6364   typedef typename boost::make_unsigned<difference_type>::type        size_type;
    6465
    65    //!Creates a process shared message queue with name "name". For this message queue,
    66    //!the maximum number of messages will be "max_num_msg" and the maximum message size
    67    //!will be "max_msg_size". Throws on error and if the queue was previously created.
    68    message_queue_t(create_only_t create_only,
    69                  const char *name,
    70                  size_type max_num_msg,
    71                  size_type max_msg_size,
    72                  const permissions &perm = permissions());
    73 
    74    //!Opens or creates a process shared message queue with name "name".
    75    //!If the queue is created, the maximum number of messages will be "max_num_msg"
    76    //!and the maximum message size will be "max_msg_size". If queue was previously
    77    //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
    78    //!are ignored. Throws on error.
    79    message_queue_t(open_or_create_t open_or_create,
    80                  const char *name,
    81                  size_type max_num_msg,
    82                  size_type max_msg_size,
    83                  const permissions &perm = permissions());
    84 
    85    //!Opens a previously created process shared message queue with name "name".
    86    //!If the queue was not previously created or there are no free resources,
    87    //!throws an error.
    88    message_queue_t(open_only_t open_only,
    89                  const char *name);
    90 
    91    //!Destroys *this and indicates that the calling process is finished using
    92    //!the resource. All opened message queues are still
    93    //!valid after destruction. The destructor function will deallocate
    94    //!any system resources allocated by the system for use by this process for
    95    //!this resource. The resource can still be opened again calling
    96    //!the open constructor overload. To erase the message queue from the system
    97    //!use remove().
    98    ~message_queue_t();
    99 
    10066   //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
    10167   //!message queue with priority "priority". If the message queue is full
    10268   //!the sender is blocked. Throws interprocess_error on error.*/
     
    157123   //!Never throws
    158124   size_type get_num_msg();
    159125
    160    //!Removes the message queue from the system.
    161    //!Returns false on error. Never throws
    162    static bool remove(const char *name);
    163 
    164126   /// @cond 
    165127   private:
    166128   typedef boost::posix_time::ptime ptime;
     
    173135                const void *buffer,      size_type buffer_size,
    174136                unsigned int priority,   const ptime &abs_time);
    175137
     138   protected:
    176139   //!Returns the needed memory size for the shared message queue.
    177140   //!Never throws
    178141   static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
    179 
    180    ipcdetail::managed_open_or_create_impl<shared_memory_object> m_shmem;
    181142   /// @endcond
     143   
     144   //!This is the atomic functor to be executed when creating or opening
     145   //!shared memory. Never throws
     146   class initialization_func_t
     147   {
     148      public:
     149      typedef typename boost::intrusive::
     150         pointer_traits<void_pointer>::template
     151            rebind_pointer<char>::type                                    char_ptr;
     152      typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
     153      typedef typename boost::make_unsigned<difference_type>::type        size_type;
     154
     155      initialization_func_t(size_type maxmsg = 0,
     156                            size_type maxmsgsize = 0)
     157         : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
     158
     159      bool operator()(void *address, size_type, bool created);
     160      const size_type m_maxmsg;
     161      const size_type m_maxmsgsize;
     162   };
    182163};
    183164
    184165/// @cond
     
    324305         r_hdr_size     = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
    325306         r_index_size   = ipcdetail::get_rounded_size(sizeof(msg_hdr_ptr_t)*max_num_msg, msg_hdr_align),
    326307         r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header);
    327       return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
    328          ipcdetail::managed_open_or_create_impl<shared_memory_object>::ManagedOpenOrCreateUserOffset;
     308      return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size);
    329309   }
    330310
    331311   //!Initializes the memory structures to preallocate messages and constructs the
     
    376356};
    377357
    378358
    379 //!This is the atomic functor to be executed when creating or opening
    380 //!shared memory. Never throws
    381 template<class VoidPointer>
    382 class initialization_func_t
    383 {
    384    public:
    385    typedef typename boost::intrusive::
    386       pointer_traits<VoidPointer>::template
    387          rebind_pointer<char>::type                                    char_ptr;
    388    typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
    389    typedef typename boost::make_unsigned<difference_type>::type        size_type;
    390 
    391    initialization_func_t(size_type maxmsg = 0,
    392                          size_type maxmsgsize = 0)
    393       : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
     359}  //namespace ipcdetail {
    394360
    395    bool operator()(void *address, size_type, bool created)
    396    {
    397       char      *mptr;
     361template<class VoidPointer, class Impl>
     362bool message_queue_base_t<VoidPointer, Impl>::initialization_func_t::operator()(void *address, size_type, bool created)
     363{
     364   char      *mptr;
    398365
    399       if(created){
    400          mptr     = reinterpret_cast<char*>(address);
    401          //Construct the message queue header at the beginning
    402          BOOST_TRY{
    403             new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
    404          }
    405          BOOST_CATCH(...){
    406             return false; 
    407          }
    408          BOOST_CATCH_END
     366   if(created){
     367      mptr     = reinterpret_cast<char*>(address);
     368      //Construct the message queue header at the beginning
     369      BOOST_TRY{
     370         new (mptr) ipcdetail::mq_hdr_t<void_pointer>(m_maxmsg, m_maxmsgsize);
    409371      }
    410       return true;
     372      BOOST_CATCH(...){
     373         return false; 
     374      }
     375      BOOST_CATCH_END
    411376   }
    412    const size_type m_maxmsg;
    413    const size_type m_maxmsgsize;
    414 };
    415 
    416 }  //namespace ipcdetail {
    417 
    418 template<class VoidPointer>
    419 inline message_queue_t<VoidPointer>::~message_queue_t()
    420 {}
     377   return true;
     378}
    421379
    422 template<class VoidPointer>
    423 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
     380template<class VoidPointer, class Impl>
     381inline typename message_queue_base_t<VoidPointer, Impl>::size_type
     382message_queue_base_t<VoidPointer, Impl>::get_mem_size
    424383   (size_type max_msg_size, size_type max_num_msg)
    425 {  return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg);   }
    426 
    427 template<class VoidPointer>
    428 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t create_only,
    429                                     const char *name,
    430                                     size_type max_num_msg,
    431                                     size_type max_msg_size,
    432                                     const permissions &perm)
    433       //Create shared memory and execute functor atomically
    434    :  m_shmem(create_only,
    435               name,
    436            get_mem_size(max_msg_size, max_num_msg),
    437               read_write,
    438               static_cast<void*>(0),
    439               //Prepare initialization functor
    440               ipcdetail::initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
    441               perm)
    442 {}
    443 
    444 template<class VoidPointer>
    445 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t open_or_create,
    446                                     const char *name,
    447                                     size_type max_num_msg,
    448                                     size_type max_msg_size,
    449                                     const permissions &perm)
    450       //Create shared memory and execute functor atomically
    451    :  m_shmem(open_or_create,
    452               name,
    453               get_mem_size(max_msg_size, max_num_msg),
    454               read_write,
    455               static_cast<void*>(0),
    456               //Prepare initialization functor
    457               ipcdetail::initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
    458               perm)
    459 {}
    460 
    461 template<class VoidPointer>
    462 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t open_only,
    463                                     const char *name)
    464    //Create shared memory and execute functor atomically
    465    :  m_shmem(open_only,
    466               name,
    467               read_write,
    468               static_cast<void*>(0),
    469               //Prepare initialization functor
    470               ipcdetail::initialization_func_t<VoidPointer> ())
    471 {}
     384{  return ipcdetail::mq_hdr_t<void_pointer>::get_mem_size (max_msg_size, max_num_msg) + Impl::get_data_offset();   }
    472385
    473 template<class VoidPointer>
    474 inline void message_queue_t<VoidPointer>::send
     386template<class VoidPointer, class Impl>
     387inline void message_queue_base_t<VoidPointer, Impl>::send
    475388   (const void *buffer, size_type buffer_size, unsigned int priority)
    476389{  this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
    477390
    478 template<class VoidPointer>
    479 inline bool message_queue_t<VoidPointer>::try_send
     391template<class VoidPointer, class Impl>
     392inline bool message_queue_base_t<VoidPointer, Impl>::try_send
    480393   (const void *buffer, size_type buffer_size, unsigned int priority)
    481394{  return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
    482395
    483 template<class VoidPointer>
    484 inline bool message_queue_t<VoidPointer>::timed_send
     396template<class VoidPointer, class Impl>
     397inline bool message_queue_base_t<VoidPointer, Impl>::timed_send
    485398   (const void *buffer, size_type buffer_size
    486399   ,unsigned int priority, const boost::posix_time::ptime &abs_time)
    487400{
     
    492405   return this->do_send(timed, buffer, buffer_size, priority, abs_time);
    493406}
    494407
    495 template<class VoidPointer>
    496 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
     408template<class VoidPointer, class Impl>
     409inline bool message_queue_base_t<VoidPointer, Impl>::do_send(block_t block,
    497410                                const void *buffer,      size_type buffer_size,
    498411                                unsigned int priority,   const boost::posix_time::ptime &abs_time)
    499412{
    500    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
     413   ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data());
    501414   //Check if buffer is smaller than maximum allowed
    502415   if (buffer_size > p_hdr->m_max_msg_size) {
    503416      throw interprocess_exception(size_error);
     
    538451      }
    539452     
    540453      //Get the first free message from free message queue
    541       ipcdetail::msg_hdr_t<VoidPointer> *free_msg = p_hdr->free_msg();
     454      ipcdetail::msg_hdr_t<void_pointer> *free_msg = p_hdr->free_msg();
    542455      if (free_msg == 0) {
    543456         throw interprocess_exception("boost::interprocess::message_queue corrupted");
    544457      }
     
    563476   return true;
    564477}
    565478
    566 template<class VoidPointer>
    567 inline void message_queue_t<VoidPointer>::receive(void *buffer,        size_type buffer_size,
     479template<class VoidPointer, class Impl>
     480inline void message_queue_base_t<VoidPointer, Impl>::receive(void *buffer,        size_type buffer_size,
    568481                        size_type &recvd_size,   unsigned int &priority)
    569482{  this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
    570483
    571 template<class VoidPointer>
     484template<class VoidPointer, class Impl>
    572485inline bool
    573    message_queue_t<VoidPointer>::try_receive(void *buffer,              size_type buffer_size,
     486   message_queue_base_t<VoidPointer, Impl>::try_receive(void *buffer,              size_type buffer_size,
    574487                              size_type &recvd_size,   unsigned int &priority)
    575488{  return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
    576489
    577 template<class VoidPointer>
     490template<class VoidPointer, class Impl>
    578491inline bool
    579    message_queue_t<VoidPointer>::timed_receive(void *buffer,            size_type buffer_size,
     492   message_queue_base_t<VoidPointer, Impl>::timed_receive(void *buffer,            size_type buffer_size,
    580493                                size_type &recvd_size,   unsigned int &priority,
    581494                                const boost::posix_time::ptime &abs_time)
    582495{
     
    587500   return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
    588501}
    589502
    590 template<class VoidPointer>
     503template<class VoidPointer, class Impl>
    591504inline bool
    592    message_queue_t<VoidPointer>::do_receive(block_t block,
     505   message_queue_base_t<VoidPointer, Impl>::do_receive(block_t block,
    593506                          void *buffer,            size_type buffer_size,
    594507                          size_type &recvd_size,   unsigned int &priority,
    595508                          const boost::posix_time::ptime &abs_time)
    596509{
    597    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
     510   ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data());
    598511   //Check if buffer is big enough for any message
    599512   if (buffer_size < p_hdr->m_max_msg_size) {
    600513      throw interprocess_exception(size_error);
     
    636549      }
    637550
    638551      //Thre is at least message ready to pick, get the top one
    639      ipcdetail::msg_hdr_t<VoidPointer> *top_msg = p_hdr->top_msg();
     552     ipcdetail::msg_hdr_t<void_pointer> *top_msg = p_hdr->top_msg();
    640553
    641554      //Paranoia check
    642555      if (top_msg == 0) {
     
    664577   return true;
    665578}
    666579
    667 template<class VoidPointer>
    668 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
     580template<class VoidPointer, class Impl>
     581inline typename message_queue_base_t<VoidPointer, Impl>::size_type
     582message_queue_base_t<VoidPointer, Impl>::get_max_msg() const
    669583{
    670    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
     584   ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data());
    671585   return p_hdr ? p_hdr->m_max_num_msg : 0;  }
    672586
    673 template<class VoidPointer>
    674 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
     587template<class VoidPointer, class Impl>
     588inline typename message_queue_base_t<VoidPointer, Impl>::size_type
     589message_queue_base_t<VoidPointer, Impl>::get_max_msg_size() const
    675590{
    676    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
     591   ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data());
    677592   return p_hdr ? p_hdr->m_max_msg_size : 0;
    678593}
    679594
    680 template<class VoidPointer>
    681 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg()
     595template<class VoidPointer, class Impl>
     596inline typename message_queue_base_t<VoidPointer, Impl>::size_type
     597message_queue_base_t<VoidPointer, Impl>::get_num_msg()
    682598{
    683    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
     599   ipcdetail::mq_hdr_t<void_pointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<void_pointer>*>(static_cast<Impl&> (*this).get_data());
    684600   if(p_hdr){
    685601      //---------------------------------------------
    686602      scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
     
    691607   return 0;
    692608}
    693609
     610/// @endcond
     611
     612//!A class that allows sending messages
     613//!between processes.
     614template<class VoidPointer>
     615class message_queue_t : public message_queue_base_t<VoidPointer, message_queue_t<VoidPointer> >
     616{
     617   typedef message_queue_base_t<VoidPointer, message_queue_t<VoidPointer> > message_queue_base;
     618   
     619   message_queue_t();
     620   
     621   public:
     622   typedef typename message_queue_base::size_type size_type;
     623
     624   protected:
     625   ipcdetail::managed_open_or_create_impl<shared_memory_object> m_shmem;
     626
     627   friend class message_queue_base_t<VoidPointer, message_queue_t<VoidPointer> >;
     628   //!Get pointer to message queue data
     629   void* get_data() { return m_shmem.get_user_address(); }
     630   //!Offset to add to message queue size (for extra data required by backend)
     631   static size_type get_data_offset()
     632   { return ipcdetail::managed_open_or_create_impl<shared_memory_object>::ManagedOpenOrCreateUserOffset; }
     633
     634   public:
     635   //!Creates a process shared message queue with name "name". For this message queue,
     636   //!the maximum number of messages will be "max_num_msg" and the maximum message size
     637   //!will be "max_msg_size". Throws on error and if the queue was previously created.
     638   message_queue_t(create_only_t create_only,
     639                 const char *name,
     640                 size_type max_num_msg,
     641                 size_type max_msg_size,
     642                 const permissions &perm = permissions());
     643
     644   //!Opens or creates a process shared message queue with name "name".
     645   //!If the queue is created, the maximum number of messages will be "max_num_msg"
     646   //!and the maximum message size will be "max_msg_size". If queue was previously
     647   //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
     648   //!are ignored. Throws on error.
     649   message_queue_t(open_or_create_t open_or_create,
     650                 const char *name,
     651                 size_type max_num_msg,
     652                 size_type max_msg_size,
     653                 const permissions &perm = permissions());
     654
     655   //!Opens a previously created process shared message queue with name "name".
     656   //!If the queue was not previously created or there are no free resources,
     657   //!throws an error.
     658   message_queue_t(open_only_t open_only,
     659                 const char *name);
     660
     661   //!Destroys *this and indicates that the calling process is finished using
     662   //!the resource. All opened message queues are still
     663   //!valid after destruction. The destructor function will deallocate
     664   //!any system resources allocated by the system for use by this process for
     665   //!this resource. The resource can still be opened again calling
     666   //!the open constructor overload. To erase the message queue from the system
     667   //!use remove().
     668   ~message_queue_t();
     669
     670   //!Removes the message queue from the system.
     671   //!Returns false on error. Never throws
     672   static bool remove(const char *name);
     673};
     674
     675template<class VoidPointer>
     676inline message_queue_t<VoidPointer>::~message_queue_t()
     677{}
     678
     679/// @cond
     680
    694681template<class VoidPointer>
    695682inline bool message_queue_t<VoidPointer>::remove(const char *name)
    696683{  return shared_memory_object::remove(name);  }
    697684
     685template<class VoidPointer>
     686inline message_queue_t<VoidPointer>::message_queue_t(create_only_t create_only,
     687                                    const char *name,
     688                                    size_type max_num_msg,
     689                                    size_type max_msg_size,
     690                                    const permissions &perm)
     691      //Create shared memory and execute functor atomically
     692   :  m_shmem(create_only,
     693              name,
     694           get_mem_size(max_msg_size, max_num_msg),
     695              read_write,
     696              static_cast<void*>(0),
     697              //Prepare initialization functor
     698              typename message_queue_base::initialization_func_t (max_num_msg, max_msg_size),
     699              perm)
     700{}
     701
     702template<class VoidPointer>
     703inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t open_or_create,
     704                                    const char *name,
     705                                    size_type max_num_msg,
     706                                    size_type max_msg_size,
     707                                    const permissions &perm)
     708      //Create shared memory and execute functor atomically
     709   :  m_shmem(open_or_create,
     710              name,
     711              get_mem_size(max_msg_size, max_num_msg),
     712              read_write,
     713              static_cast<void*>(0),
     714              //Prepare initialization functor
     715              typename message_queue_base::initialization_func_t (max_num_msg, max_msg_size),
     716              perm)
     717{}
     718
     719template<class VoidPointer>
     720inline message_queue_t<VoidPointer>::message_queue_t(open_only_t open_only,
     721                                    const char *name)
     722   //Create shared memory and execute functor atomically
     723   :  m_shmem(open_only,
     724              name,
     725              read_write,
     726              static_cast<void*>(0),
     727              //Prepare initialization functor
     728              typename message_queue_base::initialization_func_t ())
     729{}
     730
    698731/// @endcond
    699732
    700733}} //namespace boost{  namespace interprocess{