MPI: max time

This commit is contained in:
violette 2024-04-10 16:37:23 -04:00
parent a95776ac11
commit 03d8401d5f
4 changed files with 263 additions and 48 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
build build
.vimspector.json

Binary file not shown.

Binary file not shown.

View file

@ -1,12 +1,37 @@
/* #define _XOPEN_SOURCE 700
"Hello World" MPI Test Program #include <sys/types.h>
*/
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <mpi.h>
int main(int argc, char** argv) { #include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
enum HEADER {
TRIP_ID = 0,
ARRIVAL_TIME,
DEPARTURE_TIME,
STOP_ID,
STOP_SEQUENCE,
STOP_HEADSIGN,
PICKUP_TYPE,
DROP_OFF_TYPE,
SHAPE_DIST_TRAVELED,
TIMEPOINT,
END
};
#define DELIM ','
#define OVERLAP 100
#define STOP_FILE "./stop_times.txt"
int do_map(char *, size_t);
void do_reduce(size_t);
int parprocess(MPI_File *, const int, const int);
int
main(int argc, char **argv)
{
// Initialize the MPI environment // Initialize the MPI environment
MPI_Init(NULL, NULL); MPI_Init(NULL, NULL);
@ -14,6 +39,8 @@ int main(int argc, char** argv) {
MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int localmax = 0, globalmax = 0;
// Get the name of the processor // Get the name of the processor
char processor_name[MPI_MAX_PROCESSOR_NAME]; char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len; int name_len;
@ -23,37 +50,224 @@ int main(int argc, char** argv) {
printf("Hello world from processor %s, rank %d out of %d processors\n", printf("Hello world from processor %s, rank %d out of %d processors\n",
processor_name, world_rank, world_size); processor_name, world_rank, world_size);
int number, size;
ssize_t err;
MPI_File in;
if ((err = MPI_File_open(MPI_COMM_WORLD, STOP_FILE, MPI_MODE_RDONLY,
// We are assuming at least 2 processes for this task MPI_INFO_NULL, &in))) {
if (world_size < 2) { fprintf(stderr, "%s: Couldn't open file %s\n", argv[0],
fprintf(stderr, "World size must be greater than 1 for %s\n", argv[0]); STOP_FILE);
MPI_Abort(MPI_COMM_WORLD, 1); exit(-1);
}
int number;
if (world_rank == 0) {
// If we are rank 0, set the number to -1 and send it to process 1
number = -1;
MPI_Send(
/* data = */ &number,
/* count = */ 1,
/* datatype = */ MPI_INT,
/* destination = */ 1,
/* tag = */ 0,
/* communicator = */ MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(
/* data = */ &number,
/* count = */ 1,
/* datatype = */ MPI_INT,
/* source = */ 0,
/* tag = */ 0,
/* communicator = */ MPI_COMM_WORLD,
/* status = */ MPI_STATUS_IGNORE);
printf("Process 1 received number %d from process 0\n", number);
} }
MPI_Comm_size(MPI_COMM_WORLD, &size);
// Finalize the MPI environment. localmax = parprocess(&in, world_rank, world_size);
MPI_Reduce(&localmax, &globalmax, 1, MPI_INT, MPI_MAX, 0,
MPI_COMM_WORLD);
if (world_rank == 0)
printf("globalmax: %d\n", globalmax);
MPI_File_close(&in);
MPI_Finalize(); MPI_Finalize();
exit(0);
} }
time_t
substr_time(struct tm a, struct tm b)
{
return (a.tm_hour * 3600 + a.tm_min * 60 + a.tm_sec) -
(b.tm_hour * 3600 + b.tm_min * 60 + b.tm_sec);
}
char *
get_word(char *lines, size_t x, size_t y, size_t num_attr, size_t max_attr)
{
size_t offset = (x * (num_attr * max_attr) + y * (max_attr));
return lines + offset;
}
void
fill_lines(char *chunk, size_t num_char, size_t num_lines, size_t num_attr,
size_t max_attr, char *lines)
{
size_t attr_pos = 0, line_pos = 0, word_pos = 0;
for (size_t k = 0; k < num_char; ++k) {
if (chunk[k] == DELIM) {
// go to next attrib
char *word = get_word(lines, line_pos, attr_pos,
num_attr, max_attr);
memcpy(word, chunk + k - word_pos, word_pos);
word[word_pos] = '\0';
++attr_pos;
word_pos = 0;
}
else if (chunk[k] == '\n') {
++line_pos;
attr_pos = 0;
word_pos = 0;
}
else if (chunk[k] == '\r') {}
else {
++word_pos;
}
}
}
void
get_lines_info(char *chunk, size_t num_char, size_t *max_attr_size,
size_t *num_lines)
{
// count max line size and number of lines
size_t current_attr_size = 0;
for (int k = 0; k < num_char; ++k) {
// LINE
if (chunk[k] == '\n' || chunk[k] == '\r')
++(*num_lines);
// ATTRIBUTES
if (chunk[k] == DELIM) {
if (current_attr_size > *max_attr_size)
*max_attr_size = current_attr_size;
current_attr_size = 0;
} else
++current_attr_size;
}
}
// get num of cols of csv
size_t
get_num_attr(char *chunk)
{
size_t num_attr = 0;
for (size_t k = 0 ; chunk[k] != '\n' ; ++k)
if (chunk[k] == DELIM)
++num_attr;
return num_attr;
}
int
do_map(char *chunk, size_t num_char)
{
size_t num_lines = 0, num_attr = 0;
size_t max_attr_size = 0;
get_lines_info(chunk, num_char, &max_attr_size, &num_lines);
num_attr = get_num_attr(chunk);
// allocate lines (just a big continuous chunk)
// is a 2d arr of char*
char *lines;
lines = calloc(1, num_lines * num_attr * max_attr_size);
fill_lines(chunk, num_char, num_lines, num_attr, max_attr_size, lines);
struct tm dep_time, arr_time;
time_t max_time = 0;
for (size_t k = 0; k < num_lines; ++k) {
memcpy(&dep_time,
get_word(lines, k, DEPARTURE_TIME, num_attr,
max_attr_size),
sizeof(struct tm));
memcpy(&arr_time,
get_word(lines, k, ARRIVAL_TIME, num_attr,
max_attr_size),
sizeof(struct tm));
// arr_time.tm_sec);
strptime(get_word(lines, k, DEPARTURE_TIME, num_attr,
max_attr_size),
"%H:%M:%S", &dep_time);
strptime(get_word(lines, k, ARRIVAL_TIME, num_attr,
max_attr_size),
"%H:%M:%S", &arr_time);
time_t tmp = substr_time(arr_time, dep_time);
if (tmp > max_time)
max_time = tmp;
}
free(lines);
free(chunk);
return max_time;
}
// help from https://stackoverflow.com/questions/12939279/mpi-reading-from-a-text-file
// is hosted under a permissive licence, ty Jonathan Dursi :)
int
parprocess(MPI_File *in, const int rank, const int size)
{
// reads revelant lines from file to chunk.
// IN OUR CASE we will use overlap to reach EOF of this line.
// Duplicates dont matter in our case ; res will be the same either way.
size_t proc_size, total_size, total_size_overlap;
char *chunk;
MPI_Offset globalstart;
MPI_Offset globalend;
MPI_Offset filesize;
MPI_File_get_size(*in, &filesize);
filesize--; /* get rid of text file eof */
proc_size = filesize / size;
globalstart = rank * proc_size;
globalend = globalstart + proc_size - 1;
if (rank == size - 1)
globalend = filesize - 1;
/* add overlap to the end of everyone's chunk except last
* proc... */
size_t globalend_overlap = globalend;
if (rank != size - 1)
globalend_overlap += OVERLAP;
total_size_overlap = globalend_overlap - globalstart + 1;
total_size = globalend - globalstart + 1;
/* allocate memory, filled with 0 */
chunk = calloc(1, total_size);
printf("[PARPROCESS] begin\n");
ssize_t err;
{
err = MPI_File_read_at_all_begin(*in, globalstart, chunk,
total_size, MPI_CHAR);
if (err) {
printf("error %lu\n", err);
MPI_Finalize();
}
err = MPI_File_read_at_all_end(*in, chunk, MPI_STATUS_IGNORE);
if (err) {
printf("error %lu\n", err);
MPI_Finalize();
}
}
// fills the first incoherent bytes with \0
//size_t k = 0;
//if (rank != 0) { // first has no incoherece at begining
// for (; chunk[k] != '\r' && chunk[k] != '\n';
// ++k) // get number of incoherent bytes :)
// ;
// // reset
// memmove(chunk, chunk + k, total_size); // - 2: dont count \n\r
//}
// fill char after next EOL wiht \0 ; starting from proc_size to end of
// overlap
//if (rank != size) { // last doesnt have padding, dont check it
// for (; (chunk[globalend] != '\n' && chunk[globalend] != '\r') &&
// globalend < globalend_overlap;
// ++globalend)
// ;
// memset(chunk + globalend, '\0', OVERLAP);
// //
//}
//chunk[total_size_overlap] = '\0'; // just to be sure!
int max = do_map(chunk, total_size);
printf("[PARPROCESS] end, max: %d\n", max);
return max;
}