El problema del productor/consumidor es uno de los problemas más habituales que se presentan en el ámbito de la programación concurrente. Este problema es una abstracción de algunas de las situaciones más típicas que se pueden encontrar en muchas aplicaciones de programación concurrente. Representa un buen ejemplo de los problemas de comunicación entre procesos concurrentes.
El problema del productor/consumidor consiste en suponer que tenemos un productor que se encarga de generar una serie de datos y depositarlos en un determinado lugar, almacén de datos o buzón de datos. El consumidor es el encargado de recoger los datos del buzón y procesarlos. Se debe tener en cuenta que el productor no puede depositar más datos hasta que el consumidor no haya recogido los datos anteriores del buzón. Además el consumidor no debe intentar consumir datos si no existen en el buzón.
Debe existir, pues, una cierta coordinación o sincronización entre las acciones del productor y las del consumidor. Como normalmente el ritmo del productor en la generación de los datos suele ser diferente del ritmo de procesamiento de datos del consumidor, es necesario la utilización de un lugar de almacenamiento, como es el buzón o buffer. El buffer es una zona de memoria común al productor y al consumidor. Si el buffer es suficientemente grande, entonces el productor podrá generar datos sin esperar a que el consumidor haya procesado los anteriores. Sólo eventualmente, cuando el buffer esté llenó deberá detener su producción, y esperar a que el consumidor procese alguno de los datos anteriores, de tal forma que libere parte del espacio del buffer. También puede ocurrir lo contrario, es decir, que el consumidor procese los datos de forma más rápida de lo que lo hace el productor, debiendo esperar cuando el buffer se encuentre vacío.
Un buffer es empleado en un gran número de
tareas como las lecturas y escrituras en discos y cintas, que
sólo pueden manejar bloques de datos, así como terminales
de E/S, mensajes de red, etc.
El problema del productor/consumidor se puede plantear de distintas formas, según las distintas suposiciones que se planteen en el problema :
1) Buffer infinito : Supongamos que no existe límite en el tamaño del buffer, es decir, tenemos un buffer infinito. El productor puede introducir datos en el buffer continuamente sin preocuparse del tamaño del buffer. Por su parte el consumidor sólo debe tener en cuenta la existencia de datos en el buffer, es decir, comprobar que el buffer no está vacío. Tendremos dos variables globales in y out que representan los índices del buffer para la entrada y la salida de datos, respectivamente. in cuenta el número de datos producidos y out el número de datos consumidos. Inicialmente in y out valen 0.
Una primera aproximación al problema se muestra en el siguiente seudocódigo, en el que el productor y el consumidor producen y consumen datos indefinidamente.
Productor :
repetir producir(dato); buffer[in] = dato; in = in + 1; fin_repetir;
Consumidor :
repetir esperar hasta que (in > out); dato = buffer[out]; out = out + 1; consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
Sea s = in - out, entonces s es el número de datos en el buffer en cualquier momento. Inicialmente s es 0, y puede incrementar o reducir su valor durante la ejecución del productor y el consumidor, pero nunca alcanzará valores por debajo de 0, puesto que el consumidor no consume datos del buffer cuando éste se encuentre vacío, sino que espera a que el productor introduzca un nuevo dato en el buffer. De esta forma se observa que s equivale a un semáforo. De hecho, la sentencia esperar hasta que (in > out) en el consumidor puede implementarse mediante la primitiva de semáforos P o wait(s) introducida por Dijkstra, y añadiendo la primitiva V o signal(s) en el productor, para que despierte al consumidor. Suponiendo que tenemos el semáforo elementos, que equivale a s, y que representa el número de elementos en el buffer, el seudocódigo correspondiente sería el siguiente :
Productor :
repetir producir(dato); buffer[in] = dato; in = in + 1; signal(elementos); fin_repetir;
Consumidor :
repetir wait(elementos); dato = buffer[out]; out = out + 1; consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; semaforo elementos = 0; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
En esta solución se pueden observar los siguientes invariantes que siempre se verifican, independientemente de la situación en la que nos encontremos :
Supongamos ahora, que la parte de código que añade datos al buffer y que obtiene datos del buffer en el productor y el consumidor, respectivamente, son secciones críticas y no se pueden realizar simultáneamente. Esto puede suceder ya que no se puede leer y escribir simultáneamente en una posición de memoria, y además se puede dar la situación de múltiples productores o consumidores. Un caso de múltiples productores es un sistema con terminales de transacciones que colocan sus peticiones en una misma cola. Si se añaden nuevos procesadores de transacciones al sistema, estaríamos en un caso de múltiples consumidores.
A continuación se presenta una solución que consigue la exclusión mutua mediante la utilización de un semáforo binario s.
Productor :
repetir producir(dato); wait(s); /* Solicitar acceso a Región Crítica */ buffer[in] = dato; /* Región */ in = in + 1; /* crítica */ signal(s); /* Liberar acceso a Región Crítica */ signal(elementos); /* Indicar existencia de elementos */ fin_repetir;
Consumidor :
repetir wait(elementos); /* Esperar existencia de elementos */ wait(s); /* Solicitar acceso a Región Crítica */ dato = buffer[out]; /* Región */ out = out + 1; /* crítica */ signal(s); /* Liberar acceso a Región Crítica */ consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; semaforo elementos = 0; semaforo_binario s = 1; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
Una solución equivalente a la anterior pero empleando semáforos binarios en vez de semáforos normales es la siguiente :
Productor :
repetir producir(dato); wait(s); /* Solicitar acceso a Región Crítica */ buffer[in] = dato; /* Región */ in = in + 1; /* crítica */ n = n + 1; si (n==1) entonces signal(hay_elementos); /* Indicar existencia de elementos*/ signal(s); /* Liberar acceso a Región Crítica */ fin_repetir;
Consumidor :
entera m = 0; repetir si (m == 0) entonces wait(hay_elementos); /* Esperar existencia de elementos */ wait(s); /* Solicitar acceso a Región Crítica */ dato = buffer[out]; /* Región */ out = out + 1; /* crítica */ n = n - 1 m = n; signal(s); /* Liberar acceso a Región Crítica */ consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; entera n = 0; semaforo_binario hay_elementos = 0; semaforo_binario s = 1; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
Se puede observar que existen dos semáforos
binarios : hay_elementos, que
indica la existencia o no de elementos en el buffer, y s,
que indica la situación de acceso a la región crítica.
También existe la variable global n,
que indica el número de elementos en el buffer y cuyo acceso
es crítico. La variable m
es local al consumidor y sirve como almacén temporal del
valor de n, y permite comprobar si
al consumir el último dato, el número de elementos
se hizo 0, y entonces se debe esperar a que nuevamente haya elementos,
antes de consumir el siguiente. Inicialmente el consumidor espera
a que existan elementos, ya que m
se inicializa a 0.
2) Buffer limitado : Como un buffer infinito es una situación imposible en la realidad, la solución al problema del productor/consumidor debe modificarse en cierta medida. Existen dos técnicas de abordar esta nueva situación :
Buffer Circular :
Si empleamos la técnica de buffer circular, la solución al límite del buffer consiste en calcular la posición en el buffer realizando una operación modular con el tamaño del buffer. De esta forma al llegar al final del buffer regresamos al inicio del mismo La variable tam es el tamaño del buffer. Una aproximación a la solución de este caso es la siguiente :
Productor :
repetir producir(dato); esperar hasta que ( ((in>=out) y (in-out<n)) o ((in <out) y (out-in>1)) ); buffer[in] = dato; /* Región */ in = in + 1; /* crítica */ si (in == tam_buffer) entonces in = 1; sino in = in + 1; fin_repetir;
Consumidor :
repetir esperar hasta que (in<>out); dato = buffer[out]; /* Región */ out = out + 1; /* crítica */ if (out == tam_buffer) entonces out = 0; else out = out + 1; consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
A continuación se presenta una implementación del problema para el caso de buffer limitado circular, en la que se emplean dos semáforos : espacios, que indica el número de posiciones de buffer libre, y elementos, que indica el número de elementos en el buffer. El tamaño del buffer viene indicado por tam_buffer. La actualización de los índices in y out relativos al buffer se realiza con la operación módulo mod tam_buffer.
Una posible implementación sería la siguiente :
Productor :
repetir producir(dato); wait(espacios); /* Esperar espacio libre */ wait(s); /* Solicitar acceso a Región Crítica */ buffer[in] = dato; /* Región */ in = (in + 1) mod tam_buffer; /* crítica */ signal(s); /* Liberar acceso a Región Crítica */ signal(elementos);/* Indicar elementos disponibles */ fin_repetir;
Consumidor :
repetir wait(elementos); /* Esperar elementos disponibles */ wait(s); /* Solicitar acceso a Región Crítica */ dato = buffer[out]; /* Región */ out = (out + 1) mod tam_buffer; /* crítica */ signal(s); /* Liberar acceso a Región Crítica */ signal(espacios); /* Indicar espacio disponible */ consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; semaforo_binario s = 1; semaforo elementos = 0; semaforo espacios = tam_buffer; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
Observando este listado se puede comprobar que se cumplen los siguientes invariantes :
De esta forma se garantiza que el consumidor no extrae elementos si el buffer está vacío, ni el productor añade elementos si el buffer está lleno.
Si el buffer está vacío entonces elementos = 0 ya que el número de elementos es 0, y así el consumidor estará bloqueado en wait(elementos).
Si el buffer está lleno entonces el número de elementos es tam_buffer y por esta razón espacios = 0, con lo que el productor se encontraría en wait(espacios).
Además no existe deadlock, ya que para ello
tanto productor como consumidor se deben encontrar suspendidos
en wait(), pero esto implicaría
que espacios = elementos = 0, y entonces
teniendo en cuenta los invariantes anteriores, tendríamos
que #E = 0 y a la vez #E
= tam_buffer - 0, lo cual es imposible, por
lo que se puede concluir en la no existencia de bloqueo mutuo
o deadlock.
Múltiples buffers :
Esta técnica es necesaria en algunas situaciones en las que la técnica del buffer circular no se puede emplear, como sucede en equipos de E/S que emplean DMA para trabajar, y que sólo admiten una dirección de memoria y una longitud donde establecer el buffer en el que escribir los datos. También es empleado en sistemas de comunicación donde los requisitos de buffer varían con el tiempo, y con esta técnica se pueden añadir nuevos pequeños buffers según la necesidad, en vez de tener reservado un gran buffer inicial.
Esta técnica consiste en emplear dos o más
buffers, normalmente del mismo tamaño, de tal forma que
cuando es rellenado por el productor, se le pasa al consumidor
que comienza a procesar, y devuelve el buffer vacío al
productor para que le vuelva a rellenar. Esta técnica sin
embargo puede desperdiciar espacio, ya que se puede dar la situación
de que el productor haya rellenado el segundo buffer pero el consumidor
aún no haya terminado con el primer buffer aunque la mayor
parte del mismo se encuentre vacía.
Las soluciones presentadas hasta ahora emplean a los semáforos como herramienta de sincronización. Pero existe una herramienta más adecuada, las regiones críticas condicionales.
La forma de emplear regiones condicionales pasa por definir un recurso con las variables de memoria críticas (buffer, in, out y una variable contador del número de elementos en el buffer). Además se asocia una condición con la región crítica, de tal forma que el acceso a la misma sólo se produce cuando se cumple la condición, y el acceso sólo es posible por un único proceso al mismo tiempo, lo que garantiza la exclusión mutua.
La implementación sería de la siguiente forma :
Productor :
repetir producir(dato); region B cuando (contador < tam_buffer) hacer buffer[in] = dato; /* Región */ in = (in + 1) mod tam_buffer; /* crítica */ contador = contador + 1; fin_region; fin_repetir;
Consumidor :
repetir region B cuando (contador > 0) hacer dato = buffer[out]; /* Región */ out = (out + 1) mod tam_buffer; /* crítica */ contador = contador - 1; fin_region; consumir(dato); fin_repetir;
Principal :
entera in = 0; entera out = 0; recurso B:: buffer, in, out, contador; cobegin /* Ejecución concurrente */ Productor; Consumidor; coend;
El problema del productor/consumidor que presentamos a continuación es una particularización. El programa que se muestra como ejemplo consiste en un programa que copia un archivo origen en un archivo destino, indicando ambos nombres en la línea de comandos. El objetivo de la demostración se basa en que existen dos agentes, el productor y el consumidor. El productor leerá los datos del fichero origen y los almacenará en un buffer compartido con el consumidor. El consumidor leerá los datos del buffer compartido y los escribirá en el fichero destino. El objetivo es mostrar como se deben sincronizar el proceso productor y el proceso consumidor, para evitar interferencias en las lecturas y escrituras en el buffer.
La implementación que se presenta se ha realizado para el sistema operativo Linux 1.2, y se han empleado procesos. Se crea un proceso hijo mediante la sentencia fork() de UNIX, que realizará la función del consumidor. Mientras que el proceso padre asumirá la función del productor. La sincronización del acceso al buffer se ha conseguido empleando variables condicionales (que delimitan regiones críticas condicionales) y mútex (equivalentes a semáforos). Tanto las variables condicionales como los mútex son estructuras que no existen en un sistema UNIX habitual, pero se han simulado mediante semáforos a través de una serie de pequeñas funciones que encapsulan su funcionamiento. La razón para hacer esto es presentar un aspecto similar, a fin de comparar el programa con la versión multihilo del mismo programa que se presenta posteriormente, y distinguir claramente las pequeñas diferencias.
La comunicación entre el proceso Productor y el proceso Consumidor se realiza a través del buffer de memoria compartida, y usando los mútex y variables de condición para sincronizar el acceso.
En este ejemplo vamos a emplear las variables de condición para controlar el acceso al buffer. El productor añade elementos al buffer que son leídos por el consumidor. Existen dos variables de condición que controlan el acceso al buffer. Una variable de condición cond_rem, es empleada por el productor a la espera de que el consumidor lea algún elemento del buffer y libere alguna posición del mismo, para que el productor puede escribir en el buffer. La variable de condición cond_add, es empleada por el consumidor a la espera de que el productor escriba algún elemento en el buffer y el consumidor puede leer del mismo.
Básicamente el Productor realiza las siguientes acciones de forma repetitiva :
- Bloquea el acceso al buffer.
- Mientras el buffer esté lleno
- Esperar a que el Consumidor me despierte y libero el acceso al buffer.
- Leer datos del fichero de Entrada y almacenarlos en al buffer.
- Si es el final del fichero
- Bloquear acceso a flag de Finalización, para indicárselo al consumidor.
- Activar flag de finalización
- Liberar acceso a flag de Finalización.
- Despertar al Consumidor, si estaba bloqueado.
- Liberar el acceso al buffer.
- Final del Productor.
- Actualiza el índice del buffer para escritura.
- Incrementar el contador de posiciones del buffer ocupadas.
- Despertar al Consumidor, si estaba bloqueado.
- Liberar el acceso al buffer.
- Volver al principio, hasta finalizar.
El Consumidor por su parte realiza las siguientes acciones repetitivas :
- Bloquea el acceso al buffer.
- Si el buffer está vacío y el flag de Finalización está activo
- Liberar el acceso al buffer.
- Final del Consumidor.
- Mientras el buffer está vacío y el flag de Finalización no está activo
- Esperar a que el Productor me despierte y libero el acceso al buffer.
- Si el buffer está vacío y el flag de Finalización está activo
- Liberar el acceso al buffer.
- Final del Consumidor.
- Escribir datos del buffer al fichero de Salida.
- Actualiza el índice del buffer para lectura.
- Decrementar el contador de posiciones del buffer ocupadas.
- Despertar al Productor, si estaba bloqueado.
- Liberar el acceso al buffer.
- Volver al principio, hasta finalizar.
El listado del programa fuente es el siguiente :
PROC.C /**************************************************************************** EJEMPLO DEL PRODUCTOR/CONSUMIDOR. Aplicacion: copy Version: 1.0p Descripcion: Programa demostrativo del caso del productor/consumidor que realiza el copiado de un fichero fuente en un fichero destino. La implementacion se ha realizado mediante dos procesos que se comunican a traves de un buffer usando memoria compartida y empleando mutex y variables de condicion para realizar la sincronizacion. Los mutex y las variables de condicion se han implementado mediante semaforos, encapsulando las llamadas mediante una serie de funciones. Autor: Jose Maria Toribio Vicente Mail: jmtoribi@poboxes.com *****************************************************************************/ #define VERSION "copy 1.0p - Productor/Consumidor (vers. procesos)\n" /* #define DEBUG */ /* Define para compilar en modo DEBUG */ #include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> /* Librerias locales */ #include "semaforo.h" #include "mutex.h" #include "cond.h" #include "error.h" #include "getstats.h" #include "tiempo.h" /* Definicion de Identificadores de Claves de Recursos del sistema */ #define MUTEX1_KEY ((key_t)188761L) #define MUTEX2_KEY ((key_t)188762L) #define COND1_KEY ((key_t)188763L) #define COND2_KEY ((key_t)188764L) #define SHMEM_KEY ((key_t)188765L) /* Definicion de constantes relativas al tamano del Buffer */ #define BUFSIZE 512 #define BUFCNT 10 /* Definicion de una funcion de impresion de salida inmediata */ void Printf(char *mensaje) { printf(mensaje); fflush(stdout); } /* Esta es la estructura de datos empleada entre el hilo consumidor y el hilo productor. Memoria Compartida */ typedef struct s_buf_t { mutex_t buf_lock; /* Mutex de acceso a region critica */ mutex_t fin_lock; /* Mutex de acceso a flag finalizacion */ cond_t cond_add; /* Var.cond anadir datos */ cond_t cond_rem; /* Var.cond eliminar datos */ int nocup; /* Contador de posiciones ocupadas */ int in, /* Indice al lugar de Entrada */ out; /* Indice al lugar de Salida */ int fin; /* Flag de Finalizacion */ char buffer[BUFCNT][BUFSIZE]; /* Buffer de datos */ int lbuffer[BUFCNT]; /* Long. de bytes en cada Buffer */ } buf_t; buf_t *Buf; /* Variable del Buffer que apunta a la memoria compartida */ /* Prototipos del Productor y el Consumidor */ void *Consumidor(void *); void *Productor(void *); /* PROGRAMA PRINCIPAL */ main(int argc, char **argv) { int ifd, ofd; /* Descriptores de los fichero de Entrada y Salida */ int pid_cons, estado_cons; /* PID y estado de salida del proceso Consumidor */ int shmem_id; /* Identificador para la zona de memoria compartida */ struct timeval curtime, newtime; /* Var. de tiempo */ estad_t estad[2]; /* Datos estadisticos de la aplicacion */ /* Comprobar el numero de argumentos de la linea de comandos */ if (argc != 3) { printf(VERSION); printf("Sintaxis: %s <fic_origen> <fic_destino>\n", argv[0]); exit(0); } /* Abrir el fichero de entrada para el productor */ if ((ifd = open(argv[1], O_RDONLY)) == -1) { fprintf(stderr, "No se puede abrir el fichero %s\n", argv[1]); exit(1); } /* Abrir el fichero de salida para el consumidor */ if ((ofd = open(argv[2], O_WRONLY|O_CREAT, 0666)) == -1) { fprintf(stderr, "No se puede abrir el fichero %s\n", argv[2]); exit(1); } /* Crea el Buffer en Memoria Compartida */ if ((shmem_id = shmget(SHMEM_KEY, sizeof(buf_t), 0666 | IPC_CREAT)) <0) ErrorFatal("No se pudo crear la memoria compartida"); /* Establece el acceso a la memoria compartida */ if ((Buf = (buf_t *) shmat(shmem_id, (char *)0, 0)) == (buf_t *) -1) ErrorFatal("No se pudo enlazar con la memoria compartida"); /* Inicializar los indices, flag finalizacion, y num.de buffers ocupados */ Buf->in = Buf->out = Buf->fin = Buf->nocup = 0; /* Inicializa los Mutex */ if ( mutex_init(&Buf->buf_lock, MUTEX1_KEY) ) ErrorFatal("No se puede crear Mutex: buf_lock"); if ( mutex_init(&Buf->fin_lock, MUTEX2_KEY) ) ErrorFatal("No se puede crear Mutex: fin_lock"); /* Inicializa las Var.de Condicion */ if ( cond_init(&Buf->cond_add, COND1_KEY) ) ErrorFatal("No se puede crear Var.Condicion: cond_add"); if ( cond_init(&Buf->cond_rem, COND2_KEY) ) ErrorFatal("No se puede crear Var.Condicion: cond_rem"); /* Inicializa el reloj de tiempo */ gettimeofday(&curtime, NULL); # ifdef DEBUG PrintTime("Padre: Voy a crear el proceso Consumidor", &curtime, &newtime); # endif /* Obtener los tiempos iniciales */ getestad(&estad[0]); /* Crea el proceso consumidor */ if ((pid_cons = fork()) == -1) { fprintf(stderr, "No se puede crear el proceso Consumidor\n"); exit(1); } if (pid_cons == 0) { /* Ejecutamos el hilo Consumidor */ Consumidor((void *)ofd); exit(0); } /* El proceso padre ejecuta el Productor en concurrencia con el hijo Consumidor */ Productor((void *)ifd); close(ifd); /* Espera a que finalice el proceso consumidor */ if (wait(&estado_cons) == -1) ErrorFatal("Error en la espera del proceso consumidor"); close(ofd); /* Obtener tiempos finales */ getestad(&estad[1]); /* Destruye los Mutex */ if ( mutex_destroy(&Buf->buf_lock) ) ErrorFatal("Error en la destruccion del mutex buf_lock"); if ( mutex_destroy(&Buf->fin_lock) ) ErrorFatal("Error en la destruccion del mutex fin_lock"); /* Destruye las Var.de Condicion */ if ( cond_destroy(&Buf->cond_add) ) ErrorFatal("Error en la destruccion de la var.cond cond_add"); if ( cond_destroy(&Buf->cond_rem) ) ErrorFatal("Error en la destruccion de la var.cond cond_rem"); /* Destruye la Memoria Compartida */ if ( shmdt((char *)Buf) <0 ) ErrorFatal("No se pudo desenlazar la memoria compartida"); if ( shmctl(shmem_id, IPC_RMID, (struct shmid_ds *)0) <0 ) ErrorFatal("No se pudo liberar la memoria compartida"); # ifdef DEBUG PrintTime("Padre: He finalizado", &curtime, &newtime); # endif /* Imprimir los tiempos */ printestad(stderr, &estad[0], &estad[1]); /* Finaliza el programa */ return(0); } /* Productor */ void *Productor(void *arg) { int ifd = (int) arg; /* Descriptor del fichero de Entrada */ struct timeval curtime, newtime; /* Inicializa el reloj de tiempo */ gettimeofday(&curtime, NULL); # ifdef DEBUG PrintTime("Productor: Voy a comenzar", &curtime, &newtime); # endif /* Realiza el papel de Productor */ while (1) { /* Bloquea el mutex de acceso a la region critica */ mutex_lock(&Buf->buf_lock); # ifdef DEBUG printf("Productor: Ante FIN = %d - No.Buffers lleno = %d\n", Buf->fin, Buf->nocup); fflush(stdout); # endif /* Comprobar si hay espacio o los buffers estan llenos */ /* si estan llenos espera */ while (Buf->nocup == BUFCNT) { /* Comprueba la condicion */ # ifdef DEBUG Printf("Productor: Bloqueado en la condicion\n"); # endif cond_wait(&Buf->cond_rem, &Buf->buf_lock); } /* PrintTime("Productor: Voy a leer", &curtime, &newtime); */ # ifdef DEBUG printf("Hilo Productor: Desp FIN = %d - No.Buffers lleno = %d\n", Buf->fin, Buf->nocup); fflush(stdout); # endif /* Lee datos del fichero y los coloca en un buffer */ Buf->lbuffer[Buf->in] = read(ifd, Buf->buffer[Buf->in], BUFSIZE); /* Comprueba si se ha realizado la lectura con exito o es fin */ if (Buf->lbuffer[Buf->in] == 0) { /* Bloquea el mutex de bloqueo_realizado */ mutex_lock(&Buf->fin_lock); /* Establece el flag bloqueo_realizado y libera el mutex */ Buf->fin = 1; mutex_unlock(&Buf->fin_lock); /* Indica la existencia de elementos al consumidor */ cond_signal(&Buf->cond_add); /* Libera el mutex del buffer */ mutex_unlock(&Buf->buf_lock); # ifdef DEBUG PrintTime("Productor: He finalizado", &curtime, &newtime); # endif /* Finaliza el bucle */ break; } /* Establece el siguiente buffer a llenar */ Buf->in = ++Buf->in % BUFCNT; /* Incrementa el contador de buffers llenos */ Buf->nocup++; /* Indica al consumidor la existencia de elementos */ cond_signal(&Buf->cond_add); /* Libera el mutex del buffer */ mutex_unlock(&Buf->buf_lock); } return(0); } /* Consumidor */ void *Consumidor(void *arg) { int fd = (int) arg; /* Descriptor del fichero de Salida */ struct timeval curtime, newtime; /* Inicializa el reloj de tiempo */ gettimeofday(&curtime, NULL); # ifdef DEBUG PrintTime("Consumidor: Voy a comenzar", &curtime, &newtime); # endif /* Realiza el papel de Consumidor */ while (1) { /* Bloquea el mutex del buffer */ mutex_lock(&Buf->buf_lock); # ifdef DEBUG printf("Consumidor: Ante FIN = %d - No.Buffers lleno = %d\n", Buf->fin, Buf->nocup); fflush(stdout); # endif /* Si no hay ningun buffer lleno y flag fin esta activo => finaliza */ if (!Buf->nocup && Buf->fin) { mutex_unlock(&Buf->buf_lock); # ifdef DEBUG PrintTime("Consumidor: He finalizado", &curtime, &newtime); # endif break; } /* Si no hay ningun buffer lleno y no es el fin => espera */ while (Buf->nocup == 0 && !Buf->fin) { # ifdef DEBUG Printf("Consumidor: Bloqueado en la condicion\n"); # endif cond_wait(&Buf->cond_add, &Buf->buf_lock); } /* Si no hay ningun buffer lleno y flag fin esta activo => finaliza */ if (!Buf->nocup && Buf->fin) { mutex_unlock(&Buf->buf_lock); # ifdef DEBUG PrintTime("Consumidor: He finalizado", &curtime, &newtime); # endif break; } # ifdef DEBUG printf("Consumidor: Desp FIN = %d - No.Buffers lleno = %d\n", Buf->fin, Buf->nocup); Printf("Consumidor: voy a escribir en el fichero\n"); # endif /* Escribe los datos del buffer al fichero */ write(fd, Buf->buffer[Buf->out], Buf->lbuffer[Buf->out]); /* Establece el siguiente buffer desde el que escribir */ Buf->out = ++Buf->out % BUFCNT; /* Decrementa el numero de buffers lleno */ Buf->nocup--; /* Indica al productor que un buffer esta vacio */ cond_signal(&Buf->cond_rem); /* Libera el mutex del buffer */ mutex_unlock(&Buf->buf_lock); } /* Finalizar el proceso Consumidor */ return(0); }
El código fuente necesita las biblioteca de rutinas de comunes a todos los ejemplos para poder compilar adecuadamente.
El fichero Makefile para compilar el ejemplo del productor/consumidor en su versión de procesos es el siguiente :
MAKEFILE : ############## ## Makefile ## ############## #DEFS=-DDEBUG -DYIELD DEFS= INSDIR=../../.. INCDIR=$(INSDIR)/include OBJDIR=$(INSDIR)/libproc OBJ=$(OBJDIR)/tiempo.o $(OBJDIR)/error.o $(OBJDIR)/getstats.o \ $(OBJDIR)/semaforo.o $(OBJDIR)/mutex.o $(OBJDIR)/cond.o CC=gcc CFLAGS=$(DEFS) -I$(INCDIR) LINK=$(CFLAGS) $(OBJ) #Definicion de dependencias all : proc .c.o: $(CC) $(CFLAGS) -c $< proc : proc.o $(CC) $(LINK) proc.o -o proc proc.o : proc.c
Este ejemplo es exactamente el mismo que el caso anterior, excepto que ahora hemos implementado el programa mediante hilos en vez de procesos. Para ello se ha utilizado la librería Pthreads 1.60 beta5 del paquete de hilos de Chris Provenzano, aunque es muy posible que el programa funciona igualmente con cualquier otro paquete que siga el estándar POSIX.1c sobre hilos. El programa se ha realizado con dicho paquete instalado bajo el sistema operativo Linux 1.2.
La apariencia de la versión con procesos y esta versión multihilo es casi idéntica, de tal forma que cambian muy pocas cosas. Existe un hilo Productor, que corresponde al hilo principal del proceso, y un hilo Consumidor que es creado inicialmente por el hilo principal. El programa también se puede compilar, mediante la directiva de compilación #define DOS_HILOS , en una versión en la que el hilo principal crea dos nuevos hilos, uno para el Productor y uno para el Consumidor, y se limita a esperar a que finalicen. Los algoritmos del productor y del consumidor son idénticos a la versión con procesos.
Se emplea, también, una zona de memoria compartida, aunque a diferencia de la implementación mediante procesos, esta zona de memoria compartida es una simple variable global, lo cual simplifica mucho más las cosas haciendo todo más sencillo. Recordemos que en la programación multihilo las variables definidas como globales en un programa, pueden ser accedidas por todos los hilos que maneje el proceso, en cambio, las variables definidas localmente en cada hilo son locales al ámbito del hilo, ya que se encuentran definidos en su pila, y todo hilo tiene una pila independiente.
La sincronización entre los hilos en el acceso al buffer compartido se realiza mediante mútex y variables de condición. En esta ocasión, estas herramientas de programación son proporcionadas por el paquete de hilos sobre el que se ha realizado el programa.
En primer lugar podemos observar que el nombre de las funciones de gestión de mútex y variables de condición es similar al empleado en la versión de procesos, salvo el prefijo pthread_funcion ; incluso sus parámetros son muy similares.
Cuando el productor quiere añadir un elemento al buffer, comprueba si el buffer está lleno, si lo está el productor se bloquea a la espera de la condición, hasta que el consumidor procesa un elemento. Cuando el consumidor procesa un elemento le extrae del buffer, e informa de que el buffer no está lleno, y así el productor despierta de la espera por la variable de condición. De esta forma el productor puede añadir un nuevo elemento al buffer.
El consumidor funciona de forma similar al productor. Emplea una variable de condición para determinar si el buffer está vacío. Cuando el consumidor quiere eliminar un elemento del buffer, comprueba si éste está vacío, si es así, se bloquea a la espera de la variable de condición, esperando que el productor añada un nuevo elemento al buffer. Cuando el productor lo hace, se satisface la condición y entonces el consumidor puede continuar, procesando un nuevo elemento del buffer.
En el ejemplo, el productor copia un fichero, leyendo las datos y almacenándolos dentro de un buffer compartido, y el consumidor obtiene los datos del buffer y los escribe en un nuevo fichero de salida. La estructura de datos Buf es empleada para contener tanto el buffer de datos como las variables de condición que controlan el acceso al buffer.
El hilo principal abre ambos ficheros, inicializa la estructura Buf, crea el hilo consumidor, y asume el papel de productor. El productor lee los datos del fichero de entrada, y coloca los datos dentro de una posición libre del buffer. Si no existen posiciones libres en el buffer, el productor espera en la instrucción pthread_cond_wait().
El hilo consumidor lee los datos del buffer compartido y los escribe en el fichero de salida. Si no existen datos disponibles, espera a que el productor los introduzca.
Si el fichero de entrada y el de salida estuvieran en discos físicos distintos, entonces este ejemplo podría ejecutar el productor y el consumidor realmente en paralelo. Este paralelismo incrementaría significativamente el rendimiento del ejemplo mediante el uso de hilos.
Existe la posibilidad de compilar el programa mediante la directiva de compilación #define YIELD que permite que el productor otorgue aleatoriamente la CPU al consumidor usando la primitiva pthread_yield() del paquete Pthreads, aunque no es estándar POSIX, con el fin de entremezclar la ejecución de los hilos Productor y Consumidor de una forma más segura, y poder estudiar la evolución.
El listado para la versión multihilo es el siguiente :
PROD.C /**************************************************************************** EJEMPLO DEL PRODUCTOR/CONSUMIDOR. Aplicacion: copy Version: 1.0h Fecha: 20/11/1996 Descripcion: Programa demostrativo del caso del productor/consumidor que realiza el copiado de un fichero fuente en un fichero destino. La implementacion se ha realizado mediante dos hilos que emplean un buffer de memoria compartida y empleando mutex y variables de condicion para realizar la sincronizacion. El programa emplea las rutinas de la libreria Pthreads, que es compatible POSIX.1c Autor: Jose Maria Toribio Vicente Mail: jmtoribi@poboxes.com *****************************************************************************/ #define VERSION "copy 1.0h - Productor/Consumidor (vers. hilos)\n" /* #define PTHREAD_KERNEL */ /* #define DEBUG */ /* Define para compilar en modo DEBUG */ /* #define DOS_HILOS */ /* Define para la version con dos hilos ademas del hilo principal */ /* #define YIELD */ /* Define para compilar con pthread_yield() No-POSIX */ #include <pthread.h> #include <stdio.h> #include <sys/stat.h> #include <fcntl.h> #include <errno.h> #include "error.h" #include "getstats.h" #include "tiempo.h" /* Definicion de constantes relativas al tamano del Buffer */ #define BUFSIZE 512 #define BUFCNT 10 /* Definicion de una funcion de impresion de salida inmediata */ void Printf(char *mensaje) { printf(mensaje); fflush(stdout); } /* Esta es la estructura de datos empleada entre el hilo consumidor y el hilo productor */ typedef struct s_buf_t { pthread_mutex_t buf_lock; /* Mutex de acceso a region crtica */ pthread_mutex_t fin_lock; /* Mutex de acceso a flag finalizacion */ pthread_cond_t cond_add; /* Var.cond anadir datos */ pthread_cond_t cond_rem; /* Var.cond eliminar datos */ int nocup; /* Contador de posiciones ocupadas */ int in, /* Indice al lugar de Entrada */ out; /* Indice al lugar de Salida */ int fin; /* Flag de Finalizacion */ char buffer[BUFCNT][BUFSIZE]; /* Buffer de datos */ int lbuffer[BUFCNT]; /* Long. de bytes en cada Buffer */ } buf_t; /* Variable global del Buffer empleada como memoria compartida */ buf_t Buf = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0, 0, 0 }; /* Prototipos del Productor y el Consumidor */ void *Consumidor(void *); void *Productor(void *); /* PROGRAMA PRINCIPAL */ main(int argc, char **argv) { int ifd, ofd; /* Descriptores de los fichero de Entrada y Salida */ pthread_t hilo_cons; /* ID del hilo Consumidor */ # ifdef DOS_HILOS pthread_t hilo_prod; /* ID del hilo Productor */ # endif struct timeval curtime, newtime; /* Var. de tiempo */ estad_t estad[2]; /* Datos estadisticos de la aplicacion */ /* Comprobar el numero de argumentos de la linea de comandos */ if (argc != 3) { printf(VERSION); printf("Sintaxis: %s <fic_origen> <fic_destino>\n", argv[0]); exit(0); } /* Abrir el fichero de entrada para el productor */ if ((ifd = open(argv[1], O_RDONLY)) == -1) { fprintf(stderr, "No se puede abrir el fichero %s\n", argv[1]); exit(1); } /* Abrir el fichero de salida para el consumidor */ if ((ofd = open(argv[2], O_WRONLY|O_CREAT, 0666)) == -1) { fprintf(stderr, "No se puede abrir el fichero %s\n", argv[2]); exit(1); } /* Inicializar los indices, flag finalizacion, y num.de buffers ocupados */ Buf.in = Buf.out = Buf.fin = Buf.nocup = 0; /* Inicializa los Mutex */ pthread_mutex_init(&Buf.buf_lock, NULL); pthread_mutex_init(&Buf.fin_lock, NULL); /* Inicializa las Var.de Condicion */ pthread_cond_init(&Buf.cond_add, NULL); pthread_cond_init(&Buf.cond_rem, NULL); /* Inicializa el reloj de tiempo */ gettimeofday(&curtime, NULL); # ifdef DEBUG PrintTime("Padre: Voy a crear al Hilo Consumidor", &curtime, &newtime); # endif /* Obtener los tiempos iniciales */ getestad(&estad[0]); /* Crea el hilo Consumidor */ if (pthread_create(&hilo_cons, NULL, Consumidor, (void *)ofd)) ErrorFatal("Error creando el hilo del Consumidor"); #ifdef DOS_HILOS /* Crea el hilo Productor */ if (pthread_create(&hilo_prod, NULL, Productor, (void *)ifd)) ErrorFatal("Error creando el hilo del Productor"); # ifdef DEBUG PrintTime("Padre: He creado los hilos Productor y Consumidor", &curtime, &newtime); # endif /* Espera a que finalice el hilo Productor */ pthread_join(hilo_prod, NULL); #else /* UN_HILO */ Productor( (void *)ifd ); #endif close(ifd); /* Espera a que finalice el hilo Consumidor */ pthread_join(hilo_cons, NULL); close(ofd); /* Obtener tiempos finales */ getestad(&estad[1]); /* Destruye los Mutex */ pthread_mutex_destroy(&Buf.buf_lock); pthread_mutex_destroy(&Buf.fin_lock); /* Destruye las Var.de Condicion */ pthread_cond_destroy(&Buf.cond_add); pthread_cond_destroy(&Buf.cond_rem); # ifdef DEBUG PrintTime("Padre: He finalizado", &curtime, &newtime); # endif /* Imprimir los tiempos */ printestad(stderr, &estad[0], &estad[1]); /* Finaliza el programa */ return(0); } /* El hilo Productor */ void *Productor(void *arg) { int ifd = (int) arg; /* Descriptor del fichero de Entrada */ struct timeval curtime, newtime; /* Inicializa el reloj de tiempo */ gettimeofday(&curtime, NULL); # ifdef DEBUG PrintTime("Hilo Productor: Voy a comenzar", &curtime, &newtime); # endif /* Realiza el papel de Productor */ while (1) { /* Bloquea el mutex de acceso a la region critica */ pthread_mutex_lock(&Buf.buf_lock); # ifdef DEBUG printf("Hilo Productor: Ante FIN = %d - No.Buffers lleno = %d\n", Buf.fin, Buf.nocup); fflush(stdout); # endif /* Comprobar si hay espacio o los buffers estan llenos */ /* si estan llenos espera */ while (Buf.nocup == BUFCNT) { /* Comprueba la condicion */ # ifdef DEBUG Printf("Hilo Productor: Bloqueado en la condicion\n"); # endif pthread_cond_wait(&Buf.cond_rem, &Buf.buf_lock); } /* PrintTime("Hilo Productor: Voy a leer", &curtime, &newtime); */ # ifdef DEBUG printf("Hilo Productor: Desp FIN = %d - No.Buffers lleno = %d\n", Buf.fin, Buf.nocup); fflush(stdout); # endif /* Lee datos del fichero y los coloca en un buffer */ Buf.lbuffer[Buf.in] = read(ifd, Buf.buffer[Buf.in], BUFSIZE); /* Comprueba si se ha realizado la lectura con exito o es fin */ if (Buf.lbuffer[Buf.in] == 0) { /* Bloquea el mutex de bloqueo_realizado */ pthread_mutex_lock(&Buf.fin_lock); /* Establece el flag bloqueo_realizado y libera el mutex */ Buf.fin = 1; pthread_mutex_unlock(&Buf.fin_lock); /* Indica la existencia de elementos al consumidor */ pthread_cond_signal(&Buf.cond_add); /* Libera el mutex del buffer */ pthread_mutex_unlock(&Buf.buf_lock); # ifdef DEBUG PrintTime("Hilo Productor: He finalizado", &curtime, &newtime); # endif /* Finaliza el bucle */ break; } /* Establece el siguiente buffer a llenar */ Buf.in = ++Buf.in % BUFCNT; /* Incrementa el contador de buffers llenos */ Buf.nocup++; /* Indica al consumidor la existencia de elementos */ pthread_cond_signal(&Buf.cond_add); /* Libera el mutex del buffer */ pthread_mutex_unlock(&Buf.buf_lock); # ifdef YIELD /* Aleatoriamente otorga la CPU a otro hilo, para entremezclarlos */ if ( (rand() % 100) <= 50 ) pthread_yield(); # endif } # ifdef DOS_HILOS /* Finalizar el hilo */ pthread_exit((void *)0); # endif } /* El hilo Consumidor */ void *Consumidor(void *arg) { int fd = (int) arg; /* Descriptor del fichero de Salida */ struct timeval curtime, newtime; /* Inicializa el reloj de tiempo */ gettimeofday(&curtime, NULL); # ifdef DEBUG PrintTime("Hilo Consumidor: Voy a comenzar", &curtime, &newtime); # endif /* Realiza el papel de Consumidor */ while (1) { /* Bloquea el mutex del buffer */ pthread_mutex_lock(&Buf.buf_lock); # ifdef DEBUG printf("Hilo Consumidor: Ante FIN = %d - No.Buffers lleno = %d\n", Buf.fin, Buf.nocup); fflush(stdout); # endif /* Si no hay ningun buffer lleno y flag fin esta activo => finaliza */ if (!Buf.nocup && Buf.fin) { pthread_mutex_unlock(&Buf.buf_lock); # ifdef DEBUG PrintTime("Hilo Consumidor: He finalizado", &curtime, &newtime); # endif break; } /* Si no hay ningun buffer lleno y no es el fin => espera */ while (Buf.nocup == 0 && !Buf.fin) { # ifdef DEBUG Printf("Hilo Consumidor: Bloqueado en la condicion\n"); # endif pthread_cond_wait(&Buf.cond_add, &Buf.buf_lock); } /* Si no hay ningun buffer lleno y flag fin esta activo => finaliza */ if (!Buf.nocup && Buf.fin) { pthread_mutex_unlock(&Buf.buf_lock); # ifdef DEBUG PrintTime("Hilo Consumidor: He finalizado", &curtime, &newtime); # endif break; } # ifdef DEBUG printf("Hilo Consumidor: Desp FIN = %d - No.Buffers lleno = %d\n", Buf.fin, Buf.nocup); Printf("Consumidor: voy a escribir en el fichero\n"); # endif /* Escribe los datos del buffer al fichero */ write(fd, Buf.buffer[Buf.out], Buf.lbuffer[Buf.out]); /* Establece el siguiente buffer desde el que escribir */ Buf.out = ++Buf.out % BUFCNT; /* Decrementa el numero de buffers lleno */ Buf.nocup--; /* Indica al productor que un buffer esta vacio */ pthread_cond_signal(&Buf.cond_rem); /* Libera el mutex del buffer */ pthread_mutex_unlock(&Buf.buf_lock); } /* Finalizar el hilo Consumidor */ pthread_exit((void *)0); }
El fichero Makefile que nos permitirá la compilación de esta versión es el siguiente :
MAKEFILE : ############## ## Makefile ## ############## #DEFS=-DDOS_HILOS -DDEBUG -DYIELD DEFS= INSDIR=../../.. INCDIR=$(INSDIR)/include OBJDIR=$(INSDIR)/libpthread OBJ=$(OBJDIR)/tiempo.o $(OBJDIR)/error.o $(OBJDIR)/getstats.o CC=pgcc CFLAGS=$(DEFS) -I$(INCDIR) LINK=$(CFLAGS) $(OBJ) #Definicion de dependencias .c.o: $(CC) $(CFLAGS) -c $< prod : prod.o $(CC) $(LINK) prod.o -o prod prod.o : prod.c
Primeramente decir que ambas implementaciones, como
se podrá haber observado, son muy similares no solo conceptualmente
sino a nivel de programación, si se dispone de las rutinas
adecuadas que nos proporciona la biblioteca de rutinas comunes
a los distintos ejemplos.
Como referencia para discutir las ventajas e inconvenientes
de las dos implementaciones : multihilo y procesos, se indican
los siguientes tiempos de ejecución (en segundos)
obtenidos como muestra en un sistema formado por una CPU Intel
486-66Mhz - 9Mb RAM - Linux 1.2.3, y con carga mínima en
el sistema, en el que se copiaba un archivo de 931.356 bytes,
repitiéndose la prueba en siete ocasiones para las dos
implementaciones :
MUESTRAS | ||||||||
Hilos T. usuario | ||||||||
Hilos T. Kernel | ||||||||
Hilos T. TOTAL | ||||||||
Proc. T. usuario | ||||||||
Proc. T. Kernel | ||||||||
Proc. T. TOTAL |
Inconvenientes de la versión de procesos :
Ventajas de la versión de procesos :
Inconvenientes de la versión multihilo :
Como inconvenientes de la versión multihilo se pueden destacar los siguientes :
Ventajas de la versión multihilo :
El programa emplea menos recursos del kernel, lo cual es ventajoso si tenemos en cuenta que los recursos del kernel son limitados y empleados por otros programas y el resto del sistema.