diff --git a/.gitignore b/.gitignore index 378eac2..1bc4e0d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ build +.vimspector.json diff --git a/src/ift630_sts3 b/src/ift630_sts3 deleted file mode 100755 index 8da96fa..0000000 Binary files a/src/ift630_sts3 and /dev/null differ diff --git a/src/ift630_sts3d b/src/ift630_sts3d deleted file mode 100755 index 8da96fa..0000000 Binary files a/src/ift630_sts3d and /dev/null differ diff --git a/src/main.c b/src/main.c index 68df94a..282a6d1 100644 --- a/src/main.c +++ b/src/main.c @@ -1,59 +1,273 @@ -/* - "Hello World" MPI Test Program -*/ -#include -#include -#include +#define _XOPEN_SOURCE 700 +#include + #include +#include +#include +#include +#include -int main(int argc, char** argv) { - // Initialize the MPI environment - MPI_Init(NULL, NULL); +enum HEADER { + TRIP_ID = 0, + ARRIVAL_TIME, + DEPARTURE_TIME, + STOP_ID, + STOP_SEQUENCE, + STOP_HEADSIGN, + PICKUP_TYPE, + DROP_OFF_TYPE, + SHAPE_DIST_TRAVELED, + TIMEPOINT, + END +}; - int world_size, world_rank; - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); +#define DELIM ',' +#define OVERLAP 100 +#define STOP_FILE "./stop_times.txt" - // Get the name of the processor - char processor_name[MPI_MAX_PROCESSOR_NAME]; - int name_len; - MPI_Get_processor_name(processor_name, &name_len); +int do_map(char *, size_t); +void do_reduce(size_t); +int parprocess(MPI_File *, const int, const int); - // Print off a hello world message - printf("Hello world from processor %s, rank %d out of %d processors\n", - processor_name, world_rank, world_size); +int +main(int argc, char **argv) +{ + // Initialize the MPI environment + MPI_Init(NULL, NULL); + int world_size, world_rank; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + int localmax = 0, globalmax = 0; - // We are assuming at least 2 processes for this task - if (world_size < 2) { - fprintf(stderr, "World size must be greater than 1 for %s\n", argv[0]); - MPI_Abort(MPI_COMM_WORLD, 1); - } - int number; - if (world_rank == 0) { - // If we are rank 0, set the number to -1 and send it to process 1 - number = -1; - MPI_Send( - /* data = */ &number, - /* count = */ 1, - /* datatype = */ MPI_INT, - /* destination = */ 1, - /* tag = */ 0, - /* communicator = */ MPI_COMM_WORLD); - } else if (world_rank == 1) { - MPI_Recv( - /* data = */ &number, - /* count = */ 1, - /* datatype = */ MPI_INT, - /* source = */ 0, - /* tag = */ 0, - /* communicator = */ MPI_COMM_WORLD, - /* status = */ MPI_STATUS_IGNORE); - printf("Process 1 received number %d from process 0\n", number); - } + // Get the name of the processor + char processor_name[MPI_MAX_PROCESSOR_NAME]; + int name_len; + MPI_Get_processor_name(processor_name, &name_len); - // Finalize the MPI environment. - MPI_Finalize(); + // Print off a hello world message + printf("Hello world from processor %s, rank %d out of %d processors\n", + processor_name, world_rank, world_size); + + int number, size; + ssize_t err; + MPI_File in; + + if ((err = MPI_File_open(MPI_COMM_WORLD, STOP_FILE, MPI_MODE_RDONLY, + MPI_INFO_NULL, &in))) { + fprintf(stderr, "%s: Couldn't open file %s\n", argv[0], + STOP_FILE); + exit(-1); + } + MPI_Comm_size(MPI_COMM_WORLD, &size); + + localmax = parprocess(&in, world_rank, world_size); + + MPI_Reduce(&localmax, &globalmax, 1, MPI_INT, MPI_MAX, 0, + MPI_COMM_WORLD); + if (world_rank == 0) + printf("globalmax: %d\n", globalmax); + + MPI_File_close(&in); + MPI_Finalize(); + exit(0); } +time_t +substr_time(struct tm a, struct tm b) +{ + return (a.tm_hour * 3600 + a.tm_min * 60 + a.tm_sec) - + (b.tm_hour * 3600 + b.tm_min * 60 + b.tm_sec); +} + +char * +get_word(char *lines, size_t x, size_t y, size_t num_attr, size_t max_attr) +{ + size_t offset = (x * (num_attr * max_attr) + y * (max_attr)); + return lines + offset; +} + +void +fill_lines(char *chunk, size_t num_char, size_t num_lines, size_t num_attr, + size_t max_attr, char *lines) +{ + size_t attr_pos = 0, line_pos = 0, word_pos = 0; + for (size_t k = 0; k < num_char; ++k) { + if (chunk[k] == DELIM) { + // go to next attrib + char *word = get_word(lines, line_pos, attr_pos, + num_attr, max_attr); + memcpy(word, chunk + k - word_pos, word_pos); + word[word_pos] = '\0'; + ++attr_pos; + word_pos = 0; + } + else if (chunk[k] == '\n') { + ++line_pos; + attr_pos = 0; + word_pos = 0; + } + else if (chunk[k] == '\r') {} + else { + ++word_pos; + } + } +} + +void +get_lines_info(char *chunk, size_t num_char, size_t *max_attr_size, + size_t *num_lines) +{ + // count max line size and number of lines + size_t current_attr_size = 0; + for (int k = 0; k < num_char; ++k) { + // LINE + if (chunk[k] == '\n' || chunk[k] == '\r') + ++(*num_lines); + // ATTRIBUTES + if (chunk[k] == DELIM) { + if (current_attr_size > *max_attr_size) + *max_attr_size = current_attr_size; + current_attr_size = 0; + } else + ++current_attr_size; + } +} + +// get num of cols of csv +size_t +get_num_attr(char *chunk) +{ + size_t num_attr = 0; + for (size_t k = 0 ; chunk[k] != '\n' ; ++k) + if (chunk[k] == DELIM) + ++num_attr; + return num_attr; +} + +int +do_map(char *chunk, size_t num_char) +{ + size_t num_lines = 0, num_attr = 0; + size_t max_attr_size = 0; + + get_lines_info(chunk, num_char, &max_attr_size, &num_lines); + num_attr = get_num_attr(chunk); + + // allocate lines (just a big continuous chunk) + // is a 2d arr of char* + char *lines; + lines = calloc(1, num_lines * num_attr * max_attr_size); + + + fill_lines(chunk, num_char, num_lines, num_attr, max_attr_size, lines); + + struct tm dep_time, arr_time; + time_t max_time = 0; + for (size_t k = 0; k < num_lines; ++k) { + memcpy(&dep_time, + get_word(lines, k, DEPARTURE_TIME, num_attr, + max_attr_size), + sizeof(struct tm)); + memcpy(&arr_time, + get_word(lines, k, ARRIVAL_TIME, num_attr, + max_attr_size), + sizeof(struct tm)); + // arr_time.tm_sec); + + strptime(get_word(lines, k, DEPARTURE_TIME, num_attr, + max_attr_size), + "%H:%M:%S", &dep_time); + + strptime(get_word(lines, k, ARRIVAL_TIME, num_attr, + max_attr_size), + "%H:%M:%S", &arr_time); + time_t tmp = substr_time(arr_time, dep_time); + if (tmp > max_time) + max_time = tmp; + } + free(lines); + free(chunk); + return max_time; +} + +// help from https://stackoverflow.com/questions/12939279/mpi-reading-from-a-text-file +// is hosted under a permissive licence, ty Jonathan Dursi :) +int +parprocess(MPI_File *in, const int rank, const int size) +{ + // reads revelant lines from file to chunk. + // IN OUR CASE we will use overlap to reach EOF of this line. + // Duplicates dont matter in our case ; res will be the same either way. + size_t proc_size, total_size, total_size_overlap; + char *chunk; + MPI_Offset globalstart; + MPI_Offset globalend; + MPI_Offset filesize; + + MPI_File_get_size(*in, &filesize); + filesize--; /* get rid of text file eof */ + proc_size = filesize / size; + globalstart = rank * proc_size; + globalend = globalstart + proc_size - 1; + if (rank == size - 1) + globalend = filesize - 1; + + /* add overlap to the end of everyone's chunk except last + * proc... */ + size_t globalend_overlap = globalend; + if (rank != size - 1) + globalend_overlap += OVERLAP; + + total_size_overlap = globalend_overlap - globalstart + 1; + total_size = globalend - globalstart + 1; + + /* allocate memory, filled with 0 */ + chunk = calloc(1, total_size); + printf("[PARPROCESS] begin\n"); + + ssize_t err; + { + err = MPI_File_read_at_all_begin(*in, globalstart, chunk, + total_size, MPI_CHAR); + if (err) { + printf("error %lu\n", err); + MPI_Finalize(); + } + err = MPI_File_read_at_all_end(*in, chunk, MPI_STATUS_IGNORE); + if (err) { + printf("error %lu\n", err); + MPI_Finalize(); + } + } + + + // fills the first incoherent bytes with \0 + //size_t k = 0; + //if (rank != 0) { // first has no incoherece at begining + // for (; chunk[k] != '\r' && chunk[k] != '\n'; + // ++k) // get number of incoherent bytes :) + // ; + // // reset + + // memmove(chunk, chunk + k, total_size); // - 2: dont count \n\r + //} + + // fill char after next EOL wiht \0 ; starting from proc_size to end of + // overlap + //if (rank != size) { // last doesnt have padding, dont check it + // for (; (chunk[globalend] != '\n' && chunk[globalend] != '\r') && + // globalend < globalend_overlap; + // ++globalend) + // ; + // memset(chunk + globalend, '\0', OVERLAP); + // // + //} + //chunk[total_size_overlap] = '\0'; // just to be sure! + + int max = do_map(chunk, total_size); + printf("[PARPROCESS] end, max: %d\n", max); + + return max; +}