Open In Colab  View Notebook on GitHub

๐Ÿ‘‚ Schedule jobs using listeners#

This guide gives you a brief introduction to Argilla Listeners. Argilla Listeners enable you to build fine-grained complex workflows as background processes, like a low-key alternative to job scheduling directly integrated with Argilla.

The main goal facilitates the user to define and customize their Argilla experience, and these Listeners are the backbone of our Argilla Plugins. Note that the tutorial about active learning with small-text is a great example of how powerful listeners can be. Alternatively, you can check the Python Client to get acquainted.

This feature is experimental, you can expect some changes in the Python API. Please report on Github any issues you encounter. Also, Jupyter Notebooks might need to be completely restarted to ensure all background processes are properly stopped.

Install dependencies#

For using listeners you need to install the following dependencies:

[ ]:
%pip install argilla[listeners] -qqq

Basics#

Listeners are decorators and wrap about a function that you would like to schedule. By defining a query, the update_records function gets two variables: 1) the records that we get from the dataset and query, and 2) the ctx that contains function parameters like query and dataset.

[ ]:
import argilla as rg
from argilla.listeners import listener

@listener(
    dataset="my_dataset", # dataset to get record from
    query="lucene query", # https://docs.argilla.io/en/latest/guides/features/queries.html
    execution_interval_in_seconds=3, # interval to check execution of `update_records`
)
def update_records(records, ctx):
    # records get the records that adhere to the query
    for rec in records:
        # do something ,e.g., train a model, change records
        rec.metadata = {"updated": True}

    # ctx hold the listener info
    name = ctx.__listener__.dataset
    rg.log(name, records)

Start and stop listening#

[ ]:
update_records.start()
update_records.stop()

Advanced#

Conditional execution#

We can set a condition for the expected number of record to require before actually excuting the decorated function.

[ ]:
@listener(
    dataset="my_dataset", # dataset to get record from
    condition=lambda search: search.total == 10, # only executes if `query` results in 10 records
)

@listener(
    dataset="my_dataset", # dataset to get record from
    condition=lambda search: search.total > 10, #  only executes if `query` results in more than 10 records
)

Updatable query_params#

During a execution loop, it is possible to update and change query_params to allow for flexible querying based on the output of the query.

[8]:
@listener(
    dataset="uber-reviews", # dataset to get record from
    query="metadata.batch_id:{batch_id}",
    batch_id=0
)
def update_records(records, ctx):
    # next iteration the query is executed with batch_id = 1
    ctx.query_params["batch_id"] += 1

Metrics#

Potentially actions like reporting can be done, based on the metrics provided by Argilla.

[ ]:
@listener(
    dataset="my_dataset", # dataset to get record from
    metrics=["F1"]
)
def update_records(records, ctx):
    # next iteration the query is executed with batch_id = 1
    print(ctx.metrics)

Without loading records#

Sometimes we might just want to listen without loading and processing the docs direcly.

[ ]:
@listener(
    dataset="my_dataset", # dataset to get record from
    query_records=False
)
def update_records(ctx):
    # Don`t load the records
    pass

Next steps#

If you want to continue learning Argilla:

๐Ÿ™‹โ€โ™€๏ธ Join the Argilla Slack community!

โญ Argilla Github repo to stay updated.

๐Ÿ“š Argilla documentation for more guides and tutorials.