MPI: search key
This commit is contained in:
parent
ce4e52a5ac
commit
6496329914
1 changed files with 92 additions and 36 deletions
126
src/main.c
126
src/main.c
|
@ -25,9 +25,10 @@ enum HEADER {
|
||||||
#define OVERLAP 100
|
#define OVERLAP 100
|
||||||
#define STOP_FILE "./stop_times.txt"
|
#define STOP_FILE "./stop_times.txt"
|
||||||
|
|
||||||
int do_map(char *, size_t);
|
// adding key here!
|
||||||
|
int do_map(char *, size_t, char *, int);
|
||||||
void do_reduce(size_t);
|
void do_reduce(size_t);
|
||||||
int parprocess(MPI_File *, const int, const int);
|
int parprocess(MPI_File *, const int, const int, char *);
|
||||||
|
|
||||||
int
|
int
|
||||||
main(int argc, char **argv)
|
main(int argc, char **argv)
|
||||||
|
@ -39,22 +40,16 @@ main(int argc, char **argv)
|
||||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||||
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
||||||
|
|
||||||
|
char *key;
|
||||||
if (argc != 2) {
|
if (argc != 2) {
|
||||||
printf("Please enter key as arg");
|
if (world_rank == 0)
|
||||||
exit(-1);
|
printf("no args ; indexing all\n");
|
||||||
|
key = NULL;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
key = argv[2];
|
||||||
|
|
||||||
char *key = argv[1];
|
size_t res = 0;
|
||||||
int localmax = 0, globalmax = 0;
|
|
||||||
|
|
||||||
// Get the name of the processor
|
|
||||||
char processor_name[MPI_MAX_PROCESSOR_NAME];
|
|
||||||
int name_len;
|
|
||||||
MPI_Get_processor_name(processor_name, &name_len);
|
|
||||||
|
|
||||||
// Print off a hello world message
|
|
||||||
printf("Hello world from processor %s, rank %d out of %d processors\n",
|
|
||||||
processor_name, world_rank, world_size);
|
|
||||||
|
|
||||||
int number, size;
|
int number, size;
|
||||||
ssize_t err;
|
ssize_t err;
|
||||||
|
@ -68,12 +63,19 @@ main(int argc, char **argv)
|
||||||
}
|
}
|
||||||
MPI_Comm_size(MPI_COMM_WORLD, &size);
|
MPI_Comm_size(MPI_COMM_WORLD, &size);
|
||||||
|
|
||||||
localmax = parprocess(&in, world_rank, world_size);
|
res = parprocess(&in, world_rank, world_size, key);
|
||||||
|
|
||||||
MPI_Reduce(&localmax, &globalmax, 1, MPI_INT, MPI_MAX, 0,
|
if (world_rank == 0) {
|
||||||
MPI_COMM_WORLD);
|
for (char k = 0; k < world_size; k++) {
|
||||||
if (world_rank == 0)
|
MPI_Recv(&res, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0,
|
||||||
printf("globalmax: %d\n", globalmax);
|
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||||
|
if (res) {
|
||||||
|
//printf("res: found in %lu ns\n", res); break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
printf("res: not found\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MPI_File_close(&in);
|
MPI_File_close(&in);
|
||||||
MPI_Finalize();
|
MPI_Finalize();
|
||||||
|
@ -152,7 +154,20 @@ get_num_attr(char *chunk)
|
||||||
return num_attr;
|
return num_attr;
|
||||||
}
|
}
|
||||||
|
|
||||||
int get_max_time(char *lines, size_t num_lines, size_t num_attr, size_t max_attr)
|
int
|
||||||
|
search_key(char *lines, size_t num_lines, size_t num_attr, size_t max_attr,
|
||||||
|
char *key)
|
||||||
|
{
|
||||||
|
for (size_t k = 0; k < num_lines; ++k) {
|
||||||
|
if (!strcmp(get_word(lines, k, TRIP_ID, num_attr, max_attr),
|
||||||
|
key))
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
get_max_time(char *lines, size_t num_lines, size_t num_attr, size_t max_attr)
|
||||||
{
|
{
|
||||||
struct tm dep_time, arr_time;
|
struct tm dep_time, arr_time;
|
||||||
time_t max_time = 0;
|
time_t max_time = 0;
|
||||||
|
@ -163,14 +178,11 @@ int get_max_time(char *lines, size_t num_lines, size_t num_attr, size_t max_attr
|
||||||
memcpy(&arr_time,
|
memcpy(&arr_time,
|
||||||
get_word(lines, k, ARRIVAL_TIME, num_attr, max_attr),
|
get_word(lines, k, ARRIVAL_TIME, num_attr, max_attr),
|
||||||
sizeof(struct tm));
|
sizeof(struct tm));
|
||||||
// arr_time.tm_sec);
|
|
||||||
|
|
||||||
strptime(get_word(lines, k, DEPARTURE_TIME, num_attr,
|
strptime(get_word(lines, k, DEPARTURE_TIME, num_attr, max_attr),
|
||||||
max_attr),
|
|
||||||
"%H:%M:%S", &dep_time);
|
"%H:%M:%S", &dep_time);
|
||||||
|
|
||||||
strptime(get_word(lines, k, ARRIVAL_TIME, num_attr,
|
strptime(get_word(lines, k, ARRIVAL_TIME, num_attr, max_attr),
|
||||||
max_attr),
|
|
||||||
"%H:%M:%S", &arr_time);
|
"%H:%M:%S", &arr_time);
|
||||||
time_t tmp = substr_time(arr_time, dep_time);
|
time_t tmp = substr_time(arr_time, dep_time);
|
||||||
if (tmp > max_time)
|
if (tmp > max_time)
|
||||||
|
@ -180,10 +192,11 @@ int get_max_time(char *lines, size_t num_lines, size_t num_attr, size_t max_attr
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
do_map(char *chunk, size_t num_char)
|
do_map(char *chunk, size_t num_char, char *key, int rank)
|
||||||
{
|
{
|
||||||
size_t num_lines = 0, num_attr = 0;
|
size_t num_lines = 0, num_attr = 0;
|
||||||
size_t max_attr_size = 0;
|
size_t max_attr_size = 0;
|
||||||
|
size_t time, res;
|
||||||
|
|
||||||
get_lines_info(chunk, num_char, &max_attr_size, &num_lines);
|
get_lines_info(chunk, num_char, &max_attr_size, &num_lines);
|
||||||
num_attr = get_num_attr(chunk);
|
num_attr = get_num_attr(chunk);
|
||||||
|
@ -193,20 +206,64 @@ do_map(char *chunk, size_t num_char)
|
||||||
char *lines;
|
char *lines;
|
||||||
lines = calloc(1, num_lines * num_attr * max_attr_size);
|
lines = calloc(1, num_lines * num_attr * max_attr_size);
|
||||||
|
|
||||||
|
|
||||||
fill_lines(chunk, num_char, num_lines, num_attr, max_attr_size, lines);
|
fill_lines(chunk, num_char, num_lines, num_attr, max_attr_size, lines);
|
||||||
|
|
||||||
int max_time = get_max_time(lines, num_lines, num_attr, max_attr_size);
|
struct timespec start_time, stop_time;
|
||||||
|
|
||||||
|
// test all ; print all
|
||||||
|
if (key == NULL) {
|
||||||
|
char file_name[10];
|
||||||
|
|
||||||
|
sprintf(file_name, "Out%d.txt", rank);
|
||||||
|
FILE *file = fopen(file_name, "w");
|
||||||
|
for (size_t k = 0; k < num_lines; ++k) {
|
||||||
|
char *trip_name = get_word(lines, k, TRIP_ID, num_attr,
|
||||||
|
max_attr_size);
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &start_time);
|
||||||
|
// int res = get_max_time(lines, num_lines, num_attr,
|
||||||
|
// max_attr_size);
|
||||||
|
size_t res = search_key(lines, num_lines, num_attr,
|
||||||
|
max_attr_size, trip_name);
|
||||||
|
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &stop_time);
|
||||||
|
|
||||||
|
if (!res)
|
||||||
|
continue; // dont print if err
|
||||||
|
time = (stop_time.tv_sec - start_time.tv_sec) *
|
||||||
|
100000000 +
|
||||||
|
(stop_time.tv_nsec - start_time.tv_nsec);
|
||||||
|
fprintf(file, "%s:%lu\n", trip_name, time);
|
||||||
|
}
|
||||||
|
fclose(file);
|
||||||
|
// just so we dont lock
|
||||||
|
MPI_Send(&time, 1, MPI_UNSIGNED_LONG, 0, 0, MPI_COMM_WORLD);
|
||||||
|
} else { // search for key
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &start_time);
|
||||||
|
// int res = get_max_time(lines, num_lines, num_attr,
|
||||||
|
// max_attr_size);
|
||||||
|
res = search_key(lines, num_lines, num_attr, max_attr_size,
|
||||||
|
key);
|
||||||
|
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &stop_time);
|
||||||
|
|
||||||
|
time = (stop_time.tv_sec - start_time.tv_sec) * 100000000 +
|
||||||
|
(stop_time.tv_nsec - start_time.tv_nsec);
|
||||||
|
if (res)
|
||||||
|
MPI_Send(&time, 1, MPI_UNSIGNED_LONG, 0, 0,
|
||||||
|
MPI_COMM_WORLD);
|
||||||
|
else
|
||||||
|
MPI_Send(&res, 1, MPI_UNSIGNED_LONG, 0, 0,
|
||||||
|
MPI_COMM_WORLD);
|
||||||
|
}
|
||||||
free(lines);
|
free(lines);
|
||||||
free(chunk);
|
free(chunk);
|
||||||
return max_time;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
// help from https://stackoverflow.com/questions/12939279/mpi-reading-from-a-text-file
|
// help from https://stackoverflow.com/questions/12939279/mpi-reading-from-a-text-file
|
||||||
// is hosted under a permissive licence, ty Jonathan Dursi :)
|
// is hosted under a permissive licence, ty Jonathan Dursi :)
|
||||||
int
|
int
|
||||||
parprocess(MPI_File *in, const int rank, const int size)
|
parprocess(MPI_File *in, const int rank, const int size, char *key)
|
||||||
{
|
{
|
||||||
// reads revelant lines from file to chunk.
|
// reads revelant lines from file to chunk.
|
||||||
// IN OUR CASE we will use overlap to reach EOF of this line.
|
// IN OUR CASE we will use overlap to reach EOF of this line.
|
||||||
|
@ -236,7 +293,6 @@ parprocess(MPI_File *in, const int rank, const int size)
|
||||||
|
|
||||||
/* allocate memory, filled with 0 */
|
/* allocate memory, filled with 0 */
|
||||||
chunk = calloc(1, total_size);
|
chunk = calloc(1, total_size);
|
||||||
printf("[PARPROCESS] begin\n");
|
|
||||||
|
|
||||||
ssize_t err;
|
ssize_t err;
|
||||||
{
|
{
|
||||||
|
@ -253,7 +309,8 @@ parprocess(MPI_File *in, const int rank, const int size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eh commenting this out, at worst we'll have one unusable line, but this
|
||||||
|
// still works w/ padding
|
||||||
// fills the first incoherent bytes with \0
|
// fills the first incoherent bytes with \0
|
||||||
//size_t k = 0;
|
//size_t k = 0;
|
||||||
//if (rank != 0) { // first has no incoherece at begining
|
//if (rank != 0) { // first has no incoherece at begining
|
||||||
|
@ -277,8 +334,7 @@ parprocess(MPI_File *in, const int rank, const int size)
|
||||||
//}
|
//}
|
||||||
//chunk[total_size_overlap] = '\0'; // just to be sure!
|
//chunk[total_size_overlap] = '\0'; // just to be sure!
|
||||||
|
|
||||||
int max = do_map(chunk, total_size);
|
int max = do_map(chunk, total_size, key, rank);
|
||||||
printf("[PARPROCESS] end, max: %d\n", max);
|
|
||||||
|
|
||||||
return max;
|
return max;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue