Source code for neograd.autograd.utils

import numpy as np
from .graph import Graph
from itertools import zip_longest


[docs]def process_data(data): '''Checks and processes the data for storage in Tensor Supported types for data - [int, float, list, np.ndarray] Elements in data should be float or be typecastable to float Args: data (int or float or list or np.ndarray): Data to be processed Returns: Processed data Raises: TypeError: If data or its elements aren't typecastable to float TypeError: If data is not instance of supported types ''' supported_types = (int, float, list, np.ndarray) if type(data) in supported_types: if not isinstance(data, np.ndarray): data = np.array(data) try: data = data.astype(float) except ValueError: raise TypeError("Elements of data should be of type float or be typecastable to float") else: raise TypeError(f"Expected data of types {supported_types} instead got {type(data)}") return data
[docs]def unbroadcast_data(data, orig_data_shape, broadcasted_shape): ''' Unbroadcasts the data to its original shape If data(a np object) is broadcasted during an operation, then it is unbroadcasted here, where all axes where it was broadcasted are summed along those axes to give the original shape of the data. If broadcasted_shape is None, then the data is returned as is. Args: data (np.ndarray): Data to be unbroadcasted orig_data_shape (tuple): Original shape of data before broadcasting broadcasted_shape (tuple): Shape to which data has been broadcasted to Returns: Data that is unbroadcasted ''' def get_axes_to_be_summed(orig_data_shape, broadcasted_shape): '''Returns the axes along which data has been broadcasted Given the original data shape and its broadcasted shape, it returns True along an axis if their dimensions don't match, else returns False if they match, meaning there has been no broadcasting along that axis. https://numpy.org/doc/stable/user/basics.broadcasting.html Args: orig_data_shape (tuple): Original shape of data before broadcasting broadcasted_shape (tuple): Shape to which data has been broadcasted to Returns: tuple of axes on which there's been broadcasting ''' axes_to_be_summed = [] zipped = list(zip_longest(tuple(reversed(broadcasted_shape)), tuple(reversed(orig_data_shape)), fillvalue=None)) for dim, (dim_broadcasted, dim_orig) in enumerate(reversed(zipped)): if dim_broadcasted!=dim_orig: axes_to_be_summed.append(dim) return tuple(axes_to_be_summed) if broadcasted_shape is not None: axes_to_be_summed = get_axes_to_be_summed(orig_data_shape, broadcasted_shape) unbroadcasted_data = np.sum(data, axis=axes_to_be_summed) else: unbroadcasted_data = data return unbroadcasted_data
[docs]def get_graph(): '''Returns graph that is in use and present in Graph.graph If Graph.graph is None, then the global graph _NG_GRAPH is used Returns: Graph object that is currently used ''' if Graph.graph is None: from .. import _NG_GRAPH graph = _NG_GRAPH else: graph = Graph.graph return graph
[docs]class new_graph: '''Creates a Graph object Context Manager to create a new graph if required anywhere and under the circumstances where it shouldn't interfere with the global _NG_GRAPH After entering, Graph object created is set in Graph.graph. After exiting the Graph.graph is set back to None which implies that global _NG_GRAPH will be used ''' def __enter__(self): Graph.graph = Graph() def __exit__(self, exc_type, exc_value, exc_traceback): Graph.graph = None
[docs]class no_track: '''Prevents tracking of Tensors Context Manager to prevent creation of a backward graph, when gradient calculation is not required, for ex when testing a model after training it, you don't need any backward pass On entering, graph.track is set to False to indicate no tracking and on exiting, it is set back to True Parameters: graph (Graph): The current graph in use ''' def __init__(self): self.graph = get_graph() def __enter__(self): self.graph.track = False def __exit__(self, exc_type, exc_value, exc_traceback): self.graph.track = True
[docs]def _evaluate_grad_check(analytical_grads, calculated_grads, epsilon, print_vals): '''Evaluates the gradient check and indicates whether it has passed or not Calculates the distance between the analytical and calculated gradients and if it is less than epsilon, then it has passed else failed Args: analytical_grads (list of int or float): Gradients that are calculated analytically by wiggling the parameters calculated_grads (list of int or float): Gradients that are calulated through backpropagation epsilon (float): The amount by which params need to be wiggled print_vals (bool): True if distance and verdict needs to be printed Returns: Distance between analytical and calculated gradients ''' dist = np.linalg.norm(analytical_grads-calculated_grads)/(np.linalg.norm(analytical_grads) + np.linalg.norm(calculated_grads)) if print_vals: print("Gradient Check Distance:", dist) if dist<epsilon: print("Gradient Check PASSED") else: print("Gradient Check FAILED") return dist
[docs]def _wiggle_params(analytical_grads, calculated_grads, params, get_loss, epsilon): '''Changes the params value by epsilon and calculates the analytical gradient First to each element in params.data epsilon is added and loss is calculated, similarly 2*epsilon is subtracted to get another loss and using these two analytical gradient is calculated and is appended to analytical_grads and the gradient in param is appended to calculated_grads Args: analytical_grads (list of int or float): Gradients that are calculated analytically by wiggling the parameters calculated_grads (list of int or float): Gradients that are calulated through backpropagation params (list of Tensor): All params that need to be wiggled get_loss: function that is used to calculate the loss epsilon (float): The amount by which params need to be wiggled ''' for param in params: if param.requires_grad: if not(isinstance(param.grad, np.ndarray)): param.grad = np.array(param.grad) for idx in np.ndindex(param.shape): with no_track(): param.data[idx]+=epsilon # PLUS loss1 = get_loss() param.data[idx]-=(2*epsilon) # MINUS loss2 = get_loss() param.data[idx]+=epsilon # ORIGINAL calculated_grads.append(param.grad[idx]) analytical_grads.append((loss1.data-loss2.data)/(2*epsilon)) param.zero_grad() # to prevent any side effects
[docs]def grad_check(model, inputs, targets, loss_fn, epsilon=1e-7, print_vals=True): '''Performs Gradient Check Implements Gradient Check, to make sure that backprop is calculating the right gradients. All the parameters in the model are checked. If distance between backprop gradients and numerical gradients is less than epsilon, then the gradients are proper, if not there is an issue Args: model (Model): The Neural Network to be evaluated inputs (Tensor): Input data(No need for complete data, only sample enough) targets (Tensor): Targets loss_fn (Loss): Loss Function epsilon (float): The amount by which params need to be wiggled Defaults to 1e-7 print_vals (bool): True if distance and verdict needs to be printed Returns: Distance between analytical and calculated gradients ''' params = model.parameters() analytical_grads = [] calculated_grads = [] for param in params: param.zero_grad() def get_loss(): outputs = model(inputs) loss = loss_fn(outputs, targets) return loss with new_graph(): loss = get_loss() loss.backward() _wiggle_params(analytical_grads, calculated_grads, params, get_loss, epsilon) analytical_grads = np.array(analytical_grads) calculated_grads = np.array(calculated_grads) return _evaluate_grad_check(analytical_grads, calculated_grads, epsilon, print_vals)
[docs]def fn_grad_check(fn, inputs, params, targets=None, loss_fn=None, epsilon=1e-7, print_vals=True, **kwargs): '''Performs Gradient Check for a function Implements Gradient Check for a function instead of a complete model Any params that are required to be gradient checked can be specified Args: fn: Function to be gradient checked inputs (list of Tensor): inputs to the function params (list of Tensor): the params whose data can be wiggled to get the gradients targets (Tensor): targets of the function loss_fn (Loss): loss_fn to evaluate the function epsilon (float): The amount by which params need to be wiggled Defaults to 1e-7 print_vals (bool): True if distance and verdict needs to be printed **kwargs: Any kwargs to be passed to fn Returns: Distance between analytical and calculated gradients ''' if loss_fn is None: from ..nn.loss import MSE loss_fn = MSE() analytical_grads = [] calculated_grads = [] for param in params: param.zero_grad() def get_loss(targets=targets): outputs = fn(*inputs, **kwargs) if targets is None: from .tensor import Tensor as tensor targets = tensor(np.ones(outputs.shape)) loss = loss_fn(outputs, targets) return loss with new_graph(): loss = get_loss() loss.backward() _wiggle_params(analytical_grads, calculated_grads, params, get_loss, epsilon) analytical_grads = np.array(analytical_grads) calculated_grads = np.array(calculated_grads) return _evaluate_grad_check(analytical_grads, calculated_grads, epsilon, print_vals)