Ticket #7027: boost-windows-message-queue.diff

File boost-windows-message-queue.diff, 14.8 KB (added by frank.richter@…, 10 years ago)

interprocess: message queue using windows_shared_memory

  • boost/interprocess/ipc/windows_message_queue.hpp

    diff -ruN boost_1_50_0_beta1.org/boost/interprocess/ipc/windows_message_queue.hpp boost_1_50_0_beta1/boost/interprocess/ipc/windows_message_queue.hpp
    old new  
     1//////////////////////////////////////////////////////////////////////////////
     2//
     3// (C) Copyright Ion Gaztanaga 2005-2011. Distributed under the Boost
     4// Software License, Version 1.0. (See accompanying file
     5// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
     6//
     7// See http://www.boost.org/libs/interprocess for documentation.
     8//
     9//////////////////////////////////////////////////////////////////////////////
     10
     11#ifndef BOOST_INTERPROCESS_WINDOWS_MESSAGE_QUEUE_HPP
     12#define BOOST_INTERPROCESS_WINDOWS_MESSAGE_QUEUE_HPP
     13
     14#include <boost/interprocess/detail/config_begin.hpp>
     15#include <boost/interprocess/detail/workaround.hpp>
     16
     17#include <boost/interprocess/ipc/message_queue.hpp>
     18#include <boost/interprocess/windows_shared_memory.hpp>
     19
     20
     21//!\file
     22//!Inter-process message queue using Windows shared memory.
     23
     24namespace boost{  namespace interprocess{
     25
     26//!A class that allows sending messages
     27//!between processesusing Windows shared memory.
     28template<class VoidPointer>
     29class windows_message_queue_t : public message_queue_base_t<VoidPointer, windows_message_queue_t<VoidPointer> >
     30{
     31   typedef message_queue_base_t<VoidPointer, windows_message_queue_t<VoidPointer> > message_queue_base;
     32   
     33   windows_message_queue_t();
     34   
     35   protected:
     36   ipcdetail::managed_open_or_create_impl<windows_shared_memory, 0, false> m_shmem;
     37
     38   friend class message_queue_base;
     39   //!Get pointer to message queue data
     40   void* get_data() { return m_shmem.get_user_address(); }
     41   //!Offset to add to message queue size (for extra data required by backend)
     42   static size_type get_data_offset()
     43   { return ipcdetail::managed_open_or_create_impl<windows_shared_memory>::ManagedOpenOrCreateUserOffset; }
     44
     45   public:
     46   typedef typename message_queue_base::size_type size_type;
     47
     48   //!Creates a process shared message queue with name "name". For this message queue,
     49   //!the maximum number of messages will be "max_num_msg" and the maximum message size
     50   //!will be "max_msg_size". Throws on error and if the queue was previously created.
     51   windows_message_queue_t(create_only_t create_only,
     52                 const char *name,
     53                 size_type max_num_msg,
     54                 size_type max_msg_size,
     55                 const permissions &perm = permissions());
     56
     57   //!Opens or creates a process shared message queue with name "name".
     58   //!If the queue is created, the maximum number of messages will be "max_num_msg"
     59   //!and the maximum message size will be "max_msg_size". If queue was previously
     60   //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
     61   //!are ignored. Throws on error.
     62   windows_message_queue_t(open_or_create_t open_or_create,
     63                 const char *name,
     64                 size_type max_num_msg,
     65                 size_type max_msg_size,
     66                 const permissions &perm = permissions());
     67
     68   //!Opens a previously created process shared message queue with name "name".
     69   //!If the queue was not previously created or there are no free resources,
     70   //!throws an error.
     71   windows_message_queue_t(open_only_t open_only,
     72                 const char *name);
     73
     74   //!Destroys *this and indicates that the calling process is finished using
     75   //!the resource. All opened message queues are still
     76   //!valid after destruction. The destructor function will deallocate
     77   //!any system resources allocated by the system for use by this process for
     78   //!this resource. The resource can still be opened again calling
     79   //!the open constructor overload. To erase the message queue from the system
     80   //!use remove().
     81   ~windows_message_queue_t();
     82};
     83
     84template<class VoidPointer>
     85inline windows_message_queue_t<VoidPointer>::~windows_message_queue_t()
     86{}
     87
     88/// @cond
     89
     90template<class VoidPointer>
     91inline windows_message_queue_t<VoidPointer>::windows_message_queue_t(create_only_t create_only,
     92                                    const char *name,
     93                                    size_type max_num_msg,
     94                                    size_type max_msg_size,
     95                                    const permissions &perm)
     96      //Create shared memory and execute functor atomically
     97   :  m_shmem(create_only,
     98              name,
     99           get_mem_size(max_msg_size, max_num_msg),
     100              read_write,
     101              static_cast<void*>(0),
     102              //Prepare initialization functor
     103              message_queue_base::initialization_func_t (max_num_msg, max_msg_size),
     104              perm)
     105{}
     106
     107template<class VoidPointer>
     108inline windows_message_queue_t<VoidPointer>::windows_message_queue_t(open_or_create_t open_or_create,
     109                                    const char *name,
     110                                    size_type max_num_msg,
     111                                    size_type max_msg_size,
     112                                    const permissions &perm)
     113      //Create shared memory and execute functor atomically
     114   :  m_shmem(open_or_create,
     115              name,
     116              get_mem_size(max_msg_size, max_num_msg),
     117              read_write,
     118              static_cast<void*>(0),
     119              //Prepare initialization functor
     120              message_queue_base::initialization_func_t (max_num_msg, max_msg_size),
     121              perm)
     122{}
     123
     124template<class VoidPointer>
     125inline windows_message_queue_t<VoidPointer>::windows_message_queue_t(open_only_t open_only,
     126                                    const char *name)
     127   //Create shared memory and execute functor atomically
     128   :  m_shmem(open_only,
     129              name,
     130              read_write,
     131              static_cast<void*>(0),
     132              //Prepare initialization functor
     133              message_queue_base::initialization_func_t ())
     134{}
     135
     136typedef windows_message_queue_t<offset_ptr<void> > windows_message_queue;
     137
     138}} //namespace boost{  namespace interprocess{
     139
     140#include <boost/interprocess/detail/config_end.hpp>
     141
     142#endif   //#ifndef BOOST_INTERPROCESS_WINDOWS_MESSAGE_QUEUE_HPP
  • libs/interprocess/test/windows_message_queue_test.cpp

    diff -ruN boost_1_50_0_beta1.org/libs/interprocess/test/windows_message_queue_test.cpp boost_1_50_0_beta1/libs/interprocess/test/windows_message_queue_test.cpp
    old new  
     1//////////////////////////////////////////////////////////////////////////////
     2//
     3// (C) Copyright Ion Gaztanaga 2004-2011. Distributed under the Boost
     4// Software License, Version 1.0. (See accompanying file
     5// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
     6//
     7// See http://www.boost.org/libs/interprocess for documentation.
     8//
     9//////////////////////////////////////////////////////////////////////////////
     10
     11#include <boost/interprocess/detail/config_begin.hpp>
     12
     13#ifdef BOOST_INTERPROCESS_WINDOWS
     14
     15#include <boost/interprocess/ipc/windows_message_queue.hpp>
     16#include <boost/interprocess/managed_external_buffer.hpp>
     17#include <boost/interprocess/managed_heap_memory.hpp>
     18#include <boost/interprocess/containers/map.hpp>
     19#include <boost/interprocess/containers/set.hpp>
     20#include <boost/interprocess/allocators/node_allocator.hpp>
     21#include <vector>
     22#include <cstddef>
     23#include <limits>
     24#include <boost/thread.hpp>
     25#include <memory>
     26#include <string>
     27#include "get_process_id_name.hpp"
     28
     29////////////////////////////////////////////////////////////////////////////////
     30//                                                                            //
     31//  This example tests the process shared message queue.                      //
     32//                                                                            //
     33////////////////////////////////////////////////////////////////////////////////
     34
     35using namespace boost::interprocess;
     36
     37//This test inserts messages with different priority and marks them with a
     38//time-stamp to check if receiver obtains highest priority messages first and
     39//messages with same priority are received in fifo order
     40bool test_priority_order()
     41{
     42   {
     43      windows_message_queue mq1
     44         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
     45         mq2
     46         (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
     47
     48      //We test that the queue is ordered by priority and in the
     49      //same priority, is a FIFO
     50      windows_message_queue::size_type recvd = 0;
     51      unsigned int priority = 0;
     52      std::size_t tstamp;
     53
     54      //We will send 100 message with priority 0-9
     55      //The message will contain the timestamp of the message
     56      for(std::size_t i = 0; i < 100; ++i){
     57         tstamp = i;
     58         mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
     59      }
     60
     61      unsigned int priority_prev = (std::numeric_limits<unsigned int>::max)();
     62      std::size_t  tstamp_prev = 0;
     63
     64      //Receive all messages and test those are ordered
     65      //by priority and by FIFO in the same priority
     66      for(std::size_t i = 0; i < 100; ++i){
     67         mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
     68         if(priority > priority_prev)
     69            return false;
     70         if(priority == priority_prev &&
     71            tstamp   <= tstamp_prev){
     72            return false;
     73         }
     74         priority_prev  = priority;
     75         tstamp_prev    = tstamp;
     76      }
     77   }
     78   return true;
     79}
     80
     81//[message_queue_test_test_serialize_db
     82//This test creates a in memory data-base using Interprocess machinery and
     83//serializes it through a message queue. Then rebuilds the data-base in
     84//another buffer and checks it against the original data-base
     85bool test_serialize_db()
     86{
     87   //Typedef data to create a Interprocess map 
     88   typedef std::pair<const std::size_t, std::size_t> MyPair;
     89   typedef std::less<std::size_t>   MyLess;
     90   typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
     91      node_allocator_t;
     92   typedef map<std::size_t,
     93               std::size_t,
     94               std::less<std::size_t>,
     95               node_allocator_t>
     96               MyMap;
     97
     98   //Some constants
     99   const std::size_t BufferSize  = 65536;
     100   const std::size_t MaxMsgSize  = 100;
     101
     102   //Allocate a memory buffer to hold the destiny database using vector<char>
     103   std::vector<char> buffer_destiny(BufferSize, 0);
     104
     105   {
     106      //Create the message-queues
     107      windows_message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
     108
     109      //Open previously created message-queue simulating other process
     110      windows_message_queue mq2(open_only, test::get_process_id_name());
     111
     112      //A managed heap memory to create the origin database
     113      managed_heap_memory db_origin(buffer_destiny.size());
     114
     115      //Construct the map in the first buffer
     116      MyMap *map1 = db_origin.construct<MyMap>("MyMap")
     117                                       (MyLess(),
     118                                       db_origin.get_segment_manager());
     119      if(!map1)
     120         return false;
     121
     122      //Fill map1 until is full
     123      try{
     124         std::size_t i = 0;
     125         while(1){
     126            (*map1)[i] = i;
     127            ++i;
     128         }
     129      }
     130      catch(boost::interprocess::bad_alloc &){}
     131
     132      //Data control data sending through the message queue
     133      std::size_t sent = 0;
     134      windows_message_queue::size_type recvd = 0;
     135      windows_message_queue::size_type total_recvd = 0;
     136      unsigned int priority;
     137
     138      //Send whole first buffer through the mq1, read it
     139      //through mq2 to the second buffer
     140      while(1){
     141         //Send a fragment of buffer1 through mq1
     142                 std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
     143                                       MaxMsgSize : (db_origin.get_size() - sent);
     144         mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
     145               , bytes_to_send
     146               , 0);
     147         sent += bytes_to_send;
     148         //Receive the fragment through mq2 to buffer_destiny
     149                 mq2.receive( &buffer_destiny[total_recvd]
     150                          , BufferSize - recvd
     151                  , recvd
     152                  , priority);
     153         total_recvd += recvd;
     154
     155         //Check if we have received all the buffer
     156         if(total_recvd == BufferSize){
     157            break;
     158         }
     159      }
     160     
     161      //The buffer will contain a copy of the original database
     162      //so let's interpret the buffer with managed_external_buffer
     163      managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
     164
     165      //Let's find the map
     166      std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
     167      MyMap *map2 = ret.first;
     168
     169      //Check if we have found it
     170      if(!map2){
     171         return false;
     172      }
     173
     174      //Check if it is a single variable (not an array)
     175      if(ret.second != 1){
     176         return false;
     177      }
     178
     179      //Now let's compare size
     180      if(map1->size() != map2->size()){
     181         return false;
     182      }
     183
     184      //Now let's compare all db values
     185          MyMap::size_type num_elements = map1->size();
     186          for(std::size_t i = 0; i < num_elements; ++i){
     187         if((*map1)[i] != (*map2)[i]){
     188            return false;
     189         }
     190      }
     191     
     192      //Destroy maps from db-s
     193      db_origin.destroy_ptr(map1);
     194      db_destiny.destroy_ptr(map2);
     195   }
     196   return true;
     197}
     198//]
     199static const int MsgSize = 10;
     200static const int NumMsg  = 1000;
     201static char msgsend [10];
     202static char msgrecv [10];
     203
     204
     205static boost::interprocess::windows_message_queue *pmessage_queue;
     206
     207void receiver()
     208{
     209   boost::interprocess::windows_message_queue::size_type recvd_size;
     210   unsigned int priority;
     211   int nummsg = NumMsg;
     212
     213   while(nummsg--){
     214      pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
     215   }
     216}
     217
     218bool test_buffer_overflow()
     219{
     220   {
     221      std::auto_ptr<boost::interprocess::windows_message_queue>
     222         ptr(new boost::interprocess::windows_message_queue
     223               (create_only, test::get_process_id_name(), 10, 10));
     224      pmessage_queue = ptr.get();
     225
     226      //Launch the receiver thread
     227      boost::thread thread(&receiver);
     228      boost::thread::yield();
     229
     230      int nummsg = NumMsg;
     231
     232      while(nummsg--){
     233         pmessage_queue->send(msgsend, MsgSize, 0);
     234      }
     235
     236      thread.join();
     237   }
     238   return true;
     239}
     240
     241int main ()
     242{
     243   if(!test_priority_order()){
     244      return 1;
     245   }
     246
     247   if(!test_serialize_db()){
     248      return 1;
     249   }
     250
     251   if(!test_buffer_overflow()){
     252      return 1;
     253   }
     254
     255   return 0;
     256}
     257
     258#else
     259
     260int main ()
     261{
     262   return 0;
     263}
     264
     265#endif
     266
     267#include <boost/interprocess/detail/config_end.hpp>