Opened 14 years ago
Closed 14 years ago
#2810 closed Patches (fixed)
Optimization for list::merge performance
| Reported by: | Andrey Semashev | Owned by: | Ion Gaztañaga |
|---|---|---|---|
| Milestone: | Boost 1.39.0 | Component: | intrusive |
| Version: | Boost 1.38.0 | Severity: | Optimization |
| Keywords: | list merge performance | Cc: |
Description
The current implementation of the list::merge method is quite inefficient since it always iterates through the *this sequence, even if there are no more elements to insert. It also inserts elements from the other sequence one by one, which may generate a lot of overhead for node pointers adjustments in case if there are equivalent nodes in either of the sequences.
I suggest the following optimized version of the merge method (see the my_merge function):
#include <cstdlib>
#include <iostream>
#include <boost/checked_delete.hpp>
#include <boost/date_time/microsec_time_clock.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/intrusive/options.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
namespace bi = boost::intrusive;
using boost::posix_time::ptime;
typedef bi::list_base_hook<
bi::link_mode< bi::auto_unlink >
> BaseHook_t;
struct MyData :
public BaseHook_t
{
struct OrderByKey
{
typedef bool result_type;
result_type operator() (MyData const& left, MyData const& right) const
{
return left.m_Key < right.m_Key;
}
};
struct Cloner
{
typedef MyData* result_type;
result_type operator() (MyData const& that) const
{
return new MyData(that);
}
};
int m_Key;
explicit MyData(int k) : m_Key(k) {}
};
typedef bi::list<
MyData,
bi::base_hook< BaseHook_t >,
bi::constant_time_size< false >
> List_t;
template< typename PredicateT >
void my_merge(List_t& this_, List_t& x, PredicateT p)
{
typedef List_t::const_iterator const_iterator;
typedef List_t::size_type size_type;
const_iterator b = this_.cbegin(), e = this_.cend();
const_iterator ex = x.cend();
while (!x.empty())
{
const_iterator bx = x.cbegin();
while (b != e && p(*bx, *b))
{
++b;
}
if (b != e)
{
// determine the end of the range of x to inject at b
size_type n = 0;
do
{
++bx;
++n;
}
while (bx != ex && p(*bx, *b));
this_.splice(b, x, x.cbegin(), bx, n);
}
else
{
// the rest of the list is to be appended to the end
this_.splice(e, x);
break;
}
}
}
int main(int, char*[])
{
enum
{
LIST1_SIZE = 1000000,
LIST2_SIZE = LIST1_SIZE
};
List_t l11, l12;
for (unsigned int i = 0; i < LIST1_SIZE; ++i)
{
MyData* p = new MyData(std::rand());
l11.push_back(*p);
}
l11.sort(MyData::OrderByKey());
for (unsigned int i = 0; i < LIST2_SIZE; ++i)
{
MyData* p = new MyData(std::rand());
l12.push_back(*p);
}
l12.sort(MyData::OrderByKey());
List_t l21, l22;
l21.clone_from(l11, MyData::Cloner(), boost::checked_deleter< MyData >());
l22.clone_from(l12, MyData::Cloner(), boost::checked_deleter< MyData >());
ptime start1, end1, start2, end2;
start1 = boost::date_time::microsec_clock< ptime >::universal_time();
l11.merge(l12, MyData::OrderByKey());
end1 = boost::date_time::microsec_clock< ptime >::universal_time();
start2 = boost::date_time::microsec_clock< ptime >::universal_time();
my_merge(l21, l22, MyData::OrderByKey());
end2 = boost::date_time::microsec_clock< ptime >::universal_time();
unsigned int duration1 = end1.time_of_day().total_microseconds() - start1.time_of_day().total_microseconds();
unsigned int duration2 = end2.time_of_day().total_microseconds() - start2.time_of_day().total_microseconds();
std::cout << "Boost.Intrusive version: " << duration1 << " us, optimized: " << duration2 << " us" << std::endl;
l11.clear_and_dispose(boost::checked_deleter< MyData >());
l12.clear_and_dispose(boost::checked_deleter< MyData >());
l21.clear_and_dispose(boost::checked_deleter< MyData >());
l22.clear_and_dispose(boost::checked_deleter< MyData >());
return 0;
}
This test case shows that my_merge is faster than list::merge by an order of magnitude and even more if LIST1_SIZE != LIST2_SIZE.
Attachments (1)
Change History (5)
by , 14 years ago
| Attachment: | list.hpp.patch added |
|---|
follow-up: 3 comment:1 by , 14 years ago
I think your proposal has a bug in the first loop (it shouldn't be "!p(*bx, b)"?). I've tested the following:
template<class Predicate> void merge(list_impl& x, Predicate p) {
const_iterator e(this->cend()), ex(x.cend()); const_iterator b(this->cbegin()); while(!x.empty()){
const_iterator ix(x.cbegin()); while (b != e && !p(*ix, *b)){
++b;
} if(b == e){
Now transfer the rest to the end of the container this->splice(e, x); break;
} else{
size_type n(0); do{
++ix; ++n;
} while(ix != ex && p(*ix, *b)); this->splice(b, x, x.begin(), ix, n);
}
}
}
And similarly for slist:
template<class Predicate> iterator merge(slist_impl& x, Predicate p) {
const_iterator e(this->cend()), ex(x.cend()), bb(this->cbefore_begin()),
bb_next, last_inserted(e);
while(!x.empty()){
const_iterator ibx_next(x.cbefore_begin()), ibx(ibx_next++); while (++(bb_next = bb) != e && !p(*ibx_next, *bb_next)){
bb = bb_next;
} if(bb_next == e){
Now transfer the rest to the end of the container last_inserted = this->splice_after(bb, x); break;
} else{
size_type n(0); do{
ibx = ibx_next; ++n;
} while(++(ibx_next = ibx) != ex && p(*ibx_next, *bb_next)); this->splice_after(bb, x, x.before_begin(), ibx, n); last_inserted = ibx;
}
} return last_inserted.unconst();
}
Those are not thorougly tested so let me know if you find any error.
comment:2 by , 14 years ago
Sorry, here's the same with preformatted text:
template<class Predicate>
void merge(list_impl& x, Predicate p)
{
const_iterator e(this->cend()), ex(x.cend());
const_iterator b(this->cbegin());
while(!x.empty()){
const_iterator ix(x.cbegin());
while (b != e && !p(*ix, *b)){
++b;
}
if(b == e){
//Now transfer the rest to the end of the container
this->splice(e, x);
break;
}
else{
size_type n(0);
do{
++ix; ++n;
} while(ix != ex && p(*ix, *b));
this->splice(b, x, x.begin(), ix, n);
}
}
}
template<class Predicate>
iterator merge(slist_impl& x, Predicate p)
{
const_iterator e(this->cend()), ex(x.cend()), bb(this->cbefore_begin()),
bb_next, last_inserted(e);
while(!x.empty()){
const_iterator ibx_next(x.cbefore_begin()), ibx(ibx_next++);
while (++(bb_next = bb) != e && !p(*ibx_next, *bb_next)){
bb = bb_next;
}
if(bb_next == e){
//Now transfer the rest to the end of the container
last_inserted = this->splice_after(bb, x);
break;
}
else{
size_type n(0);
do{
ibx = ibx_next; ++n;
} while(++(ibx_next = ibx) != ex && p(*ibx_next, *bb_next));
this->splice_after(bb, x, x.before_begin(), ibx, n);
last_inserted = ibx;
}
}
return last_inserted.unconst();
}
comment:3 by , 14 years ago
Replying to igaztanaga:
I think your proposal has a bug in the first loop (it shouldn't be "!p(*bx, b)"?).
Yeah, looks like you're right. Thanks for spotting this.
Your corrected version seems to be working flawlessly in my application. At least, the list::merge method. I hope, it will be released in 1.39.
comment:4 by , 14 years ago
| Resolution: | → fixed |
|---|---|
| Status: | new → closed |
Applied modified patch. Revision 51971.

The patch that applies the optimization to list::merge (against branches/release)