Aller au contenu principal

User Guides

MLFlow simplifie l'enregistrement et la gestion des modèles de machine learning. Pour transférer un modèle vers MLFlow en Python, suivez ces étapes. Cet exemple vous guidera tout au long du processus de suivi d'une expérience, d'entraînement d'un modèle simple et de son enregistrement dans le registre de modèles MLFlow.

astuce

Si vous ne pouvez pas exécuter le code ci-dessous sur votre ordinateur portable, vous pouvez utiliser le notebook Jupyter fourni par la plateforme Kosmos.

Avant de commencer, assurez-vous que les packages MLFlow et Scikit-learn sont installés dans votre environnement Python. Vous pouvez les installer avec pip :

pip install mlflow sklearn
MLFlow basic sample
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier


# you need to set environment variables, it is not recommended to do them via python like this example
os.environ["MLFLOW_TRACKING_URI"] = "http://mlflow.kosmos-data:5000"

# s3 env vars to be able to push artifacts
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://minio.kosmos-s3"
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"

# for automatic system metrics logging use this, you need psutil for more metrics, and pynvml for GPU metrics
os.environ['MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING']= 'true'

# Load the Iris dataset
iris = load_iris()
X = iris.data
y = iris.target

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Start an MLFlow run
with mlflow.start_run():
# Create and train the model
model = RandomForestClassifier(n_estimators=100, max_depth=3)
model.fit(X_train, y_train)

# Log the model to MLFlow
mlflow.sklearn.log_model(model, "model")

# Optionally log parameters or metrics
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 3)
mlflow.log_metric("accuracy", model.score(X_test, y_test))
# Register the model in the MLFlow Model Registry
result = mlflow.register_model("runs:/<run_id>/model", "RandomForestIrisModel")

# Load the model from the registry
model_uri = "models:/RandomForestIrisModel/1" # Replace with the correct version number
model = mlflow.sklearn.load_model(model_uri)

# Use the loaded model for prediction
predictions = model.predict(X_test)

En suivant ces étapes, vous pouvez facilement suivre et gérer vos modèles de machine learning avec MLFlow. Les principales opérations incluent :

  • Journalisation des modèles avec mlflow.sklearn.log_model() (ou l'intégration MLFlow appropriée pour d'autres frameworks).
  • Suivi des paramètres et des métriques pour surveiller les performances des modèles.
  • Enregistrement des modèles pour le contrôle de version et une meilleure gestion des modèles dans le registre des modèles.

MLFlow vous aide à rationaliser et à automatiser la gestion des modèles de machine learning, facilitant ainsi le suivi des expériences, la reproduction des résultats et le partage des modèles entre les équipes.

Réferences

Offical documentation


Using API

Enfin, vous pouvez le faire via curl

curl -XPOST -u admin:password
-d '{"username": "username", "password": "password"}'
"https://<mlflow_external_url>/api/2.0/mlflow/users/create"

Add new permissions

De même, vous pouvez trouver le reste de la documentation de l'API pour ajouter ou modifier des autorisations singulières sur la documentation officielle de MLFlow.

Usage

Track and push to registry

Comme expliqué précédemment, cela permet de transmettre des artefacts et des métadonnées à MLFlow concernant une session d'entraînement spécifique. Les artefacts sont stockés dans S3. Les métadonnées, les paramètres et les informations générales sur les exécutions et les expériences sont stockés dans PostgreSQL.

Concepts

  • Run : Le suivi MLFlow s'articule autour du concept d'exécutions, qui sont des exécutions d'un code de science des données, par exemple une seule exécution de « python train.py ». Chaque exécution enregistre des métadonnées (informations diverses sur l'exécution, telles que les métriques, les paramètres, les heures de début et de fin) et des artefacts (fichiers de sortie de l'exécution, tels que les pondérations du modèle, les images, etc.).

  • Experiment : Cela regroupe les exécutions d'une tâche spécifique. Vous pouvez créer une expérience via la CLI, l'API ou l'interface utilisateur. L'API et l'interface utilisateur MLFlow vous permettent également de créer et de rechercher des expériences.

Logging a Run

Afin d'enregistrer une exécution MLFlow, voici un exemple Python simple :

mlflow_train.py
import os
import mlflow

# you need to set environment variables, it is not recommended to do them via python like this example

os.environ["MLFLOW_TRACKING_URI"] = "http://mlflow.kosmos-data:5000"

# s3 env vars to be able to push artifacts
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://minio.kosmos-s3"
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"

# for automatic system metrics logging use this, you need psutil for more metrics, and pynvml for GPU metrics
os.environ['MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING']= 'true'

# Creates the experiment if it doesn't exist
mlflow.set_experiment("Experiment Name")

with mlflow.start_run() as run:
# you can let mlflow decide which metrics to compute and to push including parameters and models
# this is supported only for popular libraries such as Scikit-learn, xgboost, pytorch, keras or spark
# you have to set it before starting your training session though.
mlflow.autolog()

# you can also combine autologging with manual logging

# add tags

tags = {
"mlflow.note.content": "This will change the description.\n And it *supports* _markdown_\n",
"framework": "the one used",
"base_model": "huggingface/author/name"
}

mlflow.set_tags(tags)
# you can store a single value parameter
mlflow.log_param("lr", 0.01)

# insert your ml code which normally should include logging parameters at multiple steps, not necessary every one.
model = ...

# you can also push lists to display graphs on the UI, x axis will be steps or time
mlflow.log_param("val_loss", [0.0001, 0.0002, 0.0003,0.0004])

# you can also push to the same key which creates a list
mlflow.log_param("train_loss", 0.0001)
mlflow.log_param("train_loss", 0.0003)
mlflow.log_param("train_loss", 0.0004)

# log model as an artifact to this run,
# make sure to use the right backends in this example we are using pytorch
# you can have a lot of other backends such as keras sklearn tensorflow spacy spark pyfunc etc
mlflow.pytorch.log_model(
model,
"pytorch_model",
model_signature, # you have to prepare a model signature for this, but it isn't required.
)

# push the model to the registry
registered_model = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name="pytorch_model"
)
# add tags to the registered model
client = mlflow.tracking.MlflowClient()
for k,v in tags.items():
client.set_model_version_tag(registered_model.name, registered_model.version, k, v)


print(f"Registered model {registered_model.name}:{registered_model.version}")
print(f"Finished run ID:{run.info.run_id}")

Pull model from registry

Dans cet exemple, nous allons extraire un modèle PyTorch que nous avons poussé précédemment et exécuter l'inférence.

mlflow_pull.py
import mlflow

model_name = "pytorch_model"
model_version = 1
model = mlflow.pytorch.load_model(f"models:/{model_name}/{model_version}")

text = """
Cristiano Ronaldo dos Santos Aveiro (Portuguese pronunciation: [kɾiʃˈtjɐnu ʁɔˈnaldu]; born 5 February 1985) is a Portuguese professional footballer who plays as a forward for and captains both Saudi Pro League club Al Nassr and the Portugal national team. Widely regarded as one of the greatest players of all time, Ronaldo has won five Ballon d'Or awards,[note 3] a record three UEFA Men's Player of the Year Awards, and four European Golden Shoes, the most by a European player. He has won 33 trophies in his career, including seven league titles, five UEFA Champions Leagues, the UEFA European Championship and the UEFA Nations League. Ronaldo holds the records for most appearances (183), goals (140) and assists (42) in the Champions League, goals in the European Championship (14), international goals (128) and international appearances (205). He is one of the few players to have made over 1,200 professional career appearances, the most by an outfield player, and has scored over 850 official senior career goals for club and country, making him the top goalscorer of all time.
"""

labels = ["person", "award", "date", "competitions", "teams"]

entities = model.predict_entities(text, labels)

for entity in entities:
print(f"{entity['text']} => {entity['label']}")

Model Validation

Validating a Masked Language Model with Label Studio Data

La validation du modèle consiste à vérifier les performances de votre modèle entraîné sur des données inédites.

Il s'agit d'une étape cruciale avant le déploiement de votre modèle en production, car elle garantit son bon fonctionnement et la conformité aux attentes en matière de performances.

Dans ce guide, nous allons valider un Masked Language Model (MLM) à l'aide de données exportées depuis Label Studio.


What is Model Validation?

La validation du modèle répond à des questions telles que :

Mon modèle effectue-t-il des prédictions correctes sur de nouvelles données ?La précision est-elle acceptable pour le déploiement ?Où mon modèle commet-il des erreurs ?

La validation utilise un jeu de données distinct (non utilisé lors de l'entraînement) pour mesurer les performances du modèle de manière réaliste.


Prerequisites

Avant de démarrer, vous avez besoin :

  • D'un MLM entrainé et sauvegardé dans MLFlow ( model et tokenizer)
  • Un jeu de donnée annoté manuellement stocké dans label Studio

Preparing your Label Studio export

Assurez-vous que votre projet de studio d'étiquettes dispose d'une sortie S3 configurée en suivant la procédure

Run the script

Le script suivant suit différentes étapes, chaque étape peut être une cellule de notebook Jupyter si vous expérimentez.

Import libraries and define constants
import json
import mlflow
from transformers import AutoTokenizer, pipeline

# =====================================
# 1) Paths and setup
# =====================================

# MLFlow model name and version
MODEL_NAME = "my_mlm_model" # <-- change to your registered model name
MODEL_VERSION = "1" # <-- change to the version you want to validate

# Path to Label Studio export
VALIDATION_EXPORT = "validation_exported.json"

# Batch size for prediction
BATCH_SIZE = 32
Charger le modèle et le tokenizer depuis MLFlow
# Load model and tokenizer from MLFlow registry
model_uri = f"models:/{MODEL_NAME}/{MODEL_VERSION}"
model_pipeline = mlflow.transformers.load_model(model_uri)

# The MLFlow transformers flavor returns a pipeline dict:
# model_pipeline["model"], model_pipeline["tokenizer"]
model = model_pipeline["model"]
tokenizer = model_pipeline["tokenizer"]

# Create fill-mask pipeline
fill_mask = pipeline("fill-mask", model=model, tokenizer=tokenizer, device=0) # device=0 for GPU
Charger les annotations depuis KEDS
# =====================================
# Download data from EDS using KEDS
# =====================================
from keds.s3 import connection
import os
LABEL_STUDIO_SRC="labelstudio/annotations" # bucket and relative path inside the bucket

def download_from_keds(src, dest=''):
"""
Download files from an S3 path to a local directory.

:param src: str, The S3 path (bucket and prefix) to source files from (e.g., 'mybucket/models').
:param dest: str, The local folder to download files to (default is 'model').
"""

# Extract bucket name and prefix from src
bucket_name = src.split('/')[0]
prefix = src.split('/', 1)[1]

client = connection(bucket_name)
# Ensure the destination folder exists
os.makedirs(dest, exist_ok=True)

if len(dest) == 0:
dest = prefix
# List the objects in the specified S3 path
objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)

# Download each file
for obj in objects:
# Skip folders
if obj.object_name.endswith('/'):
continue
# Print the object name
print(f"Downloading {obj.object_name}")
# Skip already downloaded files
local_file_path = os.path.join(dest, os.path.basename(obj.object_name))
if not os.path.exists(local_file_path):
client.fget_object(bucket_name, obj.object_name, local_file_path)
print(f"Downloaded {obj.object_name} to {local_file_path}")
else:
print(f"File {obj.object_name} already exists, skipping.")

download_from_keds(LABEL_STUDIO_SRC, "annotations")
Charger les données de validation depuis les exports Label Studio
with open(VALIDATION_EXPORT) as f:
validation_tasks = json.load(f)

validation_samples = []
masked_texts = []

for task in validation_tasks:
text = task["data"]["text"]
expected = task["annotations"][0]["result"][0]["value"]["text"][0]
masked_text = text.replace("[MASK]", tokenizer.mask_token)

validation_samples.append({"text": text, "expected": expected})
masked_texts.append(masked_text)

print(f"Loaded {len(validation_samples)} validation samples.")

Jouer les prédictions en batch
predictions_all = []

for i in range(0, len(masked_texts), BATCH_SIZE):
batch = masked_texts[i:i+BATCH_SIZE]
batch_predictions = fill_mask(batch)
predictions_all.extend(batch_predictions)

print(f"Generated predictions for {len(predictions_all)} samples.")

Comparer les prédictions aux réponses attendues
total = len(validation_samples)
correct = 0
mismatches = []

for idx, sample in enumerate(validation_samples):
predictions = predictions_all[idx]
top_pred = predictions[0]["token_str"].strip()
expected = sample["expected"].strip()

if top_pred.lower() == expected.lower():
correct += 1
else:
mismatches.append({
"input": sample["text"],
"expected": expected,
"predicted": top_pred
})

accuracy = correct / total * 100 if total > 0 else 0
print(f"Validation accuracy: {accuracy:.2f}% ({correct}/{total} correct)")

Tracer les prédiction incorrectes
if mismatches:
print("--- Mismatched examples ---")
for mm in mismatches:
print(f"Input: {mm['input']}")
print(f"Expected: {mm['expected']}")
print(f"Predicted: {mm['predicted']}")
print()
else:
print("All predictions matched the expected outputs!")

Examine the Validation

Le script donnera une sortie similaire à celle-ci :

Validation accuracy: 92.50% (185/200 correct)

--- Mismatched examples ---
Input: The sun [MASK] in the west.
Expected: sets
Predicted: rises
  • Faible précision ? Vérifiez la qualité de vos données d'entraînement, modifiez les paramètres d'entraînement et augmentez les périodes d'entraînement.
  • Erreurs récurrentes ? Ajoutez des exemples similaires à votre ensemble d'entraînement.
  • Envie de vitesse ? Optimisez vos inférences en exploitant votre GPU et les bibliothèques Python adéquates.

Autre exemple

Voici un autre exemple qui part de la récupération des données et des annotations pour entrainer un modèle BERT, le tester et le charger dans une instance de mlflow

Cet exemple nécessite d'installer les libraries python mlflow==3.2.0 datasets==4.0.0 tiktoken==0.11.0 accelerate==1.10.1 transformers==4.55.2 et keds dans votre environnement Jupyter

import os
import json
import time
from datetime import timedelta

import tiktoken
import mlflow
from huggingface_hub import ModelCardData, ModelCard
from transformers import (
AutoTokenizer,
AutoModelForMaskedLM,
Trainer,
TrainingArguments,
DataCollatorForLanguageModeling,
TrainerCallback,
pipeline
)
from datasets import Dataset

# =====================================
# Récupération des artefacts
# =====================================
from keds.s3 import connection
import os
# fichiers du modèle
MODEL_SRC="ivvqmlopsmodelbas/model"
# sortie Label Studio
LABEL_STUDIO_SRC="ivvqmlopsmodelbas/label_output"

def download_s3_files(src, dest=''):
"""
Télécharge des fichiers depuis un chemin S3 vers un répertoire local.

:param src: str, Chemin S3 (bucket et préfixe) d’où récupérer les fichiers (ex. 'mybucket/models').
:param dest: str, Dossier local de destination (par défaut 'model').
"""

# S’assure que le dossier de destination existe
os.makedirs(dest, exist_ok=True)

# Extrait le nom du bucket et le préfixe depuis src
bucket_name = src.split('/')[0]
client = connection(bucket_name)
prefix = src.split('/', 1)[1]
if len(dest) == 0:
dest = prefix
# Liste les objets dans le chemin S3 spécifié
objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)

# Télécharge chaque fichier
for obj in objects:
# Ignore les dossiers
if obj.object_name.endswith('/'):
continue
# Affiche le nom de l’objet
print(f"Télécharger {obj.object_name}")
# Ignore les fichiers déjà téléchargés
local_file_path = os.path.join(dest, os.path.basename(obj.object_name))
if not os.path.exists(local_file_path):
client.fget_object(bucket_name, obj.object_name, local_file_path)
print(f"Télécharger {obj.object_name} vers {local_file_path}")
else:
print(f"Fichier {obj.object_name} existe déjà, skipping.")

download_s3_files(MODEL_SRC, "model")
download_s3_files(LABEL_STUDIO_SRC, "label_output")


# =====================================
# 1) Chargement et prétraitement des données
# =====================================
# instance MLflow
INSTANCE_NAMESPACE="ivvqmlflow"
MLFLOW_URL=f"http://mlflow.{INSTANCE_NAMESPACE}:5000"
MASK_TOKEN = "[MASK]"
MODEL_PATH = "model"
LABEL_STUDIO_EXPORT = "label_output" # Répertoire contenant plusieurs fichiers JSON
MAX_STEPS = 5
# Chargement du tokenizer
tokenizer = AutoTokenizer.from_pretrained(
MODEL_PATH,
use_fast=False,
local_files_only=True
)

def load_and_prepare_data(tokenizer):
"""Charge l’export Label Studio (fichier ou dossier) et le convertit en Dataset HF pour le MLM."""
data_rows = []

if os.path.isdir(LABEL_STUDIO_EXPORT):
# --- Cas 1 : Dossier de fichiers JSON (nouveau format) ---
for fname in os.listdir(LABEL_STUDIO_EXPORT):
fpath = os.path.join(LABEL_STUDIO_EXPORT, fname)
with open(fpath, "r") as f:
task = json.load(f)
text = task["task"]["data"]["text"]
masked_text = text.replace("[MASK]", tokenizer.mask_token)
data_rows.append({"text": masked_text})

else:
# --- Cas 2 : Fichier JSON unique contenant une liste de tâches (ancien format) ---
with open(LABEL_STUDIO_EXPORT, "r") as f:
tasks = json.load(f)
for task in tasks:
text = task["data"]["text"]
masked_text = text.replace("[MASK]", tokenizer.mask_token)
data_rows.append({"text": masked_text})

# Conversion en Dataset HF
dataset = Dataset.from_list(data_rows)

# Tokenisation
def tokenize_function(examples):
return tokenizer(
examples["text"],
padding="max_length",
truncation=True,
max_length=128,
)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.remove_columns(["text"])
return tokenized_dataset.train_test_split(test_size=0.2, seed=42)

tokenized_datasets = load_and_prepare_data(tokenizer)

# =====================================
# 2) Modèle et configuration de l’entraînement
# =====================================
# Chargement du modèle
model = AutoModelForMaskedLM.from_pretrained(
MODEL_PATH,
local_files_only=True
)

# Data collator pour le MLM
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=True,
mlm_probability=0.15
)

# Arguments d’entraînement
training_args = TrainingArguments(
output_dir="./target_model",
eval_strategy="steps",
learning_rate=5e-5,
eval_steps=1,
save_strategy="no",
logging_strategy="steps",
logging_steps=100,
per_device_train_batch_size=32,
num_train_epochs=20,
max_steps=MAX_STEPS,
weight_decay=0.01,
)

# Évite les conditions de course liées aux tokenizers
os.environ['TOKENIZERS_PARALLELISM'] = 'false'

# =====================================
# 3) Callback MLflow pour journaliser les métriques à chaque étape
# =====================================

class MLflowLoggingCallback(TrainerCallback):
"""Journalise les métriques d’entraînement et d’évaluation dans MLflow à chaque étape."""
def on_log(self, args, state, control, logs=None, **kwargs):
logs = logs or {}
step = state.global_step
if 'eval_loss' in logs:
mlflow.log_metric("loss", logs['eval_loss'], step=step)
elif 'train_loss' in logs:
mlflow.log_metric("loss", logs['train_loss'], step=step)
if 'eval_steps_per_second' in logs:
mlflow.log_metric("steps_per_second", logs['eval_steps_per_second'], step=step)
elif 'train_steps_per_second' in logs:
mlflow.log_metric("steps_per_second", logs['train_steps_per_second'], step=step)

# =====================================
# 4) Entraînement avec suivi MLflow
# =====================================

# Configuration de l’expérience MLflow
mlflow.set_tracking_uri(MLFLOW_URL)
mlflow.set_experiment("test1")

model_description = "Modèle ModernBERT de type Fill-Mask affiné avec des annotations Label Studio"

# Création de la model card
card_data = ModelCardData(
language="en",
license="apache-2.0",
library_name="transformers",
base_model="bert",
model_name="bert_fill_mask",
description=model_description
)

card = ModelCard.from_template(
card_data,
template_str="""
---
language: {{ language }}
license: {{ license }}
library_name: {{ library_name }}
base_model: {{ base_model }}
model_name: {{ model_name }}
---

# {{ model_name }}

{{ description }}

## Intended Use
Fill-mask inference

## Training Data
Label Studio annotations

## Limitations
Performance depends on annotation quality.
"""
)


with mlflow.start_run() as run:
run_id = mlflow.active_run().info.run_id

# Journalisation des hyperparamètres
mlflow.log_params({
'batch_size': training_args.per_device_train_batch_size,
'num_epochs': training_args.num_train_epochs,
'eval_steps': training_args.eval_steps
})

trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["test"],
processing_class=tokenizer,
data_collator=data_collator,
callbacks=[MLflowLoggingCallback]
)

# Entraînement et mesure du temps
start = time.perf_counter()
trainer.train()
end = time.perf_counter()
duration = timedelta(seconds=int(end - start))
mlflow.log_param("duration", str(duration))

# Sauvegarde du modèle final et du tokenizer
model.save_pretrained("target_model")
tokenizer.save_pretrained("target_model")

input_example = "The capital of France is [MASK]."

logged_model = mlflow.transformers.log_model(
transformers_model="target_model",
tokenizer=tokenizer,
name="target_model",
task="fill-mask",
model_card=card,
input_example=[input_example],
pip_requirements=[],
extra_pip_requirements=None,
tags={
"duration": duration,
"task": "token-classification",
**{k:v for k,v in vars(card.data).items() if v is not None} # récupération des tags depuis la model card
},
)

# =====================================
# 5) Exemple d’inférence
# =====================================

fill_mask = pipeline(
"fill-mask",
model=model,
tokenizer=tokenizer
)

test_examples = [
"The capital of France is [MASK].",
"Paris is the [MASK] of France.",
"The bicycle is [MASK]-powered.",
"The car is [MASK]-powered.",
"The violin is an [MASK].",
"Mars is closest to [MASK]."
]

print("\n--- Test Inference ---\n")
for example in test_examples:
results = fill_mask(example.replace("[MASK]", tokenizer.mask_token))
top_prediction = results[0]["token_str"]
print(f"Entrée: {example}")
print(f"Prédit: {top_prediction}\n")

# =====================================
# 6) Publication du nouveau modèle dans le registre
# =====================================

# Publication dans le registre
registered_model_name = "test_fillmask"
client = mlflow.MlflowClient()
if not client.search_registered_models(f"name = '{registered_model_name}'"):
client.create_registered_model(registered_model_name)

registered_model = client.create_model_version(
name=registered_model_name,
source=f"{logged_model.artifact_path}/model",
description=model_description,
run_id=logged_model.run_id,
tags=logged_model.tags
)

print(f"Modèle publié dans le registre: mlflow-{INSTANCE_NAMESPACE}:/{registered_model.name}/{registered_model.version}")