EECE-4029 Operating Systems Fall 2016
Lab 5

processes, mutex, semaphores, memory management, producer-consumer, files, deadlock, more..

Stream Sharing: One Producer, Many Consumers

Due: Oct 7 (submit instructions: here)

Rationale:
    You have seen how to join more than one stream through the example of merge in 07-prod-cons.c. In this lab you will generalize the stream definition in that file so that it supports the action of connecting any number of consumers to a single producer. Hopefully we will make use of this implementation when we get to device drivers.
 
Lab:
    Sample application in java. Graphics are used for ease of illustration and are not a requirement of the lab

Generalize the Stream structure and operation in 07-prod-cons.c to support multiple consumers. To relieve you of some worry, it is ok to change the type of object the queue holds to int (it was void* in 07-prod-cons.c). The code for such a queue is in queue_a.h and queue_a.c. If you do this, don't forget to change put, get, and other functions accordingly.

Specifically, the following is required for 8/10 points:

  1. Each token that the producer generates is produced exactly one time.
  2. The producer will not put a token into the stream until a token it has not yet put into the stream is demanded by one of the consumers.
  3. All consumers see all the tokens generated by the producer (unless they are not active - see the next list).
  4. Since the buffer is of limited size, a token must be removed from the buffer when it is certain that all active consumers have gotten it. OK, maybe leave the last one in the buffer if it makes like easy for you.
For the full 10 points:
  1. A consumer may connect to and disconnect from the stream dynamically. That is, when a consumer connects to a stream the consumer will see only those tokens generated since it connected and when a consumer disconnects, the stream monitor will forget about that consumer and not be responsible for saving tokens for it until it decides to reconnect.
  2. A consumer may cancel a request for a token. The consumer will not lose information in the stream, as canceled tokens can be requested again later.
The above applet illustrates the operation of the generalized stream. Open the applet by clicking on the "Applet" button. Click "Connect A". Then click on "Consume A" three times. The buffer contains only the number 3. The field marked "A:" contains "1 2 3" showing what tokens have been consumed by "A". Now click "Connect B". Click on "Consume B" three times. The buffer now has "3 4 5 6" because the "A" consumer has not yet seem 4,5,6. The "B:" field shows "4 5 6" which are the tokens consumed by "B". Click on "Consume A" six times. The "A:" field shows "1 2 3 4 5 6 7 8 9" are the tokens consumed by "A" while the "B" field has not changed. But the buffer now has "6 7 8 9" because the "B" thread has not yet consumed 7,8,9. Click on "Disconnect B" and the buffer shrinks to "9". Play around with this more as needed.
 
Assistance:
Question: What new structures do you think I will need and which do you think I will need to modify?

  1. A buffer: this is already supplied
  2. A connections array: used to check the validity of a 'get' request and to maintain an index into the buffer location where the next unread token for the corresponding consumer is located. Elements stored in the array should be ConnectEntry objects (see below) which contain the authorized consumer thread (or id). Authorization occurs during the first invocation of get by sending the consumer thread (or id) in as an argument. Then, in 'get', the connections array can be checked for a ConnectExntry object whose thread matches that of the thread argument.
  3. ConnectEntry struct: an object of this type contains a token index and consumer thread. The consumer thread is used to weakly authenticate the consumer (as above) and the index is used to facilitate cancelling and disconnecting. To disconnect, a consumer passes a pointer to itself through the argument list of the disconnect method. The connections array is checked to see if this consumer is legitimately connected to the stream. If it is, it is removed from the array. If not, an error occurs.
  4. Stream struct: a complete Stream struct would look as follows:
      typedef struct stream_struct {
         struct stream_struct *next;
         ConnectEntry *ce;       /* the database (linked list) of the connection
                                    details of all connected consumers */
         pthread_t *thread;      /* needed to cancel the thread and in 'connect' */
         int last_pos;           /* token number of last buffer element */
         int first_pos;          /* token number of first buffer element */
         pthread_mutex_t mutex;  /* A lock on the critical sections of code */
         pthread_cond_t  full;   /* A producer will wait if its buffer is full */
         pthread_cond_t  empty;  /* A consumer will wait if the buffer it is getting
                                    tokens from is empty or doesn't yet have its 
                                    requested token */
         queue buffer;   /* a buffer of values of any type      */
         void *args;     /* arguments for the producing stream  */
         int id;         /* identity of this stream             */
      } Stream;
    
  5. Args struct: unchanged

Question: How should get work?

In order, after entry:

  1. If the buffer is empty and the asker is trying to get the token, cancel here instead of getting stuck waiting because the asker is the human operated consumer.
  2. Enter a mutex lock
  3. Check whether any connections have been made (the connections array exists).
  4. Look for the calling thread in one of the ConnectEntry objects of the connections array. If not found, give up, otherwise retain the ConnectEntry object found so that first, which corresponds to the index of the first token in the stream that has not yet been seen by the calling thread, can be used below.
  5. Wait here (a mutex cond variable) as long as the buffer is empty or the request is for a token that has not yet been added to the buffer (this is where last_pos comes in). Do not blindly cut-and-paste. Do not assume that "empty" means use the "empty" cond variable. Think about it. Also, consider what happens if the buffer is still empty when the thread is resumed at this point. This is probably the only tricky line in the whole program and illustrates an important principle in locking - which we will consider later in the course.
  6. Take a peek into the buffer and grab the requested token but do not dequeue (this is where first_pos comes in).
  7. If the token about to be returned was at the beginning of the buffer, check whether any other connected consumer still needs to see it and remove the token from the buffer if there is none (that is, dequeue), and signal one of the cond variables that the buffer has one more empty space now. Use first_pos again. Be careful to avoid testing against the calling thread.
  8. Increment first in the ConnectEntry object.
  9. Unlock the mutex.
  10. Return the token.

Question: How should put work?

In order, after entry:

  1. Get a mutex lock.
  2. If the buffer is full, wait until a space opens up due to line 7 of get.
  3. Enqueue the value to be put into the buffer.
  4. Increment last_pos (because a new token has been added to the stream).
  5. Signal one of the cond variables that there is one added token in the buffer so that any consumers waiting on such a token may resume.
  6. Unlock the mutex.

Question: How should connect work?

Function connect just connects a consumer to a producer. Steps to do this follow:

  1. If s is the producer stream, do this:
       ConnectEntry *ptr = s->ce;
       s->ce = (ConnectEntry*)malloc(sizeof(ConnectEntry));
       s->ce->next = ptr;
    
  2. Initialize first and thread (these are the consumer's data).
  3. Add s to the argument list of the consumer (the next field).

Question: How should disconnect work?

Remove the consumer's ConnectEntry object from the connections array of the producer it is connected to.

Question: When you say "put a token into the stream", do you mean calling the put method? So the producer should be waiting for some condition variable to indicate that a consumer needs another token that is not yet at the end of the buffer?

  1. A call to "put" puts a token into the stream although the token may be held in the monitor before a consumer asks for it.
  2. The producer can put a token into the stream without waiting - if the monitor's buffer is not full and the producer has a token for the stream it should go right ahead and put it in. The producer will have to wait if the buffer is full, though.

Question: Right now, it looks like example 7 (07-prod-cons.c) has the producer create as many tokens as possible. Is this wrong?

The producer creates tokens without being concerned about the monitor at all. If it takes a long time for the producer to produce a token so be it - and consumers may have to wait. But, when a producer is ready to put a token into the stream, it wants that token to be put into the stream without delay (although it will have to wait if the buffer is full).

Question: For the two items on the second list of requirements (the requirements required for all 10 points), how do you recommend that we tests these features? Is it recommended that we make some kind of GUI for our program, like your Java example?

Nope - you can write functions with names that correspond to indicated features - for example, a 'cancel' function, and a 'connect' function, and a 'disconnect' function and so on and then you will simply write a straight line program that exercises these in some way, printing the results of each to the screen for validation. Here is what I did (using int instead of void* objects) Warning - if you blindly cut-and-paste you will lose time - use the brain to figure things out - use the code below to check whether you can improve on you ideas:

...
#include "queue_a.h"

#define BUFFER_SIZE 3
int idcnt = 1;
pthread_t s1, t1, t2, a1;

typedef struct connectentry {
   struct connectentry *next;
   pthread_t *thread;
   int first;
} ConnectEntry;

/* One of these per stream.
   Holds:  the mutex lock, and full and empty semaphores;
           a buffer of tokens taken from a producer
           a structure with information regarding the processing of tokens 
           an identity
   Note: tokens of a stream are implicitly numbered - the first token 
         has number 1, the second number 2 and so on
*/
typedef struct stream_struct {
   struct stream_struct *next;
   ConnectEntry *ce;       /* the database (linked list) of the connection
                              details of all connected consumers */
   pthread_t *thread;      /* needed to cancel the thread and in 'connect' */
   int last_pos;           /* token number of last buffer element */
   int first_pos;          /* token number of first buffer element */
   pthread_mutex_t mutex;  /* A lock on the critical sections of code */
   pthread_cond_t  full;   /* A producer will wait if its buffer is full */
   pthread_cond_t  empty;  /* A consumer will wait if the buffer it is getting
                              tokens from is empty or doesn't yet have its 
                              requested token */
   queue buffer;   /* a buffer of values of any type      */
   void *args;     /* arguments for the producing stream  */
   int id;         /* identity of this stream             */
} Stream;

/* prod: linked list of streams that this producer consumes tokens from
   self: the producer's output stream */
typedef struct {
   Stream *self, *prod;
} Args;

/* Return 'ret' which should have a value from a call to put 
   If p is the asker and the queue is empty, just return to the asker 
   The asker is the function that translates user console input to
   requests for tokens from the consumers connected to the producer */
int get (void *stream, pthread_t *p) {
   int ret;
   ...
   if (p != NULL && isEmpty(q)) return -2;

   pthread_mutex_lock(mutex);
   ...
   ret = peek(q, ce->first - *first_pos);
   ...
   pthread_mutex_unlock(mutex);
   return ret;
}

/* 'value' is the value to move to the consumer */
void put(void *stream,  int value) {
   ...
   pthread_mutex_lock(mutex);
   if (nelem(q) >= BUFFER_SIZE) pthread_cond_wait(full, mutex);
   enqueue(q, value);
   ...
   pthread_cond_broadcast(empty);
   pthread_mutex_unlock(mutex);
   return;
}

/* Put 1,2,3,4,5... into the self stream */
void *producer (void *streams) {
   Stream *self = ((Args*)streams)->self;
   int i;
   
   for (i=1 ; ; i++) {
      put(self, i);
      ... some print statement here ...
   }
   pthread_exit(NULL);
}

/* A consumer that gets connected to the producer's output stream 
   The consumer multiplies the input token by self->args which is
   set during initialization (see below) */
void *consumer (void *streams) {
   ...
   int in;
   
   ... some print statement here ...
   while (true) {
      in = get(prod, NULL);
      if (in != -1) in = (in)*(int)(self->args); else in = -1;
      put(self, in);
      print(&self->buffer, "cons", self->id);
   }
   pthread_exit(NULL);
}

/* This is the function that translates user console input to requests
   for tokens from the consumers.  It assumes there are two consumers.
   If the user inputs 1, the next token is fetched from consumer 1, if 
   the user inputs 2, the next token is fetched from consumer 2, a 0 
   input quits, any other input does nothing.  */
void *asker (void *streams) {
   Stream *s1 = ((Args*)streams)->prod;
   Stream *s2 = (((Args*)streams)->prod)->next;
   char line[10000];
   int val;

   while (true) {
      sleep(1);
      printf("[1 or 2 or 0 to quit] >>> "); fflush(stdout);
      fgets(line, 9000, stdin);
      if (line[0] == '1') {
	 val = get(s1, &a1);
	 if (val >= 0) 
	    printf("--------> Asker: got %d from Consumer(%d)\n", val, s1->id);
	 else if (val == -1) 
	    printf("          Buffer full and request not present\n");
	 else 
	    printf("          Buffer empty\n");
      } else if (line[0] == '2') {
	 val = get(s2, &a1);
	 if (val >= 0)
	    printf("--------> Asker: got %d from Consumer(%d)\n", val, s2->id);
	 else if (val == -1)
	    printf("          Buffer full and request not present\n");
	 else 
	    printf("          Buffer empty\n");
      } else if (line[0] == '0') {
	 printf("--------> Asker: have a good day!\n");
	 break;
      } else
	 printf("--------> Asker: not valid\n");
   }
   pthread_exit(NULL);
}

/* initialize streams - see also queue_a.h and queue_a.c */
void init_stream (Args *args, pthread_t *thread, Stream *self, void *data) {
   if (self != NULL) {
      self->thread = thread;
      self->ce = NULL;
      self->next = NULL;
      self->args = data;
      self->id = idcnt++;
      self->last_pos = -1;
      self->first_pos = 0;
      init_queue(&self->buffer);
      pthread_mutex_init(&self->mutex, NULL);
      pthread_cond_init(&self->full, NULL);
      pthread_cond_init(&self->empty, NULL);
   }
   args->self = self;
   args->prod = NULL;
}

/* free allocated space in the queue - see queue_a.h and queue_a.c */
void kill_stream(Stream *stream) { 
   destroy_queue(&stream->buffer);    /* provided by queue_a.c */
   pthread_cancel(*stream->thread);
}

/* puts an initialized stream object onto the end of a stream's input list */
void connect (Args *arg, Stream *s) {  
   s->next = arg->prod;
   arg->prod = s;
   ConnectEntry *ptr = s->ce;
   s->ce = (ConnectEntry*)malloc(sizeof(ConnectEntry));
   s->ce->next = ptr;				
   s->ce->first = 0;
   s->ce->thread = arg->self->thread;
}

int main () {
   Stream suc1, con1, con2, ask1;
   Args suc1_args, con1_args, con2_args, ask1_args;
   pthread_attr_t attr;

   init_stream(&suc1_args, &s1, &suc1, NULL);   /* initialize producer stream */

   init_stream(&con1_args, &t1, &con1, (void*)7);/* initialize times 7 stream */
   connect(&con1_args, &suc1);                   /* connect to producer */

   init_stream(&con2_args, &t2, &con2, (void*)5);/* initialize times 5 stream */
   connect(&con2_args, &suc1);                   /* connect to producer */

   init_stream(&ask1_args, &a1, &ask1, NULL);    /* initialize asker stream */
   connect(&ask1_args, &con1);                   /* connect to times 7 stream */
   connect(&ask1_args, &con2);                   /* connect to times 5 stream */

   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
   pthread_create(&s1, &attr, producer, (void*)&suc1_args);
   pthread_create(&t1, &attr, consumer, (void*)&con1_args);
   pthread_create(&t2, &attr, consumer, (void*)&con2_args);
   pthread_create(&a1, &attr, asker,    (void*)&ask1_args);

   pthread_join(a1, NULL);
   kill_stream(&suc1);
   kill_stream(&con1);
   kill_stream(&con2);
   kill_stream(&ask1);
}