DEV Community

David Mezzetti for NeuML

Posted on • Edited on • Originally published at neuml.hashnode.dev

Run pipeline workflows

txtai has a growing list of models available through it's pipeline framework. Pipelines wrap a machine learning model and transform data. Currently, pipelines can wrap Hugging Face models, Hugging Face pipelines or PyTorch models (support for TensorFlow is in the backlog).

The following is a list of the currently implemented pipelines.

  • Questions - Answer questions using a text context
  • Labels - Apply labels to text using a zero-shot classification model. Also supports similarity comparisions.
  • Summary - Abstractive text summarization
  • Textractor - Extract text from documents
  • Transcription - Transcribe audio to text
  • Translation - Machine translation

Pipelines are great and make using a variety of machine learning models easier. But what if we want to glue the results of different pipelines together? For example, extract text, summarize it, translate it to English and load it into an Embedding index. That would require code to join those operations together in an efficient manner.

Enter workflows. Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows don't know they are working with pipelines but enable efficient processing of pipeline data. Workflows are streaming by nature and work on data in batches, allowing large volumes of data to be processed efficiently.

Install dependencies

Install txtai and all dependencies. Since this article is using optional pipelines/workflows, we need to install the pipeline and workflow extras package.

pip install txtai[pipeline,workflow] sacremoses # Get test data wget -N https://github.com/neuml/txtai/releases/download/v2.0.0/tests.tar.gz tar -xvzf tests.tar.gz 
Enter fullscreen mode Exit fullscreen mode

Create a series of pipelines to use in this notebook

from txtai.pipeline import Summary, Textractor, Transcription, Translation # Summary instance summary = Summary() # Text extraction textractor = Textractor() # Transcription instance transcribe = Transcription("facebook/wav2vec2-large-960h") # Create a translation instance translate = Translation() 
Enter fullscreen mode Exit fullscreen mode

Basic workflow

The following shows a basic workflow in action!

from txtai.workflow import Workflow, Task # Workflow that translate text to French workflow = Workflow([Task(lambda x: translate(x, "fr"))]) # Data to run through the pipeline data = ["The sky is blue", "Forest through the trees"] # Workflows are generators for efficiency, read results to list for display list(workflow(data)) 
Enter fullscreen mode Exit fullscreen mode
['Le ciel est bleu', 'Forêt à travers les arbres'] 
Enter fullscreen mode Exit fullscreen mode

This isn't too different from previous pipeline examples. The only difference is data is feed through the workflow. In this example, the workflow calls the translation pipeline and translates text to French. Let's look at a more complex example.

Multistep workflow

The following workflow reads a series of audio files, transcribes them to text and translates the text to French. This is based on the classic txtai example from Introducing txtai.

Workflows take two main parameters. The action to execute which is a callable and a pattern to filter data with. Data that is accepted by the filter will be processed, otherwise it will be passed through to the next task.

from txtai.workflow import FileTask tasks = [ FileTask(transcribe, r"\.wav$"), Task(lambda x: translate(x, "fr")) ] # List of files to process data = [ "txtai/US_tops_5_million.wav", "txtai/Canadas_last_fully.wav", "txtai/Beijing_mobilises.wav", "txtai/The_National_Park.wav", "txtai/Maine_man_wins_1_mil.wav", "txtai/Make_huge_profits.wav" ] # Workflow that translate text to French workflow = Workflow(tasks) # Run workflow list(workflow(data)) 
Enter fullscreen mode Exit fullscreen mode
["Les cas de virus U sont en tête d'un million", "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten", "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient", "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue", "L'homme principal gagne du billet de loterie", "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour"] 
Enter fullscreen mode Exit fullscreen mode

Complex workflow

Let's put this all together into a full-fledged workflow to build an embeddings index. This workflow will work with both documents and audio files. Documents will have text extracted and summarized. Audio files will be transcribed. Both results will be joined, translated into French and loaded into an Embeddings index.

from txtai.embeddings import Embeddings, Documents from txtai.workflow import FileTask, WorkflowTask # Embeddings index embeddings = Embeddings({"path": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", "content": True}) documents = Documents() # List of files to process files = [ "txtai/article.pdf", "txtai/US_tops_5_million.wav", "txtai/Canadas_last_fully.wav", "txtai/Beijing_mobilises.wav", "txtai/The_National_Park.wav", "txtai/Maine_man_wins_1_mil.wav", "txtai/Make_huge_profits.wav" ] data = [(x, element, None) for x, element in enumerate(files)] # Workflow that extracts text and builds a summary articles = Workflow([ FileTask(textractor), Task(summary) ]) # Define workflow tasks. Workflows can also be tasks! tasks = [ WorkflowTask(articles, r".\.pdf$"), FileTask(transcribe, r"\.wav$"), Task(lambda x: translate(x, "fr")), Task(documents.add, unpack=False) ] # Workflow that translate text to French workflow = Workflow(tasks) # Run workflow and show results to be indexed for x in workflow(data): print(x) # Build the embeddings index embeddings.index(documents) # Cleanup temporary storage documents.close() 
Enter fullscreen mode Exit fullscreen mode
(0, "Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub.", None) (1, "Les cas de virus U sont en tête d'un million", None) (2, "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten", None) (3, "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient", None) (4, "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue", None) (5, "L'homme principal gagne du billet de loterie", None) (6, "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour", None) 
Enter fullscreen mode Exit fullscreen mode

Query for results in French

# Run a search query and show the result. embeddings.search("changement climatique", 1)[0] 
Enter fullscreen mode Exit fullscreen mode
{'id': '2', 'score': 0.2982647716999054, 'text': "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten"} 
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result. embeddings.search("traitement du langage naturel", 1)[0] 
Enter fullscreen mode Exit fullscreen mode
{'id': '0', 'score': 0.47031939029693604, 'text': "Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub."} 
Enter fullscreen mode Exit fullscreen mode

Configuration-driven workflow

Workflows can also be defined with YAML and run as an application. Applications can run standalone or as a FastAPI instance. More information can be found here.

workflow = """ writable: true embeddings: path: sentence-transformers/paraphrase-multilingual-mpnet-base-v2 content: True # Summarize text summary: # Extract text from documents textractor: # Transcribe audio to text transcription: path: facebook/wav2vec2-large-960h # Translate text between languages translation: workflow: summarize: tasks: - action: textractor task: file - summary index: tasks: - action: summarize select: '\\.pdf$' - action: transcription select: '\\.wav$' task: file - action: translation args: ['fr'] - action: index """ # Create and run the workflow from txtai.app import Application # Create and run the workflow app = Application(workflow) list(app.workflow("index", files)) 
Enter fullscreen mode Exit fullscreen mode
["Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub.", "Les cas de virus U sont en tête d'un million", "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten", "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient", "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue", "L'homme principal gagne du billet de loterie", "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour"] 
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result. app.search("changement climatique", 1)[0] 
Enter fullscreen mode Exit fullscreen mode
{'id': '2', 'score': 0.2982647716999054, 'text': "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten"} 
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result. app.search("traitement du langage naturel", 1)[0 
Enter fullscreen mode Exit fullscreen mode
{'id': '0', 'score': 0.47031939029693604, 'text': "Txtai, un moteur de recherche alimenté par l'IA construit sur Transformers, permet la recherche basée sur la compréhension du langage naturel (NLU) dans n'importe quelle application. Le champ de traitement du langage naturel (NLP) évolue rapidement avec un certain nombre de nouveaux développements. Le moteur de recherche open-source est open source et disponible sur GitHub."} 
Enter fullscreen mode Exit fullscreen mode

Wrapping up

Results are good! We can see the power of workflows and how they can join a series of pipelines together in an efficient manner. Workflows can work with any callable, not just pipelines, workflows transform data from one format to another. Workflows are an exciting and promising development for txtai.

Top comments (0)