Back to Docs
# Modern Data, AI, & ML <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> <p> <i class="fa-solid fa-bullhorn primary-text"></i> Adam Lieberman </p> <img class="power-on" src="/images/presentations/Power_On_RGB.png"/> --- <!-- Speaker Details --> <div class="container"> <div class="row speaker-details"> <div class="col-5"> <div class="shape-container"> <div class="shape"> <div id="speaker" class="rounded-circle speaker-image neon" style="background: url('/images/presentations/data_ml/speaker_adam_lieberman.png') no-repeat;"></div> </div> </div> </div> <div class="col"> <h2>Adam Lieberman</h2> <h4>Distinguished Engineer, Head of AI & ML</h4> <ul class="fancy-list"> <li>4.5 Years @ Finastra</li> <li>Helping strategize, architect, and develop modern machine learning practices at Finastra</li> <li>Leading the innovation lab to break barriers through cutting edge technology</li> </ul> <p>Python, Scala, SQL, Julia, R <u>+ always looking to grow</u></p> </div> </div> </div> --- <!-- Agenda --> # Agenda <ul class="fancy-list"> <li>Introduction to Databricks</li> <li>Introduction to Machine Learning</li> <li>Introduction to Data Engineering</li> <li>HMDA Data</li> <li><b><i>Skills Lab: Data, AI, & ML</i></b> <ul> <li>What we are Building</li> <li>Project Structure & Setup</li> <li>Delta Live Table Data Pipeline</li> <li>Feature Generation & ML Feature Store</li> <li>Hyperparameter Tuning, Model Training Pipeline, & MLFlow</li> <li>Model Selection & Model Registry </li> </ul> </li> </ul> Note: In this session we will cover 5 core topics. First, we will provide an overview of our databricks environment along with it's tools and features that we use for production grade data engineeing and machine learning model development. Next, we will have a quick primer on data engineering and machine learning, covering some core principles that will be beneficial for our session. We will then discuss a dataset from the Consumer Financial protection Bureau called the Home Mortgage Disclosure Act (HMDA) dataset. We will then end with a walkthrough of our skills lab where we will develop an end-to-end modernized data engineering and machine learning pipeline to deploy a production grade machine learning model. Let's jump right in. --- # Introduction to Databricks <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> ___ # What is Databricks? <!-- .slide: class="multi-columns wide" --> <div class="container"> <p>The databricks platform provides a unified set of tools or building, deploying, sharing, and maintaining enterprise grade data-solutions at scale.<br/> <div class="row"> <div class="col-5 fragment" data-fragment-index="1"> <ul class="fancy-list larger"> <h4>Uses:</h4> <li>Data Engineering</li> <li>Machine Learning Model Development</li> <li>Data Driven Analysis</li> <li>Process Automation</li> <li>Dashboarding</li> <li>+ More</li> </ul> </div> <div class="col fragment text-align-left" data-fragment-index="2"> At Finastra, our data science environment is built on top of databricks in tandem with Microsoft Azure to provide our engineers a seamless data experience. <br/> <br/> <div class="row"> <center><div class="col col-12"> <img src="/images/presentations/data_ml/databricks_home.png"/> </div></center> </div> </div> </div> </div> Note: The databricks platform provides a unified set of tools or building, deploying, sharing, and maintaining enterprise grade data-solutions at scale. From data engineering, to data-driven analysis, to dashboarding, to machine learning model development, databricks provides a suite of environments integrated tightly into many diffferent cloud providers, bundled with security and governance, to help bring solutions to life. At Finastra, our data science environment is built on top of databricks in tandem with Microsoft Azure to provide our developers a seamless data experience for end. Databricks has many tools and features that make it a great platform and in the coming minutes, we will explore a small sliver of tools we will leverage for our data engineering and machine learning session. ___ # Why Databricks <!-- .slide: class="multi-columns wide" --> <div class="container"> <p>Databricks is a holistic data environment</p><br/> <div class="row"> <div class="col fragment" data-fragment-index="1"> <h2>Analytics Engine</h2> <ul class="fancy-list"> <li>Production Grade Machine Learning Model Serving</li> <li>Data Quality Workflows</li> <li>Data Visualization & Exploration</li> <li>Scalable Clusters</li> <li>Wide Variety of Scripting Lanuages</li> </ul> </div> <div class="col fragment" data-fragment-index="2"> <h2>Lakehouse Architecture</h2> <ul class="fancy-list"> <li>Governance, Security, & Compliance</li> <li>Data Discovery & Collaboration</li> <li>Best of both data lakes and data warehousing</li> </ul> </div> <div class="col fragment" data-fragment-index="3"> <h2>Continuous Development</h2> <ul class="fancy-list"> <li>Robust Roadmaps</li> <li>Private Preview of Upcoming Features</li> <li>Innovation Focus</li> </ul> </div> </div> </div> Note: So, why Databricks? Databricks is a holistic data environment. Firstly, it's a fantastic analytics engine. With scalable clusters and virtual machines we have the ability to scale the compute we need for the tasks at hand such as production grade ML, data quality workflows, data visualization and explorattion, and more across a wide variety of languages and frameworks such as spark scala, python, R, SQL, and more. Databricks also supports the lakehouse architecture which handles data cataloging, discovery, collaboration, governance, security, and compliance bridging tthe best of both data lakes and data warehousing. Finally, we love databricks for it's continuous innovation and development for data engineering and machine learning features that improve the developer experience when constructing our data driven workflows and pipelines. ___ # Workspace Options <div class="container"> <p>Databricks provides 3 main workspaces:</p> <div class="row"> <div class="col"> <ul class="fancy-list"> <li>Data Science & Engineering</li> <li>Machine Learning</li> <li>SQL</li> </ul> </div> <div class="col"> <h5>Machine Learning Workspace:</h5> <center><div class="col col-12"> <img src="/images/presentations/data_ml/databricks_ml_workspace.png"/> </div></center> </div> </div> </div> Note: Databricks provides 3 main workspace options all with their own special tools. The data science and engineering workspace is the classic Databricks environment for collaboration among data scientists, data engineers, and data analysts. It also forms the backbone of the Databricks Machine Learning environment. Databricks Machine Learning is an integrated end-to-end machine learning environment incorporating managed services for experiment tracking, model training, feature development and management, and feature and model serving. Databricks SQL describes the enterprise data warehouse built into the Databricks Lakehouse Platform. The core offering of Databricks SQL is optimized compute called a SQL warehouse. Databricks provides a colllection of UI tools known as the SQL persona to compose and execute SQL queries, visualizations, and dashboards. SQL warehouses provide general compute for SQL queries executed from many environments, including third party BI and visualization tools. Databricks SQL also provides a robust API. In our lab, we will leverage most heavily the databricks Machine Learning workspace to register a production grade machine learning model. ___ # Notebooks <div class="container"> <p>Notebooks are the primary tool for creating data science and machine learning workflows in collaboration with colleauges.<br/><br> They provide real-time coauthoring in multiple languages, automatic versioning, and built-in data visualizations. <center> <div class="col col-6"> <img src="/images/presentations/data_ml/databricks_notebook.png"/></center> </div> </div> Note: Notebooks are a common tool in data science and machine learning for developing code and presenting results. In Azure Databricks, notebooks are the primary tool for creating data science and machine learning workflows and collaborating with colleagues. Databricks notebooks provide real-time coauthoring in multiple languages, automatic versioning, and built-in data visualizations. With Azure Databricks notebooks, you can develop code in multiple langiages such as Python, SQL, R, and more. You can customize your environment with libraries of your choice. Create scheduled jobs and runs across multiple noteboos, export notebooks into html or ipython notebooks, build and share dashboards, and more. ___ # Repos <div class="container"> <div class="row"> <div class="col text-align-left"> Databricks repos is a visual git client supporting operations such as cloning repos, committing, pushing, pulling, managing branches, and visual diff comparisons. <div class="col col-10"> <center><img src="/images/presentations/data_ml/databricks_repos.png"/></center> </div> </div> <div class="col text-align-left"> Repos allows you to develop code in notebooks while following engineering code development best practies using git for version control, collaboration, and CI/CD. <div class="col col-10"> <center><img src="/images/presentations/data_ml/databricks_repos2.png"/></center> </div> </div> </div> </div> Note: Databricks Repos is a visual Git client in Databricks. It supports common Git operations such a cloning a repository, committing and pushing, pulling, branch management, and visual comparison of diffs when committing. Within Repos you can develop code in notebooks or other files and follow data science and engineering code development best practices using Git for version control, collaboration, and CI/CD. You will find all the code for the tech con skills lab in a repository. ___ # Delta Tables <div class="container"> <div class="row"> <p>A Databricks Delta Table records version changes or modifications in a feature class of table in Delta Lake. Unlike traditional tables that store data in a row and column format, the Databricks Delta Table facilitates ACID transactions and time travel features to store metadata information for quicker Data Ingestion.</p> </div> </div> Note: By default, all the tables that are created in Databricks are Delta tables. Delta Lake uses transaction logging to store the history of changes on your data and with this feature, you can access the historical version of data that is changing over time and helps to go back in time travel on the delta table and see the previous snapshot of the data and also helps in auditing, logging, and data tracking. ___ # Feature Store <div class="container"> <p>A centralized repository that enables data scientists to find and share features and also ensures that the same code used to compute the feature values is used for model training and inference.</p> <div class="container"> <center><img src="/images/presentations/data_ml/feature_store.png" width="700" /></center> <div class="row"> <div class="col fragment" data-fragment-index="1"> <h4>Reusable Assets</h4> <p style="font-size: 18">Data scientists, analysts and ML engineers can search for features based on the consumed raw data and either use features directly or fork existing features. They can browse features, their definitions, source data and consuming scripts/models.</p> </div> <div class="col fragment" data-fragment-index="2"> <h4>Consistency</h4> <p style="font-size: 18">When you use features from Feature Store to train a model, the model is packaged with feature metadata. When you use the model for batch scoring or online inference, it automatically retrieves features from Feature Store. The caller does not need to know about them or include logic to look up or join features to score new data.</p> </div> <div class="col fragment" data-fragment-index="3"> <h4>Lineage</h4> <p style="font-size: 18">Data sources used to create the feature table are saved and accessible to see full lineage. For each feature in a feature table, you can also access the models, notebooks, jobs, and endpoints that use the feature.</p> </div> </div> </div> </div> Note: The databricks feature store is a centralized repository that allows interoperability between data scientists creating features for their applications and machine learning models. It is essentially a layer over a delta table with a UI and set of APIs that allows data scientists to create reusable assets from their features, creates consistency for feature generation and model inference, and provides lineage to understand who and what the features are being used for. ___ # MLFlow Experiment Tracking <div class="container"> <p>The MLflow tracking component lets you log source properties, parameters, metrics, tags, and artifacts related to training a machine learning model.</p><br/> <div class="row"> <div class="col-6" data-fragment-index="1"> <img src="/images/presentations/data_ml/mlflow_experiment_tracking.png" width="600" /> <img src="/images/presentations/data_ml/mlflow_experiment_tracking2.png" width="600" /> </div> <div class="col-6" data-fragment-index="2"> <h4>Logging</h4> <p style="font-size: 18">Run experiments with any ML library, framework, or language and keep track of key metrics, parameters, code, models, and other artifacts for secure and easy sharing and comparison.</p> <h4>Python SDK</h4> <p style="font-size: 18">Leverage the Python SDK in your notebook workflows to sort and filter experiments to find the optimal trained models for your task at hand.</p> <h4>Visual Comparisons</h4> <p style="font-size:18">Compare runs and identify patterns between parameters and metrics to help you pin down the best model.</p> </div> </div> </div> Note: MLFlow Experiment tracking is a UI and suite of APIs that allows you to log parameters, properties, metrics, create tags, log artifacts, and more as you are training a wide variety of machine learning models. It helps keep you organized to identify your best experiment as you seek to develop and train your production ready machine learning model. ___ # MLFlow Model Registry <div class="container"> <p>A centralized model repository and set of APIs that enable you to manage the full lifecycle of MLflow Models.</p> <center><img src="/images/presentations/data_ml/mlflow_model_registry.png" width="1100"/></center> </div> --- # Introduction to Machine Learning <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> ___ # What is Machine Learning? <!-- .slide: class="multi-columns wide" --> <div class="container"> <p>Machine learning is a branch of artificial intelligence (AI) and computer science which focuses on the use of data and algorithms to imitate the way that humans learn, gradually improving performance.<br/> Essentially, it is the science of getting computers to act without being explicitly programmed. <center><img src="/images/presentations/data_ml/ai_ml_dl.jpg" width="700"/></center> </div> Note: So, what's the difference between Artificial Intelligence, Machine Learnin, and Deep Learning. AI is a broad field that tries to mimic human behavior either by being explicitly or implicitly programmed. Machine Learning is a subf field of AI that leverages data and a learning algorithm to solve a task and improve it's performance with increased experience. Deep Learning is a sub field of Machine Learning that uses an architecture called a neural network, inspired by how the brain works, to solve tasks and problems. ___ # Types of Machine Learning <div class="container"> <p>In Machine Learning there are 3 core types of learning:</p> <center><img src="/images/presentations/data_ml/types_of_ml.webp" width="700"/></center> </div> Note: In machine learning there are 3 core types of learning: supervised, unsupervised, and reinorcement learning. Supervised learning uses labeled data to predict an outcome or the future. Unsupervised learning uses data with no labels/answers and tries to find structure within the data. Reinforcement learning looks at a specific set of actions and rewards desired behaviors for the right action and punishments for undesired actions. Here, the system tries to maximize reward in a particular situation. ___ # Machine Learning Problems <div class="container"> <p>With regards to supervised and unsupervised learning there are a few different problems/tasks that we can solve:</p> <center><img src="/images/presentations/data_ml/ml_problems.png" width="700"/></center> </div> Note: ___ # Feature Engineering <div class="container"> The process of using domain knowledge to select and transform the most relevant variables from raw data when creating a machine learning model. The goal of feature engineering is to improve the performance of machine learning (ML) algorithms.<br/><br/> <center><img src="/images/presentations/data_ml/feature_engineering.png" width="700"/></center> </div> Note: Feature generation is an integral part of the machine learning life cycle. It's the process of taking raw data and leveraging domain experience to transform our data into descriptions that help our model learn better. In the example below, our raw transactions data has a user_home_country. One possible feature might be if the coutry is foreign to the United States or not. Feature generation is an arduous process in machine learning and is very experimental in nature. ___ # The Machine Learning Process <div class="container"> <center><img src="/images/presentations/data_ml/ml_process.png" width="1200"/></center> <div class="row"> <div class="col"> <b>Data Preparation</b> <p>Preparing raw data for modeling</p> </div> <div class="col"> <b>Exploratory Data Analysis</b> <p>Understanding the available data</p> </div> <div class="col"> <b>Feature Engineering</b> <p>Creating data descriptions for modeling</p> </div> <div class="col"> <b>Model Training</b> <p>Training the ML model</p> </div> <div class="col"> <b>Model Validation</b> <p>Validating Model performance</p> </div> <div class="col"> <b>Deployment</b> <p>Making model available for consumption</p> </div> <div class="col"> <b>Monitoring</b> <p>Keeping an eye on our model</p> </div> </div> </div> Note: Data preparation - Prior to any data science or ML work lies the data engineering needed to prepare production data and make it available for consumption. This data may be referred to as “raw data,” and in later steps, data scientists will extract features and labels from the raw data. Exploratory data analysis (EDA) - Analysis is conducted by data scientists to assess statistical properties of the data available, and determine if they address the business question. This requires frequent communication and iteration with business stakeholders. Feature engineering - Data scientists clean data and apply business logic and specialized transformations to engineer features for model training. These data, or features, are split into training, testing and validation sets. Model training - Data scientists explore multiple algorithms and hyperparameter configurations using the prepared data, and a best-performing model is determined according to predefined evaluation metric(s). Model validation - Prior to deployment a selected model is subjected to a validation step to ensure that it exceeds some baseline level of performance, in addition to meeting any other technical, business or regulatory requirements. This necessitates collaboration between data scientists, business stakeholders and ML engineers. Deployment - ML engineers will deploy a validated model via batch, streaming or online serving, depending on the requirements of the use case. Monitoring - ML engineers will monitor deployed models for signs of performance degradation or errors. Data scientists will often be involved in early monitoring phases to ensure that new models perform as expected after deployment. This will inform if and when the deployed model should be updated by returning to earlier stages in the workflow. ___ # XGBoost <div class="container"> <p>XGBoost works by building an ensemble of decision trees, where each tree is trained to correct the mistakes of the previous tree. The training process starts with a single decision tree and continues by adding trees one at a time. The new trees are added in a way that minimizes the error of the ensemble.</p> <center><img src="/images/presentations/data_ml/xgboost.png" width="700"/></center> </div> Note: XGBoost (eXtreme Gradient Boosting) is an open-source implementation of gradient boosting developed by Tianqi Chen. It is designed to be highly efficient and scalable, and can be used for a variety of machine learning tasks, including classification and regression. XGBoost works by building an ensemble of decision trees, where each tree is trained to correct the mistakes of the previous tree. The training process starts with a single decision tree and continues by adding trees one at a time. The new trees are added in a way that minimizes the error of the ensemble. XGBoost also includes a number of techniques that make it more efficient than traditional gradient boosting, such as regularization, sparsity-aware splitting, and column sub-sampling. Additionally, XGBoost supports parallel and distributed computing, which allows it to handle large datasets with ease. ___ # Hyperparameter Tuning <div class="container"> <p>A hyperparameter is a parameter that is set before the learning process begins. These parameters are tunable and can directly affect how well a model trains. Some examples of hyperparameters in machine learning. To tune hyperparameters we can refer to the following strategies:</p> <div class="row"> <div class="col"> <b>Grid Search</b> <p style="font-size: 18">Manually search a predefined set of hyperparameters for the best performing hyperparameter anf use that value(s)</p> </div> <div class="col"> <b>Random Search</b> <p style="font-size: 18">Similar to grid search, but replaces the exhaustive search with random search. This can outperform grid search when only a small number of hyperparameters are needed to actually optimize the algorithm.</p> </div> <div class="col"> <b>Bayesian Optimization</b> <p style="font-size: 18">Builds a probabilistic model of the function mapping from hyperparameter values to the target evaluated on a validation set. </p> </div> <div class="col"> <b>Gradient-Based Optimization</b> <p style="font-size: 18">Compute gradient using hyperparameters and then optimize hyperparameters using gradient descent</p> </div> <div class="col"> <b>Evolutionary Optimization</b> <p style="font-size: 18">Using evolutionary algorithms to search the space of possible hyperparameters</p> </div> </div> </div> --- # Introduction to Data Engineering <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> ___ # What is Data Engineering? <div class="container"> <p>Data Engineers are responsible for building data pipelines to process, organize and persist data sets for data science, machine learning, and other downstream applications.</p> <center><img src="/images/presentations/data_ml/data_engineering.png" width="700"/></center> </div> ___ # ETL <div class="container"> <p style="font-size: 28">Extract, Transform, and Load (ETL) is a process data engineers use to extract data from different sources, transform the data into a trusted and usable resource, and load that data into systems end-users have access to for downstream business problems and use cases. The 3 steps are as follows:</p> <div class="row"> <div class="col"> <b style="font-size: 24">Extract</b> <p style="font-size: 18">Before data can be moved to a new destination, it must first be extracted from it's original source</p> </div> <div class="col"> <b style="font-size: 24">Transform</b> <p style="font-size: 18">Transformation improves data integrity with sub-steps such as standardization, cleansing, deduplication, sorting, filtering, applying business logic, and more to ensure that raw data arrives to it's new destination fully compatible and ready to use.</p> </div> <div class="col"> <b style="font-size: 24">Load</b> <p style="font-size: 18">Load the transformed data into a new destination.</p> </div> </div> </div> ___ # ETL Challenges <div class="container"> <ul class="fancy-list"> <li>Custom ETL pipelines can be a slow and difficult task</li> <li>Pipelines consist of complex and custom code</li> <li>Limited Reusability</li> <li>Managing Data Quality is difficult</li> <li>Wide Variety of Scripting Lanuages</li> <li>Many custom quality and validation checks at intermittent pipeline steps</li> </ul> </div> Note: Developing pipelines that ensure data reliability and quality can be a slow and difficult task. These pipelines are underpinned with complex code and limited reusability. Also, a pipeline built in one environment cannot be used in another, even if the underlying code is very similar, meaning data engineers are often the bottleneck and tasked with reinventing the wheel every time. Beyond pipeline development, managing data quality in increasingly complex pipeline architectures is difficult. Bad data is often allowed to flow through a pipeline undetected, devaluing the entire data set. To maintain quality and ensure reliable insights, data engineers are required to write extensive custom code to implement quality checks and validation at every step of the pipeline. ___ # Data Quality Paradigms <div class="container"> <p>When we speak of data quality, we often refer to a standard called Bronze/Silver/Gold and Purposed/Non-Purposed: <br/> <div class="row"> <div class="col"> <b style="font-size: 24">Bronze</b> <p style="font-size: 18">Raw ingested data tables and history</p> </div> <div class="col"> <b style="font-size: 24">Silver</b> <p style="font-size: 18">Filtered, Cleaned, Joined, and Augmented Datasets</p> </div> <div class="col"> <b style="font-size: 24">Gold</b> <p style="font-size: 18">Business-level aggregated data for specified purposes</p> </div> </div> <br/><br/> <p>Furthermore, we can specify datasets as non-purposed or purposed:</p> <div class="row"> <div class="col"> <b style="font-size: 24">Non-Purposed</b> <p style="font-size: 18">Silver Quality slightly preprocessed data for generic purposes</p> </div> <div class="col"> <b style="font-size: 24">Purposed</b> <p style="font-size: 18">Gold quality business-purposed datasets for specific tasks, applications, problems</p> </div> </div> </div> ___ # Delta Live Tables <div class="container"> <p>Delta Live Tables (DLT) make it easy to build and manage reliable data pipelines that deliver high quality data on Delta Lake.</p> <p>DLT helps data engineering teams simplify ETL development and management with declarative pipeline development, automatic testing, and deep visibility ffor monitoring and recovery.</p> <center><img src="/images/presentations/data_ml/dlt_pipeline.png" width="750"/></center> </div> --- # HMDA Data <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> ___ # What is the HMDA dataset? <div class="section"> <p>The Home Mortgage Disclosure Act (HMDA) requires many financial institutions to maintain, report, and publicly disclose loan-level information about mortgages. This data helps to show:</p> <div class="row"> <div class="col"> <ul class="fancy-list"> <li>Whether lenders are serving the housing needs of their communities</li> <li>Data for public policy decision making</li> <li>Shed light on potentially discriminatory lending patterns</li> <li>Allow the public to audit at will</li> <li>Idenfity mortgage lending trends</li> </ul> </div> <div class="col"> <img src="/images/presentations/data_ml/hmda_data_map.png" width="500"/> </div> </div> </div> ___ # Key Data Points <div class="section"> <p>The HMDA dataset contains many different fields. Below are some key fields that we will find in our Lab Session Datasets:</p> <ul class="fancy-list"> <li>Action Taken (Lending Decision)</li> <li>Loan Type</li> <li>Loan Purpose</li> <li>Ethnicity</li> <li>Dwelling Category</li> <li>Loan Product</li> <li>State Code</li> <li>+ More</li> </ul> </div> ___ # Lab Session Data <div class="section"> <p>In our Lab, we will leverage a series of datasets based off of HMDA data. Specifically we will have 6 key tables: <br/> <div class="row"> <div class="col"> <b style="font-size: 24">Demographic</b> <p style="font-size: 18">Demographic information for the applicant and their loan such as their state and country</p> </div> <div class="col"> <b style="font-size: 24">Sensitive</b> <p style="font-size: 18">Sensitive information about the borrower such as their race and ethnicity</p> </div> <div class="col"> <b style="font-size: 24">Loan</b> <p style="font-size: 18">Loan level information such as loan amount, loan type, interest rate, and more</p> </div> <div class="col"> <b style="font-size: 24">Decision</b> <p style="font-size: 18">The financial institution's decision on the loan application.</p> </div> </div> <br/><br/> <div class="row"> <div class="col"> <b style="font-size: 24">Code Mappings</b> <p style="font-size: 18">A pre-configured mappings file that maps code values in the above tables to their description counter parts.</p> </div> <div class="col"> <b style="font-size: 24">Census Regions</b> <p style="font-size: 18">A mapping file that maps state codes to their regions and territories.</p> </div> </div> </div> --- # Skills Lab: Data, AI, & ML <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> ___ # What We Are Building <div class="section"> <p>In this lab we will cover 4 core topics:</p> <br/><br/> <div class="row"> <div class="col"> <b style="font-size: 24">DLT Data Pipeline</b> <p style="font-size: 18">Create a delta live table that transforms our raw data in to gold quality and purposed datasets</p> </div> <div class="col"> <b style="font-size: 24">Feature Generation Pipeline</b> <p style="font-size: 18">Create features for our ML model and log them to the databricks Feature Store</p> </div> </div> <br/><br/> <div class="row"> <div class="col"> <b style="font-size: 24">ML Model Pipeline </b> <p style="font-size: 18">Develop a ML model pipeline with hyperparameter tuning and experiment logs to MLFlow</p> </div> <div class="col"> <b style="font-size: 24">ML Model Registering </b> <p style="font-size: 18">Find our best performing model and register it to the Model Registry</p> </div> </div> </div> ___ # Setup 1. Navigate to https://adb-2285038586334689.9.azuredatabricks.net/ 2. Click on Repos on the left-hand sidebar 3. Find your pre-provisioned repository > **Naming Convention:** <br>`{your-email}-techcon` (registered @finastra, @dh) ___ # Project Structure ``` techcon │ README.md │ └───dataPipeline │ │ data_pipeline │ └───approvalModel │ │ feature_generation │ │ model_training │ │ model_registering │ │ model_inference │ └───dataDashboard │ │ data_dashboard │ └───doc │ <image_files.png> ``` | Directory | Description | | ------ | ------ | | dataPipeline | Scripts to clean and transform data for our machine learning model | | approvalModel | Scripts to develop our machine learning model pipeline | | dataDashboard | Scripts to develop a simple databricks dashboard using databricks visualizations | ___ # DLT Data Pipeline <!-- .slide: data-background-image="/images/presentations/ppt-subsection-background.png" --> ___ # Setup We are now going to move to our data pipeline script. We will now be working out of the following notebook: ``` techcon │ │ └───DataPipeline │ data_pipeline ``` ___ # Imports To start we will import the following libraries in our data_pipeline notebook: ```python [] import dlt from pyspark.sql.functions import * from pyspark.sql.types import * import pyspark.pandas as ps import numpy as np ``` Note: This is a note ___ # Helper Functions To clean up our raw tables we will leverage a mappings file that converts codes such as 1,2,3 to string values such as Loan originated, Application approved but not accepted, and Application denied by financial institution. We call this function many times in our pipeline for various tables and fields: ```python [12-19, 22-23, 26-27] def map_codes(ps_data,ps_mappings): ''' A function to map HMDA codes to their string values Parameters: ps_data = a pandas-spark dataframe of the data you wish to re-map ps_mappings = a pandas-spark dataframe of the mappings file Returns: ps_data = a spark dataframe ''' #Iterate over the columns in the dataset and map them with the code file using the .replace function map_cols = ps_data.columns.tolist() for i in map_cols: try: grp = ps_mappings.groupby("field").get_group(i) code_map = grp.set_index("code")["value"].to_dict() ps_data[i] = ps_data[i].replace(code_map) except: continue #Replace additional codes that signify None na_repl = ["1111", "8888", "9999", "NA", "Exempt"] ps_data.replace(na_repl, None, inplace=True) #Return data as a spark dataframe ps_data = ps_data.to_spark() return ps_data ``` ___ # Load Raw Bronze Temp Tables Next, we load our 4 core raw tables as temporary tables. Temporary tables persist no metadata for the table: ```python[1-8|11-18|21-28|31-38] #Load the loan table @dlt.table( name="bronze_loan", comment="Raw data from loan table", temporary=True ) def bronze_loan(): return spark.read.table("hive_metastore.raw_imported_mortgage_data.loan") #Load the decision table @dlt.table( name="bronze_decision", comment="Raw data from decision table", temporary=True ) def bronze_decision(): return spark.read.table("hive_metastore.raw_imported_mortgage_data.decision") #Load the demographic table @dlt.table( name="bronze_demographic", comment="Raw data from demographic table", temporary=True ) def bronze_demographic(): return spark.read.table("hive_metastore.raw_imported_mortgage_data.demographic") #Load the sensitive table @dlt.table( name="bronze_sensitive", comment="Raw data from sensitive table", temporary=True ) def bronze_sensitive(): return spark.read.table("hive_metastore.raw_imported_mortgage_data.sensitive") ``` ___ # Load Gold Support Tables Let's now load the gold census regions and code mappings tables. Here, we will specify their quality as gold in the table properties: ```python[1-9|12-20] #Load the census regions table @dlt.table( name="gold_census_regions", path = "gold_census_regions", comment = "Census region mappings", table_properties = {"quality": "gold"} ) def gold_census_regions(): return spark.read.table("hive_metastore.raw_imported_mortgage_data.census_regions") #Load the code mappings table @dlt.table( name="gold_code_mappings", path = "gold_code_mappings", comment = "Data Code Mappins", table_properties = {"quality": "gold"} ) def gold_code_mappings(): return spark.read.table("hive_metastore.raw_imported_mortgage_data.code_mappings") ``` ___ # Joining Data Let's practice joining a few datasets. Below we will create one applicant demographics dataset that consists of the demographic and sensitive tables joined together and one loan details dataset that consists of the loan and decision tables joined together. We will then take these 2 joined tables and create one bronze master table called loans that has all data joined together: ```python[1-14|16-30|32-46] #Create the bronze quality applicant demographics table consisting of demographic and sensitive information joined on loanId @dlt.table( name="bronze_applicant_demographics", path = "bronze_applicant_demographics", comment = "Joined demographic source data and sensitive source data", table_properties = {"quality": "bronze"} ) def bronze_applicant_demographics(): #Load demographic and sensitive DLTs demographic_source = dlt.read("bronze_demographic") sensitive_source = dlt.read("bronze_sensitive") #Join on loanId and return spark dataframe return demographic_source.join(sensitive_source, "loanId") #Create the bronze quality loan details table consisting of loan and deccision information joined on loanId @dlt.table( name="bronze_loan_details", path = "bronze_loan_details", comment = "Joined loan source data and decision source data", table_properties = {"quality": "bronze"} ) def bronze_loan_details(): #Load loan and decision DLTs loan_source = dlt.read("bronze_loan") decision_source = dlt.read("bronze_decision") #Join on loanId and remove duplicate columns (lei, activityYear, loanId) and return spark dataframe loan_joined = loan_source.join(decision_source, loan_source.loanId == decision_source.loanId, "inner").drop(loan_source.lei).drop(loan_source.activityYear).drop(loan_source.loanId) return loan_joined #Create the bronze quality loans table consisting of the above loan_details and applicant_demographics joined tables @dlt.table( name="bronze_loans", path = "bronze_loans", comment = "Fully joined loan records and demographics from loan, demographic, sensitive, and decision raw tables", table_properties = {"quality": "bronze"} ) def all_joined(): #Load above joined DLTs loan_source = dlt.read("bronze_loan_details") demographic_source = dlt.read("bronze_applicant_demographics") #Join tables toether on loanId and return spark dataframe all_joined = loan_source.join(demographic_source, loan_source.loanId == demographic_source.loanId, "inner").drop(loan_source.loanId) return all_joined ``` ___ # Silver Decision Table <p style="font-size: 20">Now that we understand how we can join tables toether. Let's start transforming and cleaning our decision data to create a silver quality decision table. We will:</p> <ul> <li style="font-size: 16">Set a defined table schema</li> <li style="font-size: 16">Create an exception for records that have a date later than 2019</li> <li style="font-size: 16">Drop some unwanted columns</li> <li style="font-size: 16">Map our codes</li> <li style="font-size: 16">Convert some data types</li> </ul> ```python[1-6|7-23] @dlt.table( name = "silver_decision", path = "silver_decision", comment = "Silver Quality Decision Table", table_properties = {"quality": "silver"} ) #Define our exception for 2020 and beyond records @dlt.expect("valid_year","activityYear > 2019") def silver_decision(): #Load bronze decision DLT silver_decision = dlt.read("bronze_decision") #Drop Unwanted Columns - denialReason2, denialReason3, denialReason4 droplist = ['denialReason2', 'denialReason3', 'denialReason4'] silver_decision = silver_decision.drop(*droplist) #Map Codes Using the map_codes helper function df_mappings = dlt.read("gold_code_mappings") silver_decision = map_codes(silver_decision.to_pandas_on_spark(), df_mappings.to_pandas_on_spark()) #Type Conversions silver_decision = silver_decision.withColumn("activityYear", col("activityYear").cast(IntegerType())) return silver_decision ``` ___ # Silver Loan Table <div class="row"> <div class="col"> <p>Let's now create a silver version of our loan table. Here we will:</p> <ul> <li>Specify the defined table schema</li> <li>Create 3 exceptions to monitor data quality on loanAmount, interestRate, and propertyValue</li> <li>Drop unwanted columns</li> <li>Standardize data in the DTI column</li> <li>Correct the data scale of the income field</li> <li>Map codes to their descriptions</li> <li>Convert some data types</li> </ul> </div> <div class="col"> ```python[1-41|51-54|62-65|67-80|85-86|88-90] #Specify the Silver Loan Table Schema silver_loan_schema = StructType( [ StructField("lei", StringType(), True), StructField("loanId", StringType(), True), StructField("activityYear", IntegerType(), True), StructField("conformingLoanLimit", StringType(), True), StructField("derivedLoanProductType", StringType(), True), StructField("derivedDwellingCategory", StringType(), True), StructField("purchaserType", StringType(), True), StructField("loanType", StringType(), True), StructField("loanPurpose", StringType(), True), StructField("lienStatus", StringType(), True), StructField("reverseMortgage", StringType(), True), StructField("openEndLineOfCredit", StringType(), True), StructField("businessOrCommercialPurpose", StringType(), True), StructField("loanAmount", IntegerType(), True), StructField("loanToValueRatio", FloatType(), True), StructField("interestRate", FloatType(), True), StructField("rateSpread", FloatType(), True), StructField("hoepaStatus", StringType(), True), StructField("totalLoanCosts", FloatType(), True), StructField("originationCharges", FloatType(), True), StructField("discountPoints", FloatType(), True), StructField("lenderCredits", FloatType(), True), StructField("loanTerm", IntegerType(), True), StructField("balloonPayment", StringType(), True), StructField("propertyValue", IntegerType(), True), StructField("constructionMethod", StringType(), True), StructField("occupancyType", StringType(), True), StructField("manufacturedHomeSecuredPropertyType", StringType(), True), StructField("manufacturedHomeLandPropertyInterest", StringType(), True), StructField("totalUnits", IntegerType(), True), StructField("income", FloatType(), True), StructField("debtToIncomeRatio", StringType(), True), StructField("applicantCreditScoreType", StringType(), True), StructField("coApplicantCreditScoreType", StringType(), True), StructField("submissionOfApplication", StringType(), True), StructField("initiallyPayableToInstitution", StringType(), True) ] ) #Create the silver quality loan table @dlt.table( name = "silver_loan", path = "silver_loan", comment = "Silver Quality Loan Table", table_properties = {"quality": "silver"}, schema = silver_loan_schema ) #Define our Expectations for loanAmount, interestRate, and propertyValue @dlt.expect_or_drop("valid_loanAmount", "(loanAmount <= 10000000 AND loanAmount > 0) AND loanAmount IS NOT NULL") @dlt.expect_or_drop("valid_interestRate", "interestRate > 0 OR interestRate IS NULL") #Need OR IS NULL because denied loans do not get an interest rate @dlt.expect_or_drop("valid_propertyValue", "propertyValue > 0 OR propertyValue IS NULL") #https://www.ffiec.gov/hmda/pdf/2021Guide.pdf page 31 for NULL logic def silver_loan(): #Load our bronze_loan DLT silver_loan = dlt.read("bronze_loan") silver_loan.schema['debtToIncomeRatio'].nullable = True silver_loan = silver_loan.withColumn("debtToIncomeRatio", col("debtToIncomeRatio").cast(StringType())) #Drop Unwanted Columns droplist = ["aus1","aus2","aus3","aus4","aus5","introRatePeriod","prepaymentPenaltyTerm","totalPointsAndFees", "negativeAmortization", "interestOnlyPayment", "otherNonamortizingFeatures", "multifamilyAffordableUnits"] silver_loan = silver_loan.drop(*droplist) #Rebucket Debt to Income Using the Inner Function For Dirty Unbucketed Values def dti_bucket_correction(val): try: x = float(val) if x >= 36 and x < 50: return '36%-<50%' else: return "NA" except: return val # create a UDF to transform data using PySpark syntax udf_dti_bucket_correction = udf(lambda _ : dti_bucket_correction(_), StringType()) silver_loan = silver_loan.withColumn("debtToIncomeRatio", udf_dti_bucket_correction(col("debtToIncomeRatio"))) # let's transform to pandas to use some different functions silver_loan = silver_loan.to_pandas_on_spark() #Correct Income Scale silver_loan["income"] = silver_loan["income"].astype(float) * 1000.0 #Map Codes Using the map_codes helper function df_mappings = dlt.read("gold_code_mappings") silver_loan = map_codes(silver_loan, df_mappings.to_pandas_on_spark()) #Cast Colum Types silver_loan = ( silver_loan.withColumn("activityYear", col("activityYear").cast(IntegerType())).withColumn("loanAmount", col("loanAmount").cast(IntegerType())) .withColumn("loanTerm", col("loanTerm").cast(IntegerType())).withColumn("propertyValue", col("propertyValue").cast(IntegerType())) .withColumn("totalUnits", col("totalUnits").cast(IntegerType())).withColumn("loanToValueRatio", col("loanToValueRatio").cast(FloatType())) .withColumn("interestRate", col("interestRate").cast(FloatType())).withColumn("rateSpread", col("rateSpread").cast(FloatType())) .withColumn("totalLoanCosts", col("totalLoanCosts").cast(FloatType())).withColumn("originationCharges", col("originationCharges").cast(FloatType())) .withColumn("discountPoints", col("discountPoints").cast(FloatType())).withColumn("lenderCredits", col("lenderCredits").cast(FloatType())) .withColumn("income", col("income").cast(FloatType())) .withColumn("debtToIncomeRatio",col("debtToIncomeRatio").cast(StringType())) ) return silver_loan ``` </div> ___ # Silver Sensitive Table Let's now create our silver quality sensitive table. Here, we will use the "@dlt.expect_all" to specify multiple quality expectations: ```python[27-34,42-43] #Set Schema silver_sensitive_schema = StructType( [ StructField("loanId", StringType(), True), StructField("derivedEthnicity", StringType(), True), StructField("derivedRace", StringType(), True), StructField("derivedSex", StringType(), True), StructField("applicantEthnicity1", StringType(), True), StructField("coApplicantEthnicity1", StringType(), True), StructField("applicantEthnicityObserved", StringType(), True), StructField("coApplicantEthnicityObserved", StringType(), True), StructField("applicantRace1", StringType(), True), StructField("coApplicantRace1", StringType(), True), StructField("applicantRaceObserved", StringType(), True), StructField("coApplicantRaceObserved", StringType(), True), StructField("applicantSex", StringType(), True), StructField("coApplicantSex", StringType(), True), StructField("applicantSexObserved", StringType(), True), StructField("coApplicantSexObserved", StringType(), True), StructField("applicantAge", StringType(), True), StructField("coApplicantAge", StringType(), True), StructField("applicantAgeAbove62", StringType(), True), StructField("coApplicantAgeAbove62", StringType(), True) ] ) #Define a Dictionary of Expectations silver_sensitive_expectations = { "valid_applicantEthnicity": "applicantEthnicity1 IS NOT NULL", "valid_applicantRace": "applicantRace1 IS NOT NULL", "valid_applicantSex": "applicantSex IS NOT NULL", "valid_applicantAge": "applicantAge IS NOT NULL", "valid_loanId": "loanId IS NOT NULL" } @dlt.table( name = "silver_sensitive", path = "silver_sensitive", comment = "Silver Quality Sensitive Table", table_properties = {"quality": "silver"}, schema = silver_sensitive_schema ) #Handle all expectations at once instead of multiple expect decorators @dlt.expect_all(silver_sensitive_expectations) def silver_sensitive(): #Load Bronze Sensitive Table silver_sensitive = dlt.read("bronze_sensitive") #Drop Unwanted Columns droplist = ["applicantEthnicity2", "applicantEthnicity3", "applicantEthnicity4","applicantEthnicity5", "coApplicantEthnicity2", "coApplicantEthnicity3", "coApplicantEthnicity4", "coApplicantEthnicity5", "applicantRace2", "applicantRace3", "applicantRace4", "applicantRace5", "coApplicantRace2", "coApplicantRace3", "coApplicantRace4", "coApplicantRace5" ] silver_sensitive = silver_sensitive.drop(*droplist) #Map Codes Using the map_codes helper function df_mappings = dlt.read("gold_code_mappings") silver_sensitive = map_codes(silver_sensitive.to_pandas_on_spark(), df_mappings.to_pandas_on_spark()) return silver_sensitive ``` ___ # Silver Demographic Table We will now create our silver demographic table. Here, we will leverage "@expect_all_or_drop" to drop failed records from the target dataset: ```python[1-7,15-16] #Set dictionary of expectations we wish to handle silver_demographic_expectations = { "valid_loanId": "loanId IS NOT NULL", "valid_stateCode": "stateCode IS NOT NULL", "valid_tractMinorityPopulationPercent": "tractMinorityPopulationPercent >= 0", "valid_ffiecMsaMdMedianFamilyIncome": "ffiecMsaMdMedianFamilyIncome >= 0", } @dlt.table( name = "silver_demographic", path = "silver_demographic", comment = "Silver Quality Demographic Table", table_properties = {"quality": "silver"} ) #Handle all expectations at once instead of multiple expect_or_drop decorators @dlt.expect_all_or_drop(silver_demographic_expectations) def silver_demographic(): silver_demographic = dlt.read("bronze_demographic") gold_state_regions = dlt.read("gold_census_regions") #Join Census Regions to Demographic Data silver_demographic = silver_demographic.join(gold_state_regions,silver_demographic["stateCode"] == gold_state_regions["State_Code"]).drop("State_Code") #Cast Data to Correct Type silver_demographic = ( silver_demographic.withColumn("loanId", col("loanId").cast(StringType())).withColumn("derivedMsaMd", col("derivedMsaMd").cast(IntegerType())) .withColumn("stateCode", col("stateCode").cast(StringType())).withColumn("countyCode", col("countyCode").cast(StringType())) .withColumn("censusTract", col("censusTract").cast(StringType())).withColumn("tractMinorityPopulationPercent", col("tractMinorityPopulationPercent").cast(FloatType())) .withColumn("ffiecMsaMdMedianFamilyIncome", col("ffiecMsaMdMedianFamilyIncome").cast(IntegerType())) .withColumn("tractToMsaIncomePercentage", col("tractToMsaIncomePercentage").cast(IntegerType())) .withColumn("tractOwnerOccupiedUnits", col("tractOwnerOccupiedUnits").cast(IntegerType())) .withColumn("tractOneToFourFamilyHomes", col("tractOneToFourFamilyHomes").cast(IntegerType())) .withColumn("tractMedianAgeOfHousingUnits", col("tractMedianAgeOfHousingUnits").cast(IntegerType())) .withColumn("State", col("State").cast(StringType())) .withColumn("Region", col("Region").cast(StringType())) .withColumn("Division", col("Division").cast(StringType())) ) #Return spark dataframe return silver_demographic ``` ___ # Silver Loans Table Let's now join our two silver tables for one master silver table containing all of our individual silver table data. We will use this to derive our goal purposed datasets: ```python[] @dlt.table( name = "silver_loans", path = "silver_loans", comment = "Silver Quality All Loans Table Consisting Of Joined silver_applicant_demographic and silver_loan_details table", table_properties = {"quality": "silver"} ) #Ensure we have a valid loanId as an expectation @dlt.expect_or_drop("valid_loanId", "loanId IS NOT NULL") def silver_loans(): #Load the applicant_demographics and loan_details silver DLTs silver_applicant_demographics = dlt.read("silver_applicant_demographics") silver_loan_information = dlt.read("silver_loan_details") #Join tables together on loanId silver_loans = silver_loan_information.join(silver_applicant_demographics, "loanId") #Return spark dataframe return silver_loans ``` ___ # Gold Approval Model Table <div class="row"> <div class="col"> Let's now create our gold approval model table that will contain only the fields we need for our approval prediction machine learning model. Here we will: <ul> <li>Set 19 core expectations to ensure quality of our table</li> <li>Keep only selected columns</li> <li>Convert DTI to a numerical form</li> <li>Remove Nulls</li> </ul> </div> <div class="col"> ```python[1-22|33-37|43-45|47-48] #Set Core Expectations approval_model_expectations = {"valid_loanId": "loanId IS NOT NULL", "valid_Region": "Region IS NOT NULL", "valid_loanPurpose": "loanPurpose IS NOT NULL", "valid_occupancyType": "occupancyType is NOT NULL", "valid_conformingLoanLimit": "conformingLoanLimit is NOT NULL", "valid_derivedDwellingCategory": "derivedDwellingCategory is NOT NULL", "valid_lienStatus": "lienStatus is NOT NULL", "valid_businessOrCommercialPurpose": "businessOrCommercialPurpose is NOT NULL", "valid_loanAmount": "loanAmount > 0", "valid_loanToValueRatio": "loanToValueRatio > 0", "valid_loanTerm": "loanTerm > 0", "valid_propertyValue": "propertyValue > 0", "valid_income": "income > 0", "valid_debtToIncomeRatio": "debtToIncomeRatio > 0", "valid_ffiecMsaMdMedianFamilyIncome": "ffiecMsaMdMedianFamilyIncome > 0", "valid_applicantSex": "applicantSex IS NOT NULL", "valid_applicantRace1": "applicantRace1 IS NOT NULL", "valid_applicantEthnicity1": "applicantEthnicity1 IS NOT NULL", "valid_actionTaken": "actionTaken IS NOT NULL", } @dlt.table( name = "gold_approval_model", path = "gold_approval_model", comment = "Gold Quality Approval Model Table Consisting Of Data Needed for Approval Model", table_properties = {"quality": "gold"} ) @dlt.expect_all(approval_model_expectations) def gold_approval_model(): #Select the colmns we want for our model (Note: This is coupled with EDA and domain expertise ) df_model = dlt.read("silver_loans").to_pandas_on_spark() keep_cols = ["loanId","Region","derivedLoanProductType","loanType","loanPurpose","occupancyType", "conformingLoanLimit","derivedDwellingCategory","lienStatus", "businessOrCommercialPurpose","loanAmount", "loanToValueRatio","loanTerm","propertyValue","income","debtToIncomeRatio", "ffiecMsaMdMedianFamilyIncome", "applicantSex","applicantRace1","applicantEthnicity1", "actionTaken"] df_model = df_model[keep_cols] #Re-Map our Target Variables (We want only approved/denied loans for our binary classifier) df_model = df_model[df_model.actionTaken.isin(["Loan originated", "Application approved but not accepted", "Application denied"])] df_model["actionTaken"] = df_model.actionTaken.map({"Loan originated":"Approved", "Application approved but not accepted": "Approved", "Application denied": "Denied"}) #Adjust DTI bands to be numerical dti_replace = {"Exempt": None, "<20%": 0, "20%-<30%": 20,"30%-<36%": 30,"36%-<50%":36, "50%-60%": 50, ">60%": 60} df_model["debtToIncomeRatio"] = (df_model["debtToIncomeRatio"].replace(dti_replace).astype(float)) #Remove Any Nulls df_model.dropna(inplace=True) #Return Spark Datarframe df_model = df_model.to_spark() #Cast Data Types df_model = ( df_model .withColumn("loanTerm",col("loanTerm").cast(IntegerType())) .withColumn("loanToValueRatio", col("loanToValueRatio").cast(FloatType())) ) return df_model#Set Core Expectations approval_model_expectations = {"valid_loanId": "loanId IS NOT NULL", "valid_Region": "Region IS NOT NULL", "valid_loanPurpose": "loanPurpose IS NOT NULL", "valid_occupancyType": "occupancyType is NOT NULL", "valid_conformingLoanLimit": "conformingLoanLimit is NOT NULL", "valid_derivedDwellingCategory": "derivedDwellingCategory is NOT NULL", "valid_lienStatus": "lienStatus is NOT NULL", "valid_businessOrCommercialPurpose": "businessOrCommercialPurpose is NOT NULL", "valid_loanAmount": "loanAmount > 0", "valid_loanToValueRatio": "loanToValueRatio > 0", "valid_loanTerm": "loanTerm > 0", "valid_propertyValue": "propertyValue > 0", "valid_income": "income > 0", "valid_debtToIncomeRatio": "debtToIncomeRatio > 0", "valid_ffiecMsaMdMedianFamilyIncome": "ffiecMsaMdMedianFamilyIncome > 0", "valid_applicantSex": "applicantSex IS NOT NULL", "valid_applicantRace1": "applicantRace1 IS NOT NULL", "valid_applicantEthnicity1": "applicantEthnicity1 IS NOT NULL", "valid_actionTaken": "actionTaken IS NOT NULL", } @dlt.table( name = "gold_approval_model", path = "gold_approval_model", comment = "Gold Quality Approval Model Table Consisting Of Data Needed for Approval Model", table_properties = {"quality": "gold"} ) @dlt.expect_all(approval_model_expectations) def gold_approval_model(): #Select the colmns we want for our model (Note: This is coupled with EDA and domain expertise ) df_model = dlt.read("silver_loans").to_pandas_on_spark() keep_cols = ["loanId","Region","derivedLoanProductType","loanType","loanPurpose","occupancyType", "conformingLoanLimit","derivedDwellingCategory","lienStatus", "businessOrCommercialPurpose","loanAmount", "loanToValueRatio","loanTerm","propertyValue","income","debtToIncomeRatio", "ffiecMsaMdMedianFamilyIncome", "applicantSex","applicantRace1","applicantEthnicity1", "actionTaken"] df_model = df_model[keep_cols] #Re-Map our Target Variables (We want only approved/denied loans for our binary classifier) df_model = df_model[df_model.actionTaken.isin(["Loan originated", "Application approved but not accepted", "Application denied"])] df_model["actionTaken"] = df_model.actionTaken.map({"Loan originated":"Approved", "Application approved but not accepted": "Approved", "Application denied": "Denied"}) #Adjust DTI bands to be numerical dti_replace = {"Exempt": None, "<20%": 0, "20%-<30%": 20,"30%-<36%": 30,"36%-<50%":36, "50%-60%": 50, ">60%": 60} df_model["debtToIncomeRatio"] = (df_model["debtToIncomeRatio"].replace(dti_replace).astype(float)) #Remove Any Nulls df_model.dropna(inplace=True) #Return Spark Datarframe df_model = df_model.to_spark() #Cast Data Types df_model = ( df_model .withColumn("loanTerm",col("loanTerm").cast(IntegerType())) .withColumn("loanToValueRatio", col("loanToValueRatio").cast(FloatType())) ) return df_model ``` </div> </div> ___ # Gold BI Dashboard Table We will additionally create 2 gold purposed business table that can be used to create a visual databricks dashboard from (reader exercise): ```python[] #Set Schema Definition for Value field in KPI table kpi_schema = StructType([ StructField("metric", StringType(), True), StructField("value", DoubleType(), True)] ) @dlt.table( name = "gold_kpi_information", path = "gold_kpi_information", comment = "Gold standard table consisting of KPI information for dashboarding", table_properties = {"quality": "gold"}, schema = kpi_schema ) @dlt.expect_or_drop("valid_metrics", "value >= 0") def gold_kpi_information(): #Load Silver Quality Loans Table and convert to pyspark df_loans = dlt.read("silver_loans").to_pandas_on_spark() #Income KPIs average_income = df_loans["income"].mean() std_income = df_loans["income"].std() median_income = df_loans["income"].median() #loanAmount KPIs average_loanAmount = df_loans["loanAmount"].mean() std_loanAmount = df_loans["loanAmount"].std() median_loanAmount = df_loans["loanAmount"].median() #Submission KPIs numLoans = df_loans.shape[0] num_loansApproved = df_loans[(df_loans.actionTaken == "Loan originated") | (df_loans.actionTaken == "Application approved but not accepted")].shape[0] num_loansDenied = df_loans[df_loans.actionTaken == "Application denied"].shape[0] #Create and return a Spark DataFrame cols = ["average_income", "std_income", "median_income", "average_loan_amount", "std_loan_amount", "median_loan_amount", "num_submissions", "num_submissions_approved","num_submissions_denied"] vals = [average_income,std_income,median_income,average_loanAmount, std_loanAmount,median_loanAmount,numLoans, num_loansApproved, num_loansDenied] df_rows = list(zip(cols,vals)) df_kpis = ps.DataFrame(df_rows,columns=["metric","value"]).to_spark() return df_kpis #Set Schema Definition gender_schema = StructType([ StructField("metric", StringType(), True), StructField("value", FloatType(), True)] ) @dlt.table( name = "gold_gender_information", path = "gold_gender_information", comment = "Gold standard table consisting of Gender information for dashboarding", table_properties = {"quality": "gold"}, schema = gender_schema ) @dlt.expect_or_drop("valid_metrics", "value >= 0") def gold_gender_information(): #Load Silver Loans Data df_fair = dlt.read("silver_loans").to_pandas_on_spark() #Male Metrics: total_male_submissions, num_approved_male_submissions, num_denied_male_submissions df_male = df_fair[df_fair.applicantSex == "Male"] num_male = float(df_male.shape[0]) num_male_approved = float(df_male[(df_male.actionTaken == "Loan originated") | (df_male.actionTaken == "Application approved but not accepted")].shape[0]) num_male_denied = float(df_male[df_male.actionTaken == "Application denied"].shape[0]) #Female Metrics: total_female_submissions, num_approved_female_submissions, num_fedenied_male_submissions df_female = df_fair[df_fair.applicantSex == "Female"] num_female = float(df_female.shape[0]) num_female_approved = float(df_female[(df_female.actionTaken == "Loan originated") | (df_female.actionTaken == "Application approved but not accepted")].shape[0]) num_female_denied = float(df_female[df_female.actionTaken == "Application denied"].shape[0]) #Gender AIR Ratio: pr(Y=1 | D=Unprivlidged)/ pr(Y=1|D=privileged) gender_air = float(np.divide(np.divide(num_female_approved, num_female) , np.divide(num_male_approved, num_male))) #Create and Return Spark DataFrame cols = ["total_male_submissions", "num_approved_male_submissions", "num_denied_male_submissions", "total_female_submissions", "num_approved_female_submissions", "num_denied_female_submissions", "gender_air"] vals = [num_male, num_male_approved, num_male_denied, num_female, num_female_approved, num_female_denied, gender_air] df_rows = list(zip(cols,vals)) df_gender = ps.DataFrame(df_rows,columns=["metric","value"]).to_spark() df_gender = df_gender.withColumn("value", col("value").cast(FloatType())) return df_gender #Set Schema Definition state_info_schema = StructType([ StructField("stateCode", StringType(), True), StructField("averageLoanAmount", DoubleType(), True), StructField("numSubmissions", LongType(), True), StructField("averageIncome", DoubleType(), True) ]) state_expectations = {"valid_avgLoanAmount": "averageLoanAmount > 0", "valid_numSubmissions": "numSubmissions > 0", "valid_averageIncome": "averageIncome > 0" } @dlt.table( name = "gold_state_information", path = "gold_state_information", comment = "Gold standard table consisting of State information for dashboarding", table_properties = {"quality": "gold"}, schema = state_info_schema ) @dlt.expect_all(state_expectations) def gold_state_information(): #Load Silver Quality Loans Data df_all = dlt.read("silver_loans").to_pandas_on_spark() #Create Top 10 States Dataset and return Spark Dataframe avg_loan_amt = df_all.groupby(["stateCode"]).loanAmount.mean().sort_values(ascending=False) counts = df_all.groupby(["stateCode"]).loanId.count().sort_values(ascending=False) avg_income = df_all.groupby(["stateCode"]).income.mean().sort_values(ascending=False) df_action = df_all.groupby(["stateCode","actionTaken"]).loanId.count() df_state_info = avg_loan_amt.to_frame().join(counts).join(avg_income) df_state_info = df_state_info.rename(columns={"loanAmount": "averageLoanAmount", "loanId":"numSubmissions", "income":"averageIncome"}) df_state_info = df_state_info.reset_index() df_state_info = df_state_info.to_spark() return df_state_info ``` ___ # Pipeline Execution <p style="font-size: 24">Now that we have written all code in our dlt_pipeline notebook, let's execute the DLT pipeline workflow. To do so, 1. Make sure you are in the data science & engineering workspace (check top of side-bar for options) 2. Click on Workflows in the sidebar 3. Select Delta Live Tables at the top 4. Click `{your-email} - Data Pipeline` 5. Click Start at the top right and wait 5-10 mins for pipeline execution and completion 6. Upon successful completion you will see the following graph generated:</p> <center><img src="/images/presentations/data_ml/dlt_pipeline_workflow.png" width="700"/></center> ___ # Checking Failures Click on any table to see the pipeline data quality details such as the number of failed exceptions: <center><img src="/images/presentations/data_ml/dlt_failures.png" width="1200"/></center> ___ # Feature Generation <!-- .slide: data-background-image="/images/presentations/ppt-subsection-background.png" --> ___ # Setup We are now going to move to our feature generation script. We will now be working out of the following notebook: ``` techcon │ │ └───approvalModel │ feature_generation ``` ___ # Imports To start our feature generation we will import the following libraries: ```python[] from databricks import feature_store import pyspark.pandas as ps import pyspark.sql.functions as F ``` ___ # Load Gold Table Data Let's now load our gold approval model table that we constructed in our DLT pipeline: ```python[] def get_table(tableName): ''' * Description: A helper function to load a spark dataframe from a table in our delta live table pipeline * Input: (tableName) a String representing the name of a table from our delta live table pipeline * Return: A spark dataframe ''' base = spark.conf.get("myuser.pipelines.location") path = f"{base}tables/{tableName}" return spark.read.format("delta").load(path) #Load Gold Table Approval Model Data and convert it to a pandas on spark dataframe df = get_table("gold_approval_model").to_pandas_on_spark() ``` ___ # Feature Store Datasets <div class="section"> <div class="row"> <div class="col"> <p>Let's now create 3 different datasets that we will log to our Feature Store and leverage for model training:</p> <ul> <li>approval_model.decision - Our target variables which we are trying to predict</li> <li>approval_model.features - Our model's base set of features</li> <li>approval_model.sensitive - Sensitive fields to assess fairness of our model's predictions during training</li> </ul> </div> <div class="col"> ```python[1-5|7-10|52] #Create Target Dataset #actionTaken is our Target/Label Variable (0-Denied, 1-Approved) target = ["actionTaken"] df_target = df[["loanId"]+target] df_target["actionTaken"] = df_target.actionTaken.eq('Approved').astype(int) #Create Sensitive Dataset #We will use this to log a fairness chart in the modelTraining Script sensitive_attributes = ["applicantSex","applicantRace1","applicantEthnicity1"] df_sensitive = df[["loanId"]+sensitive_attributes] #Create Core Features Dataset def createFeatures(df,one_hot_cols,binary_cols,numerical_cols): ''' * Description: A helper function that creates the core features for our machine learning approval model: * Input: df - A pandas on spark dataframe from our gold_approval_model table one_hot_cols - A list of strings representing the columns to one-hot-encode binary_cols - A list of strings representing the columns to convert to boolean (0/1) numerical_cols - A list of strings representing the numerical columns * Return: df_features - A pandas on spark dataframe consisting of our generated features ''' #Numerical dataframe df_num = df[numerical_cols] #One Hot Encode the features df_one_hot = ps.get_dummies(df[one_hot_cols]).astype(int) #Change the Features with 2 options to binary flags (1/0) df_binary = df[binary_cols] df_binary["conformingLoanLimit"] = df_binary.conformingLoanLimit.eq('Conforming').astype(int) df_binary["derivedDwellingCategory"] = df_binary.derivedDwellingCategory.eq('Single Family (1-4 Units):Site-Built').astype(int) df_binary["lienStatus"] = df_binary.lienStatus.eq('First Lien').astype(int) df_binary["businessOrCommercialPurpose"] = df_binary.businessOrCommercialPurpose.eq('Yes').astype(int) #Join all the individual Features Together df_features = ps.concat([df_num,df_binary,df_one_hot],axis=1) #Add loanId as primary identifier df_features["loanId"] = df["loanId"] #Clean up and remap column Names for the feature store (all ":", " ", "-" to be replaced with "_") cols = df_features.columns.tolist() new_cols = [i.replace(":","_").replace(" ","_").replace("-","_") for i in cols] df_features = df_features.rename(columns=dict(zip(cols,new_cols))) return df_features #Set parameters for our generate_features function and return the dataframe of features one_hot_cols = ["Region","derivedLoanProductType","loanType","loanPurpose","occupancyType"] binary_cols = ["conformingLoanLimit","derivedDwellingCategory","lienStatus", "businessOrCommercialPurpose"] numerical_cols = ["loanAmount", "loanToValueRatio","loanTerm","propertyValue","income","debtToIncomeRatio","ffiecMsaMdMedianFamilyIncome"] df_features = createFeatures(df,one_hot_cols,binary_cols,numerical_cols) ``` </div> </div> ___ # Create Feature Store Database Let's now create a database that our Feature Store will utilize: ```python[] dbutils.widgets.text("run_as_shared", "0") def get_database_name(): run_as_shared = dbutils.widgets.get("run_as_shared") if run_as_shared == "1": db_name = "shared_approval_model" else: user_name_raw = spark.sql('select current_user() as user').collect()[0]['user'] user_name = user_name_raw.split('@')[0].replace('.', '_') db_name = f"{user_name}_approval_model" return db_name ``` ```python[] db_name = get_database_name() ``` ```python[] spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}") ``` ___ # Create Feature Tables <div class="section"> <div class="row"> <div class="col"> <p style="font-size: 24">Create Feature Tables & Save Features:</p> ```python[] #Instantiate Feature Store Client Object fs = feature_store.FeatureStoreClient() spark.conf.set("spark.sql.shuffle.partitions", "5") #Create Features table try: fs.create_table( name=f"{db_name}.features", primary_keys=["loanId"], df=df_features.to_spark(), description="Loan Level Features for the Loan Approval Model" ) except: print("Features Table Exists") #Create Targets Table try: fs.create_table( name=f"{db_name}.targets", primary_keys=["loanId"], df=df_target.to_spark(), description="Approval/Denial Targets for the Loan Approval Model" ) except: print("Targets Table Exists") #Create Sensitive Attributes Table try: fs.create_table( name=f"{db_name}.sensitive", primary_keys=["loanId"], df=df_sensitive.to_spark(), description="Sensitive Attributes for Assessing Fairness of the Loan Approval Model" ) except: print("Sensitive Attributes Table Exists") ``` </div> <div class="col"> <p style="font-size: 24">Save Features:</p> ```python[] #Save features fs.write_table( name=f"{db_name}.features", df=df_features.to_spark(), mode="overwrite" ) #Save Targets fs.write_table( name=f"{db_name}.targets", df=df_target.to_spark(), mode="overwrite" ) #Save Sensitive Attributes fs.write_table( name=f"{db_name}.sensitive", df=df_sensitive.to_spark(), mode="overwrite" ) ``` </div> </div> </div> ___ # View Feature Store Tables Now that we have created our feature tables and populated them with data we can visually inspect the feature store (Once we run our ML workflow or when the individual feature_generation notebook is executed): 1. Make sure you are in the machine learning workspace 2. In the side bar click Feature Store 3. Click any of the following `{your-email}_approval_model.features`, `{your-email}_approval_model.sensitive`, `{your-email}_approval_model.features` <center><img src="/images/presentations/data_ml/feature_store_ui.png" width="700"/></center> ___ # Model Training <!-- .slide: data-background-image="/images/presentations/ppt-subsection-background.png" --> ___ # Setup We are now going to move to our model training script. We will now be working out of the following notebook: ``` techcon │ │ └───approvalModel │ model_training ``` ___ # Imports Let's import the following libraries: ```python[] #Generic Imports import time #Machine Learning Imports - Instantiate Model Architectures, Model Validation, Hyperparameter Tuning from sklearn.model_selection import train_test_split from sklearn import metrics from sklearn.metrics import confusion_matrix from sklearn.model_selection import train_test_split, RandomizedSearchCV, KFold from sklearn.preprocessing import StandardScaler from sklearn.utils.class_weight import compute_sample_weight import xgboost as xgb from xgboost import XGBClassifier from xgboost import plot_importance from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK from hyperopt.pyll import scope #Data Visualization Imports import matplotlib.pyplot as plt import seaborn as sns #FeatureStore Imports - Look up and Access Features from databricks.feature_store import FeatureLookup, FeatureStoreClient #Mlflow Imports - Model Experiment Tracking, Deployment import mlflow from mlflow.pyfunc import PythonModel, PythonModelContext from mlflow.models import infer_signature ``` ___ # Load Feature Store Tables Let's now load our data from the feature store and create our model training set ```python[17-42] dbutils.widgets.text("run_as_shared", "0") def get_database_name(): run_as_shared = dbutils.widgets.get("run_as_shared") if run_as_shared == "1": db_name = "shared_approval_model" else: user_name_raw = spark.sql('select current_user() as user').collect()[0]['user'] user_name = user_name_raw.split('@')[0].replace('.', '_') db_name = f"{user_name}_approval_model" return db_name db_name = get_database_name() #Option 2: Grab Specific (or all) Features from FeatureStore and join them to Exisiting Features #Feature Table Lookups for Core Features and Sensitive (We only want Gender so we will specify applicantSex) feature_lookups = [ FeatureLookup( table_name = f'{db_name}.features', lookup_key = 'loanId', ), FeatureLookup( table_name = f'{db_name}.sensitive', feature_names = ['applicantSex'], lookup_key = 'loanId' ) ] #Instantiate Feature Store Client fs = FeatureStoreClient() #Create FeatureStore FeatureSet by joining features on Targets Dataset training_set = fs.create_training_set( df = spark.sql(f"SELECT * FROM {db_name}.targets"), feature_lookups = feature_lookups, label = 'actionTaken' #exclude_columns = ['applicantSex', 'applicantRace1','applicantEthnicity1'] #Note: You can also exclude columns ) #Convert Dataset to Pandas training_df = training_set.load_df().toPandas() ``` ___ # Train/Test Split In order to properly validate our model we will create a separate training set to train our model and a held out test set to validate our model's performance: ```python[] #Split Features and Targets/Labels df_features = training_df.drop(["actionTaken"],axis=1) df_targets = training_df.actionTaken #Create Train/Test Split (80% Train, 20% Test) X_train, X_test, y_train, y_test = train_test_split(df_features, df_targets, test_size=0.2, random_state=42) #Split Out Sensitive Attributes for Fairness Analysis in Model Training s_train = X_train[["loanId","applicantSex"]] s_test = X_test[["loanId","applicantSex"]] X_train.drop(["loanId","applicantSex"],axis=1,inplace=True) X_test.drop(["loanId","applicantSex"],axis=1,inplace=True) ``` ___ # Label Balancing With classification problems in machine learning, we want to make sure the samples corresponding to our labels are balanced. Target imbalance can sometimes skew learning and cause evaluation metrics to be misleading. Below, we will compute sample weights for our model based on our label distribution: <br><br> ```python[] #Compute the Sample Weights for our Model based on a balance split between Approvals and Denials sample_weights = compute_sample_weight(class_weight='balanced',y=y_train) ``` ___ # Model Wrapping <div class="section"> <div class="row"> <div class="col" style="font-size: 20 !important"> <p>When we deploy a model, the POST request must already have the generated features in order to predict against the model. Often, we do not have access to the features, but only the underlying data that the features are created from. In order to perform feature generation on samples sent to the model and modify requests we create a ModelWrapper class. At a high level, the ModelWrapper will allow us to customize our model. Specifically we will:</p> <ul> <li>Create a Feature Generation Function (format_request features) - Pass in data from the request and generate features to be used in the model's predict function</li> <li>Override the Predict Function - Call the feature generation function, return probability of approval probability, and format the response as a list</li> </ul><br><br><br> <b>Note: This is an advanced concept necessary for model REST endpoint deployment. Please feel free to skip this section.</b> </div> <div class="col"> ```python[] class ModelWrapper(PythonModel): def __init__(self, model): self._model = model def format_request_features(self, data,region_map): ''' Description: Creates the Features from raw POST data sent to the model endpoint Input: data - A pandas dataframe (MLFlow automatically takes the JSON and creates a dataframe from it) region_map - A dictionary of state to region mappings that we will log in our model training Return: A pandas dataframe of generated features ''' #We Have to import all libraries within functions for ModelWrapper import pandas as pd import numpy as np #Ensure we are only using the correct columns for feature generation feature_columns = [ "businessOrCommercialPurpose", "conformingLoanLimit","debtToIncomeRatio", "derivedDwellingCategory", "derivedLoanProductType", "ffiecMsaMdMedianFamilyIncome", "income", "lienStatus", "loanAmount", "loanPurpose", "loanTerm", "loanToValueRatio", "loanType", "occupancyType","propertyValue", "stateCode" ] df = data[feature_columns] #Create debtToIncomeRatio Feature (Correct and Bucket it) def dti_bucket_correction(val): try: x = float(val) if x >= 36 and x < 50: return '36%-<50%' else: return "NA" except: return val df["debtToIncomeRatio"] = df["debtToIncomeRatio"].apply(lambda i: dti_bucket_correction(i)) dti_replace = {"Exempt": None, "<20%": 0, "20%-<30%": 20,"30%-<36%": 30,"36%-<50%":36, "50%-60%": 50, ">60%": 60} df["debtToIncomeRatio"] = df["debtToIncomeRatio"].replace(dti_replace).astype(float) #businessOrCommercialPurpose Binary Feature (1-Yes, 2-No) df["businessOrCommercialPurpose"] = df.businessOrCommercialPurpose.eq('1').astype(int) df["conformingLoanLimit"] = df.conformingLoanLimit.eq('C').astype(int) df["derivedDwellingCategory"] = df.derivedDwellingCategory.eq('Single Family (1-4 Units):Site-Built').astype(int) df["lienStatus"] = df.lienStatus.eq('1').astype(int) #1 - First Lien #Derived Loan Product Type Feature lpt_cats = ['Conventional:First Lien', 'FHA:First Lien', 'VA:First Lien','FSA/RHS:First Lien', 'Conventional:Subordinate Lien'] dummies_loan_product_type = pd.get_dummies(df["derivedLoanProductType"],prefix="derivedLoanProductType") dummies_loan_product_type = dummies_loan_product_type.T.reindex(["derivedLoanProductType_"+ i for i in lpt_cats]).T.fillna(0).astype(int) df.drop("derivedLoanProductType",axis=1,inplace=True) df = pd.concat([df, dummies_loan_product_type],axis=1) #Loan Purpose Feature ({"1": "Home purchase", "31": "Refinancing", "32": "Cash out refinancing"}) lp_map = {"1": "Home purchase", "31": "Refinancing", "32": "Cash out refinancing"} df["loanPurpose"] = df["loanPurpose"].replace(lp_map) lp_cats = ["Home purchase", "Refinancing", "Cash out refinancing"] dummies_lp = pd.get_dummies(df["loanPurpose"],prefix="loanPurpose") dummies_lp = dummies_lp.T.reindex(["loanPurpose_"+ i for i in lp_cats]).T.fillna(0).astype(int) df.drop("loanPurpose",axis=1,inplace=True) df = pd.concat([df, dummies_lp],axis=1) #Loan Type Feature ({"1":"Conventional", "2": "FHA", "4": "USDA", "3": "VA"} ) lt_map = {"1":"Conventional", "2": "FHA", "4": "USDA", "3": "VA"} df["loanType"] = df["loanType"].replace(lt_map) lt_cats = ["Conventional", "FHA", "USDA", "VA"] dummies_lt = pd.get_dummies(df["loanType"],prefix="loanType") dummies_lt = dummies_lt.T.reindex(["loanType_"+ i for i in lt_cats]).T.fillna(0).astype(int) df.drop("loanType",axis=1,inplace=True) df = pd.concat([df, dummies_lt],axis=1) #Occupancy Type Feature ({"3": "Investment property", "1": "Principal residence", "2": "Second residence"}) ot_map = {"3": "Investment property", "1": "Principal residence", "2": "Second residence"} df["occupancyType"] = df["occupancyType"].replace(ot_map) ot_cats = ["Investment property", "Principal residence", "Second residence"] dummies_ot = pd.get_dummies(df["occupancyType"],prefix="occupancyType") dummies_ot = dummies_ot.T.reindex(["occupancyType_"+ i for i in ot_cats]).T.fillna(0).astype(int) df.drop("occupancyType",axis=1,inplace=True) df = pd.concat([df, dummies_ot],axis=1) #Region Feature df["Region"] = df["stateCode"].replace(region_map) df.drop("stateCode",axis=1,inplace=True) region_cats = ["South", "West", "Midwest", "Northeast"] dummies_region = pd.get_dummies(df["Region"],prefix="Region") dummies_region = dummies_region.T.reindex(["Region_"+ i for i in region_cats]).T.fillna(0).astype(int) df.drop("Region",axis=1,inplace=True) df = pd.concat([df, dummies_region],axis=1) #Clean Up Column Names for ML Model (No :, _ , -) cols = df.columns.tolist() new_cols = [i.replace(":","_").replace(" ","_").replace("-","_") for i in cols] df = df.rename(columns=dict(zip(cols,new_cols))) #Reorder Features for ML Model input_schema = ["businessOrCommercialPurpose","conformingLoanLimit", "debtToIncomeRatio", "derivedDwellingCategory", "derivedLoanProductType_Conventional_First_Lien", "derivedLoanProductType_Conventional_Subordinate_Lien", "derivedLoanProductType_FHA_First_Lien", "derivedLoanProductType_FSA/RHS_First_Lien", "derivedLoanProductType_VA_First_Lien", "ffiecMsaMdMedianFamilyIncome", "income", "lienStatus", "loanAmount", "loanPurpose_Cash_out_refinancing", "loanPurpose_Home_purchase", "loanPurpose_Refinancing", "loanTerm", "loanToValueRatio", "loanType_Conventional", "loanType_FHA", "loanType_USDA", "loanType_VA", "occupancyType_Investment_property", "occupancyType_Principal_residence", "occupancyType_Second_residence", "propertyValue", "Region_Midwest", "Region_Northeast", "Region_South", "Region_West" ] df = df[input_schema] #Handle Mlflow 32 bit precision types for numerical features df["ffiecMsaMdMedianFamilyIncome"] = df["ffiecMsaMdMedianFamilyIncome"].astype(np.int32) df["income"] = df["income"].astype(np.int32) df["loanAmount"] = df["loanAmount"].astype(np.int32) df["loanTerm"] = df["loanTerm"].astype(np.int32) df["loanToValueRatio"] = df["loanToValueRatio"].astype(np.float32) df["propertyValue"] = df["propertyValue"].astype(np.int32) return df def predict(self, context: PythonModelContext, data): ''' Description: Overwrite the predict function to generate features, predict probability of approval, and return single list of probabilites in response Input: context - A pythonModelContext (the model) data - A pandas dataframe of request data Return: A list of probabilities ''' import numpy as np #Load the region mapping dictionary that is saved as an artifact in model trainig region_map = mlflow.artifacts.load_dict(context.artifacts["census_regions"]) #Generate Features data = self.format_request_features(data,region_map) #Get prediction approval probabilities and round them to 2 decimals preds = self._model.predict_proba(data)[:,1] preds = np.round(preds.astype(float),2) #Modify the response to a list of probabilities preds_resp = preds.tolist() return preds_resp def predict_batch(self, data): pass #We Log the region_map as an artifact for feature generation but first we need to obtain it def get_table(tableName): base = spark.conf.get("myuser.pipelines.location") path = f"{base}tables/{tableName}" return spark.read.format("delta").load(path) df_regions = get_table("gold_census_regions")[["State_Code","Region"]].toPandas() region_map = dict(zip(df_regions.State_Code, df_regions.Region)) ``` </div> </div> </div> ___ # Model Experiment Training & Hyperparameter Tuning <div class="section"> <div class="row"> <div class="col" style="font-size: 20 !important"> <p>Let's now create a function that will train an XGBoost model and log experiments to our MLFlow Experiment Workspace. We will:</p> <ul> <li>Define a hyperparameter search space</li> <li>Train an XGBoost model</li> <li>Calculate the following performance metrics: accuracy score, f1 score, balanced accuracy score</li> <li>Create a gender fairness predictions chart</li> <li>Run 100 Experiments with different parameters from our bayesian hyperparameter tuning</li> </div> <div class="col"> ```python[1-9|24-26|35-41|43-51|63-73] #Define the Search Space Over the Tunable Parameters search_space = { 'learning_rate': hp.loguniform('learning_rate', -5, 0), 'max_depth': scope.int(hp.uniform('max_depth', 1, 25)), 'subsample': hp.uniform('subsample', 0.5, 1), 'colsample_bytree': hp.uniform('colsample_bytree', 0.5, 1), 'gamma': hp.loguniform('gamma', 0, 10), 'objective': 'binary:logistic', } def train_model(params): # With MLflow autologging, hyperparameters and the trained model are automatically logged to MLflow. mlflow.xgboost.autolog(silent=True, log_models=False) # However, we can log additional information by using an MLFlow tracking context manager with mlflow.start_run(nested=True): runid = mlflow.active_run().info.run_id #Record Run Time start_time = time.time() #Train Model model = XGBClassifier(**params) model.fit(X_train, y_train,sample_weight=sample_weights) #Test Predictions for Validation predictions_test = model.predict(X_test) #Log Run Time run_time = time.time() - start_time mlflow.log_metric('runtime', run_time) #Performance Metrics that will auto-log to MLFlow (F1 Score, Balanced Accuracy Score, Accuracy Sccore) accuracy_score = metrics.accuracy_score(y_test, predictions_test) f1_score = metrics.f1_score(y_test, predictions_test) balanced_accuracy_score = metrics.balanced_accuracy_score(y_test, predictions_test) mlflow.log_metric("accuracy_score",accuracy_score) mlflow.log_metric("f1_score",f1_score) mlflow.log_metric("balanced_accuracy_score",balanced_accuracy_score) #Log Fairness Chart Artifact for Male/Female Predictions on the test set male = X_test[s_test.applicantSex == "Male"] female = X_test[s_test.applicantSex == "Female"] male_preds = model.predict_proba(male) female_preds = model.predict_proba(female) sns.set(style="white", palette="muted", color_codes=True, context="talk") fig = sns.distplot(male_preds[:,1], hist=False,kde_kws={'shade': True,},label='{}'.format("Male Approval"),color="blue") fig = sns.distplot(female_preds[:,1], hist=False,kde_kws={'shade': True,},label='{}'.format("Female Approval"),color="red") mlflow.log_figure(fig.figure, 'gender_fairness_plot.png') fig.clear() #Log Census Region Dictionary Artifact mlflow.log_dict(region_map, f"region_map.json") #Log our model with the ModelWrapper and link to dictionary artifact for feature generation mlflow.pyfunc.log_model("model",python_model=ModelWrapper(model),artifacts={"census_regions":"runs:/{}/region_map.json".format(runid)}) # Set the loss to -1*f1_score so fmin maximizes the f1 score (We minimize the negative == optimizing max) return {'status': STATUS_OK, 'loss': -f1_score, 'params': model.get_params()} #Set Experiment mlflow.set_experiment(spark.conf.get("myuser.pipelines.mlflow")) #Start the hyperparameter tuning (Note for faster experimentation reduce model runs in max_evals from 100 to a smaller number like 10) with mlflow.start_run(run_name='approvalPrediction'): best_params = fmin( fn=train_model, space=search_space, algo=tpe.suggest, max_evals=100, ) ``` </div> </div> </div> ___ # Viewing Experiments 1. Make sure you are in the Machine Learning workspace 2. Click Experiments in the siddebar 3. Click `{your-email}` to view experiments <center><img src="/images/presentations/data_ml/model_training_experiments.png" width="1200"/></center> ___ # Model Registering <!-- .slide: data-background-image="/images/presentations/ppt-subsection-background.png" --> ___ # Setup We are now going to move to our model registering script. We will now be working out of the following notebook: ``` techcon │ │ └───approvalModel │ model_registering ``` ___ # Imports Let's import the following libraries:<br><br> ```python[] import mlflow from mlflow.tracking.client import MlflowClient ``` ___ # Finding Best Experiment Run Let's now search through all of our 100 ML model experiments and find the best trained model according to the balanced accuracy score ```python[] #Set MLFlow Experiment Directory mlflow.set_experiment(spark.conf.get("myuser.pipelines.mlflow")) #Search Runs df_experiment = mlflow.search_runs() #Find Best Run based on Maximum (Best) Performance Metric we logged (balance accuracy score) run_id = df_experiment.loc[df_experiment['metrics.balanced_accuracy_score_unknown_dataset'].idxmax()]['run_id'] #Register the best performing model to the model registry - techCon_approval_model will be the registered name artifact_path = "model" model_name = "techCon_approval_model" model_uri = "runs:/{run_id}/{artifact_path}".format(run_id=run_id, artifact_path=artifact_path) model_details = mlflow.register_model(model_uri=model_uri, name=model_name) #Update the model's description client = MlflowClient() client.update_registered_model( name=model_details.name, description="TechCon Home Mortgage Approval Prediction Model." ) ``` ___ # Model State Transitioning Let's now promote our model to a production standpoint: ```python[] #When you are happy with the model you can transition it's stage to production client.transition_model_version_stage( name=model_details.name, version=model_details.version, stage='production', ) model_version_details = client.get_model_version( name=model_details.name, version=model_details.version, ) print("The current model stage is: '{stage}'".format(stage=model_version_details.current_stage)) latest_version_info = client.get_latest_versions(model_name, stages=["production"]) latest_production_version = latest_version_info[0].version print("The latest production version of the model '%s' is '%s'." % (model_name, latest_production_version)) ``` ___ # Viewing Registered Model 1. Make sure you are in the Machine Learning workspace 2. Click Models in the sidebar 3. Click techCon_approval_model <center><img src="/images/presentations/data_ml/registered_model.png" width="1200"/></center> <b>Note: Your model should say Version 1 and increment if you run the model pipeline multiple times, deploying a better model</b> ___ # Model Task Orchestration <!-- .slide: data-background-image="/images/presentations/ppt-subsection-background.png" --> ___ # Running the ML Pipeline We have now created notebooks for feature generation, model training, and model registering. We have assembled them sequentially in a ML Flow task that we will now run. 1. Make sure you are in the Machine Learning workspace 2. Click Workflows 3. Click `{your-email}-Model Generation Pipeline` 4. Click `Run now` at the top right <center><img src="/images/presentations/data_ml/ml_task_orchestration.png" width="600"/></center> ___ # Model Serving <!-- .slide: data-background-image="/images/presentations/ppt-subsection-background.png" --> ___ # Creating an Endpoint <div class="section"> <div class="row"> <div class="col"> <p>Now that we have a model registered in production, we can serve a model and create an endpoint:</p> <ul> <li>Make sure you are in the Machine Learning workspace</li> <li>Click Models</li> <li>Find your production model</li> <li>Click Serving at the top navigation bar</li> <li>Click Use model for inference at the top right and wait a few minutes</li> </ul> </div> <div class="col"> <p>When your model is served you should see the following:</p> <center><img src="/images/presentations/data_ml/model_endpoint_creation.png" width="800"/></center> </div> </div> </div> ___ # Using the Endpoint Now that we have a model registered in production and served, we can send the model data for inference right from the UI in browser, send a Curl request, or get predictions from the model through python. Let's send a request with the following body structure: <center><img src="/images/presentations/data_ml/model_serving.png" width="1000"/></center> --- # The End <!-- .slide: data-background-image="/images/presentations/ppt-background.png" --> ___ # Contact & Resources For any questions please reach out to adam.lieberman@finastra.com | Topic | Resource | | ------ | ------ | | Databricks | https://docs.databricks.com/introduction/index.html | | MLFlow | https://docs.databricks.com/mlflow/index.html | | Feature Store | https://docs.databricks.com/machine-learning/feature-store/index.html | | Delta Live Tables | https://learn.microsoft.com/en-us/azure/databricks/workflows/delta-live-tables/ | | Model Serving | https://learn.microsoft.com/en-us/azure/databricks/mlflow/model-serving | | Machine Learning| https://www.deeplearningbook.org/contents/ml.html | | Data Engineering | https://medium.com/@rchang/a-beginners-guide-to-data-engineering-part-i-4227c5c457d7 |