DOLFINx
DOLFINx C++ interface
MPI.h
1 // Copyright (C) 2007-2014 Magnus Vikstrøm and Garth N. Wells
2 //
3 // This file is part of DOLFINx (https://www.fenicsproject.org)
4 //
5 // SPDX-License-Identifier: LGPL-3.0-or-later
6 
7 #pragma once
8 
9 #include <algorithm>
10 #include <array>
11 #include <cassert>
12 #include <complex>
13 #include <cstdint>
14 #include <dolfinx/common/Timer.h>
15 #include <dolfinx/common/log.h>
16 #include <dolfinx/graph/AdjacencyList.h>
17 #include <numeric>
18 #include <set>
19 #include <span>
20 #include <tuple>
21 #include <type_traits>
22 #include <utility>
23 #include <vector>
24 
25 #define MPICH_IGNORE_CXX_SEEK 1
26 #include <mpi.h>
27 
29 namespace dolfinx::MPI
30 {
31 
33 enum class tag : int
34 {
35  consensus_pcx,
36  consensus_pex
37 };
38 
41 class Comm
42 {
43 public:
45  explicit Comm(MPI_Comm comm, bool duplicate = true);
46 
48  Comm(const Comm& comm) noexcept;
49 
51  Comm(Comm&& comm) noexcept;
52 
53  // Disable copy assignment operator
54  Comm& operator=(const Comm& comm) = delete;
55 
57  Comm& operator=(Comm&& comm) noexcept;
58 
60  ~Comm();
61 
63  MPI_Comm comm() const noexcept;
64 
65 private:
66  // MPI communicator
67  MPI_Comm _comm;
68 };
69 
71 int rank(MPI_Comm comm);
72 
75 int size(MPI_Comm comm);
76 
84 constexpr std::array<std::int64_t, 2> local_range(int rank, std::int64_t N,
85  int size)
86 {
87  assert(rank >= 0);
88  assert(N >= 0);
89  assert(size > 0);
90 
91  // Compute number of items per rank and remainder
92  const std::int64_t n = N / size;
93  const std::int64_t r = N % size;
94 
95  // Compute local range
96  if (rank < r)
97  return {rank * (n + 1), rank * (n + 1) + n + 1};
98  else
99  return {rank * n + r, rank * n + r + n};
100 }
101 
108 constexpr int index_owner(int size, std::size_t index, std::size_t N)
109 {
110  assert(index < N);
111 
112  // Compute number of items per rank and remainder
113  const std::size_t n = N / size;
114  const std::size_t r = N % size;
115 
116  if (index < r * (n + 1))
117  {
118  // First r ranks own n + 1 indices
119  return index / (n + 1);
120  }
121  else
122  {
123  // Remaining ranks own n indices
124  return r + (index - r * (n + 1)) / n;
125  }
126 }
127 
151 std::vector<int> compute_graph_edges_pcx(MPI_Comm comm,
152  const std::span<const int>& edges);
153 
178 std::vector<int> compute_graph_edges_nbx(MPI_Comm comm,
179  const std::span<const int>& edges);
180 
200 template <typename T>
201 std::pair<std::vector<std::int32_t>, std::vector<T>>
202 distribute_to_postoffice(MPI_Comm comm, const std::span<const T>& x,
203  std::array<std::int64_t, 2> shape,
204  std::int64_t rank_offset);
205 
227 template <typename T>
228 std::vector<T> distribute_from_postoffice(
229  MPI_Comm comm, const std::span<const std::int64_t>& indices,
230  const std::span<const T>& x, std::array<std::int64_t, 2> shape,
231  std::int64_t rank_offset);
232 
256 template <typename T>
257 std::vector<T> distribute_data(MPI_Comm comm,
258  const std::span<const std::int64_t>& indices,
259  const std::span<const T>& x, int shape1);
260 
261 template <typename T>
262 struct dependent_false : std::false_type
263 {
264 };
265 
267 template <typename T>
268 constexpr MPI_Datatype mpi_type()
269 {
270  if constexpr (std::is_same_v<T, float>)
271  return MPI_FLOAT;
272  else if constexpr (std::is_same_v<T, double>)
273  return MPI_DOUBLE;
274  else if constexpr (std::is_same_v<T, std::complex<double>>)
275  return MPI_C_DOUBLE_COMPLEX;
276  else if constexpr (std::is_same_v<T, std::complex<float>>)
277  return MPI_C_FLOAT_COMPLEX;
278  else if constexpr (std::is_same_v<T, short int>)
279  return MPI_SHORT;
280  else if constexpr (std::is_same_v<T, int>)
281  return MPI_INT;
282  else if constexpr (std::is_same_v<T, unsigned int>)
283  return MPI_UNSIGNED;
284  else if constexpr (std::is_same_v<T, long int>)
285  return MPI_LONG;
286  else if constexpr (std::is_same_v<T, unsigned long>)
287  return MPI_UNSIGNED_LONG;
288  else if constexpr (std::is_same_v<T, long long>)
289  return MPI_LONG_LONG;
290  else if constexpr (std::is_same_v<T, unsigned long long>)
291  return MPI_UNSIGNED_LONG_LONG;
292  else if constexpr (std::is_same_v<T, bool>)
293  return MPI_C_BOOL;
294  else if constexpr (std::is_same_v<T, std::int8_t>)
295  return MPI_INT8_T;
296  else
297  // Issue compile time error
298  static_assert(!std::is_same_v<T, T>);
299 }
300 
301 //---------------------------------------------------------------------------
302 template <typename T>
303 std::pair<std::vector<std::int32_t>, std::vector<T>>
304 distribute_to_postoffice(MPI_Comm comm, const std::span<const T>& x,
305  std::array<std::int64_t, 2> shape,
306  std::int64_t rank_offset)
307 {
308  const int size = dolfinx::MPI::size(comm);
309  const int rank = dolfinx::MPI::rank(comm);
310  assert(x.size() % shape[1] == 0);
311  const std::int32_t shape0_local = x.size() / shape[1];
312 
313  LOG(2) << "Sending data to post offices (distribute_to_postoffice)";
314 
315  // Post office ranks will receive data from this rank
316  std::vector<int> row_to_dest(shape0_local);
317  for (std::int32_t i = 0; i < shape0_local; ++i)
318  {
319  int dest = MPI::index_owner(size, i + rank_offset, shape[0]);
320  row_to_dest[i] = dest;
321  }
322 
323  // Build list of (dest, positions) for each row that doesn't belong to
324  // this rank, then sort
325  std::vector<std::array<std::int32_t, 2>> dest_to_index;
326  dest_to_index.reserve(shape0_local);
327  for (std::int32_t i = 0; i < shape0_local; ++i)
328  {
329  std::size_t idx = i + rank_offset;
330  if (int dest = MPI::index_owner(size, idx, shape[0]); dest != rank)
331  dest_to_index.push_back({dest, i});
332  }
333  std::sort(dest_to_index.begin(), dest_to_index.end());
334 
335  // Build list of neighbour src ranks and count number of items (rows
336  // of x) to receive from each src post office (by neighbourhood rank)
337  std::vector<int> dest;
338  std::vector<std::int32_t> num_items_per_dest,
339  pos_to_neigh_rank(shape0_local, -1);
340  {
341  auto it = dest_to_index.begin();
342  while (it != dest_to_index.end())
343  {
344  const int neigh_rank = dest.size();
345 
346  // Store global rank
347  dest.push_back((*it)[0]);
348 
349  // Find iterator to next global rank
350  auto it1
351  = std::find_if(it, dest_to_index.end(),
352  [r = dest.back()](auto& idx) { return idx[0] != r; });
353 
354  // Store number of items for current rank
355  num_items_per_dest.push_back(std::distance(it, it1));
356 
357  // Map from local x index to local destination rank
358  for (auto e = it; e != it1; ++e)
359  pos_to_neigh_rank[(*e)[1]] = neigh_rank;
360 
361  // Advance iterator
362  it = it1;
363  }
364  }
365 
366  // Determine source ranks
367  const std::vector<int> src = MPI::compute_graph_edges_nbx(comm, dest);
368  LOG(INFO)
369  << "Number of neighbourhood source ranks in distribute_to_postoffice: "
370  << src.size();
371 
372  // Create neighbourhood communicator for sending data to post offices
373  MPI_Comm neigh_comm;
374  MPI_Dist_graph_create_adjacent(comm, src.size(), src.data(), MPI_UNWEIGHTED,
375  dest.size(), dest.data(), MPI_UNWEIGHTED,
376  MPI_INFO_NULL, false, &neigh_comm);
377 
378  // Compute send displacements
379  std::vector<std::int32_t> send_disp = {0};
380  std::partial_sum(num_items_per_dest.begin(), num_items_per_dest.end(),
381  std::back_inserter(send_disp));
382 
383  // Pack send buffers
384  std::vector<T> send_buffer_data(shape[1] * send_disp.back());
385  std::vector<std::int64_t> send_buffer_index(send_disp.back());
386  {
387  std::vector<std::int32_t> send_offsets = send_disp;
388  for (std::int32_t i = 0; i < shape0_local; ++i)
389  {
390  if (int neigh_dest = pos_to_neigh_rank[i]; neigh_dest != -1)
391  {
392  std::size_t pos = send_offsets[neigh_dest];
393  send_buffer_index[pos] = i + rank_offset;
394  std::copy_n(std::next(x.begin(), i * shape[1]), shape[1],
395  std::next(send_buffer_data.begin(), shape[1] * pos));
396  ++send_offsets[neigh_dest];
397  }
398  }
399  }
400 
401  // Send number of items to post offices (destination) that I will be
402  // sending
403  std::vector<int> num_items_recv(src.size());
404  num_items_per_dest.reserve(1);
405  num_items_recv.reserve(1);
406  MPI_Neighbor_alltoall(num_items_per_dest.data(), 1, MPI_INT,
407  num_items_recv.data(), 1, MPI_INT, neigh_comm);
408 
409  // Prepare receive displacement and buffers
410  std::vector<std::int32_t> recv_disp(num_items_recv.size() + 1, 0);
411  std::partial_sum(num_items_recv.begin(), num_items_recv.end(),
412  std::next(recv_disp.begin()));
413 
414  // Send/receive global indices
415  std::vector<std::int64_t> recv_buffer_index(recv_disp.back());
416  MPI_Neighbor_alltoallv(send_buffer_index.data(), num_items_per_dest.data(),
417  send_disp.data(), MPI_INT64_T,
418  recv_buffer_index.data(), num_items_recv.data(),
419  recv_disp.data(), MPI_INT64_T, neigh_comm);
420 
421  // Send/receive data (x)
422  MPI_Datatype compound_type;
423  MPI_Type_contiguous(shape[1], dolfinx::MPI::mpi_type<T>(), &compound_type);
424  MPI_Type_commit(&compound_type);
425  std::vector<T> recv_buffer_data(shape[1] * recv_disp.back());
426  MPI_Neighbor_alltoallv(send_buffer_data.data(), num_items_per_dest.data(),
427  send_disp.data(), compound_type,
428  recv_buffer_data.data(), num_items_recv.data(),
429  recv_disp.data(), compound_type, neigh_comm);
430  MPI_Type_free(&compound_type);
431  MPI_Comm_free(&neigh_comm);
432 
433  LOG(2) << "Completed send data to post offices.";
434 
435  // Convert to local indices
436  const std::int64_t r0 = MPI::local_range(rank, shape[0], size)[0];
437  std::vector<std::int32_t> index_local(recv_buffer_index.size());
438  std::transform(recv_buffer_index.cbegin(), recv_buffer_index.cend(),
439  index_local.begin(), [r0](auto idx) { return idx - r0; });
440 
441  return {index_local, recv_buffer_data};
442 };
443 //---------------------------------------------------------------------------
444 template <typename T>
446  MPI_Comm comm, const std::span<const std::int64_t>& indices,
447  const std::span<const T>& x, std::array<std::int64_t, 2> shape,
448  std::int64_t rank_offset)
449 {
450  common::Timer timer("Distribute row-wise data (scalable)");
451  assert(shape[1] > 0);
452 
453  const int size = dolfinx::MPI::size(comm);
454  const int rank = dolfinx::MPI::rank(comm);
455  assert(x.size() % shape[1] == 0);
456  const std::int64_t shape0_local = x.size() / shape[1];
457 
458  // 0. Send x data to/from post offices
459 
460  // Send receive x data to post office (only for rows that need to be
461  // communicated)
462  auto [post_indices, post_x] = MPI::distribute_to_postoffice(
463  comm, x, {shape[0], shape[1]}, rank_offset);
464  assert(post_indices.size() == post_x.size() / shape[1]);
465 
466  // 1. Send request to post office ranks for data
467 
468  // Build list of (src, global index, global, index positions) for each
469  // entry in 'indices' that doesn't belong to this rank, then sort
470  std::vector<std::tuple<int, std::int64_t, std::int32_t>> src_to_index;
471  for (std::size_t i = 0; i < indices.size(); ++i)
472  {
473  std::size_t idx = indices[i];
474  if (int src = MPI::index_owner(size, idx, shape[0]); src != rank)
475  src_to_index.push_back({src, idx, i});
476  }
477  std::sort(src_to_index.begin(), src_to_index.end());
478 
479  // Build list is neighbour src ranks and count number of items (rows
480  // of x) to receive from each src post office (by neighbourhood rank)
481  std::vector<std::int32_t> num_items_per_src;
482  std::vector<int> src;
483  {
484  auto it = src_to_index.begin();
485  while (it != src_to_index.end())
486  {
487  src.push_back(std::get<0>(*it));
488  auto it1 = std::find_if(it, src_to_index.end(),
489  [r = src.back()](auto& idx)
490  { return std::get<0>(idx) != r; });
491  num_items_per_src.push_back(std::distance(it, it1));
492  it = it1;
493  }
494  }
495 
496  // Determine 'delivery' destination ranks (ranks that want data from
497  // me)
498  const std::vector<int> dest
500  LOG(INFO) << "Neighbourhood destination ranks from post office in "
501  "distribute_data (rank, num dests, num dests/mpi_size): "
502  << rank << ", " << dest.size() << ", "
503  << static_cast<double>(dest.size()) / size;
504 
505  // Create neighbourhood communicator for sending data to post offices
506  // (src), and receiving data form my send my post office
507  MPI_Comm neigh_comm0;
508  MPI_Dist_graph_create_adjacent(comm, dest.size(), dest.data(), MPI_UNWEIGHTED,
509  src.size(), src.data(), MPI_UNWEIGHTED,
510  MPI_INFO_NULL, false, &neigh_comm0);
511 
512  // Communicate number of requests to each source
513  std::vector<int> num_items_recv(dest.size());
514  num_items_per_src.reserve(1);
515  num_items_recv.reserve(1);
516  MPI_Neighbor_alltoall(num_items_per_src.data(), 1, MPI_INT,
517  num_items_recv.data(), 1, MPI_INT, neigh_comm0);
518 
519  // Prepare send/receive displacements
520  std::vector<std::int32_t> send_disp = {0};
521  std::partial_sum(num_items_per_src.begin(), num_items_per_src.end(),
522  std::back_inserter(send_disp));
523  std::vector<std::int32_t> recv_disp = {0};
524  std::partial_sum(num_items_recv.begin(), num_items_recv.end(),
525  std::back_inserter(recv_disp));
526 
527  // Pack my requested indices (global) in send buffer ready to send to
528  // post offices
529  assert(send_disp.back() == (int)src_to_index.size());
530  std::vector<std::int64_t> send_buffer_index(src_to_index.size());
531  std::transform(src_to_index.cbegin(), src_to_index.cend(),
532  send_buffer_index.begin(),
533  [](auto& x) { return std::get<1>(x); });
534 
535  // Prepare the receive buffer
536  std::vector<std::int64_t> recv_buffer_index(recv_disp.back());
537  MPI_Neighbor_alltoallv(send_buffer_index.data(), num_items_per_src.data(),
538  send_disp.data(), MPI_INT64_T,
539  recv_buffer_index.data(), num_items_recv.data(),
540  recv_disp.data(), MPI_INT64_T, neigh_comm0);
541 
542  MPI_Comm_free(&neigh_comm0);
543 
544  // 2. Send data (rows of x) back to requesting ranks (transpose of the
545  // preceding communication pattern operation)
546 
547  // Build map from local index to post_indices position. Set to -1 for
548  // data that was already on this rank and was therefore was not
549  // sent/received via a postoffice.
550  const std::array<std::int64_t, 2> postoffice_range
551  = MPI::local_range(rank, shape[0], size);
552  std::vector<std::int32_t> post_indices_map(
553  postoffice_range[1] - postoffice_range[0], -1);
554  for (std::size_t i = 0; i < post_indices.size(); ++i)
555  {
556  assert(post_indices[i] < (int)post_indices_map.size());
557  post_indices_map[post_indices[i]] = i;
558  }
559 
560  // Build send buffer
561  std::vector<T> send_buffer_data(shape[1] * recv_disp.back());
562  for (std::size_t p = 0; p < recv_disp.size() - 1; ++p)
563  {
564  int offset = recv_disp[p];
565  for (std::int32_t i = recv_disp[p]; i < recv_disp[p + 1]; ++i)
566  {
567  std::int64_t index = recv_buffer_index[i];
568  if (index >= rank_offset and index < (rank_offset + shape0_local))
569  {
570  // I already had this index before any communication
571  std::int32_t local_index = index - rank_offset;
572  std::copy_n(std::next(x.begin(), shape[1] * local_index), shape[1],
573  std::next(send_buffer_data.begin(), shape[1] * offset));
574  }
575  else
576  {
577  // Take from my 'post bag'
578  auto local_index = index - postoffice_range[0];
579  std::int32_t pos = post_indices_map[local_index];
580  assert(pos != -1);
581  std::copy_n(std::next(post_x.begin(), shape[1] * pos), shape[1],
582  std::next(send_buffer_data.begin(), shape[1] * offset));
583  }
584 
585  ++offset;
586  }
587  }
588 
589  MPI_Dist_graph_create_adjacent(comm, src.size(), src.data(), MPI_UNWEIGHTED,
590  dest.size(), dest.data(), MPI_UNWEIGHTED,
591  MPI_INFO_NULL, false, &neigh_comm0);
592 
593  MPI_Datatype compound_type0;
594  MPI_Type_contiguous(shape[1], dolfinx::MPI::mpi_type<T>(), &compound_type0);
595  MPI_Type_commit(&compound_type0);
596 
597  std::vector<T> recv_buffer_data(shape[1] * send_disp.back());
598  MPI_Neighbor_alltoallv(send_buffer_data.data(), num_items_recv.data(),
599  recv_disp.data(), compound_type0,
600  recv_buffer_data.data(), num_items_per_src.data(),
601  send_disp.data(), compound_type0, neigh_comm0);
602 
603  MPI_Type_free(&compound_type0);
604  MPI_Comm_free(&neigh_comm0);
605 
606  std::vector<std::int32_t> index_pos_to_buffer(indices.size(), -1);
607  for (std::size_t i = 0; i < src_to_index.size(); ++i)
608  index_pos_to_buffer[std::get<2>(src_to_index[i])] = i;
609 
610  // Extra data to return
611  std::vector<T> x_new(shape[1] * indices.size());
612  for (std::size_t i = 0; i < indices.size(); ++i)
613  {
614  const std::int64_t index = indices[i];
615  if (index >= rank_offset and index < (rank_offset + shape0_local))
616  {
617  // Had data from the start in x
618  auto local_index = index - rank_offset;
619  std::copy_n(std::next(x.begin(), shape[1] * local_index), shape[1],
620  std::next(x_new.begin(), shape[1] * i));
621  }
622  else
623  {
624  if (int src = MPI::index_owner(size, index, shape[0]); src == rank)
625  {
626  // In my post office bag
627  auto local_index = index - postoffice_range[0];
628  std::int32_t pos = post_indices_map[local_index];
629  assert(pos != -1);
630  std::copy_n(std::next(post_x.begin(), shape[1] * pos), shape[1],
631  std::next(x_new.begin(), shape[1] * i));
632  }
633  else
634  {
635  // In my received post
636  std::int32_t pos = index_pos_to_buffer[i];
637  assert(pos != -1);
638  std::copy_n(std::next(recv_buffer_data.begin(), shape[1] * pos),
639  shape[1], std::next(x_new.begin(), shape[1] * i));
640  }
641  }
642  }
643 
644  return x_new;
645 }
646 //---------------------------------------------------------------------------
647 template <typename T>
648 std::vector<T> distribute_data(MPI_Comm comm,
649  const std::span<const std::int64_t>& indices,
650  const std::span<const T>& x, int shape1)
651 {
652  assert(shape1 > 0);
653  assert(x.size() % shape1 == 0);
654  const std::int64_t shape0_local = x.size() / shape1;
655 
656  std::int64_t shape0(0), rank_offset(0);
657  MPI_Allreduce(&shape0_local, &shape0, 1, MPI_INT64_T, MPI_SUM, comm);
658  MPI_Exscan(&shape0_local, &rank_offset, 1, MPI_INT64_T, MPI_SUM, comm);
659 
660  return distribute_from_postoffice(comm, indices, x, {shape0, shape1},
661  rank_offset);
662 }
663 //---------------------------------------------------------------------------
664 
665 } // namespace dolfinx::MPI
A duplicate MPI communicator and manage lifetime of the communicator.
Definition: MPI.h:42
Comm(MPI_Comm comm, bool duplicate=true)
Duplicate communicator and wrap duplicate.
Definition: MPI.cpp:12
~Comm()
Destructor (frees wrapped communicator)
Definition: MPI.cpp:39
MPI_Comm comm() const noexcept
Return the underlying MPI_Comm object.
Definition: MPI.cpp:73
A timer can be used for timing tasks. The basic usage is.
Definition: Timer.h:31
MPI support functionality.
Definition: MPI.h:30
std::pair< std::vector< std::int32_t >, std::vector< T > > distribute_to_postoffice(MPI_Comm comm, const std::span< const T > &x, std::array< std::int64_t, 2 > shape, std::int64_t rank_offset)
Distribute row data to 'post office' ranks.
Definition: MPI.h:304
std::vector< T > distribute_from_postoffice(MPI_Comm comm, const std::span< const std::int64_t > &indices, const std::span< const T > &x, std::array< std::int64_t, 2 > shape, std::int64_t rank_offset)
Distribute rows of a rectangular data array from post office ranks to ranks where they are required.
Definition: MPI.h:445
std::vector< int > compute_graph_edges_nbx(MPI_Comm comm, const std::span< const int > &edges)
Determine incoming graph edges using the NBX consensus algorithm.
Definition: MPI.cpp:151
constexpr int index_owner(int size, std::size_t index, std::size_t N)
Return which rank owns index in global range [0, N - 1] (inverse of MPI::local_range).
Definition: MPI.h:108
std::vector< int > compute_graph_edges_pcx(MPI_Comm comm, const std::span< const int > &edges)
Determine incoming graph edges using the PCX consensus algorithm.
Definition: MPI.cpp:91
std::vector< T > distribute_data(MPI_Comm comm, const std::span< const std::int64_t > &indices, const std::span< const T > &x, int shape1)
Distribute rows of a rectangular data array to ranks where they are required (scalable version).
Definition: MPI.h:648
int size(MPI_Comm comm)
Return size of the group (number of processes) associated with the communicator.
Definition: MPI.cpp:83
int rank(MPI_Comm comm)
Return process rank for the communicator.
Definition: MPI.cpp:75
constexpr MPI_Datatype mpi_type()
MPI Type.
Definition: MPI.h:268
constexpr std::array< std::int64_t, 2 > local_range(int rank, std::int64_t N, int size)
Return local range for the calling process, partitioning the global [0, N - 1] range across all ranks...
Definition: MPI.h:84
tag
MPI communication tags.
Definition: MPI.h:34
Definition: MPI.h:263