Producer Consumer problem - CSU360 - Shoolini U

Practical 12: Write a C program to implement Producer Consumer problem using semaphores

12. Introduction to the Producer-Consumer Problem using semaphores

The Producer-Consumer problem is a classic synchronization issue within multi-processing where two processes, the producer and the consumer, share a common, fixed-size buffer. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer consumes the data from the buffer. The challenge is to make sure that the producer won't try to add data into the buffer if it's full and the consumer won't try to remove data from an empty buffer.

12.1. Understanding Semaphores

Semaphores are synchronization tools used to control access to a common resource by multiple processes in a concurrent system such as a multitasking operating system. A semaphore is simply a variable that is non-negative and shared between threads. This variable is used to solve the critical section problem and to achieve process synchronization in the inter-process communication process.

12.1.1. Algorithm for Producer-Consumer Problem using semaphores
Producer Consumer Problem
Figure 12.1.1.1: Output of Producer Consumer Problem
Producer Consumer Problem
Figure 12.1.1.1: Output of Producer Consumer Problem

12.2 Implementing the Producer-Consumer Problem in C using Semaphores

This implementation of the Producer-Consumer problem uses POSIX semaphores. Semaphores are typically used for protecting resources that have multiple instances.

Key components of the solution include:

12.2.1 Producer Code

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX_ITEMS 10
#define BUFFER_SIZE 5

int buffer[BUFFER_SIZE];
int in = 0;
int out = 0;

sem_t empty;
sem_t full;
pthread_mutex_t mutex;

void *producer(void *pno)
{
    int item;
    for(int i = 0; i < MAX_ITEMS; i++) {
        item = rand(); // Produce an item
        sem_wait(&empty);
        pthread_mutex_lock(&mutex);
        buffer[in] = item;
        printf("Producer %d: Insert Item %d at %d\n", *((int *)pno),buffer[in],in);
        in = (in+1)%BUFFER_SIZE;
        pthread_mutex_unlock(&mutex);
        sem_post(&full);
    }
}
12.2.2 Consumer Code

void *consumer(void *cno)
{   
    for(int i = 0; i < MAX_ITEMS; i++) {
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        int item = buffer[out];
        printf("Consumer %d: Remove Item %d from %d\n", *((int *)cno), item, out);
        out = (out+1)%BUFFER_SIZE;
        pthread_mutex_unlock(&mutex);
        sem_post(&empty);
    }
}
12.2.3 Main Function

int main()
{
    pthread_t pro,con;
    pthread_mutex_init(&mutex, NULL);
    sem_init(&empty, 0, BUFFER_SIZE);
    sem_init(&full, 0, 0);

    int a = 1;
    pthread_create(&pro, NULL, (void *)producer, (void *)&a);
    int b = 2;
    pthread_create(&con, NULL, (void *)consumer, (void *)&b);

    pthread_join(pro, NULL);
    pthread_join(con, NULL);

    pthread_mutex_destroy(&mutex);
    sem_destroy(&empty);
    sem_destroy(&full);

    return 0;
}

This C program will successfully implement the Producer-Consumer problem using semaphores on a Kali Linux system, showcasing how semaphores can be used to manage synchronization between threads safely.

12.3 Alternate Implementation of Producer Consumer problem in C

Below is an alternate implementation for the Producer Consumer problem in C.

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

/*
use the pthread flag with gcc to compile this code
~$ gcc -pthread producer_consumer.c -o producer_consumer
*/


pthread_t *producers;
pthread_t *consumers;

sem_t buf_mutex,empty_count,fill_count;

int *buf,buf_pos=-1,prod_count,con_count,buf_len;


int produce(pthread_t self){
	int i = 0;
	int p = 1 + rand()%40;
	while(!pthread_equal(*(producers+i),self) && i < prod_count){
		i++;
	}
	printf("Producer %d produced %d \n",i+1,p);
	return p;
}


void consume(int p,pthread_t self){
	int i = 0;
	while(!pthread_equal(*(consumers+i),self) && i < con_count){
		i++;
	}

	printf("Buffer:");
	for(i=0;i<=buf_pos;++i)
		printf("%d ",*(buf+i));
	printf("\nConsumer %d consumed %d \nCurrent buffer len: %d\n",i+1,p,buf_pos);
	
}


void* producer(void *args){

	while(1){
		int p = produce(pthread_self());
		sem_wait(&empty_count);
		sem_wait(&buf_mutex);
		++buf_pos;			// critical section
		*(buf + buf_pos) = p; 
		sem_post(&buf_mutex);
		sem_post(&fill_count);
		sleep(1 + rand()%3);
	}
	
	return NULL;
}


void* consumer(void *args){
	int c;
	while(1){
		sem_wait(&fill_count);
		sem_wait(&buf_mutex);
		c = *(buf+buf_pos);
		consume(c,pthread_self());
		--buf_pos;
		sem_post(&buf_mutex);
		sem_post(&empty_count);
		sleep(1+rand()%5);
	}

	return NULL;
}

int main(void){
	
	int i,err;

	srand(time(NULL));

	sem_init(&buf_mutex,0,1);
	sem_init(&fill_count,0,0);

	printf("Enter the number of Producers:");
	scanf("%d",&prod_count);
	producers = (pthread_t*) malloc(prod_count*sizeof(pthread_t));

	printf("Enter the number of Consumers:");
	scanf("%d",&con_count);
	consumers = (pthread_t*) malloc(con_count*sizeof(pthread_t));

	printf("Enter buffer capacity:");
	scanf("%d",&buf_len);
	buf = (int*) malloc(buf_len*sizeof(int));

	sem_init(&empty_count,0,buf_len);

	for(i=0;i<prod_count;i++){
		err = pthread_create(producers+i,NULL,&producer,NULL);
		if(err != 0){
			printf("Error creating producer %d: %s\n",i+1,strerror(err));
		}else{
			printf("Successfully created producer %d\n",i+1);
		}
	}

	for(i=0;i<con_count;i++){
		err = pthread_create(consumers+i,NULL,&consumer,NULL);
		if(err != 0){
			printf("Error creating consumer %d: %s\n",i+1,strerror(err));
		}else{
			printf("Successfully created consumer %d\n",i+1);
		}
	}

	for(i=0;i<prod_count;i++){
		pthread_join(*(producers+i),NULL);
	}
	for(i=0;i<con_count;i++){
		pthread_join(*(consumers+i),NULL);
	}


	return 0;
}

Image Reference