1+ /*
2+ This NVSHMEM MPI Benchmark uses nvshmemx_all_to_all host function.
3+ Each device send the same data to each peer, and if DEBUG is defined
4+ prints the received messages.
5+ */
6+
7+ #include " ../../cuda_util/cuda_util.h"
8+ #include " ../../cuda_util/random_fill.h"
9+ #include " ../../util/argparse.h"
10+ #include " ../../util/mpi_util.h"
11+ #include " ../../util/simple_utils.h"
12+ #include " mpi.h"
13+ #include " nvshmem.h"
14+ #include " nvshmemx.h"
15+ #include < cstdlib>
16+ #include < cstring>
17+ #include < stdint.h>
18+ #include < stdio.h>
19+ #include < stdlib.h>
20+ #include < sys/types.h>
21+ #include < time.h>
22+ #include < unistd.h>
23+
24+ #define DEBUG 1
25+ #define CUDA_CHECK (stmt ) \
26+ do { \
27+ cudaError_t result = (stmt); \
28+ if (cudaSuccess != result) { \
29+ fprintf (stderr, " [%s:%d] CUDA failed with %s \n " , __FILE__, __LINE__, \
30+ cudaGetErrorString (result)); \
31+ exit (-1 ); \
32+ } \
33+ } while (0 )
34+
35+ #define MY_SOURCE (source, mype, numbytes ) \
36+ ((void *)(((char *)source) + (mype * numbytes)))
37+
38+ static struct options opts;
39+ static struct parser_doc parser_doc;
40+
41+ clock_t start, endparse, cusetup, endwarmup, enditer, c_end;
42+
43+ void bench_iter (int nDev, void *sendbuff, void *recvbuff, int size,
44+ int data_type, cudaStream_t s);
45+
46+ int main (int argc, char *argv[]) {
47+ start = clock ();
48+ build_parser_doc (" MPI with nvshmem using collective "
49+ " broadcast operation on the host. Only "
50+ " PE 0 broadcasts" ,
51+ " " , " 1" , " egencer20@ku.edu.tr" , &parser_doc);
52+ argument_parse (&opts, &parser_doc, argc, argv);
53+
54+ int myRank, nRanks, localRank = 0 ;
55+ int size = opts.data_len ;
56+
57+ int data_size = 0 ;
58+ int data_type = opts.data_type ;
59+
60+ switch (opts.data_type ) {
61+ case options::OPTION_CHAR:
62+ data_size = sizeof (char );
63+ break ;
64+ case options::OPTION_FLOAT:
65+ data_size = sizeof (float );
66+ break ;
67+ case options::OPTION_INT:
68+ data_size = sizeof (int );
69+ break ;
70+ }
71+
72+ int mype_node;
73+ cudaStream_t stream;
74+ MPI_Comm mpi_comm = MPI_COMM_WORLD;
75+ nvshmemx_init_attr_t attr;
76+
77+ MPI_Init (&argc, &argv);
78+ MPI_Comm_rank (MPI_COMM_WORLD, &myRank);
79+ MPI_Comm_size (MPI_COMM_WORLD, &nRanks);
80+
81+ attr.mpi_comm = &mpi_comm;
82+ nvshmemx_init_attr (NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
83+ mype_node = nvshmem_team_my_pe (NVSHMEMX_TEAM_NODE);
84+ int nDev = nRanks;
85+
86+ void *sendbuff;
87+ void *recvbuff;
88+
89+ REPORT (" NDEV: %d myrank: %d\n " , nDev, mype_node);
90+ report_options (&opts);
91+ endparse = clock ();
92+
93+ CUDA_CHECK (cudaSetDevice (mype_node));
94+ CUDA_CHECK (cudaStreamCreate (&stream));
95+
96+ // CUDA_CHECK(cudaMalloc(&(sendbuff), size * data_size));
97+
98+ recvbuff = nvshmem_malloc (data_size * size);
99+ sendbuff = nvshmem_malloc (data_size * size);
100+
101+ void *tmp = malloc (data_size * size);
102+ memset (tmp, 0 , data_size * size);
103+ random_fill_host (tmp, data_size * size);
104+
105+ nvshmemx_putmem_on_stream (sendbuff, tmp, data_size * size, mype_node, stream);
106+ CUDA_CHECK (cudaStreamSynchronize (stream));
107+ nvshmemx_barrier_all_on_stream (stream);
108+
109+ free (tmp);
110+
111+ cusetup = clock ();
112+
113+ for (int iter = 0 ; iter < opts.warmup_iterations ; iter++) {
114+ bench_iter (nDev, sendbuff, recvbuff, size, data_type, stream);
115+ }
116+
117+ endwarmup = clock ();
118+
119+ for (int iter = 0 ; iter < opts.iterations ; iter++) {
120+ bench_iter (nDev, sendbuff, recvbuff, size, data_type, stream);
121+ }
122+
123+ CUDA_CHECK (cudaStreamSynchronize (stream));
124+
125+ #ifdef DEBUG
126+
127+ void *local_sendbuff = malloc (size * data_size);
128+ void *local_recvbuff = malloc (size * data_size);
129+
130+ CUDACHECK (cudaMemcpyAsync (local_sendbuff, sendbuff, size * data_size,
131+ cudaMemcpyDeviceToHost, stream));
132+ CUDACHECK (cudaMemcpyAsync (local_recvbuff, recvbuff, size * data_size,
133+ cudaMemcpyDeviceToHost, stream));
134+ CUDA_CHECK (cudaStreamSynchronize (stream));
135+
136+ REPORT (" My data: %d\n " , ((int *)local_sendbuff)[0 ]);
137+ REPORT (" The max is: <-> %d\n " , ((int *)local_recvbuff)[0 ]);
138+
139+ #endif
140+
141+ enditer = clock ();
142+
143+ // free device buffers
144+
145+ nvshmem_free (sendbuff);
146+ nvshmem_free (recvbuff);
147+
148+ nvshmem_finalize ();
149+ MPICHECK (MPI_Finalize ());
150+
151+ c_end = clock ();
152+
153+ #define CLOCK_CONVERT (x ) (((double )x) / CLOCKS_PER_SEC)
154+
155+ REPORT (" Completed Succesfully\n "
156+ " parsing arguments: %.2f\n "
157+ " cuda setup: %.2f\n "
158+ " warmup, avg: %.2f, %.2f\n "
159+ " iterations, avg: %.2f, %.2f\n "
160+ " cleanup: %.2f\n "
161+ " total: %.2f\n\n " ,
162+ CLOCK_CONVERT (endparse - start), CLOCK_CONVERT (cusetup - endparse),
163+ CLOCK_CONVERT (endwarmup - cusetup),
164+ (CLOCK_CONVERT (endwarmup - cusetup)) /
165+ (opts.warmup_iterations > 0 ? opts.warmup_iterations : 1 ),
166+ CLOCK_CONVERT (enditer - endwarmup),
167+ (CLOCK_CONVERT (enditer - endwarmup)) /
168+ (opts.iterations > 0 ? opts.iterations : 1 ),
169+ CLOCK_CONVERT (c_end - enditer), CLOCK_CONVERT (c_end - start));
170+ return 0 ;
171+ }
172+
173+ void bench_iter (int nDev, void *sendbuff, void *recvbuff, int size,
174+ int data_type, cudaStream_t stream) {
175+
176+ if (data_type == options::OPTION_CHAR) {
177+ nvshmemx_char_sum_reduce_on_stream (NVSHMEMX_TEAM_NODE, (char *)recvbuff,
178+ (const char *)sendbuff, size, stream);
179+ }
180+ if (data_type == options::OPTION_FLOAT) {
181+ nvshmemx_float_sum_reduce_on_stream (NVSHMEMX_TEAM_NODE, (float *)recvbuff,
182+ (const float *)sendbuff, size, stream);
183+ }
184+ if (data_type == options::OPTION_INT) {
185+ nvshmemx_int_sum_reduce_on_stream (NVSHMEMX_TEAM_NODE, (int *)recvbuff,
186+ (const int *)sendbuff, size, stream);
187+ }
188+
189+ nvshmemx_barrier_all_on_stream (stream);
190+ }
0 commit comments