Skip to content

Instantly share code, notes, and snippets.

@bikash119
Last active September 23, 2024 12:31
Show Gist options
  • Save bikash119/3c23ceb80e5bc0102771ef4fd9211b4f to your computer and use it in GitHub Desktop.
Save bikash119/3c23ceb80e5bc0102771ef4fd9211b4f to your computer and use it in GitHub Desktop.
How-to-guides-basic

Define LLMs as local or remote models

Working with LLMs

LLM subclasses are designed to be used within a [Task][distilabel.steps.tasks.Task], but they can also be used standalone.

from distilabel.llms import OpenAILLM

llm = OpenAILLM(model="gpt-4")
llm.load()

llm.generate(
    inputs=[
        [{"role": "user", "content": "What's the capital of Spain?"}],
    ],
)
# "The capital of Spain is Madrid."

!!! NOTE Always call the LLM.load or Task.load method when using LLMs standalone or as part of a Task. If using a Pipeline, this is done automatically in Pipeline.run().

Within a Task

Pass the LLM as an argument to the [Task][distilabel.steps.tasks.Task], and the task will handle the rest.

from distilabel.llms import OpenAILLM
from distilabel.steps.tasks import TextGeneration

llm = OpenAILLM(model="gpt-4")
task = TextGeneration(name="text_generation", llm=llm)

task.load()

next(task.process(inputs=[{"instruction": "What's the capital of Spain?"}]))
# [{'instruction': "What's the capital of Spain?", "generation": "The capital of Spain is Madrid."}]

Runtime Parameters

LLMs can have runtime parameters, such as generation_kwargs, provided via the Pipeline.run() method using the params argument.

!!! NOTE Runtime parameters can differ between LLM subclasses, caused by the different functionalities offered by the LLM providers.

from distilabel.pipeline import Pipeline
from distilabel.llms import OpenAILLM
from distilabel.steps import LoadDataFromDicts
from distilabel.steps.tasks import TextGeneration

with Pipeline(name="text-generation-pipeline") as pipeline:
    load_dataset = LoadDataFromDicts(
        name="load_dataset",
        data=[{"instruction": "Write a short story about a dragon that saves a princess from a tower."}],
    )

    text_generation = TextGeneration(
        name="text_generation",
        llm=OpenAILLM(model="gpt-4"),
    )

    load_dataset >> text_generation

if __name__ == "__main__":
    pipeline.run(
        parameters={
            text_generation.name: {"llm": {"generation_kwargs": {"temperature": 0.3}}},
        },
    )

Creating custom LLMs

To create custom LLMs, subclass either [LLM][distilabel.llms.LLM] for synchronous or [AsyncLLM][distilabel.llms.AsyncLLM] for asynchronous LLMs. Implement the following methods:

  • model_name: A property containing the model's name.

  • generate: A method that takes a list of prompts and returns generated texts.

  • agenerate: A method that takes a single prompt and returns generated texts. This method is used within the generate method of the AsyncLLM class.

  • (optional) get_last_hidden_state: is a method that will take a list of prompts and return a list of hidden states. This method is optional and will be used by some tasks such as the [GenerateEmbeddings][distilabel.steps.tasks.GenerateEmbeddings] task.

=== "Custom LLM"

```python
from typing import Any

from pydantic import validate_call

from distilabel.llms import LLM
from distilabel.llms.typing import GenerateOutput, HiddenState
from distilabel.steps.tasks.typing import ChatType

class CustomLLM(LLM):
    @property
    def model_name(self) -> str:
        return "my-model"

    @validate_call
    def generate(self, inputs: List[ChatType], num_generations: int = 1, **kwargs: Any) -> List[GenerateOutput]:
        for _ in range(num_generations):
            ...

    def get_last_hidden_state(self, inputs: List[ChatType]) -> List[HiddenState]:
        ...
```

=== "Custom AsyncLLM"

```python
from typing import Any

from pydantic import validate_call

from distilabel.llms import AsyncLLM
from distilabel.llms.typing import GenerateOutput, HiddenState
from distilabel.steps.tasks.typing import ChatType

class CustomAsyncLLM(AsyncLLM):
    @property
    def model_name(self) -> str:
        return "my-model"

    @validate_call
    async def agenerate(self, input: ChatType, num_generations: int = 1, **kwargs: Any) -> GenerateOutput:
        for _ in range(num_generations):
            ...

    def get_last_hidden_state(self, inputs: List[ChatType]) -> List[HiddenState]:
        ...
```

generate and agenerate keyword arguments (but input and num_generations) are considered as RuntimeParameters, so a value can be passed to them via the parameters argument of the Pipeline.run method.

!!! NOTE To have the arguments of the generate and agenerate coerced to the expected types, the validate_call decorator is used, which will automatically coerce the arguments to the expected types, and raise an error if the types are not correct. This is specially useful when providing a value for an argument of generate or agenerate from the CLI, since the CLI will always provide the arguments as strings.

Execute Steps and Tasks in a Pipeline

How to create a pipeline

[Pipeline][distilabel.pipeline.Pipeline] organise the Steps and Tasks in a sequence, where the output of one step is the input of the next one. A [Pipeline][distilabel.pipeline.Pipeline] should be created by making use of the context manager along with passing a name, and optionally a description.

from distilabel.pipeline import Pipeline

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

Connecting steps with the Step.connect method

Now, we can define the steps of our [Pipeline][distilabel.pipeline.Pipeline].

!!! NOTE Steps without predecessors (i.e. root steps), need to be [GeneratorStep][distilabel.steps.GeneratorStep]s such as [LoadDataFromDicts][distilabel.steps.LoadDataFromDicts] or [LoadDataFromHub][distilabel.steps.LoadDataFromHub]. After this, other steps can be defined.

from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")
    ...

!!! Tip "Easily load your datasets"

If you are already used to work with Hugging Face's `Dataset` via `load_dataset` or `pd.DataFrame`, you can create the `GeneratorStep` directly from the dataset (or dataframe), and create the step with the help of [`make_generator_step`][distilabel.steps.generators.utils.make_generator_step]:

=== "From a list of dicts"

    ```python
    from distilabel.pipeline import Pipeline
    from distilabel.steps import make_generator_step

    dataset = [{"instruction": "Tell me a joke."}]

    with Pipeline("pipe-name", description="My first pipe") as pipeline:
        loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
        ...
    ```

=== "From `datasets.Dataset`"

    ```python
    from datasets import load_dataset
    from distilabel.pipeline import Pipeline
    from distilabel.steps import make_generator_step

    dataset = load_dataset(
        "DIBT/10k_prompts_ranked",
        split="train"
    ).filter(
        lambda r: r["avg_rating"]>=4 and r["num_responses"]>=2
    ).select(range(500))

    with Pipeline("pipe-name", description="My first pipe") as pipeline:
        loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
        ...
    ```

=== "From `pd.DataFrame`"

    ```python
    import pandas as pd
    from distilabel.pipeline import Pipeline
    from distilabel.steps import make_generator_step

    dataset = pd.read_csv("path/to/dataset.csv")

    with Pipeline("pipe-name", description="My first pipe") as pipeline:
        loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
        ...
    ```

Next, we will use prompt column from the dataset obtained through LoadDataFromHub and use several LLMs to execute a TextGeneration task. We will also use the Task.connect() method to connect the steps, so the output of one step is the input of the next one.

!!! NOTE The order of the execution of the steps will be determined by the connections of the steps. In this case, the TextGeneration tasks will be executed after the LoadDataFromHub step.

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        task.connect(load_dataset)

    ...

For each row of the dataset, the TextGeneration task will generate a text based on the instruction column and the LLM model, and store the result (a single string) in a new column called generation. Because we need to have the responses in the same column, we will add GroupColumns to combine them all in the same column as a list of strings.

!!! NOTE In this case, the GroupColumns tasks will be executed after all TextGeneration steps.

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)

Connecting steps with the >> operator

Besides the Step.connect method: step1.connect(step2), there's an alternative way by making use of the >> operator. We can connect steps in a more readable way, and it's also possible to connect multiple steps at once.

=== "Step per step"

Each call to `step1.connect(step2)` has been exchanged by `step1 >> step2` within the loop.

```python
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset >> task >> combine_generations
```

=== "Multiple steps at once"

Each task is first appended to a list, and then all the calls to connections are done in a single call.

```python
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    load_dataset >> tasks >> combine_generations
```

Routing batches to specific downstream steps

In some pipelines, you may want to send batches from a single upstream step to specific downstream steps based on certain conditions. To achieve this, you can use a routing_batch_function. This function takes a list of downstream steps and returns a list of step names to which each batch should be routed.

Let's update the example above to route the batches loaded by the LoadDataFromHub step to just 2 of the TextGeneration tasks. First, we will create our custom [routing_batch_function][distilabel.pipeline.routing_batch_function.routing_batch_function], and then we will update the pipeline to use it:

import random
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
    return random.sample(steps, 2)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    load_dataset >> sample_two_steps >> tasks >> combine_generations

The routing_batch_function that we just built is a common one, so distilabel comes with a builtin function that can be used to achieve the same behavior:

from distilable.pipeline import sample_n_steps

sample_two_steps = sample_n_steps(2)

Running the pipeline

Pipeline.dry_run

Before running the Pipeline we can check if the pipeline is valid using the Pipeline.dry_run() method. It takes the same parameters as the run method which we will discuss in the following section, plus the batch_size we want the dry run to use (by default set to 1).

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.dry_run(parameters=..., batch_size=1)

Pipeline.run

After testing, we can now execute the full Pipeline using the Pipeline.run() method.

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )

But if we run the pipeline above, we will see that the run method will fail:

ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'], but only the inputs=['prompt', 'completion', 'meta'] are available, which means that the inputs=['instruction'] are missing or not available
when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are generating the required inputs.

This is because, before actually running the pipeline, we must ensure each step has the necessary input columns to be executed. In this case, the TextGeneration task requires the instruction column, but the LoadDataFromHub step generates the prompt column. To solve this, we can use the output_mappings or input_mapping arguments of individual Steps, to map columns from one step to another.

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"}
    )

    ...

If we execute the pipeline again, it will run successfully and we will have a Distiset with the outputs of all the leaf steps of the pipeline which we can push to the Hugging Face Hub.

if __name__ == "__main__":
    distiset = pipeline.run(...)
    distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")

Pipeline.run with a dataset

Note that in most cases if you don't need the extra flexibility the [GeneratorSteps][distilabel.steps.base.GeneratorStep] bring you, you can create a dataset as you would normally do and pass it to the [Pipeline.run][distilabel.pipeline.base.BasePipeline.run] method directly. Look at the highlighted lines to see the updated lines:

import random
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
    return random.sample(steps, 2)

dataset = load_dataset(
    "distilabel-internal-testing/instruction-dataset-mini",
    split="test"
)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    sample_two_steps >> tasks >> combine_generations


if __name__ == "__main__":
    distiset = pipeline.run(
        dataset=dataset,
        parameters=...
    )

Stopping the pipeline

In case you want to stop the pipeline while it's running, you can press ++ctrl+c++ or ++cmd+c++ depending on your OS (or send a SIGINT to the main process), and the outputs will be stored in the cache. Pressing an additional time will force the pipeline to stop its execution, but this can lead to losing the generated outputs for certain batches.

Cache

If for some reason, the pipeline execution stops (for example by pressing Ctrl+C), the state of the pipeline and the outputs will be stored in the cache, so we can resume the pipeline execution from the point where it was stopped.

If we want to force the pipeline to run again without can, then we can use the use_cache argument of the Pipeline.run() method:

if __name__ == "__main__":
    distiset = pipeline.run(parameters={...}, use_cache=False)

!!! NOTE For more information on caching, we refer the reader to the caching section.

Adjusting the batch size for each step

Memory issues can arise when processing large datasets or when using large models. To avoid this, we can use the input_batch_size argument of individual tasks. TextGeneration task will receive 5 dictionaries, while the LoadDataFromHub step will send 10 dictionaries per batch:

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
        batch_size=10
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(
            name=f"text_generation_with_{llm.model_name}",
            llm=llm,
            input_batch_size=5,
        )

    ...

Serializing the pipeline

Sharing a pipeline with others is very easy, as we can serialize the pipeline object using the save method. We can save the pipeline in different formats, such as yaml or json:

=== "yaml" python if __name__ == "__main__": pipeline.save("pipeline.yaml", format="yaml")

=== "json" python if __name__ == "__main__": pipeline.save("pipeline.json", format="json")

To load the pipeline, we can use the from_yaml or from_json methods:

=== "yaml" python pipeline = Pipeline.from_yaml("pipeline.yaml")

=== "json" python pipeline = Pipeline.from_json("pipeline.json")

Serializing the pipeline is very useful when we want to share the pipeline with others, or when we want to store the pipeline for future use. It can even be hosted online, so the pipeline can be executed directly using the CLI.

Fully working example

To sum up, here is the full code of the pipeline we have created in this section. Note that you will need to change the name of the Hugging Face repository where the resulting will be pushed, set OPENAI_API_KEY environment variable, set MISTRAL_API_KEY and have gcloud installed and configured:

Code ```python from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM from distilabel.pipeline import Pipeline from distilabel.steps import GroupColumns, LoadDataFromHub from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )
    distiset.push_to_hub(
        "distilabel-internal-testing/instruction-dataset-mini-with-generations"
    )
```

GeneratorStep

The [GeneratorStep][distilabel.steps.GeneratorStep] is a subclass of [Step][distilabel.steps.Step] that is intended to be used as the first step within a [Pipeline][distilabel.pipeline.Pipeline], because it doesn't require input and generates data that can be used by other steps. Alternatively, it can also be used as a standalone.

from typing import List
from typing_extensions import override

from distilabel.steps import GeneratorStep
from distilabel.steps.typing import GeneratorStepOutput

class MyGeneratorStep(GeneratorStep):
    instructions: List[str]

    @override
    def process(self, offset: int = 0) -> GeneratorStepOutput:
        if offset:
            self.instructions = self.instructions[offset:]

        while self.instructions:
            batch = [
                {
                    "instruction": instruction
                } for instruction in self.instructions[: self.batch_size]
            ]
            self.instructions = self.instructions[self.batch_size :]
            yield (
                batch,
                True if len(self.instructions) == 0 else False,
            )

    @property
    def outputs(self) -> List[str]:
        return ["instruction"]

Then we can use it as follows:

step = MyGeneratorStep(
    name="my-generator-step",
    instructions=["Tell me a joke.", "Tell me a story."],
    batch_size=1,
)
step.load()

next(step.process(offset=0))
# ([{'instruction': 'Tell me a joke.'}], False)
next(step.process(offset=1))
# ([{'instruction': 'Tell me a story.'}], True)

!!! NOTE The Step.load() always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.

Defining custom GeneratorSteps

We can define a custom generator step by creating a new subclass of the [GeneratorStep][distilabel.steps.GeneratorStep] and defining the following:

  • outputs: is a property that returns a list of strings with the names of the output fields.

  • process: is a method that yields output data and a boolean flag indicating whether that's the last batch to be generated.

!!! NOTE The default signature for the process method is process(self, offset: int = 0) -> GeneratorStepOutput. The argument offset should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one [Step][distilabel.steps.Step] at a time could be connected to the current one.

!!! WARNING For the custom [Step][distilabel.steps.Step] subclasses to work properly with distilabel and with the validation and serialization performed by default over each [Step][distilabel.steps.Step] in the [Pipeline][distilabel.pipeline.Pipeline], the type-hint for both [StepInput][distilabel.steps.StepInput] and [StepOutput][distilabel.steps.typing.StepOutput] should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING, otherwise, the validation and/or serialization will fail.

=== "Inherit from GeneratorStep"

We can inherit from the `GeneratorStep` class and define the `outputs`, and `process` methods as follows:


```python
from typing import List
from typing_extensions import override

from distilabel.steps import GeneratorStep
from distilabel.steps.typing import GeneratorStepOutput

class MyGeneratorStep(GeneratorStep):
    instructions: List[str]

    @override
    def process(self, offset: int = 0) -> GeneratorStepOutput:
        ...

    @property
    def outputs(self) -> List[str]:
        ...
```

=== "Using the @step decorator"

The `@step` decorator will take care of the boilerplate code, and will allow to define the `outputs`, and `process` methods in a more straightforward way. One downside is that it won't let you access the `self` attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom [`GeneratorStep`][distilabel.steps.GeneratorStep] subclass.

```python
from distilabel.steps import step
from distilabel.steps.typing import GeneratorStepOutput

@step(outputs=[...], step_type="generator")
def CustomGeneratorStep(offset: int = 0) -> GeneratorStepOutput:
    yield (
        ...,
        True if offset == 10 else False,
    )

step = CustomGeneratorStep(name="my-step")
```

GlobalStep

The [GlobalStep][distilabel.steps.GlobalStep] is a subclass of [Step][distilabel.steps.Step] that is used to define a step that requires the previous steps to be completed to run, since it will wait until all the input batches are received before running. This step is useful when you need to run a step that requires all the input data to be processed before running. Alternatively, it can also be used as a standalone.

Defining custom GlobalSteps

We can define a custom step by creating a new subclass of the [GlobalStep][distilabel.steps.GlobalStep] and defining the following:

  • inputs: is a property that returns a list of strings with the names of the required input fields.

  • outputs: is a property that returns a list of strings with the names of the output fields.

  • process: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it should yield the output data.

!!! NOTE The default signature for the process method is process(self, *inputs: StepInput) -> StepOutput. The argument inputs should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one [Step][distilabel.steps.Step] at a time could be connected to the current one.

!!! WARNING For the custom [GlobalStep][distilabel.steps.GlobalStep] subclasses to work properly with distilabel and with the validation and serialization performed by default over each [Step][distilabel.steps.Step] in the [Pipeline][distilabel.pipeline.Pipeline], the type-hint for both [StepInput][distilabel.steps.StepInput] and [StepOutput][distilabel.steps.typing.StepOutput] should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING, otherwise, the validation and/or serialization will fail.

=== "Inherit from GlobalStep"

We can inherit from the `GlobalStep` class and define the `inputs`, `outputs`, and `process` methods as follows:

```python
from distilabel.steps import GlobalStep, StepInput
from distilabel.steps.typing import StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> List[str]:
        ...

    @property
    def outputs(self) -> List[str]:
        ...

    def process(self, *inputs: StepInput) -> StepOutput:
        for input in inputs:
            for item in input:
                ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs
```

=== "Using the @step decorator"

The `@step` decorator will take care of the boilerplate code, and will allow to define the `inputs`, `outputs`, and `process` methods in a more straightforward way. One downside is that it won't let you access the `self` attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom [`GlobalStep`][distilabel.steps.GlobalStep] subclass.

```python
from distilabel.steps import StepInput, step
from distilabel.steps.typing import StepOutput

@step(inputs=[...], outputs=[...], step_type="global")
def CustomStep(inputs: StepInput) -> StepOutput:
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")
```

Define Steps for your Pipeline

Working with Steps

The [Step][distilabel.steps.Step] is intended to be used within the scope of a [Pipeline][distilabel.pipeline.Pipeline], which will orchestrate the different steps defined but can also be used standalone.

Assuming that we have a [Step][distilabel.steps.Step] already defined as it follows:

class MyStep(Step):
    @property
    def inputs(self) -> List[str]:
        return ["input_field"]

    @property
    def outputs(self) -> List[str]:
        return ["output_field"]

    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input["output_field"] = input["input_field"]
        yield inputs

Then we can use it as follows:

step = MyStep(name="my-step")
step.load()

next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]

!!! NOTE The Step.load() always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.

Arguments

  • input_mappings, is a dictionary that maps keys from the input dictionaries to the keys expected by the step. For example, if input_mappings={"instruction": "prompt"}, means that the input key prompt will be used as the key instruction for current step.

  • output_mappings, is a dictionary that can be used to map the outputs of the step to other names. For example, if output_mappings={"conversation": "prompt"}, means that output key conversation will be renamed to prompt for the next step.

  • input_batch_size (by default set to 50), is independent for every step and will determine how many input dictionaries will process at once.

Runtime parameters

Steps can also have RuntimeParameter, which are parameters that can only used after the pipeline initialisation when calling the Pipeline.run.

from distilabel.mixins.runtime_parameters import RuntimeParameter

class Step(...):
    input_batch_size: RuntimeParameter[PositiveInt] = Field(
        default=DEFAULT_INPUT_BATCH_SIZE,
        description="The number of rows that will contain the batches processed by the"
        " step.",
    )

Types of Steps

There are two special types of [Step][distilabel.steps.Step] in distilabel:

  • [GeneratorStep][distilabel.steps.GeneratorStep]: is a step that only generates data, and it doesn't need any input data from previous steps and normally is the first node in a [Pipeline][distilabel.pipeline.Pipeline]. More information: Components -> Step - GeneratorStep.

  • [GlobalStep][distilabel.steps.GlobalStep]: is a step with the standard interface i.e. receives inputs and generates outputs, but it processes all the data at once, and often is the final step in the [Pipeline][distilabel.pipeline.Pipeline]. The fact that a [GlobalStep][distilabel.steps.GlobalStep] requires the previous steps to finish before being able to start. More information: Components - Step - GlobalStep.

  • [Task][distilabel.steps.tasks.Task], is essentially the same as a default [Step][distilabel.steps.Step], but it relies on an [LLM][distilabel.llms.LLM] as an attribute, and the process method will be in charge of calling that LLM. More information: Components - Task.

Defining custom Steps

We can define a custom step by creating a new subclass of the [Step][distilabel.steps.Step] and defining the following:

  • inputs: is a property that returns a list of strings with the names of the required input fields.

  • outputs: is a property that returns a list of strings with the names of the output fields.

  • process: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it should yield the output data.

!!! NOTE The default signature for the process method is process(self, *inputs: StepInput) -> StepOutput. The argument inputs should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one [Step][distilabel.steps.Step] at a time could be connected to the current one.

!!! WARNING For the custom [Step][distilabel.steps.Step] subclasses to work properly with distilabel and with the validation and serialization performed by default over each [Step][distilabel.steps.Step] in the [Pipeline][distilabel.pipeline.Pipeline], the type-hint for both [StepInput][distilabel.steps.StepInput] and [StepOutput][distilabel.steps.typing.StepOutput] should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING, otherwise, the validation and/or serialization will fail.

=== "Inherit from Step"

We can inherit from the `Step` class and define the `inputs`, `outputs`, and `process` methods as follows:

```python
from distilabel.steps import Step, StepInput
from distilabel.steps.typing import StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> List[str]:
        ...

    @property
    def outputs(self) -> List[str]:
        ...

    def process(self, *inputs: StepInput) -> StepOutput:
        for input in inputs:
            ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs
```

=== "Using the @step decorator"

The `@step` decorator will take care of the boilerplate code, and will allow to define the `inputs`, `outputs`, and `process` methods in a more straightforward way. One downside is that it won't let you access the `self` attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom [`Step`][distilabel.steps.Step] subclass.


```python
from distilabel.steps import StepInput, step
from distilabel.steps.typing import StepOutput

@step(inputs=[...], outputs=[...])
def CustomStep(inputs: StepInput) -> StepOutput:
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")
```

Define Tasks that rely on LLMs

Working with Tasks

The [Task][distilabel.steps.tasks.Task] is a special kind of [Step][distilabel.steps.Step] that includes the [LLM][distilabel.llms.LLM] as a mandatory argument. As with a [Step][distilabel.steps.Step], it is normally used within a [Pipeline][distilabel.pipeline.Pipeline] but can also be used standalone.

For example, the most basic task is the [TextGeneration][distilabel.steps.tasks.TextGeneration] task, which generates text based on a given instruction.

from distilabel.llms import InferenceEndpointsLLM
from distilabel.steps.tasks import TextGeneration

task = TextGeneration(
    name="text-generation",
    llm=InferenceEndpointsLLM(
        model_id="meta-llama/Meta-Llama-3-70B-Instruct",
        tokenizer_id="meta-llama/Meta-Llama-3-70B-Instruct",
    ),
)
task.load()

next(task.process([{"instruction": "What's the capital of Spain?"}]))
# [
#     {
#         'instruction': "What's the capital of Spain?",
#         'generation': 'The capital of Spain is Madrid.',
#         'distilabel_metadata': {'raw_output_text-generation': 'The capital of Spain is Madrid.'},
#         'model_name': 'meta-llama/Meta-Llama-3-70B-Instruct'
#     }
# ]

!!! NOTE The Step.load() always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.

As shown above, the [TextGeneration][distilabel.steps.tasks.TextGeneration] task adds a generation based on the instruction. Additionally, it provides some metadata about the LLM call through distilabel_metadata. This can be disabled by setting the add_raw_output attribute to False when creating the task.

Specifying the number of generations and grouping generations

All the Tasks have a num_generations attribute that allows defining the number of generations that we want to have per input. We can update the example above to generate 3 completions per input:

from distilabel.llms import InferenceEndpointsLLM
from distilabel.steps.tasks import TextGeneration

task = TextGeneration(
    name="text-generation",
    llm=InferenceEndpointsLLM(
        model_id="meta-llama/Meta-Llama-3-70B-Instruct",
        tokenizer_id="meta-llama/Meta-Llama-3-70B-Instruct",
    ),
    num_generations=3,
)
task.load()

next(task.process([{"instruction": "What's the capital of Spain?"}]))
# [
#     {
#         'instruction': "What's the capital of Spain?",
#         'generation': 'The capital of Spain is Madrid.',
#         'distilabel_metadata': {'raw_output_text-generation': 'The capital of Spain is Madrid.'},
#         'model_name': 'meta-llama/Meta-Llama-3-70B-Instruct'
#     },
#     {
#         'instruction': "What's the capital of Spain?",
#         'generation': 'The capital of Spain is Madrid.',
#         'distilabel_metadata': {'raw_output_text-generation': 'The capital of Spain is Madrid.'},
#         'model_name': 'meta-llama/Meta-Llama-3-70B-Instruct'
#     },
#     {
#         'instruction': "What's the capital of Spain?",
#         'generation': 'The capital of Spain is Madrid.',
#         'distilabel_metadata': {'raw_output_text-generation': 'The capital of Spain is Madrid.'},
#         'model_name': 'meta-llama/Meta-Llama-3-70B-Instruct'
#     }
# ]

In addition, we might want to group the generations in a single output row as maybe one downstream step expects a single row with multiple generations. We can achieve this by setting the group_generations attribute to True:

from distilabel.llms import InferenceEndpointsLLM
from distilabel.steps.tasks import TextGeneration

task = TextGeneration(
    name="text-generation",
    llm=InferenceEndpointsLLM(
        model_id="meta-llama/Meta-Llama-3-70B-Instruct",
        tokenizer_id="meta-llama/Meta-Llama-3-70B-Instruct",
    ),
    num_generations=3,
    group_generations=True
)
task.load()

next(task.process([{"instruction": "What's the capital of Spain?"}]))
# [
#     {
#         'instruction': "What's the capital of Spain?",
#         'generation': ['The capital of Spain is Madrid.', 'The capital of Spain is Madrid.', 'The capital of Spain is Madrid.'],
#         'distilabel_metadata': [
#             {'raw_output_text-generation': 'The capital of Spain is Madrid.'},
#             {'raw_output_text-generation': 'The capital of Spain is Madrid.'},
#             {'raw_output_text-generation': 'The capital of Spain is Madrid.'}
#         ],
#         'model_name': 'meta-llama/Meta-Llama-3-70B-Instruct'
#     }
# ]

Defining custom Tasks

We can define a custom step by creating a new subclass of the [Task][distilabel.steps.tasks.Task] and defining the following:

  • inputs: is a property that returns a list of strings with the names of the required input fields.

  • format_input: is a method that receives a dictionary with the input data and returns a [ChatType][distilabel.steps.tasks.ChatType] following the chat-completion OpenAI message formatting.

  • outputs: is a property that returns a list of strings with the names of the output fields, this property should always include model_name as one of the outputs since that's automatically injected from the LLM.

  • format_output: is a method that receives the output from the [LLM][distilabel.llms.LLM] and optionally also the input data (which may be useful to build the output in some scenarios), and returns a dictionary with the output data formatted as needed i.e. with the values for the columns in outputs. Note that there's no need to include the model_name in the output.

from typing import Any, Dict, List, Union

from distilabel.steps.tasks.base import Task
from distilabel.steps.tasks.typing import ChatType


class MyCustomTask(Task):
    @property
    def inputs(self) -> List[str]:
        return ["input_field"]

    def format_input(self, input: Dict[str, Any]) -> ChatType:
        return [
            {
                "role": "user",
                "content": input["input_field"],
            },
        ]

    @property
    def outputs(self) -> List[str]:
        return ["output_field", "model_name"]

    def format_output(
        self, output: Union[str, None], input: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"output_field": output}

GeneratorTask

Working with GeneratorTasks

The [GeneratorTask][distilabel.steps.tasks.GeneratorTask] is a custom implementation of a [Task][distilabel.steps.tasks.Task] based on the [GeneratorStep][distilabel.steps.GeneratorStep]. As with a [Task][distilabel.steps.tasks.Task], it is normally used within a [Pipeline][distilabel.pipeline.Pipeline] but can also be used standalone.

!!! WARNING This task is still experimental and may be subject to changes in the future.

from typing import Any, Dict, List, Union
from typing_extensions import override

from distilabel.steps.tasks.base import GeneratorTask
from distilabel.steps.tasks.typing import ChatType
from distilabel.steps.typing import GeneratorOutput


class MyCustomTask(GeneratorTask):
    instruction: str

    @override
    def process(self, offset: int = 0) -> GeneratorOutput:
        output = self.llm.generate(
            inputs=[
                [
                    {"role": "user", "content": self.instruction},
                ],
            ],
        )
        output = {"model_name": self.llm.model_name}
        output.update(
            self.format_output(output=output, input=None)
        )
        yield output

    @property
    def outputs(self) -> List[str]:
        return ["output_field", "model_name"]

    def format_output(
        self, output: Union[str, None], input: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"output_field": output}

We can then use it as follows:

task = MyCustomTask(
    name="custom-generation",
    instruction="Tell me a joke.",
    llm=OpenAILLM(model="gpt-4"),
)
task.load()

next(task.process())
# [{'output_field": "Why did the scarecrow win an award? Because he was outstanding!", "model_name": "gpt-4"}]

!!! NOTE Most of the times you would need to override the default process method, as it's suited for the standard [Task][distilabel.steps.tasks.Task] and not for the [GeneratorTask][distilabel.steps.tasks.GeneratorTask]. But within the context of the process function you can freely use the llm to generate data in any way.

!!! NOTE The Step.load() always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.

Defining custom GeneratorTasks

We can define a custom generator task by creating a new subclass of the [GeneratorTask][distilabel.steps.tasks.Task] and defining the following:

  • process: is a method that generates the data based on the [LLM][distilabel.llms.LLM] and the instruction provided within the class instance, and returns a dictionary with the output data formatted as needed i.e. with the values for the columns in outputs. Note that the inputs argument is not allowed in this function since this is a [GeneratorTask][distilabel.steps.tasks.GeneratorTask]. The signature only expects the offset argument, which is used to keep track of the current iteration in the generator.

  • outputs: is a property that returns a list of strings with the names of the output fields, this property should always include model_name as one of the outputs since that's automatically injected from the LLM.

  • format_output: is a method that receives the output from the [LLM][distilabel.llms.LLM] and optionally also the input data (which may be useful to build the output in some scenarios), and returns a dictionary with the output data formatted as needed i.e. with the values for the columns in outputs. Note that there's no need to include the model_name in the output.

from typing import Any, Dict, List, Union

from distilabel.steps.tasks.base import GeneratorTask
from distilabel.steps.tasks.typing import ChatType


class MyCustomTask(GeneratorTask):
    @override
    def process(self, offset: int = 0) -> GeneratorOutput:
        output = self.llm.generate(
            inputs=[
                [{"role": "user", "content": "Tell me a joke."}],
            ],
        )
        output = {"model_name": self.llm.model_name}
        output.update(
            self.format_output(output=output, input=None)
        )
        yield output

    @property
    def outputs(self) -> List[str]:
        return ["output_field", "model_name"]

    def format_output(
        self, output: Union[str, None], input: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"output_field": output}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment