Opened 5 years ago

Last modified 5 years ago

#13043 new Bugs

Serialized MPI doesn't properly handle cancellation of a request

Reported by: Mike Willaims <michael.williams@…> Owned by: Matthias Troyer
Milestone: To Be Determined Component: mpi
Version: Boost 1.62.0 Severity: Problem
Keywords: Cc:

Description

Hi - Think I've found a bug - If you try version A with mpiexec -n 2 then you get a clean exit - if you try version B, it hangs indefinitely. request::handle_serialized_irecv isn't handling cancellation. This is bothersome when you have to MPI_comm_disconnect from things. I can workaround by wrapping the transmission but that's not optimal. Please advise if you need more info, I'm using MSMPI and MSVC. Had previously posted this on Stack Overflow who thought it was a bug. Thanks for the library and look forward to hearing from you all soon.

#include "boost/mpi.hpp"
#include "mpi.h"
#include <list>
#include "boost/serialization/list.hpp"

int main()
{
    MPI_Init(NULL, NULL);
    MPI_Comm regional;
    MPI_Comm_dup(MPI_COMM_WORLD, &regional);
    boost::mpi::communicator comm = boost::mpi::communicator(regional, boost::mpi::comm_attach);
    if (comm.rank() == 1)
    {


        //VERSION A:
        std::list<int> q;
        boost::mpi::request z = comm.irecv<std::list<int>>(1, 0, q);
        z.cancel();
        z.wait();


        //VERSION B:
//      int q;
//      boost::mpi::request z = comm.irecv<int>(1, 0, q);
//      z.cancel();
//      z.wait();

    }
    MPI_Comm_disconnect(&regional);
    MPI_Finalize();
    return 0;
}

Change History (4)

comment:1 by michael.williams@…, 5 years ago

The following changes seem to fix it, with analogous changes to the array version. I guarantee nothing, draw your own conclusions, of course.

template<typename T>
optional<status> 
request::handle_serialized_irecv(request* self, request_action action)
{
  typedef detail::serialized_irecv_data<T> data_t;
  shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
  if (action == ra_wait) {
    status stat;
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Wait for the count message to complete
      BOOST_MPI_CHECK_RESULT(MPI_Wait,
                             (self->m_requests, &stat.m_status));
	  if (stat.cancelled())
	  {
		  return stat;
	  }
      // Resize our buffer and get ready to receive its data
      data->ia.resize(data->count);
      BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                             (data->ia.address(), data->ia.size(), MPI_PACKED,
                              stat.source(), stat.tag(), 
                              MPI_Comm(data->comm), self->m_requests + 1));
    }

    // Wait until we have received the entire message
    BOOST_MPI_CHECK_RESULT(MPI_Wait,
                           (self->m_requests + 1, &stat.m_status));

    data->deserialize(stat);
    return stat;
  } else if (action == ra_test) {
    status stat;
    int flag = 0;

    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Check if the count message has completed
      BOOST_MPI_CHECK_RESULT(MPI_Test,
                             (self->m_requests, &flag, &stat.m_status));
	  if (stat.cancelled())
	  {
		  return stat;
	  }
      if (flag) {
        // Resize our buffer and get ready to receive its data
        data->ia.resize(data->count);
        BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                               (data->ia.address(), data->ia.size(),MPI_PACKED,
                                stat.source(), stat.tag(), 
                                MPI_Comm(data->comm), self->m_requests + 1));
      } else
        return optional<status>(); // We have not finished yet
    } 

    // Check if we have received the message data
    BOOST_MPI_CHECK_RESULT(MPI_Test,
                           (self->m_requests + 1, &flag, &stat.m_status));
    if (flag) {
      data->deserialize(stat);
      return stat;
    } else 
      return optional<status>();
  }
  else if (action == ra_cancel)
  {
	  status stat;
	  BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); 
	  return optional<status>();
  }
  else
  {
    return optional<status>();
  }
}

comment:2 by michael.williams@…, 5 years ago

I was close, there was a mistake in it.

template<typename T>
optional<status> 
request::handle_serialized_irecv(request* self, request_action action)
{
  typedef detail::serialized_irecv_data<T> data_t;
  shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
  if (action == ra_wait) {
	status stat;
	  if (self->m_requests[0] == MPI_REQUEST_NULL)
	  {
		  return optional<status>(); //done
	  }
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Wait for the count message to complete
      BOOST_MPI_CHECK_RESULT(MPI_Wait,
                             (self->m_requests, &stat.m_status));
	  if (stat.cancelled())
	  {
		  return stat;
	  }
      // Resize our buffer and get ready to receive its data
      data->ia.resize(data->count);
      BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                             (data->ia.address(), data->ia.size(), MPI_PACKED,
                              stat.source(), stat.tag(), 
                              MPI_Comm(data->comm), self->m_requests + 1));
    }

    // Wait until we have received the entire message
    BOOST_MPI_CHECK_RESULT(MPI_Wait,
                           (self->m_requests + 1, &stat.m_status));

    data->deserialize(stat);
    return stat;
  } else if (action == ra_test) {
    status stat;
    int flag = 0;
	if (self->m_requests[0] == MPI_REQUEST_NULL)
	{
		return optional<status>(); //done
	}
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Check if the count message has completed
      BOOST_MPI_CHECK_RESULT(MPI_Test,
                             (self->m_requests, &flag, &stat.m_status));
	  if (stat.cancelled())
	  {
		  return stat;
	  }
      if (flag) {
        // Resize our buffer and get ready to receive its data
        data->ia.resize(data->count);
        BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                               (data->ia.address(), data->ia.size(),MPI_PACKED,
                                stat.source(), stat.tag(), 
                                MPI_Comm(data->comm), self->m_requests + 1));
      } else
        return optional<status>(); // We have not finished yet
    } 

    // Check if we have received the message data
    BOOST_MPI_CHECK_RESULT(MPI_Test,
                           (self->m_requests + 1, &flag, &stat.m_status));
    if (flag) {
      data->deserialize(stat);
      return stat;
    } else 
      return optional<status>();
  }
  else if (action == ra_cancel)
  {
	  status stat;
	  BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); 
	  return optional<status>();
  }
  else
  {
    return optional<status>();
  }
}

comment:3 by michael.williams@…, 5 years ago

Still had a mistake. Don't worry, this is the last one, till I find a mistake in it.

template<typename T>
optional<status> 
request::handle_serialized_irecv(request* self, request_action action)
{
  typedef detail::serialized_irecv_data<T> data_t;
  shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
  if (action == ra_wait) {
	  status stat;
	  if (self->m_requests[0] == MPI_REQUEST_NULL)
	  {
		  return optional<status>(); //done
	  }
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Wait for the count message to complete
      BOOST_MPI_CHECK_RESULT(MPI_Wait,
                             (self->m_requests, &stat.m_status));
	  if (stat.cancelled())
	  {
		  return stat;
	  }
      // Resize our buffer and get ready to receive its data
      data->ia.resize(data->count);
      BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                             (data->ia.address(), data->ia.size(), MPI_PACKED,
                              stat.source(), stat.tag(), 
                              MPI_Comm(data->comm), self->m_requests + 1));
    }

    // Wait until we have received the entire message
    BOOST_MPI_CHECK_RESULT(MPI_Wait,
                           (self->m_requests + 1, &stat.m_status));

    data->deserialize(stat);
    return stat;
  } else if (action == ra_test) {
    status stat;
    int flag = 0;
	if (self->m_requests[0] == MPI_REQUEST_NULL)
	{
		return optional<status>(); //done
	}
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Check if the count message has completed
      BOOST_MPI_CHECK_RESULT(MPI_Test,
                             (self->m_requests, &flag, &stat.m_status));
      if (flag) {
		if (stat.cancelled())
		{
			return stat;
		}
        // Resize our buffer and get ready to receive its data
        data->ia.resize(data->count);
        BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                               (data->ia.address(), data->ia.size(),MPI_PACKED,
                                stat.source(), stat.tag(), 
                                MPI_Comm(data->comm), self->m_requests + 1));
      } else
        return optional<status>(); // We have not finished yet
    } 

    // Check if we have received the message data
    BOOST_MPI_CHECK_RESULT(MPI_Test,
                           (self->m_requests + 1, &flag, &stat.m_status));
    if (flag) {
      data->deserialize(stat);
      return stat;
    } else 
      return optional<status>();
  }
  else if (action == ra_cancel)
  {
	  status stat;
	  BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0])); 
	  return optional<status>();
  }
  else
  {
    return optional<status>();
  }
}

comment:4 by michael.williams@…, 5 years ago

I was wrong. It's simpler than all that. Sorry.

template<typename T>
optional<status> 
request::handle_serialized_irecv(request* self, request_action action)
{
  typedef detail::serialized_irecv_data<T> data_t;
  shared_ptr<data_t> data = static_pointer_cast<data_t>(self->m_data);
  if (action == ra_wait) {
	  status stat;
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Wait for the count message to complete
      BOOST_MPI_CHECK_RESULT(MPI_Wait,
                             (self->m_requests, &stat.m_status));
	  if (stat.cancelled())
	  {
		  return stat;
	  }
      // Resize our buffer and get ready to receive its data
      data->ia.resize(data->count);
      BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                             (data->ia.address(), data->ia.size(), MPI_PACKED,
                              stat.source(), stat.tag(), 
                              MPI_Comm(data->comm), self->m_requests + 1));
    }

    // Wait until we have received the entire message
    BOOST_MPI_CHECK_RESULT(MPI_Wait,
                           (self->m_requests + 1, &stat.m_status));
	if (stat.cancelled())
	{
		return stat;
	}
    data->deserialize(stat);
    return stat;
  } else if (action == ra_test) {
    status stat;
    int flag = 0;
    if (self->m_requests[1] == MPI_REQUEST_NULL) {
      // Check if the count message has completed
      BOOST_MPI_CHECK_RESULT(MPI_Test,
                             (self->m_requests, &flag, &stat.m_status));
      if (flag) {
		if (stat.cancelled())
		{
			return stat;
		}
        // Resize our buffer and get ready to receive its data
        data->ia.resize(data->count);
        BOOST_MPI_CHECK_RESULT(MPI_Irecv,
                               (data->ia.address(), data->ia.size(),MPI_PACKED,
                                stat.source(), stat.tag(), 
                                MPI_Comm(data->comm), self->m_requests + 1));
      } else
        return optional<status>(); // We have not finished yet
    } 

    // Check if we have received the message data
    BOOST_MPI_CHECK_RESULT(MPI_Test,
                           (self->m_requests + 1, &flag, &stat.m_status));
    if (flag) {
      if (stat.cancelled())
	  {
		  return stat;
	  }
      data->deserialize(stat);
      return stat;
    } else 
      return optional<status>();
  }
  else if (action == ra_cancel)
  {
	  status stat;
	  if (self->m_requests[0] != MPI_REQUEST_NULL)
	  {
		  BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0]));
	  }
	  if (self->m_requests[1] != MPI_REQUEST_NULL)
	  {
		  BOOST_MPI_CHECK_RESULT(MPI_Cancel, (&self->m_requests[0]));
	  }
	  return optional<status>();
  }
  else
  {
    return optional<status>();
  }
}
Note: See TracTickets for help on using tickets.