
/**
	primes_pthread.c 

	usage:  primes_pthread -nt <thread_count> -it <iterations> -s <server_for_slave_task> 

	The main program (master) creates a pvm task (slave). 
	Both master and slave spawn threads (-nt <thread_count>). 
	for given number of iterations (-it <iterations>)
		Master thread sends boundary values (begin and end) for finding total prime numbers to a slave thread. 
		Slave thread counts prime numbers and and sends it back to its master thread.
		Master thread receives the result and prints on the screen.
			- The results may be out of order due to asynchronous thread execution.
	end for
	program exits after all its threads complete their tasks. 

**/	

#include <stdio.h>
#include <strings.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#include <pvm3.h>

#define DEFAULT_THREAD_COUNT      5
#define DEFAULT_ITERATIONS_COUNT  5
enum ExecutionType { MASTER = 0, SLAVE = 1 } execution_type;

int pvm_slave_tid;
int parent_task_id;
int iterations;

/* prime number functions */

void stop_slave_task (int signal_id) {
	pvm_kill (pvm_slave_tid);
	exit (0);
}

int isprime (int number) {
	int i =0;
   for (i = 2; i <= number/2; i++) {
   	if (number % i == 0) {
      	return 0;
      }
	}
	return 1;
}

int prime_count (int begin, int end) {
	int count = 0;
	int i;
      

	if (begin < 0 || end < 0 || end <= begin) {
		return 0;
	}

	for (i = begin; i < end; i++) {
		if (isprime (i)) {
			count ++;
		}
	}
	sleep (2);  /* <--- a simple solution for any complex problem  */ 
	return count;
}

#define check_status(status, routine) {  	\
	if (status < 0) {								\
		fprintf (stderr, "%s at line %d failed, error code: %d\n", \
			routine, __LINE__, status);		\
		break;										\
	}													\
}

void *pthread_function (void *arg) {

   int tag = (int)arg;
   int i, status;
   int begin, end, total_primes;

   if (execution_type == MASTER) {
		/* fprintf (stderr, "running as a master, slave_tid %d\n", pvm_slave_tid); */

		for (i = 0; i < iterations; i++) {

			begin = i * 1000;
			end   = (i + 1) * 1000;

      	status = pvm_initsend (PvmDataDefault);
			check_status (status, "pvm_initsend");

      	status = pvm_pkint (&begin, 1, 1);
			check_status (status, "pvm_pkint");

      	status = pvm_pkint (&end, 1, 1);
			check_status (status, "pvm_pkint");
      
      	status = pvm_send (pvm_slave_tid, tag);
			check_status (status, "pvm_send");

      	status = pvm_recv (pvm_slave_tid, tag);
			check_status (status, "pvm_recv");

      	status = pvm_upkint (&total_primes, 1, 1);         
			check_status (status, "pvm_upkint");

      	fprintf (stderr, "%d: prime_count (%d, %d) = %d\n", tag, begin, end, total_primes);
		}
   } else {
		/* fprintf (stderr, "running as a slave, parent pid %d\n", parent_task_id); */
		
		for (i = 0; i < iterations; i++) {
      	status = pvm_recv (parent_task_id, tag);
			check_status (status, "pvm_recv");

      	status = pvm_upkint (&begin, 1, 1);
			check_status (status, "pvm_upkint");

      	status = pvm_upkint (&end, 1, 1);
			check_status (status, "pvm_upkint");

      	total_primes = prime_count (begin, end);

      	status = pvm_initsend (PvmDataDefault);
			check_status (status, "pvm_initsend");

      	status = pvm_pkint (&total_primes, 1, 1);
			check_status (status, "pvm_pkint");

      	status = pvm_send (parent_task_id, tag);
			check_status (status, "pvm_send");
		}
   }

	pvm_pthread_exit ();
	pthread_exit (NULL);

   return NULL;
}

#define usage() { \
	fprintf (stderr, "usage: %s [-s <server_for_slave_task>] [-tc <thread_count>] [-it <iterations>]\n", argv [0]); \
	fprintf (stderr, "default: %s -s localhost -tc 10 -it 10\n");	\
}

main (int argc, char *argv[]) {
   int i = 0;
   pthread_t thread_id;
   pthread_t *children;
   int     status;
   int thread_count = DEFAULT_THREAD_COUNT;
   execution_type = MASTER;
   int slave_tid = 0;
	char host [64];
	strcpy (host, "");
	
   for (i = 1; i < argc; i++) {

      if (!strcmp (argv [i],  "-slave")) {
         execution_type = SLAVE;
			continue;  /* it will not conflict with -s for server option */
      } 

      if (!strncmp (argv [i], "-tc", 3) || !strcmp (argv [i], "-thread_count")) {
         if ((i+1) < argc) {
            thread_count = atoi (argv [i+1]);
         } else {
				usage ();
			}
      }

      if (!strncmp (argv [i], "-it", 3) || !strcmp (argv [i], "-iterations")) {
         if ((i+1) < argc) {
            iterations = atoi (argv [i+1]);
         } else {
				usage ();
			}
      }

      if (!strncmp (argv [i], "-s", 2) || !strcmp (argv [i], "-server")) {
         if ((i+1) < argc) {
            strcpy (host, argv [i+1]);
         } else {
				usage ();
			}
      }

      if (!strncmp (argv [i], "-h", 2) || !strcmp (argv [i], "?")) {
			usage ();
         exit (0);
      }
   }

   thread_count = (thread_count == 0 ?  DEFAULT_THREAD_COUNT : thread_count);
   iterations = (iterations == 0 ?  DEFAULT_ITERATIONS_COUNT : iterations);
   children = (pthread_t *) malloc (thread_count * sizeof (pthread_t));

   /* master creates a slave task */
   if (execution_type == MASTER) {

   	fprintf (stderr, "%d threads and %d iterations at '%s' server\n", thread_count, iterations, 
				(strcmp (host, "") ? host : "default"));

      char thread_count_str [32];  sprintf (thread_count_str, "%d", thread_count);
      char iterations_str [32];  sprintf (iterations_str, "%d", iterations);
      char *argv_to_slave_task [6] = { 
							"-slave", 
							"-tc", thread_count_str, 
							"-it", iterations_str, NULL };

		if (!strcmp (host, ""))
      	status = pvm_spawn (argv [0], argv_to_slave_task, PvmTaskDefault, "", 1, &pvm_slave_tid);
		else 
      	status = pvm_spawn (argv [0], argv_to_slave_task, PvmTaskHost, host, 1, &pvm_slave_tid);

		if (pvm_slave_tid < 0) {
			extern char *pvm_errlist []; /* pvm private variable, not for public */
			fprintf (stderr, "pvm_spwan() call failed, error code: %d & description : %s\n", pvm_slave_tid, pvm_errlist [-pvm_slave_tid]);
			pvm_exit ();
			exit (1);
		}
	
		signal (SIGINT,  stop_slave_task);
      signal (SIGQUIT, stop_slave_task);
      signal (SIGKILL, stop_slave_task);
      signal (SIGTERM, stop_slave_task);
   } else {
		parent_task_id = pvm_parent ();
		if (parent_task_id < 0) {
			fprintf (stderr, "pvm_parent() call failed, error code: %d\n", parent_task_id);
			pvm_exit ();
			exit (1);
		}
	}

   /* create threads */
   for (i = 0; i < thread_count; i++) {
		int tag = i + 1;

      status = pthread_create (&thread_id, NULL, &pthread_function, (void *)tag);
      if (status == 0) {
         children [i] = (thread_id ? thread_id : 0);
      } else {
         fprintf (stderr, "pthread_create() call failed, error code: %d\n", status);
      }
   }

   for (i = 0; i < thread_count; i++) {
      if (children [i] != 0) {
         if (pthread_join (children [i], NULL)) {
            perror ("pthread_join: ");   
         } 
      }
   }
   fprintf (stderr, "completed\n");
	pvm_exit ();
	exit (0);
}
