Opened 14 years ago
Closed 14 years ago
#2810 closed Patches (fixed)
Optimization for list::merge performance
Reported by: | Andrey Semashev | Owned by: | Ion Gaztañaga |
---|---|---|---|
Milestone: | Boost 1.39.0 | Component: | intrusive |
Version: | Boost 1.38.0 | Severity: | Optimization |
Keywords: | list merge performance | Cc: |
Description
The current implementation of the list::merge method is quite inefficient since it always iterates through the *this sequence, even if there are no more elements to insert. It also inserts elements from the other sequence one by one, which may generate a lot of overhead for node pointers adjustments in case if there are equivalent nodes in either of the sequences.
I suggest the following optimized version of the merge method (see the my_merge function):
#include <cstdlib> #include <iostream> #include <boost/checked_delete.hpp> #include <boost/date_time/microsec_time_clock.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/intrusive/options.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive/list_hook.hpp> namespace bi = boost::intrusive; using boost::posix_time::ptime; typedef bi::list_base_hook< bi::link_mode< bi::auto_unlink > > BaseHook_t; struct MyData : public BaseHook_t { struct OrderByKey { typedef bool result_type; result_type operator() (MyData const& left, MyData const& right) const { return left.m_Key < right.m_Key; } }; struct Cloner { typedef MyData* result_type; result_type operator() (MyData const& that) const { return new MyData(that); } }; int m_Key; explicit MyData(int k) : m_Key(k) {} }; typedef bi::list< MyData, bi::base_hook< BaseHook_t >, bi::constant_time_size< false > > List_t; template< typename PredicateT > void my_merge(List_t& this_, List_t& x, PredicateT p) { typedef List_t::const_iterator const_iterator; typedef List_t::size_type size_type; const_iterator b = this_.cbegin(), e = this_.cend(); const_iterator ex = x.cend(); while (!x.empty()) { const_iterator bx = x.cbegin(); while (b != e && p(*bx, *b)) { ++b; } if (b != e) { // determine the end of the range of x to inject at b size_type n = 0; do { ++bx; ++n; } while (bx != ex && p(*bx, *b)); this_.splice(b, x, x.cbegin(), bx, n); } else { // the rest of the list is to be appended to the end this_.splice(e, x); break; } } } int main(int, char*[]) { enum { LIST1_SIZE = 1000000, LIST2_SIZE = LIST1_SIZE }; List_t l11, l12; for (unsigned int i = 0; i < LIST1_SIZE; ++i) { MyData* p = new MyData(std::rand()); l11.push_back(*p); } l11.sort(MyData::OrderByKey()); for (unsigned int i = 0; i < LIST2_SIZE; ++i) { MyData* p = new MyData(std::rand()); l12.push_back(*p); } l12.sort(MyData::OrderByKey()); List_t l21, l22; l21.clone_from(l11, MyData::Cloner(), boost::checked_deleter< MyData >()); l22.clone_from(l12, MyData::Cloner(), boost::checked_deleter< MyData >()); ptime start1, end1, start2, end2; start1 = boost::date_time::microsec_clock< ptime >::universal_time(); l11.merge(l12, MyData::OrderByKey()); end1 = boost::date_time::microsec_clock< ptime >::universal_time(); start2 = boost::date_time::microsec_clock< ptime >::universal_time(); my_merge(l21, l22, MyData::OrderByKey()); end2 = boost::date_time::microsec_clock< ptime >::universal_time(); unsigned int duration1 = end1.time_of_day().total_microseconds() - start1.time_of_day().total_microseconds(); unsigned int duration2 = end2.time_of_day().total_microseconds() - start2.time_of_day().total_microseconds(); std::cout << "Boost.Intrusive version: " << duration1 << " us, optimized: " << duration2 << " us" << std::endl; l11.clear_and_dispose(boost::checked_deleter< MyData >()); l12.clear_and_dispose(boost::checked_deleter< MyData >()); l21.clear_and_dispose(boost::checked_deleter< MyData >()); l22.clear_and_dispose(boost::checked_deleter< MyData >()); return 0; }
This test case shows that my_merge is faster than list::merge by an order of magnitude and even more if LIST1_SIZE != LIST2_SIZE.
Attachments (1)
Change History (5)
by , 14 years ago
Attachment: | list.hpp.patch added |
---|
follow-up: 3 comment:1 by , 14 years ago
I think your proposal has a bug in the first loop (it shouldn't be "!p(*bx, b)"?). I've tested the following:
template<class Predicate> void merge(list_impl& x, Predicate p) {
const_iterator e(this->cend()), ex(x.cend()); const_iterator b(this->cbegin()); while(!x.empty()){
const_iterator ix(x.cbegin()); while (b != e && !p(*ix, *b)){
++b;
} if(b == e){
Now transfer the rest to the end of the container this->splice(e, x); break;
} else{
size_type n(0); do{
++ix; ++n;
} while(ix != ex && p(*ix, *b)); this->splice(b, x, x.begin(), ix, n);
}
}
}
And similarly for slist:
template<class Predicate> iterator merge(slist_impl& x, Predicate p) {
const_iterator e(this->cend()), ex(x.cend()), bb(this->cbefore_begin()),
bb_next, last_inserted(e);
while(!x.empty()){
const_iterator ibx_next(x.cbefore_begin()), ibx(ibx_next++); while (++(bb_next = bb) != e && !p(*ibx_next, *bb_next)){
bb = bb_next;
} if(bb_next == e){
Now transfer the rest to the end of the container last_inserted = this->splice_after(bb, x); break;
} else{
size_type n(0); do{
ibx = ibx_next; ++n;
} while(++(ibx_next = ibx) != ex && p(*ibx_next, *bb_next)); this->splice_after(bb, x, x.before_begin(), ibx, n); last_inserted = ibx;
}
} return last_inserted.unconst();
}
Those are not thorougly tested so let me know if you find any error.
comment:2 by , 14 years ago
Sorry, here's the same with preformatted text:
template<class Predicate> void merge(list_impl& x, Predicate p) { const_iterator e(this->cend()), ex(x.cend()); const_iterator b(this->cbegin()); while(!x.empty()){ const_iterator ix(x.cbegin()); while (b != e && !p(*ix, *b)){ ++b; } if(b == e){ //Now transfer the rest to the end of the container this->splice(e, x); break; } else{ size_type n(0); do{ ++ix; ++n; } while(ix != ex && p(*ix, *b)); this->splice(b, x, x.begin(), ix, n); } } } template<class Predicate> iterator merge(slist_impl& x, Predicate p) { const_iterator e(this->cend()), ex(x.cend()), bb(this->cbefore_begin()), bb_next, last_inserted(e); while(!x.empty()){ const_iterator ibx_next(x.cbefore_begin()), ibx(ibx_next++); while (++(bb_next = bb) != e && !p(*ibx_next, *bb_next)){ bb = bb_next; } if(bb_next == e){ //Now transfer the rest to the end of the container last_inserted = this->splice_after(bb, x); break; } else{ size_type n(0); do{ ibx = ibx_next; ++n; } while(++(ibx_next = ibx) != ex && p(*ibx_next, *bb_next)); this->splice_after(bb, x, x.before_begin(), ibx, n); last_inserted = ibx; } } return last_inserted.unconst(); }
comment:3 by , 14 years ago
Replying to igaztanaga:
I think your proposal has a bug in the first loop (it shouldn't be "!p(*bx, b)"?).
Yeah, looks like you're right. Thanks for spotting this.
Your corrected version seems to be working flawlessly in my application. At least, the list::merge method. I hope, it will be released in 1.39.
comment:4 by , 14 years ago
Resolution: | → fixed |
---|---|
Status: | new → closed |
Applied modified patch. Revision 51971.
The patch that applies the optimization to list::merge (against branches/release)