#include #include #include #include #include #include #include #include #include #include #include #include #include // ------------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------------ //! \brief Representation for a shared memory segment manager. typedef boost::interprocess::managed_shared_memory::segment_manager segman_t; // ----- Allocators For Shared Memory ----- typedef boost::interprocess::allocator void_alloc; typedef boost::interprocess::allocator char_alloc; typedef boost::interprocess::allocator uint32_alloc; typedef boost::interprocess::allocator uint64_alloc; typedef boost::interprocess::allocator dbl_alloc; typedef boost::interprocess::allocator bool_alloc; // ----- Basic Shared Memory Types/Containers ----- typedef boost::interprocess::basic_string, char_alloc> char_str; typedef boost::interprocess::allocator str_alloc; typedef boost::interprocess::set, str_alloc> str_set; typedef boost::interprocess::set, uint64_alloc> uint64_set; typedef std::pair strmap_t; typedef std::pair movable_strmap_t; typedef boost::interprocess::allocator strmap_t_alloc; typedef boost::interprocess::map, strmap_t_alloc> str_uint32_map; // ------------------------------------------------------------------------------------------------------------------------- class SMStore_OrderID { public: typedef boost::interprocess::interprocess_mutex SMMutex; typedef boost::interprocess::scoped_lock SMMutScpLck; typedef boost::interprocess::managed_shared_memory BIPCMSM; // ----------------------------------------------------------------------- SMStore_OrderID(const std::string& branch, const uint32_t& memsize = 1024) : m_branch(branch), m_memsize(memsize), m_msm(0), m_alloc(0), m_set(0) { try { m_msm = new BIPCMSM(boost::interprocess::open_or_create, m_branch.c_str(), m_memsize*m_memmult); m_alloc = new void_alloc(m_msm->get_segment_manager()); m_set = m_msm->find_or_construct("OrderIDs")(std::less(), *m_alloc); } catch(...) { cleanup(); throw std::runtime_error("ERR: SMStore_OrderID instance encountered issues on construction."); } } // ----------------------------------------------------------------------- ~SMStore_OrderID() { } // ----------------------------------------------------------------------- inline void cleanup() { delete m_msm; m_msm = 0; delete m_alloc; m_alloc = 0; delete m_set; m_set = 0; } // ----------------------------------------------------------------------- inline const bool remove(const std::string& name) { if(name.empty()) return false; return boost::interprocess::shared_memory_object::remove(name.c_str()); } // ----------------------------------------------------------------------- inline const bool exists(const uint64_t& oid) { SMMutScpLck lock(mtx); return m_set->find(oid) != m_set->cend(); } // ----------------------------------------------------------------------- inline const bool add_id(const uint64_t& oid) { SMMutScpLck lock(mtx); return m_set->insert(oid).second; } // ----------------------------------------------------------------------- inline const bool remove_id(const uint64_t& oid) { SMMutScpLck lock(mtx); return m_set->erase(oid) != 0; } private: // ----------------------------------------------------------------------- enum {m_memmult = 8}; //!< Size of the ID chunk (64 bit|8 bytes) std::string m_branch; //!< Name of the shared memory segment, by SQF6 branch uint32_t m_memsize; //!< Size of the segment, multiple of uint64_t size BIPCMSM *m_msm; //!< Pointer to Boost IPC managed shared memory segment void_alloc *m_alloc; //!< Pointer to void allocator for the memory segement uint64_set *m_set; //!< Pointer to underlying set storing the order IDs SMMutex mtx; //!< Access protection mutex }; typedef boost::shared_ptr SMStore_OrderID_ptr; class ASIOThdPool { public: typedef boost::shared_ptr IOSWorkPtr; // ----------------------------------------------------------------------- //! \brief Ctor. ASIOThdPool(const uint32_t nthds) : IOS(), WRK(new boost::asio::io_service::work(IOS)), IOSIG(IOS, SIGTERM, SIGINT, SIGBREAK) { m_thd_cnt = nthds > 8 ? 8 : nthds; m_is_running = false; IOSIG.async_wait(boost::bind(&ASIOThdPool::shutdown, this, boost::asio::placeholders::error, boost::asio::placeholders::signal_number)); } // ----------------------------------------------------------------------- //! \brief Dtor. ~ASIOThdPool() {} // ----------------------------------------------------------------------- //! \brief Method to start up the thread pool. void run() { m_is_running = true; for(uint32_t i = 0; i < m_thd_cnt; ++i) TGRP.create_thread(boost::bind(&boost::asio::io_service::run, &IOS)); } // ----------------------------------------------------------------------- //! \brief Method to wait on the thread pool to finish and issue a shutdown. void join() { if(!m_is_running) return; TGRP.join_all(); shutdown(boost::system::error_code(), 100); } // ----------------------------------------------------------------------- //! \brief Method to shutdown the thread pool service. void shutdown(const boost::system::error_code &err, int signum) { if(!m_is_running) return; m_is_running = false; WRK.reset(); IOSIG.cancel(); IOS.stop(); } // ----------------------------------------------------------------------- boost::asio::io_service IOS; //!< Boost ASIO IO service object for scheduling work. IOSWorkPtr WRK; //!< Work object to ensure the ASIO event loop has work. boost::asio::signal_set IOSIG; //!< Signals set for handling shutdowns. boost::thread_group TGRP; //!< Thread group for handling the scheduled tasks. uint32_t m_thd_cnt; //!< Number of threads within the thread pool. bool m_is_running; //!< Control variable to indicate running state. }; class SomeSocket { public: // ----------------------------------------------------------------------- SomeSocket(ASIOThdPool& ATP) : ATP(ATP), m_socket(ATP.IOS) { } // ----------------------------------------------------------------------- ~SomeSocket() { } // ----------------------------------------------------------------------- void connect(const std::string& id, const std::string& ip, const std::string& port) { SMOID.reset(new SMStore_OrderID(id, 1)); boost::asio::ip::tcp::resolver resolver(ATP.IOS); boost::asio::ip::tcp::resolver::query query(ip, port); boost::asio::ip::tcp::resolver::iterator endpt = resolver.resolve(query); boost::system::error_code ec; while(endpt != boost::asio::ip::tcp::resolver::iterator()) { m_socket.connect(endpt->endpoint(), ec); if(!ec) { std::printf("Connected to [%s:%s]\n", ip.c_str(), port.c_str()); } ++endpt; } } // ----------------------------------------------------------------------- void run() { SMOID->add_id(12);// read stuff } // ----------------------------------------------------------------------- ASIOThdPool &ATP; boost::asio::ip::tcp::socket m_socket; SMStore_OrderID_ptr SMOID; }; int main(int argc, char* argv[]) { ASIOThdPool ATP(3); SomeSocket SS(ATP); SS.connect("ID", "209.213.219.189", "10001"); SS.run(); ATP.run(); return 0; }