Ticket #7553: test-asio.cpp

File test-asio.cpp, 8.1 KB (added by Todd Chadwick <ctchadwick@…>, 10 years ago)
Line 
1
2#include <fstream>
3#include <cstdio>
4#include <string>
5#include <cstdio>
6
7#include <boost/interprocess/managed_shared_memory.hpp>
8#include <boost/interprocess/allocators/allocator.hpp>
9#include <boost/interprocess/containers/string.hpp>
10#include <boost/interprocess/containers/set.hpp>
11#include <boost/interprocess/containers/map.hpp>
12#include <boost/interprocess/sync/interprocess_mutex.hpp>
13#include <boost/interprocess/sync/scoped_lock.hpp>
14
15#include <boost/asio.hpp>
16#include <boost/thread.hpp>
17
18// ------------------------------------------------------------------------------------------------------
19// ------------------------------------------------------------------------------------------------------
20//! \brief Representation for a shared memory segment manager.
21typedef boost::interprocess::managed_shared_memory::segment_manager segman_t;
22
23// ----- Allocators For Shared Memory -----
24typedef boost::interprocess::allocator<void, segman_t> void_alloc;
25typedef boost::interprocess::allocator<char, segman_t> char_alloc;
26typedef boost::interprocess::allocator<uint32_t, segman_t> uint32_alloc;
27typedef boost::interprocess::allocator<uint64_t, segman_t> uint64_alloc;
28typedef boost::interprocess::allocator<double, segman_t> dbl_alloc;
29typedef boost::interprocess::allocator<bool, segman_t> bool_alloc;
30
31// ----- Basic Shared Memory Types/Containers -----
32typedef boost::interprocess::basic_string<char, std::char_traits<char>, char_alloc> char_str;
33typedef boost::interprocess::allocator<char_str, segman_t> str_alloc;
34typedef boost::interprocess::set<char_str, std::less<char_str>, str_alloc> str_set;
35typedef boost::interprocess::set<uint64_t, std::less<uint64_t>, uint64_alloc> uint64_set;
36typedef std::pair<const char_str, uint32_t> strmap_t;
37typedef std::pair<char_str, uint32_t> movable_strmap_t;
38typedef boost::interprocess::allocator<strmap_t, segman_t> strmap_t_alloc;
39typedef boost::interprocess::map<char_str, uint32_t, std::less<char_str>, strmap_t_alloc> str_uint32_map;
40
41
42// -------------------------------------------------------------------------------------------------------------------------
43class SMStore_OrderID
44{
45public:
46 typedef boost::interprocess::interprocess_mutex SMMutex;
47 typedef boost::interprocess::scoped_lock<SMMutex> SMMutScpLck;
48 typedef boost::interprocess::managed_shared_memory BIPCMSM;
49
50 // -----------------------------------------------------------------------
51 SMStore_OrderID(const std::string& branch, const uint32_t& memsize = 1024)
52 : m_branch(branch),
53 m_memsize(memsize),
54 m_msm(0),
55 m_alloc(0),
56 m_set(0)
57 {
58 try
59 {
60 m_msm = new BIPCMSM(boost::interprocess::open_or_create, m_branch.c_str(),
61 m_memsize*m_memmult);
62 m_alloc = new void_alloc(m_msm->get_segment_manager());
63 m_set = m_msm->find_or_construct<uint64_set>("OrderIDs")(std::less<uint64_t>(), *m_alloc);
64 }
65 catch(...)
66 {
67 cleanup();
68 throw std::runtime_error("ERR: SMStore_OrderID instance encountered issues on construction.");
69 }
70 }
71
72 // -----------------------------------------------------------------------
73 ~SMStore_OrderID()
74 {
75 }
76
77 // -----------------------------------------------------------------------
78 inline void cleanup()
79 {
80 delete m_msm; m_msm = 0;
81 delete m_alloc; m_alloc = 0;
82 delete m_set; m_set = 0;
83 }
84
85 // -----------------------------------------------------------------------
86 inline const bool remove(const std::string& name)
87 {
88 if(name.empty()) return false;
89 return boost::interprocess::shared_memory_object::remove(name.c_str());
90 }
91
92 // -----------------------------------------------------------------------
93 inline const bool exists(const uint64_t& oid)
94 {
95 SMMutScpLck lock(mtx);
96 return m_set->find(oid) != m_set->cend();
97 }
98
99 // -----------------------------------------------------------------------
100 inline const bool add_id(const uint64_t& oid)
101 {
102 SMMutScpLck lock(mtx);
103 return m_set->insert(oid).second;
104 }
105
106 // -----------------------------------------------------------------------
107 inline const bool remove_id(const uint64_t& oid)
108 {
109 SMMutScpLck lock(mtx);
110 return m_set->erase(oid) != 0;
111 }
112
113private:
114
115 // -----------------------------------------------------------------------
116 enum {m_memmult = 8}; //!< Size of the ID chunk (64 bit|8 bytes)
117 std::string m_branch; //!< Name of the shared memory segment, by SQF6 branch
118 uint32_t m_memsize; //!< Size of the segment, multiple of uint64_t size
119 BIPCMSM *m_msm; //!< Pointer to Boost IPC managed shared memory segment
120 void_alloc *m_alloc; //!< Pointer to void allocator for the memory segement
121 uint64_set *m_set; //!< Pointer to underlying set storing the order IDs
122 SMMutex mtx; //!< Access protection mutex
123};
124
125typedef boost::shared_ptr<SMStore_OrderID> SMStore_OrderID_ptr;
126
127
128class ASIOThdPool
129{
130public:
131 typedef boost::shared_ptr<boost::asio::io_service::work> IOSWorkPtr;
132
133 // -----------------------------------------------------------------------
134 //! \brief Ctor.
135 ASIOThdPool(const uint32_t nthds)
136 : IOS(),
137 WRK(new boost::asio::io_service::work(IOS)),
138 IOSIG(IOS, SIGTERM, SIGINT, SIGBREAK)
139 {
140 m_thd_cnt = nthds > 8 ? 8 : nthds;
141 m_is_running = false;
142 IOSIG.async_wait(boost::bind(&ASIOThdPool::shutdown, this, boost::asio::placeholders::error,
143 boost::asio::placeholders::signal_number));
144 }
145
146 // -----------------------------------------------------------------------
147 //! \brief Dtor.
148 ~ASIOThdPool() {}
149
150 // -----------------------------------------------------------------------
151 //! \brief Method to start up the thread pool.
152 void run()
153 {
154 m_is_running = true;
155 for(uint32_t i = 0; i < m_thd_cnt; ++i)
156 TGRP.create_thread(boost::bind(&boost::asio::io_service::run, &IOS));
157 }
158
159 // -----------------------------------------------------------------------
160 //! \brief Method to wait on the thread pool to finish and issue a shutdown.
161 void join()
162 {
163 if(!m_is_running) return;
164 TGRP.join_all();
165 shutdown(boost::system::error_code(), 100);
166 }
167
168 // -----------------------------------------------------------------------
169 //! \brief Method to shutdown the thread pool service.
170 void shutdown(const boost::system::error_code &err, int signum)
171 {
172 if(!m_is_running) return;
173 m_is_running = false;
174
175 WRK.reset();
176 IOSIG.cancel();
177 IOS.stop();
178 }
179
180 // -----------------------------------------------------------------------
181 boost::asio::io_service IOS; //!< Boost ASIO IO service object for scheduling work.
182 IOSWorkPtr WRK; //!< Work object to ensure the ASIO event loop has work.
183 boost::asio::signal_set IOSIG; //!< Signals set for handling shutdowns.
184 boost::thread_group TGRP; //!< Thread group for handling the scheduled tasks.
185 uint32_t m_thd_cnt; //!< Number of threads within the thread pool.
186 bool m_is_running; //!< Control variable to indicate running state.
187};
188
189
190class SomeSocket
191{
192public:
193 // -----------------------------------------------------------------------
194 SomeSocket(ASIOThdPool& ATP)
195 : ATP(ATP),
196 m_socket(ATP.IOS)
197 {
198 }
199
200 // -----------------------------------------------------------------------
201 ~SomeSocket()
202 {
203 }
204
205 // -----------------------------------------------------------------------
206 void connect(const std::string& id, const std::string& ip, const std::string& port)
207 {
208
209 SMOID.reset(new SMStore_OrderID(id, 1));
210
211 boost::asio::ip::tcp::resolver resolver(ATP.IOS);
212 boost::asio::ip::tcp::resolver::query query(ip, port);
213 boost::asio::ip::tcp::resolver::iterator endpt = resolver.resolve(query);
214
215 boost::system::error_code ec;
216 while(endpt != boost::asio::ip::tcp::resolver::iterator())
217 {
218 m_socket.connect(endpt->endpoint(), ec);
219 if(!ec)
220 {
221 std::printf("Connected to [%s:%s]\n", ip.c_str(), port.c_str());
222 }
223 ++endpt;
224 }
225 }
226
227 // -----------------------------------------------------------------------
228 void run()
229 {
230 SMOID->add_id(12);// read stuff
231 }
232
233 // -----------------------------------------------------------------------
234 ASIOThdPool &ATP;
235 boost::asio::ip::tcp::socket m_socket;
236 SMStore_OrderID_ptr SMOID;
237
238};
239
240
241
242int main(int argc, char* argv[])
243{
244
245 ASIOThdPool ATP(3);
246
247 SomeSocket SS(ATP);
248
249 SS.connect("ID", "209.213.219.189", "10001");
250 SS.run();
251
252 ATP.run();
253
254 return 0;
255
256}