In questo nuovo articolo estenderemo ancora il concetto di threading con un modello molto utilizzato nell’ingegneria del software: il modello Producer-Consumer che implementeremo tramite due thread. In particolare svilupperemo una Pipeline per la comunicazione interna tra i due thread.
- Thread in Python – Threading (parte 1)
- Thread in Python – Join (parte 2)
- Thread in Python – Multithreading (parte 3)
- Thread in Python – Lock and Deadlock (parte 4)
Producer-Consumer Threading
Il problema Producer-Consumer è un problema standard e molto comune nel mondo della programmazione. Questo problema si evidenzia quando si deve gestire la sincronizzazione dei processi in generale, tra cui anche i thread.
Immaginiamo il caso in cui un programma deve ricevere (non leggere) dei dati da una risorsa (come per esempio la rete). Il programma non riceverà i dati al momento della richiesta (infatti non è una lettura), ma attenderà la ricezione rimanendo in ascolto. I dati provenienti dalla risorsa non sono emessi in maniera regolare. Identifichiamo questa risorsa, o sorgente di dati come producer.
Dall’altra parte vi è il programma in ascolto, che una volta ottenuti i dati dalla risorsa, li deve inviare (non scrivere) ad un’altra risorsa, come per esempio, un database. L’accesso al database non sarà istantaneo, ma facciamo conto che sia veloce a sufficienza per poter ricevere tutti i dati provenienti dal programma in un determinato e ragionevole periodo di tempo. Il programma quindi dovrà, a volte, attendere che il database sia pronto per ricevere i dati. Questa parte del programma la identificheremo come consumer.
Abbiamo quindi due entità, Producer e Consumer, tre le quali si creerà una Pipeline, che avrà il compito di gestire i dati tra di esse.
Modello Producer-Consumer utilizzando i Lock
Dato che stiamo parlando di threading, vediamo di risolvere questo problema usando i Lock.
Come abbiamo detto in precedenza, abbiamo un thread producer che riceve i dati dalla rete e li pone in una Pipeline. Per simulare la rete creeremo una fake network.
Importiamo il modulo random e il modulo time per avere dei valori casuali e simulare l’indeterminatezza della ricezione dei dati da parte della rete, combinandoli avremo dei periodi di tempo casuali. Poi il modulo concurrent.futures per il ThreadPoolExecutor che utilizzeremo come context manager per la gestione dei thread (come abbiamo visto in precedenza). Infine importeremo la classe Pipeline da un file pipeline.py che implementeremo più tardi.
import random import concurrent.futures import time from pipeline import Pipeline
Definiamo un oggetto generico END che utilizzeremo per segnalare la fine della ricezione dei messaggi. Questo valore verrà generato dal Producer per indicare il termine dei messaggi ricevuti, ed inserito nella Pipeline. Quando il Consumer leggerà questo valore END dalla Pipeline, interromperà il programma.
END = object()
Per prima cosa, quindi, implementiamo il thread producer che prende come argomento una pipeline (pl).
def producer(pl): for index in range(10): time.sleep(random.randint(1,11)) message = random.randint(1, 101) print("Producer received data: ", message, " (", index+1, " of 10 )") pl.set_message(message, "Producer") pl.set_message(END, "Producer")
Il thread dovrà simulare la ricezione di 10 dati (numeri casuali da 1 a 100) che verrranno acquisiti in tempi non regolari (si usa time.sleep con un numero di tempo casuale tra 1 e 10 secondi). In questo modo il thread potrà ricevere una serie di messaggi in tempi variabili.
Il dato ricevuto viene inserito in message e poi inviato alla pipeline tramite la funzione .set_message(). Sarà poi dalla pipeline che il thread consumer riceverà il valore.
Alla fine dei 10 dati, il thread producer inserirà nella pipeline un valore particolare (END) che segnalerà al thread consumer la ricezione di tutti i messaggi, e quindi della chiusura del programma.
Dall’altra parte della pipeline vi è il thread consumer, che implementeremo in maniera seguente.
def consumer(pl): message = 0 while message is not END: message = pl.get_message("Consumer") if message is not END: time.sleep(random.randint(1,11)) print("Consumer stored data: ", message)
Il thread consumer legge un messaggio dalla pipeline e per simulare l’inserimento del dato in un database si usa anche qui un tempo casuale tra 1 e 10 secondi. Questo per fare in modo che i tempi di processamento dei dati da rete, alla pipeline per arrivare al database siano simili. Essendo tempi irregolari e diversi tra ricezione e inserimento nel database, ci aspetteremo così l’accumulo nella pipeline di qualche dato in più (message) prima che il thread consumer sia in grado di processarli tutti, inserendoli in un database.
Quando il thread consumer riceverà dalla pipeline il messaggio END, comprenderà che i messaggi da ricevere sono terminati ed uscirà dall’esecuzione.
Definiti i due thread (consumer e produre) definiamo il programma principale. Per prima cosa dovremo attivare la pipeline (pl). Poi con un ThreadPoolExecutor attivare i due thread, assegnando a ciascuno di essi la stessa pipeline.
pl = Pipeline() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pl) executor.submit(consumer, pl)
Ultima cosa che ci è rimasta da definire è proprio la Pipeline. Implementiamola con una classe in un file esterno pipeline.py.
import threading class Pipeline: def __init__(self): self.message = 0 self.producer_lock = threading.Lock() self.consumer_lock = threading.Lock() self.consumer_lock.acquire() def get_message(self, name): self.consumer_lock.acquire() message = self.message self.producer_lock.release() return message def set_message(self, message, name): self.producer_lock.acquire() self.message = message self.consumer_lock.release()
La classe Pipeline ha tre membri:
- .message che immagazzina il valore da passare (messaggio)
- .producer_lock è un threading.Lock che restringe l’accesso al messaggio solo al thread producer
- .consumer_lock è un threading.Lock che restringe l’accesso al messaggio solo al thread consumer
La prima funzione che troviamo definita all’interno della classe è __init__() . La sua funzione è quella di inizializzare i tre membri della classe e poi chiamare acquire() sul consumer_lock. Facendo in questo modo, impostiamo lo stato di partenza da noi desiderato: il producer ha il permesso di aggiungere un nuovo messaggio alla pipeline, mentre il consumer deve attendere.
Nella classe Pipeline, poi sono definiti altri due metodi .get_message() e .set_messages().
.get_message() effettua la chiamata .acquire() sul consumer_lock. In questo modo il consumer attenderà finchè un messaggio non sarà pronto nella Pipeline. Quando poi il consumer avrà acquisito il .consumer_lock, copierà il valore in message. Successivamente chiamerà .release() sul .producer_lock. Il consumer rilascerà quindi il lock, permettendo al producer di inserire un altro messaggio nella pipeline.
Poniamo una particolare attenzione alla parte finale. Si potrebbe esser tentati di disfarsi di message, chiudendo la funzione con return self.message. Ma non dovresti farlo.
Ecco la risposta. Non appena il consumer chiama .producer_lock.release(), si ferma per far ripartire il producer. Ma il producer potrebbe ripartire prima che la funzione .release() restituisca il valore. Se questo dovesse accadere, il consumer restituirebbe un self.message che è in realtà il successivo messaggio generato, perdendo il primo messaggio per sempre. Questo è un esempio di race condition.
Spostandoci su .set_message(), si può notare il lato opposto della transazione. Il producer chiamando questa funzione acquisirà il .producer_lock. Imposta poi .message ed infine chiama .release() sul consumer_lock, permettendo al consumer di leggere quel valore.
Se eseguiamo otterremo un risultato simile al seguente.
Conclusioni
In questa quinta parte sui Thread in Python abbiamo introdotto il modello Producer – Consumer utilizzando i thread e con una Pipeline. Nella prossima parte vedremo come implementare lo stesso modello sostituendo la Pipeline con una Queue.
Buongiorno,
chiedo un chiarimento sull’esempio della Pipeline.
All’inizio del programma la situazione che ipotizzo dovrebbe essere che il producer può accedere alla pipeline scrivendo dei dati ed il consumer rimane in attesa di possibili dati sulla pipeline.
Per fare questo vedrei quindi l’utilizzo nella __init__ della classe Pipeline del self.producer_lock.acquire() più che del consumer, in questo modo il producer ha accesso alla pipeline per scrivere i dati, ppoi una volta inseriti dovrebbe rilasciare il lock che potrà essere quindi acquisito dal thread del consumer per accedere alla pipeline e vedere se sono arrivati dati ed in caso affermativo precedere allo scaricamento degli stessi.
Potrei avere fatto confusione, ma non mi torna la frase:
“La prima funzione che troviamo definita all’interno della classe è __init__() . La sua funzione è quella di inizializzare i tre membri della classe e poi chiamare acquire() sul consumer_lock. Facendo in questo modo, impostiamo lo stato di partenza da noi desiderato: il producer ha il permesso di aggiungere un nuovo messaggio alla pipeline, mentre il consumer deve attendere.”
infatti se nella init specifichiamo “self.consumer_lock.acquire()” il thread del producer non sarebbe bloccato fino al rilascio di questo lock? Inoltre all’inizio la pipeline sarebbe comunque vuota quindi non capisco il senso di iniziare con questa assegnazione.
Grazie per il vostro tempo ed il magnifico sito!
Saluti.
DrByte
Buongiorno,
rileggendo il comportamento dei Lock anche sulle specifiche Python penso di avere afferrato.
In sostanza quando si ha acquire il lock rimane bloccato fino a quando in un altro thread si ha lo sblocco.
In questo caso si inizia impostando self.consumer_lock.acquire().
Se il prio thread ad essere invocato è il producer questo acquisisce il suo lock e scrive il messaggio poi sblocca il thread consumer. A questo punto se fosse nuovamente chiamato il thread producer questo sarebbe bloccato e non sovrascriverebbe il dato. Alla prima chiamata del thread consumer essendo il lock non acquisito può leggere il dato ed a questo punto sblocca il producer con la self.producer_lock.release(), in questo modo sono sicuro di non sovrascrivere mai il messaggio.
Corretto?
Grazie
DrByte
Continuando il mio post precedente, se non ho preso un abbaglio, il codice della classe Pipeline dovrebbe essere modificato come segue:
import threading
class Pipeline:
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.producer_lock.acquire()
def get_message(self, name):
self.consumer_lock.acquire()
message = self.message
self.consumer_lock.release()
return message
def set_message(self, message, name):
self.producer_lock.acquire()
self.message = message
self.producer_lock.release()
Grazie per l’attenzione e per aiutarmi a comprendere meglio.
DrByte
Si può cancellare questo post, in quanto penso la risposta sia contenuta nel mio precedente.
Grazie!!