Opened 5 years ago
Last modified 5 years ago
#13043 new Bugs
Serialized MPI doesn't properly handle cancellation of a request
Reported by: | Owned by: | Matthias Troyer | |
---|---|---|---|
Milestone: | To Be Determined | Component: | mpi |
Version: | Boost 1.62.0 | Severity: | Problem |
Keywords: | Cc: |
Description
Hi - Think I've found a bug - If you try version A with mpiexec -n 2 then you get a clean exit - if you try version B, it hangs indefinitely. request::handle_serialized_irecv isn't handling cancellation. This is bothersome when you have to MPI_comm_disconnect from things. I can workaround by wrapping the transmission but that's not optimal. Please advise if you need more info, I'm using MSMPI and MSVC. Had previously posted this on Stack Overflow who thought it was a bug. Thanks for the library and look forward to hearing from you all soon.
#include "boost/mpi.hpp" #include "mpi.h" #include <list> #include "boost/serialization/list.hpp" int main() { MPI_Init(NULL, NULL); MPI_Comm regional; MPI_Comm_dup(MPI_COMM_WORLD, ®ional); boost::mpi::communicator comm = boost::mpi::communicator(regional, boost::mpi::comm_attach); if (comm.rank() == 1) { //VERSION A: std::list<int> q; boost::mpi::request z = comm.irecv<std::list<int>>(1, 0, q); z.cancel(); z.wait(); //VERSION B: // int q; // boost::mpi::request z = comm.irecv<int>(1, 0, q); // z.cancel(); // z.wait(); } MPI_Comm_disconnect(®ional); MPI_Finalize(); return 0; }
Change History (4)
comment:1 by , 5 years ago
comment:2 by , 5 years ago
I was close, there was a mistake in it.
template<typename T> optional<status> request::handle_serialized_irecv(request* self, request_action action) { typedef detail::serialized_irecv_data<T> data_t; shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data); if (action == ra_wait) { status stat; if (self->m_requests[0] == MPI_REQUEST_NULL) { return optional<status>(); //done } if (self->m_requests[1] == MPI_REQUEST_NULL) { // Wait for the count message to complete BOOST_MPI_CHECK_RESULT(MPI_Wait, (self->m_requests, &stat.m_status)); if (stat.cancelled()) { return stat; } // Resize our buffer and get ready to receive its data data->ia.resize(data->count); BOOST_MPI_CHECK_RESULT(MPI_Irecv, (data->ia.address(), data->ia.size(), MPI_PACKED, stat.source(), stat.tag(), MPI_Comm(data->comm), self->m_requests + 1)); } // Wait until we have received the entire message BOOST_MPI_CHECK_RESULT(MPI_Wait, (self->m_requests + 1, &stat.m_status)); data->deserialize(stat); return stat; } else if (action == ra_test) { status stat; int flag = 0; if (self->m_requests[0] == MPI_REQUEST_NULL) { return optional<status>(); //done } if (self->m_requests[1] == MPI_REQUEST_NULL) { // Check if the count message has completed BOOST_MPI_CHECK_RESULT(MPI_Test, (self->m_requests, &flag, &stat.m_status)); if (stat.cancelled()) { return stat; } if (flag) { // Resize our buffer and get ready to receive its data data->ia.resize(data->count); BOOST_MPI_CHECK_RESULT(MPI_Irecv, (data->ia.address(), data->ia.size(),MPI_PACKED, stat.source(), stat.tag(), MPI_Comm(data->comm), self->m_requests + 1)); } else return optional<status>(); // We have not finished yet } // Check if we have received the message data BOOST_MPI_CHECK_RESULT(MPI_Test, (self->m_requests + 1, &flag, &stat.m_status)); if (flag) { data->deserialize(stat); return stat; } else return optional<status>(); } else if (action == ra_cancel) { status stat; BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); return optional<status>(); } else { return optional<status>(); } }
comment:3 by , 5 years ago
Still had a mistake. Don't worry, this is the last one, till I find a mistake in it.
template<typename T> optional<status> request::handle_serialized_irecv(request* self, request_action action) { typedef detail::serialized_irecv_data<T> data_t; shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data); if (action == ra_wait) { status stat; if (self->m_requests[0] == MPI_REQUEST_NULL) { return optional<status>(); //done } if (self->m_requests[1] == MPI_REQUEST_NULL) { // Wait for the count message to complete BOOST_MPI_CHECK_RESULT(MPI_Wait, (self->m_requests, &stat.m_status)); if (stat.cancelled()) { return stat; } // Resize our buffer and get ready to receive its data data->ia.resize(data->count); BOOST_MPI_CHECK_RESULT(MPI_Irecv, (data->ia.address(), data->ia.size(), MPI_PACKED, stat.source(), stat.tag(), MPI_Comm(data->comm), self->m_requests + 1)); } // Wait until we have received the entire message BOOST_MPI_CHECK_RESULT(MPI_Wait, (self->m_requests + 1, &stat.m_status)); data->deserialize(stat); return stat; } else if (action == ra_test) { status stat; int flag = 0; if (self->m_requests[0] == MPI_REQUEST_NULL) { return optional<status>(); //done } if (self->m_requests[1] == MPI_REQUEST_NULL) { // Check if the count message has completed BOOST_MPI_CHECK_RESULT(MPI_Test, (self->m_requests, &flag, &stat.m_status)); if (flag) { if (stat.cancelled()) { return stat; } // Resize our buffer and get ready to receive its data data->ia.resize(data->count); BOOST_MPI_CHECK_RESULT(MPI_Irecv, (data->ia.address(), data->ia.size(),MPI_PACKED, stat.source(), stat.tag(), MPI_Comm(data->comm), self->m_requests + 1)); } else return optional<status>(); // We have not finished yet } // Check if we have received the message data BOOST_MPI_CHECK_RESULT(MPI_Test, (self->m_requests + 1, &flag, &stat.m_status)); if (flag) { data->deserialize(stat); return stat; } else return optional<status>(); } else if (action == ra_cancel) { status stat; BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); return optional<status>(); } else { return optional<status>(); } }
comment:4 by , 5 years ago
I was wrong. It's simpler than all that. Sorry.
template<typename T> optional<status> request::handle_serialized_irecv(request* self, request_action action) { typedef detail::serialized_irecv_data<T> data_t; shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data); if (action == ra_wait) { status stat; if (self->m_requests[1] == MPI_REQUEST_NULL) { // Wait for the count message to complete BOOST_MPI_CHECK_RESULT(MPI_Wait, (self->m_requests, &stat.m_status)); if (stat.cancelled()) { return stat; } // Resize our buffer and get ready to receive its data data->ia.resize(data->count); BOOST_MPI_CHECK_RESULT(MPI_Irecv, (data->ia.address(), data->ia.size(), MPI_PACKED, stat.source(), stat.tag(), MPI_Comm(data->comm), self->m_requests + 1)); } // Wait until we have received the entire message BOOST_MPI_CHECK_RESULT(MPI_Wait, (self->m_requests + 1, &stat.m_status)); if (stat.cancelled()) { return stat; } data->deserialize(stat); return stat; } else if (action == ra_test) { status stat; int flag = 0; if (self->m_requests[1] == MPI_REQUEST_NULL) { // Check if the count message has completed BOOST_MPI_CHECK_RESULT(MPI_Test, (self->m_requests, &flag, &stat.m_status)); if (flag) { if (stat.cancelled()) { return stat; } // Resize our buffer and get ready to receive its data data->ia.resize(data->count); BOOST_MPI_CHECK_RESULT(MPI_Irecv, (data->ia.address(), data->ia.size(),MPI_PACKED, stat.source(), stat.tag(), MPI_Comm(data->comm), self->m_requests + 1)); } else return optional<status>(); // We have not finished yet } // Check if we have received the message data BOOST_MPI_CHECK_RESULT(MPI_Test, (self->m_requests + 1, &flag, &stat.m_status)); if (flag) { if (stat.cancelled()) { return stat; } data->deserialize(stat); return stat; } else return optional<status>(); } else if (action == ra_cancel) { status stat; if (self->m_requests[0] != MPI_REQUEST_NULL) { BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); } if (self->m_requests[1] != MPI_REQUEST_NULL) { BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); } return optional<status>(); } else { return optional<status>(); } }
The following changes seem to fix it, with analogous changes to the array version. I guarantee nothing, draw your own conclusions, of course.