diff --git a/CMakeLists.txt b/CMakeLists.txt index df55db3..896abec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,9 +14,9 @@ set(src set(CMAKE_DEBUG_POSTFIX d) add_executable(ift630_sts3 ${src}) -find_package(MPI) #make it REQUIRED, if you want -include_directories(SYSTEM ${MPI_INCLUDE_PATH}) -target_link_libraries(ift630_sts3 ${MPI_C_LIBRARIES}) +find_package(OpenMP) #make it REQUIRED, if you want +include_directories(SYSTEM ${OpenMP_INCLUDE_PATH}) +target_link_libraries(ift630_sts3 ${OpenMP_C_LIBRARIES}) set_target_properties(ift630_sts3 PROPERTIES DEBUG_POSTFIX ${CMAKE_DEBUG_POSTFIX}) target_compile_features(ift630_sts3 PRIVATE c_std_99) diff --git a/src/main.c b/src/main.c index 3b0e6da..a768773 100644 --- a/src/main.c +++ b/src/main.c @@ -1,7 +1,7 @@ #define _XOPEN_SOURCE 700 #include -#include +#include #include #include #include @@ -26,59 +26,36 @@ enum HEADER { #define STOP_FILE "./stop_times.txt" // adding key here! -int do_map(char *, size_t, char *, int); +int do_map(char *, size_t); void do_reduce(size_t); -int parprocess(MPI_File *, const int, const int, char *); +int parprocess(char *, size_t); int main(int argc, char **argv) { - // Initialize the MPI environment - MPI_Init(NULL, NULL); - - int world_size, world_rank; - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); - - char *key; - if (argc != 2) { - if (world_rank == 0) - printf("no args ; indexing all\n"); - key = NULL; - } - else - key = argv[2]; - size_t res = 0; - int number, size; ssize_t err; - MPI_File in; + FILE *in = fopen("./stop_times.txt", "rb"); - if ((err = MPI_File_open(MPI_COMM_WORLD, STOP_FILE, MPI_MODE_RDONLY, - MPI_INFO_NULL, &in))) { - fprintf(stderr, "%s: Couldn't open file %s\n", argv[0], - STOP_FILE); - exit(-1); - } - MPI_Comm_size(MPI_COMM_WORLD, &size); - - res = parprocess(&in, world_rank, world_size, key); - - if (world_rank == 0) { - for (char k = 0; k < world_size; k++) { - MPI_Recv(&res, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - if (res) { - //printf("res: found in %lu ns\n", res); break; - } - else - printf("res: not found\n"); - } + if (!in) { + fputs("File error", stderr); + exit(1); } - MPI_File_close(&in); - MPI_Finalize(); + fseek(in, 0, SEEK_END); + int file_size = ftell(in); + rewind(in); + + char *buffer = (char *)malloc(sizeof(char) * file_size); + // copy the file into the buffer: + fread(buffer, 1, file_size, in); + + omp_set_num_threads(16); + //do_map(buffer, file_size); + parprocess(buffer, file_size); + + fclose(in); exit(0); } @@ -192,7 +169,7 @@ get_max_time(char *lines, size_t num_lines, size_t num_attr, size_t max_attr) } int -do_map(char *chunk, size_t num_char, char *key, int rank) +do_map(char *chunk, size_t num_char) { size_t num_lines = 0, num_attr = 0; size_t max_attr_size = 0; @@ -211,10 +188,8 @@ do_map(char *chunk, size_t num_char, char *key, int rank) struct timespec start_time, stop_time; // test all ; print all - if (key == NULL) { - char file_name[10]; + char file_name[] = "Out.txt"; - sprintf(file_name, "Out%d.txt", rank); FILE *file = fopen(file_name, "w"); for (size_t k = 0; k < num_lines; ++k) { char *trip_name = get_word(lines, k, TRIP_ID, num_attr, @@ -236,25 +211,7 @@ do_map(char *chunk, size_t num_char, char *key, int rank) } fclose(file); // just so we dont lock - MPI_Send(&time, 1, MPI_UNSIGNED_LONG, 0, 0, MPI_COMM_WORLD); - } else { // search for key - clock_gettime(CLOCK_MONOTONIC, &start_time); - // int res = get_max_time(lines, num_lines, num_attr, - // max_attr_size); - res = search_key(lines, num_lines, num_attr, max_attr_size, - key); - clock_gettime(CLOCK_MONOTONIC, &stop_time); - - time = (stop_time.tv_sec - start_time.tv_sec) * 100000000 + - (stop_time.tv_nsec - start_time.tv_nsec); - if (res) - MPI_Send(&time, 1, MPI_UNSIGNED_LONG, 0, 0, - MPI_COMM_WORLD); - else - MPI_Send(&res, 1, MPI_UNSIGNED_LONG, 0, 0, - MPI_COMM_WORLD); - } free(lines); free(chunk); return res; @@ -263,78 +220,42 @@ do_map(char *chunk, size_t num_char, char *key, int rank) // help from https://stackoverflow.com/questions/12939279/mpi-reading-from-a-text-file // is hosted under a permissive licence, ty Jonathan Dursi :) int -parprocess(MPI_File *in, const int rank, const int size, char *key) +parprocess(char *buff, size_t file_size) { // reads revelant lines from file to chunk. // IN OUR CASE we will use overlap to reach EOF of this line. // Duplicates dont matter in our case ; res will be the same either way. size_t proc_size, total_size, total_size_overlap; - char *chunk; - MPI_Offset globalstart; - MPI_Offset globalend; - MPI_Offset filesize; + size_t start; + size_t end; - MPI_File_get_size(*in, &filesize); - filesize--; /* get rid of text file eof */ - proc_size = filesize / size; - globalstart = rank * proc_size; - globalend = globalstart + proc_size - 1; - if (rank == size - 1) - globalend = filesize - 1; - - /* add overlap to the end of everyone's chunk except last - * proc... */ - size_t globalend_overlap = globalend; - if (rank != size - 1) - globalend_overlap += OVERLAP; - - total_size_overlap = globalend_overlap - globalstart + 1; - total_size = globalend - globalstart + 1; - - /* allocate memory, filled with 0 */ - chunk = calloc(1, total_size); - - ssize_t err; + #pragma omp parallel { - err = MPI_File_read_at_all_begin(*in, globalstart, chunk, - total_size, MPI_CHAR); - if (err) { - printf("error %lu\n", err); - MPI_Finalize(); - } - err = MPI_File_read_at_all_end(*in, chunk, MPI_STATUS_IGNORE); - if (err) { - printf("error %lu\n", err); - MPI_Finalize(); - } + size_t size = omp_get_num_threads(); + size_t rank = omp_get_thread_num(); + + printf("%lu / %lu\n", rank, size); + proc_size = file_size / size; + start = rank * proc_size; + end = start + proc_size - 1; + if (rank == size - 1) + end = file_size - 1; + + /* add overlap to the end of everyone's chunk except last + * proc... */ + size_t end_overlap = end; + if (rank != size - 1) + end_overlap += OVERLAP; + + total_size_overlap = end_overlap - start + 1; + total_size = end - start + 1; + + /* allocate memory, filled with 0 */ + char *chunk = calloc(1, total_size); + memcpy(chunk, buff + start, proc_size); + + ssize_t err; + int max = do_map(chunk, total_size); } - - // eh commenting this out, at worst we'll have one unusable line, but this - // still works w/ padding - // fills the first incoherent bytes with \0 - //size_t k = 0; - //if (rank != 0) { // first has no incoherece at begining - // for (; chunk[k] != '\r' && chunk[k] != '\n'; - // ++k) // get number of incoherent bytes :) - // ; - // // reset - - // memmove(chunk, chunk + k, total_size); // - 2: dont count \n\r - //} - - // fill char after next EOL wiht \0 ; starting from proc_size to end of - // overlap - //if (rank != size) { // last doesnt have padding, dont check it - // for (; (chunk[globalend] != '\n' && chunk[globalend] != '\r') && - // globalend < globalend_overlap; - // ++globalend) - // ; - // memset(chunk + globalend, '\0', OVERLAP); - // // - //} - //chunk[total_size_overlap] = '\0'; // just to be sure! - - int max = do_map(chunk, total_size, key, rank); - - return max; + return 0; }