#define _XOPEN_SOURCE 700 #include #include #include #include #include #include enum HEADER { TRIP_ID = 0, ARRIVAL_TIME, DEPARTURE_TIME, STOP_ID, STOP_SEQUENCE, STOP_HEADSIGN, PICKUP_TYPE, DROP_OFF_TYPE, SHAPE_DIST_TRAVELED, TIMEPOINT, END }; #define DELIM ',' #define OVERLAP 100 #define STOP_FILE "./stop_times.txt" // adding key here! int do_map(char *, size_t); void do_reduce(size_t); int parprocess(char *, size_t); int main(int argc, char **argv) { size_t res = 0; ssize_t err; FILE *in = fopen("./stop_times.txt", "rb"); if (!in) { fputs("File error", stderr); exit(1); } fseek(in, 0, SEEK_END); int file_size = ftell(in); rewind(in); char *buffer = (char *)malloc(sizeof(char) * file_size); // copy the file into the buffer: fread(buffer, 1, file_size, in); omp_set_num_threads(16); //do_map(buffer, file_size); parprocess(buffer, file_size); fclose(in); exit(0); } time_t substr_time(struct tm a, struct tm b) { return (a.tm_hour * 3600 + a.tm_min * 60 + a.tm_sec) - (b.tm_hour * 3600 + b.tm_min * 60 + b.tm_sec); } char * get_word(char *lines, size_t x, size_t y, size_t num_attr, size_t max_attr) { size_t offset = (x * (num_attr * max_attr) + y * (max_attr)); return lines + offset; } void fill_lines(char *chunk, size_t num_char, size_t num_lines, size_t num_attr, size_t max_attr, char *lines) { size_t attr_pos = 0, line_pos = 0, word_pos = 0; for (size_t k = 0; k < num_char; ++k) { if (chunk[k] == DELIM) { // go to next attrib char *word = get_word(lines, line_pos, attr_pos, num_attr, max_attr); memcpy(word, chunk + k - word_pos, word_pos); word[word_pos] = '\0'; ++attr_pos; word_pos = 0; } else if (chunk[k] == '\n') { ++line_pos; attr_pos = 0; word_pos = 0; } else if (chunk[k] == '\r') {} else { ++word_pos; } } } void get_lines_info(char *chunk, size_t num_char, size_t *max_attr_size, size_t *num_lines) { // count max line size and number of lines size_t current_attr_size = 0; for (int k = 0; k < num_char; ++k) { // LINE if (chunk[k] == '\n' || chunk[k] == '\r') ++(*num_lines); // ATTRIBUTES if (chunk[k] == DELIM) { if (current_attr_size > *max_attr_size) *max_attr_size = current_attr_size; current_attr_size = 0; } else ++current_attr_size; } } // get num of cols of csv size_t get_num_attr(char *chunk) { size_t num_attr = 0; for (size_t k = 0 ; chunk[k] != '\n' ; ++k) if (chunk[k] == DELIM) ++num_attr; return num_attr; } int search_key(char *lines, size_t num_lines, size_t num_attr, size_t max_attr, char *key) { for (size_t k = 0; k < num_lines; ++k) { if (!strcmp(get_word(lines, k, TRIP_ID, num_attr, max_attr), key)) return 1; } return 0; } int get_max_time(char *lines, size_t num_lines, size_t num_attr, size_t max_attr) { struct tm dep_time, arr_time; time_t max_time = 0; for (size_t k = 0; k < num_lines; ++k) { memcpy(&dep_time, get_word(lines, k, DEPARTURE_TIME, num_attr, max_attr), sizeof(struct tm)); memcpy(&arr_time, get_word(lines, k, ARRIVAL_TIME, num_attr, max_attr), sizeof(struct tm)); strptime(get_word(lines, k, DEPARTURE_TIME, num_attr, max_attr), "%H:%M:%S", &dep_time); strptime(get_word(lines, k, ARRIVAL_TIME, num_attr, max_attr), "%H:%M:%S", &arr_time); time_t tmp = substr_time(arr_time, dep_time); if (tmp > max_time) max_time = tmp; } return max_time; } int do_map(char *chunk, size_t num_char) { size_t num_lines = 0, num_attr = 0; size_t max_attr_size = 0; size_t time, res; get_lines_info(chunk, num_char, &max_attr_size, &num_lines); num_attr = get_num_attr(chunk); // allocate lines (just a big continuous chunk) // is a 2d arr of char* char *lines; lines = calloc(1, num_lines * num_attr * max_attr_size); fill_lines(chunk, num_char, num_lines, num_attr, max_attr_size, lines); struct timespec start_time, stop_time; // test all ; print all char file_name[] = "Out.txt"; FILE *file = fopen(file_name, "w"); for (size_t k = 0; k < num_lines; ++k) { char *trip_name = get_word(lines, k, TRIP_ID, num_attr, max_attr_size); clock_gettime(CLOCK_MONOTONIC, &start_time); // int res = get_max_time(lines, num_lines, num_attr, // max_attr_size); size_t res = search_key(lines, num_lines, num_attr, max_attr_size, trip_name); clock_gettime(CLOCK_MONOTONIC, &stop_time); if (!res) continue; // dont print if err time = (stop_time.tv_sec - start_time.tv_sec) * 100000000 + (stop_time.tv_nsec - start_time.tv_nsec); fprintf(file, "%s:%lu\n", trip_name, time); } fclose(file); // just so we dont lock free(lines); free(chunk); return res; } // help from https://stackoverflow.com/questions/12939279/mpi-reading-from-a-text-file // is hosted under a permissive licence, ty Jonathan Dursi :) int parprocess(char *buff, size_t file_size) { // reads revelant lines from file to chunk. // IN OUR CASE we will use overlap to reach EOF of this line. // Duplicates dont matter in our case ; res will be the same either way. size_t proc_size, total_size, total_size_overlap; size_t start; size_t end; #pragma omp parallel { size_t size = omp_get_num_threads(); size_t rank = omp_get_thread_num(); printf("%lu / %lu\n", rank, size); proc_size = file_size / size; start = rank * proc_size; end = start + proc_size - 1; if (rank == size - 1) end = file_size - 1; /* add overlap to the end of everyone's chunk except last * proc... */ size_t end_overlap = end; if (rank != size - 1) end_overlap += OVERLAP; total_size_overlap = end_overlap - start + 1; total_size = end - start + 1; /* allocate memory, filled with 0 */ char *chunk = calloc(1, total_size); memcpy(chunk, buff + start, proc_size); ssize_t err; int max = do_map(chunk, total_size); } return 0; }