diff -ruN boost_1_50_0_beta1.org/boost/interprocess/ipc/windows_message_queue.hpp boost_1_50_0_beta1/boost/interprocess/ipc/windows_message_queue.hpp --- boost_1_50_0_beta1.org/boost/interprocess/ipc/windows_message_queue.hpp Thu Jan 1 00:00:00 1970 +++ boost_1_50_0_beta1/boost/interprocess/ipc/windows_message_queue.hpp Tue Jun 26 09:28:15 2012 @@ -0,0 +1,142 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2005-2011. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_WINDOWS_MESSAGE_QUEUE_HPP +#define BOOST_INTERPROCESS_WINDOWS_MESSAGE_QUEUE_HPP + +#include +#include + +#include +#include + + +//!\file +//!Inter-process message queue using Windows shared memory. + +namespace boost{ namespace interprocess{ + +//!A class that allows sending messages +//!between processesusing Windows shared memory. +template +class windows_message_queue_t : public message_queue_base_t > +{ + typedef message_queue_base_t > message_queue_base; + + windows_message_queue_t(); + + protected: + ipcdetail::managed_open_or_create_impl m_shmem; + + friend class message_queue_base; + //!Get pointer to message queue data + void* get_data() { return m_shmem.get_user_address(); } + //!Offset to add to message queue size (for extra data required by backend) + static size_type get_data_offset() + { return ipcdetail::managed_open_or_create_impl::ManagedOpenOrCreateUserOffset; } + + public: + typedef typename message_queue_base::size_type size_type; + + //!Creates a process shared message queue with name "name". For this message queue, + //!the maximum number of messages will be "max_num_msg" and the maximum message size + //!will be "max_msg_size". Throws on error and if the queue was previously created. + windows_message_queue_t(create_only_t create_only, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm = permissions()); + + //!Opens or creates a process shared message queue with name "name". + //!If the queue is created, the maximum number of messages will be "max_num_msg" + //!and the maximum message size will be "max_msg_size". If queue was previously + //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters + //!are ignored. Throws on error. + windows_message_queue_t(open_or_create_t open_or_create, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm = permissions()); + + //!Opens a previously created process shared message queue with name "name". + //!If the queue was not previously created or there are no free resources, + //!throws an error. + windows_message_queue_t(open_only_t open_only, + const char *name); + + //!Destroys *this and indicates that the calling process is finished using + //!the resource. All opened message queues are still + //!valid after destruction. The destructor function will deallocate + //!any system resources allocated by the system for use by this process for + //!this resource. The resource can still be opened again calling + //!the open constructor overload. To erase the message queue from the system + //!use remove(). + ~windows_message_queue_t(); +}; + +template +inline windows_message_queue_t::~windows_message_queue_t() +{} + +/// @cond + +template +inline windows_message_queue_t::windows_message_queue_t(create_only_t create_only, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm) + //Create shared memory and execute functor atomically + : m_shmem(create_only, + name, + get_mem_size(max_msg_size, max_num_msg), + read_write, + static_cast(0), + //Prepare initialization functor + message_queue_base::initialization_func_t (max_num_msg, max_msg_size), + perm) +{} + +template +inline windows_message_queue_t::windows_message_queue_t(open_or_create_t open_or_create, + const char *name, + size_type max_num_msg, + size_type max_msg_size, + const permissions &perm) + //Create shared memory and execute functor atomically + : m_shmem(open_or_create, + name, + get_mem_size(max_msg_size, max_num_msg), + read_write, + static_cast(0), + //Prepare initialization functor + message_queue_base::initialization_func_t (max_num_msg, max_msg_size), + perm) +{} + +template +inline windows_message_queue_t::windows_message_queue_t(open_only_t open_only, + const char *name) + //Create shared memory and execute functor atomically + : m_shmem(open_only, + name, + read_write, + static_cast(0), + //Prepare initialization functor + message_queue_base::initialization_func_t ()) +{} + +typedef windows_message_queue_t > windows_message_queue; + +}} //namespace boost{ namespace interprocess{ + +#include + +#endif //#ifndef BOOST_INTERPROCESS_WINDOWS_MESSAGE_QUEUE_HPP diff -ruN boost_1_50_0_beta1.org/libs/interprocess/test/windows_message_queue_test.cpp boost_1_50_0_beta1/libs/interprocess/test/windows_message_queue_test.cpp --- boost_1_50_0_beta1.org/libs/interprocess/test/windows_message_queue_test.cpp Thu Jan 1 00:00:00 1970 +++ boost_1_50_0_beta1/libs/interprocess/test/windows_message_queue_test.cpp Tue Jun 26 10:00:45 2012 @@ -0,0 +1,267 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2004-2011. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#include + +#ifdef BOOST_INTERPROCESS_WINDOWS + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "get_process_id_name.hpp" + +//////////////////////////////////////////////////////////////////////////////// +// // +// This example tests the process shared message queue. // +// // +//////////////////////////////////////////////////////////////////////////////// + +using namespace boost::interprocess; + +//This test inserts messages with different priority and marks them with a +//time-stamp to check if receiver obtains highest priority messages first and +//messages with same priority are received in fifo order +bool test_priority_order() +{ + { + windows_message_queue mq1 + (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)), + mq2 + (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)); + + //We test that the queue is ordered by priority and in the + //same priority, is a FIFO + windows_message_queue::size_type recvd = 0; + unsigned int priority = 0; + std::size_t tstamp; + + //We will send 100 message with priority 0-9 + //The message will contain the timestamp of the message + for(std::size_t i = 0; i < 100; ++i){ + tstamp = i; + mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10)); + } + + unsigned int priority_prev = (std::numeric_limits::max)(); + std::size_t tstamp_prev = 0; + + //Receive all messages and test those are ordered + //by priority and by FIFO in the same priority + for(std::size_t i = 0; i < 100; ++i){ + mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); + if(priority > priority_prev) + return false; + if(priority == priority_prev && + tstamp <= tstamp_prev){ + return false; + } + priority_prev = priority; + tstamp_prev = tstamp; + } + } + return true; +} + +//[message_queue_test_test_serialize_db +//This test creates a in memory data-base using Interprocess machinery and +//serializes it through a message queue. Then rebuilds the data-base in +//another buffer and checks it against the original data-base +bool test_serialize_db() +{ + //Typedef data to create a Interprocess map + typedef std::pair MyPair; + typedef std::less MyLess; + typedef node_allocator + node_allocator_t; + typedef map, + node_allocator_t> + MyMap; + + //Some constants + const std::size_t BufferSize = 65536; + const std::size_t MaxMsgSize = 100; + + //Allocate a memory buffer to hold the destiny database using vector + std::vector buffer_destiny(BufferSize, 0); + + { + //Create the message-queues + windows_message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize); + + //Open previously created message-queue simulating other process + windows_message_queue mq2(open_only, test::get_process_id_name()); + + //A managed heap memory to create the origin database + managed_heap_memory db_origin(buffer_destiny.size()); + + //Construct the map in the first buffer + MyMap *map1 = db_origin.construct("MyMap") + (MyLess(), + db_origin.get_segment_manager()); + if(!map1) + return false; + + //Fill map1 until is full + try{ + std::size_t i = 0; + while(1){ + (*map1)[i] = i; + ++i; + } + } + catch(boost::interprocess::bad_alloc &){} + + //Data control data sending through the message queue + std::size_t sent = 0; + windows_message_queue::size_type recvd = 0; + windows_message_queue::size_type total_recvd = 0; + unsigned int priority; + + //Send whole first buffer through the mq1, read it + //through mq2 to the second buffer + while(1){ + //Send a fragment of buffer1 through mq1 + std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? + MaxMsgSize : (db_origin.get_size() - sent); + mq1.send( &static_cast(db_origin.get_address())[sent] + , bytes_to_send + , 0); + sent += bytes_to_send; + //Receive the fragment through mq2 to buffer_destiny + mq2.receive( &buffer_destiny[total_recvd] + , BufferSize - recvd + , recvd + , priority); + total_recvd += recvd; + + //Check if we have received all the buffer + if(total_recvd == BufferSize){ + break; + } + } + + //The buffer will contain a copy of the original database + //so let's interpret the buffer with managed_external_buffer + managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize); + + //Let's find the map + std::pair ret = db_destiny.find("MyMap"); + MyMap *map2 = ret.first; + + //Check if we have found it + if(!map2){ + return false; + } + + //Check if it is a single variable (not an array) + if(ret.second != 1){ + return false; + } + + //Now let's compare size + if(map1->size() != map2->size()){ + return false; + } + + //Now let's compare all db values + MyMap::size_type num_elements = map1->size(); + for(std::size_t i = 0; i < num_elements; ++i){ + if((*map1)[i] != (*map2)[i]){ + return false; + } + } + + //Destroy maps from db-s + db_origin.destroy_ptr(map1); + db_destiny.destroy_ptr(map2); + } + return true; +} +//] +static const int MsgSize = 10; +static const int NumMsg = 1000; +static char msgsend [10]; +static char msgrecv [10]; + + +static boost::interprocess::windows_message_queue *pmessage_queue; + +void receiver() +{ + boost::interprocess::windows_message_queue::size_type recvd_size; + unsigned int priority; + int nummsg = NumMsg; + + while(nummsg--){ + pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority); + } +} + +bool test_buffer_overflow() +{ + { + std::auto_ptr + ptr(new boost::interprocess::windows_message_queue + (create_only, test::get_process_id_name(), 10, 10)); + pmessage_queue = ptr.get(); + + //Launch the receiver thread + boost::thread thread(&receiver); + boost::thread::yield(); + + int nummsg = NumMsg; + + while(nummsg--){ + pmessage_queue->send(msgsend, MsgSize, 0); + } + + thread.join(); + } + return true; +} + +int main () +{ + if(!test_priority_order()){ + return 1; + } + + if(!test_serialize_db()){ + return 1; + } + + if(!test_buffer_overflow()){ + return 1; + } + + return 0; +} + +#else + +int main () +{ + return 0; +} + +#endif + +#include