Accueil » Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !

Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !

Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !

Cet article était loin dans les cartons, mais l’article Alternative pipeline parametrization for Azure Synapse Analytics a motivé la priorisation de cet article. Merci @paul_eng pour le partage et la motivation d’écrire celui-ci !

Introduction

Que se soit lors de l’écriture du code d’un site web, d’un script ou de projet data, il est toujours important de rendre paramétrable l’exécution du code que l’on écrit. La majeur parti des langages, framework, IDE ou solution packagée utilisée en informatique proposent tous leur propre solution pour traiter ce sujet avec plus ou moins d’efficacité et d’élégance.

L’utilisation de paramètres permet de gérer certaines particularités dans le traitement général, mais reviens maintenant le problème de comment définir et initialiser ces paramètres sans avoir à modifier quoi que ce soit dans le code ou les configurations internes de chaque composant.

Je n’ai pas trouvé dans Synapse un moyen simple et intégré pour gérer les paramètres à l’extérieur de Synapse.

Dans notre équipe nous avons commencé à travailler avec des fichiers de configurations déposés dans le Lake, mais très vite leur manipulation s’est avérée délicate et leur multiplication n’a pas aidé à rendre le tout plus facile d’usage.

Il nous fallait donc trouver quelque chose de plus « User Friendly », que nous maitrisions, qui soit centralisé et extérieur à nos différents codes. Dans le monde de la « Data », nous parlons tous couramment le SQL (à minima SELECT/ INSERT / UPDATE) et donc la Base de données de paramètre nous est apparut comme une évidence. De plus grâce au cloud il est possible de monter une petite base SQL en quelques cliques pour quelques euros par mois !

Le sujet est maintenant amené, nous devons maintenant définir ce que nous voulons paramétrer et pourquoi !

Cas d’usage – Ingérer un sous-ensemble de données

Lors de l’alimentation d’un Datalake, la première étape est d’ingérer les données depuis des données sources. Ces actions répétées sur l’ensemble des sources de données sont répétitives et il existe donc de nombreuses façons d’automatiser ce processus. Cependant, dans de nombreux cas, cette automatisation rend homogène la façon d’ingérer les données :

  • Ingestion de TOUTES les tables d’une base de données
  • Ingestion de TOUS les fichiers d’un répertoire

Imaginons maintenant que dans notre base de donnée source il y ait de nombreuses tables temporaires, de travail ou simplement non utilisé dans les projets actuels. Si l’on peut se dire qu’il est plus « simple » d’ingérer l’intégralité et de trier ensuite, il existe certains cas où ce n’est pas forcément pertinent.

Imaginons que nous travaillions pour WidWorldImporters et que j’ai pour mission de présenter un tableau de bord avec uniquement un suivi des articles facturés.

Je pourrais effectivement ingérer l’intégralité de ma base WWI mais honnêtement seulement 3 tables m’intéressent alors pourquoi ingérer l’intégralité de ma base qui comporte de nombreuses tables non utilisées pour notre usage ?

Liste des tables de la base WWI

Le débat n’est pas pourquoi on voudrait faire cela, mais comment mettre en place un pipeline d’ingestion qui nous permette de paramétrer les données/tables que nous souhaitons à l’extérieur de nos pipelines Synapse.

Solution

Une des solutions (en tout cas celle retenue ici) est d’utiliser une table de paramétrage dans une base de données dédiée. Le pipeline aura donc pour processus d’aller dans un premier temps chercher et initialiser les paramètres dans la table dédiée et ensuite d’exécuter ces activités ou sous pipeline.

Principe général du pipeline

Les mains dedans !

Les ressources utilisées dans cette expérience sont:

  • Un workspace Synapse (la base)
  • Une base de donnée accessible depuis le workspace (l’objet de l’expérience)
  • Un Azure data lake gen 2 (ou tout autre « répertoire de dépôt »)
  • La base de données SQL WideWorldImporters accessible depuis le workspace synapse disponible sur le github des samples Microsoft : WideWorldImporters-Full.bak (github.com)

Vue d’ensemble

La solution consiste en seulement trois activités

  • Lookup – pour la lecture du paramétrage
  • ForEach – pour l’exécution « pour chaque ligne de paramétrage »
  • CopyData – pour l’exécution de l’ingestion
Vue d'ensemble du pipeline montrant les 3 activités utilisés

Ce pipeline est volontairement « simple » pour présenter uniquement l’utilisation de la base de données de paramétrage.

La base de données (surtout la table !)

La base de données est très simple et comprend dans notre cas uniquement trois colonnes:

  • StagingDirectoryName (le répertoire de destination de notre extraction dans le lake)
  • FileName (le nom du fichier qui sera écrit dans le répertoire précédent)
  • SourceQuery (la requête d’extraction de données)

Voici le DDL utilisé pour créer la table et les valeurs utilisées dans l’expérience :

USE [SynapseTraining]
GO

-- Assure we have a proper schema
IF NOT EXISTS ( SELECT 1 FROM sys.schemas WHERE name = N'param' )
    EXEC('CREATE SCHEMA [param]');
GO

-- Droppring and creating the IngestionParam table
IF EXISTS ( select 1 from INFORMATION_SCHEMA.TABLES
            where TABLE_SCHEMA = 'param' and TABLE_NAME = 'IngestionParam' )
    DROP TABLE [param].[IngestionParam]
GO

CREATE TABLE [param].[IngestionParam](
    [StagingDirectoryName] varchar(200),
    [FileName] varchar(100),
    [SourceQuery] varchar(500)
)
GO

-- Inserting sample values
INSERT INTO [param].[IngestionParam]([StagingDirectoryName], [FileName], [SourceQuery])
    VALUES ('staging/Invoices/','Invoices.parquet','SELECT * FROM Sales.Invoices')


/*
Inserting more sample value to demonstrate the ease of usability

INSERT INTO [param].[IngestionParam]([StagingDirectoryName], [FileName], [SourceQuery])
    VALUES ('staging/InvoiceLines/','InvoiceLines.parquet','SELECT * FROM Sales.InvoiceLines')
INSERT INTO [param].[IngestionParam]([StagingDirectoryName], [FileName], [SourceQuery])
    VALUES ('staging/StockItems/','StockItems.parquet','SELECT * FROM Warehouse.StockItems')
*/
GO

Une fois exécutée, notre base de paramétrage est prête :

Résultat de l’execution du script SQL

Le pipeline (et autres objets nécessaires)

Lookup

La première étape est donc de lire notre table de paramétrage et pour ce faire, on utilise une activité Lookup. Celle-ci utilise en dataset source la connexion à notre base de données de paramétrage et une requête très simple de lecture de notre table de paramétrage.

Configuration du lookup

La prévisualisation des données extraites du Lookup doit nous remonter la ligne de donnée provenant de notre base de paramétrage.

Prévisualisation des données du Lookup

ForEach

Maintenant, ce que nous souhaitons faire c’est « exécuter la suite » pour chaque ligne de paramétrage inséré dans notre base et donc renvoyé par le Lookup. C’est l’activité ForEach qui nous le permet en prenant en entrée chaque ligne renvoyée par le Lookup précédent. C’est dans l’onglet « Settings » que cela se passe en précisant les « Items ». Pour se faire, il faut ajouter du contenu dynamique et ajouter le « value array » de l’activité précédente.

Paramétrage des Items du ForEach

Suivant le besoin il est possible d’utiliser les autres paramètres du ForEach pour faire un traitement séquentiel ou en batch, mais ce n’est pas nécessaire pour notre expérience.

Configuration du ForEach

Copy data

Maintenant que notre pipeline est prêt pour exécuter une activité pour chaque ligne de paramétrage, nous sommes prêts pour enfin exécuter notre copie ! Pour se faire, on ajoute une activité à l’intérieur du ForEach.

Ajout d’une activité dans le ForEach

Pour notre usage, nous utilisons l’activité Copy data et nous avons deux choses à configurer :

  • Source – pour configurer « Ce qui va être extrait »
  • Sink – pour configurer ou et comment on stocke les données précédemment extraites.

A ce stade de notre développement, il faut se dire que grâce à notre Lookup et au ForEach, nous avons dans les paramètres de notre activité l’ensemble des valeurs (colones) d’une ligne de notre table de paramétrage dans la « variable » @item() ». Par exemple :

Exemple de valeurs disponible pour nos activités

Au niveau de la source, une fois le dataset source connectée à notre base WWI configurée, nous avons uniquement à configurer le fait d’utiliser une requête que l’on récupère de notre variable @item() auquel on demande la valeur « SourceQuery » de notre table de paramétrage. C’est ce qui sera envoyé sur notre serveur pour extraction de données.

Configuration de la source du Copy data

Du côté de la destination, nous devons maintenant préciser de manière dynamique à quel endroit déposer notre fichier. Pour ce faire, nous avons les deux valeurs de notre table « StagingDirectoryName » pour le répertoire et « FileName » pour le nom de fichier.

La subtilité à ce moment est d’avoir un dataset de destination paramétrable. Pour ce faire, dans un dataset de fichier, il suffit de configurer le « File path » avec des paramètres et ce dataset deviendra « générique » pour l’ensemble des fichiers que l’on veut écrire dans notre lake.

Création des paramètres dans un dataset
Utilisation des paramètres pour le File path d’un dataset

(Mes expériences utilisent principalement des fichiers parquets et si vous vous demandez pourquoi je vous invite à regarder cette petite vidéo : Synapse Espresso: CSV vs. Parquet? – YouTube)

Dans la destination du Copy data, il nous suffit maintenant d’attribuer les valeurs de notre « @item() » (provenant du ForEach) aux paramètres de notre dataset générique :

Configuration de la cible du Copy data

Première exécution

Notre pipeline est maintenant prêt. Une validation ou un publish est toujours de bon augure pour vérifier que nous n’avons rien oublié de critique.

A ce stade, nous avons donc une seule ligne dans notre table de paramétrage qui nous propose l’extraction de la table « Sales.Invoices » à aller déposer dans le répertoire « staging/Invoices/ » dans le fichier « Invoices.parquet ». Pour l’exemple, je démarrerais avec un datalake vide :

Illustration du datalake vide

L’execution du pipeline nous permet d’observer l’execution de l’ensemble de nos activités :

Illustration de l’execution du pipeline

Et nous pouvons vérifier que notre datalake contient maintenant nos données dans un fichier parquet :

Illustration du datalake alimenté

Tout ça … pour ça ?

Il est vrai que pour l’instant, nous nous sommes un peu compliqué la vie et que pour ingérer une table dans notre datalake on aurait pu utiliser directement un Copy data.

Mais rappelons-nous du cas d’usage ! nous avons une base de données avec « beaucoup » de table, et nous voulons pouvoir en ingérer que quelques-unes sans avoir rien à toucher dans Synapse.

C’est maintenant que notre base de données de paramétrage va rentrer en action pour notre plus grand plaisir. Il me suffit d’exécuter une requête SQL (ou un script) pour changer le paramétrage de mon pipeline et finalement ingérer deux tables supplémentaires. Je vais donc exécuter les 2 requêtes précédemment en commentaire dans le script de démarrage pour obtenir le résultat suivant :

Illustration des 3 lignes de paramétrage dans notre table

Sans aucune modification dans Synape, une nouvelle exécution du pipeline nous permet d’ingérer maintenant les trois tables présentes dans la table de paramétrage. On remarque que le Copy data est cette fois exécutée trois fois :

Illustration de l’execution du pipeline avec trois fois le Copy data

Nous avons maintenant dans notre datalake les trois fichiers répartis dans trois répertoires différents :

Datalake alimenté avec trois répertoires

Et quoi d’autre ?

Cette construction nous a permis de paramétrer notre pipeline et d’ajouter facilement de nouvelles tables à ingérer tous en étant capable de garder notre lake organisé. Nous n’avons à l’instant que 3 paramètres dans notre table, mais en jouant un peu avec, on est capable de travailler sur d’autres cas d’usages.

Envie d’investiguer notre extraction sans « casser » nos fichiers existants ni notre arborescence ? On pourrait lancer une extraction dans un répertoire spécifique ? Pour ce faire, un simple update de notre table de paramétrage est nécessaire :

UPDATE [param].[IngestionParam]
SET [StagingDirectoryName] = ‘staging/Investigation/’

Oui, un UPDATE sans WHERE c’est moche et ne doit probablement pas être fait en production mais on est là pour l’expérience !

Nous sommes maintenant prêts à investiguer :

Illustration du datalake avec les fichiers générés dans le répertoire « Investigation »

sans avoir cassé notre arborescence :

Illustration du datalake avec la sauvegarde de l’arborescence d’origine

Conclusion

Dans cet article nous avons vu comment paramétrer et agir sur nos pipelines depuis l’extérieur de Synapse. Cette méthode n’est pas standard et de ce fait, il est nécessaire de s’interroger consciencieusement pour savoir si elle répond à votre besoin sans rajouter de complexité et de coûts superflus.

Pour aller plus loin

L’expérience présentée ici a été définie pour présenter « comment » paramétrer un pipeline depuis une base de données. Cependant il est évidemment possible d’aller encore plus loin dans la paramétrisation de notre pipeline. Quelques exemples :

  • Pouvoir extraire des données depuis plusieurs bases différentes hébergées sur plusieurs serveurs différents
  • Paramétrer une extraction en « Delta » (et loguer en base les différentes extractions et leurs paramètres)
  • Définir quelle table doit être extraite à quelle fréquence
  • Activer/Désactiver l’extraction d’une table par un Flag


Une réponse à “Paramétrer les Pipeline Azure Synapse Analytics sans ouvrir Synapse !”

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *