This is the fourth post in a series on Azure Data Factory Custom Activity Development.
Pipeline Execution
Executing Pipeline Activities basically requires resetting the status of prior Data Slices so that they can re-execute. We can do this in the “Manage and Monitor“ Data Factory dashboard provided by Azure. We can also do this relatively simply using PowerShell. What is missing is really being able to do this from the comfort of your own Visual Studio environment.
When writing Unit and Integration Tests it is most beneficial to be able to do this within the IDE, using frameworks such as VS Test or NUnit to assist with our test logic and execution. This follows proven practice with other types of development and allows easy automation of testing for CI/CD activities. So having made the case, how can we easily go about running our Pipeline Data Slices? Well the ADF API isn’t that intimidating, and although there are some caveats to be aware of, it is relatively easy to go about creating a class or two to assist with the task of executing ADF Activities from DotNet test code. We are of course working on the assumption that we are doing our testing in a Data Factory environment that is indeed purposed for testing and can therefore update the Pipeline accordingly.
PipelineExecutor
The first task we need is one that will execute our Pipeline Activity Data Slices. It will need to have access to various properties of the Pipeline itself. It will also need various methods and properties to conduct the desired Data Slice operations. Hence our class with a relatively fitting name of PipelineExecutor. The PipelineGetResponse class within the Microsoft.Azure.DataFactory.Models namespace allows us to request a Pipeline object’s details from our Data Factory. We’ll create a private field for this and retrieve this when constructing our PipelineExecutor class.
public class PipelineExecutor { #region Private Fields private string dataFactoryName; private DataFactoryManagementClient dfClient; private DateTime maxDateTime = DateTime.MaxValue; private DateTime minDateTime = DateTime.MinValue; private PipelineGetResponse pipelineGR; private string pipelineName; private string resourceGroupName; #endregion Private Fields #region Public Constructors ///
/// Initializes a new instance of the class. /// ///The adf application identifier. ///The adf application secret. ///The domain. ///The login windows prefix. ///The management windows DNS prefix. ///The subscription identifier. ///Name of the resource group. ///Name of the data factory. ///Name of the pipeline. public PipelineExecutor(string adfApplicationId, string adfApplicationSecret, string domain, string loginWindowsPrefix, string managementWindowsDnsPrefix, string subscriptionId, string resourceGroupName, string dataFactoryName, string pipelineName) { this.resourceGroupName = resourceGroupName; this.dataFactoryName = dataFactoryName; this.pipelineName = pipelineName; ClientCredentialProvider ccp = new ClientCredentialProvider(adfApplicationId, adfApplicationSecret); dfClient = new DataFactoryManagementClient(ccp.GetTokenCloudCredentials(domain, loginWindowsPrefix, managementWindowsDnsPrefix, subscriptionId)); LoadPipeline(); } #endregion Public Constructors ////// Loads the pipeline using a get request. ///
/// public void LoadPipeline() { this.pipelineGR = dfClient.Pipelines.Get(resourceGroupName, dataFactoryName, pipelineName); if (this.pipelineGR == null) throw new Exception(string.Format(“Pipeline {0} not found in Data Factory {1} within Resource Group {2}”, pipelineName, dataFactoryName, resourceGroupName)); }
The PipelineGetResponse.Pipeline property is the actual ADF Pipeline object, so we can apply the Decorator pattern to simply reference this and any other relevant properties within our code whenever required. For example, we create a simple readwrite property End, which will be used for getting and setting the Pipeline End, as below:
///
/// Gets or sets the pipeline end. ///
/// /// The end. /// public DateTime? End { get { return this.pipelineGR.Pipeline.Properties.End; } set { if (this.pipelineGR.Pipeline.Properties.End != value) { this.pipelineGR.Pipeline.Properties.End = value; IsDirty = true; } } }
Notice that we are setting an IsDirty property when updating the property. This can then be used to determine whether we need to save any changes to the Pipeline back to the Data Factory when we are actually ready to run our Data Slice(s).
We have properties for the major Pipeline attributes, such as a List of Activities and Data Sets, whether the Pipeline is Paused and various others. These will be used within a number of helper methods that allow for easier running of our Activity Data Slices
As previously mentioned the method for executing an Activity Data Slice is to set the status accordingly. We also need to check that the Pipeline is not currently paused, and if it is, resume it. For these purposes there are a number of methods that do similar things with regard to Data Slices, such as setting the status of the last Data Slice prior to a specified datetime, for all the Data Sets within the Pipeline, as below:
///
/// Sets the state of the pipeline activities last data slice prior to the date specified. Updates the pipeline start and end If data slices fall outside of this range. ///
///The date prior to which the last data slice will be updated. ///State of the slice. ///Type of the update. public void SetPipelineActivitiesPriorDataSliceState(DateTime priorTo, string sliceState = “Waiting”, string updateType = “UpstreamInPipeline”) { Dictionary<string, dataslice> datasetDataSlices = GetPipelineActivitiesLastDataSlice(priorTo); //the earliest start and latest end for the data slices to be reset. DateTime earliestStart = datasetDataSlices.OrderBy(ds => ds.Value.Start).First().Value.Start; DateTime latestEnd = datasetDataSlices.OrderByDescending(ds => ds.Value.End).First().Value.End; //If the pipeline start and end values do not cover this timespan, the pipeline schedule needs to be updated to use the expanded time period, else //the dataslice updates will fail. if (this.Start > earliestStart) { this.Start = earliestStart; } if (this.End < latestEnd) { this.End = latestEnd; } SavePipeline(); foreach (string datasetName in datasetDataSlices.Keys) { DataSlice ds = datasetDataSlices[datasetName]; SetDataSetDataSliceState(datasetName, ds.Start, ds.End, sliceState, updateType); } } ///
/// Saves the pipeline. ///
public void SavePipeline() { //update the pipeline with the amended properties if (IsDirty) dfClient.Pipelines.CreateOrUpdate(resourceGroupName, dataFactoryName, new PipelineCreateOrUpdateParameters() { Pipeline = this.pipelineGR.Pipeline }); }
Pipeline Data Slice Range Issues…
The first gotcha you may encounter when resetting Activity Data Slices is that of possibly exceeding the Pipeline Start and/or End times. If you try and run a Pipeline in this state you will get an exception. Hence the need to check our earliest and latest Data Slice range over all our activities and amend the Start and End properties for our Pipeline (wrapped up inside our PipelineExecutor.Start, End properties). Nothing too taxing here and we’re back in the game.
For completeness I’ll include the SetDataSetDataSliceState method called above so you can see what is required to actually set the Data Slice statuses using the Data Factory API, via the Microsoft.Azure.Management.DataFactories.DataFactoryManagementClient class.
///
/// Sets the state of all data set data slices that fall within the date range specified. ///
///Name of the dataset. ///The slice start. ///The slice end. ///State of the slice. ///Type of the update. /// /// public void SetDataSetDataSliceState(string datasetName, DateTime sliceStart, DateTime sliceEnd, string sliceState = “Waiting”, string updateType = “UpstreamInPipeline”) { DataSliceState sliceStatusResult; if (!Enum.TryParse<DataSliceState&lgt;(sliceState, out sliceStatusResult)) throw new ArgumentException(string.Format(“The value {0} for sliceStatus is invalid. Valid values are {1}.”, sliceState, string.Join(“, “, Enum.GetNames(typeof(DataSliceState))))); DataSliceUpdateType updateTypeResult; if (!Enum.TryParse(updateType, out updateTypeResult)) throw new ArgumentException(string.Format(“The value {0} for sliceStatus is invalid. Valid values are {1}.”, sliceState, string.Join(“, “, Enum.GetNames(typeof(DataSliceUpdateType))))); DataSliceSetStatusParameters dsssParams = new DataSliceSetStatusParameters() { DataSliceRangeStartTime = sliceStart.ConvertToISO8601DateTimeString(), DataSliceRangeEndTime = sliceEnd.ConvertToISO8601DateTimeString(), SliceState = sliceState, UpdateType = updateType }; dfClient.DataSlices.SetStatus(resourceGroupName, dataFactoryName, datasetName, dsssParams); }
You’ll see there are a couple of annoyances we need to code for, such as having to parse the sliceState and updateType parameters against some Enums of valid values that we’ve had to create to ensure only permitted values for these, and having to call ConvertToISO8601DateTimeString() on our slice start and end times. No biggie though.
Now that we have a class that gives us some ease of running our Pipeline Slices in a flexible manner (I’ve left out other methods for brevity) we can move on to using these within the various test methods of the Test Project classes we will be using.
A simple Base Testing Class
In the case our the project in question, we will be writing a lot of similar Tests that will simply check row counts on Hive destination objects once a Data Factory Pipeline has executed. To make writing these easier, and remembering our DRY principle mentioned back in Part 2 of this series, we can encapsulate the functionality to do this in a base class method, and derive our row counting Test classes from this.
public class TestBase { #region Private Fields protected string adfApplicationId = Properties.Settings.Default.ADFApplicationId; protected string adlsAccountName = Properties.Settings.Default.ADLSAccountName; protected string adlsRootDirPath = Properties.Settings.Default.ADLSRootDirPath; protected string domain = Properties.Settings.Default.Domain; protected string loginWindowsPrefix = Properties.Settings.Default.LoginWindowsDnsPrefix; protected string managementWindowsDnsPrefix = Properties.Settings.Default.ManagementWindowsDnsPrefix; protected string clusterName = Properties.Settings.Default.HDIClusterName; protected string clusterUserName = Properties.Settings.Default.HDIClusterUserName; protected string outputSubDir = Properties.Settings.Default.HDIClusterJobOutputSubDir; protected string dataFactory = Properties.Settings.Default.DataFactory; protected string resourceGroup = Properties.Settings.Default.ResourceGroup; protected string subscriptionId = Properties.Settings.Default.SubscriptionId; protected string tenantId = Properties.Settings.Default.TenantId; #endregion Private Fields #region Public Methods //[TestMethod] public void ExecuteRowCountTest(string pipelineName, long expected, string databaseName, string objectName) { string rowCountStatement = "select count(*) as CountAll from {0}.{1};"; //check for sql injection if (databaseName.Contains(";") | objectName.Contains(";")) throw new ArgumentException(string.Format("The parameters submitted contain potentially malicious values. databaseName : {0}, objectName{1}. This may be an attempt at sql injection", databaseName, objectName)); PipelineExecutor plr = new PipelineExecutor(adfApplicationId, adfApplicationSecret, domain, loginWindowsPrefix, managementWindowsDnsPrefix, subscriptionId, resourceGroup, dataFactory, pipelineName); plr.SetPipelineActivitiesPriorDataSliceState(DateTime.Now); plr.AwaitPipelineCompletion().Wait(); string commandText = string.Format(rowCountStatement, databaseName, objectName); IStorageAccess storageAccess = new HiveDataLakeStoreStorageAccess(tenantId, adfApplicationId, adfApplicationSecret, adlsAccountName, adlsRootDirPath); HiveDataLakeStoreJobExecutor executor = new HiveDataLakeStoreJobExecutor(clusterName, clusterUserName, clusterPassword, outputSubDir, storageAccess); long actual = Convert.ToInt64(executor.ExecuteScalar(commandText)); Assert.AreEqual(expected, actual); } #endregion Public Methods }
Our test project classes methods for destination row counts can then be simplified to something such as that below:
[TestClass] public class MyPipelineName : TestBase { #region Private Fields private string pipelineName = "MyPipelineName"; #endregion Private Fields #region Public Methods //todo centralise the storage of these [TestMethod] public void DestinationRowCountTest() { string databaseName = "UniversalStaging"; string objectName = "extBrandPlanning"; long expected = 10; base.ExecuteRowCountTest(pipelineName, expected, databaseName, objectName); } #endregion Public Methods }
Going Further
For tests involving dependent objects we can use mocking frameworks such as NSubstitute, JustMock or Moq to create these, and define expected behaviours for the mocked objects, thereby making the writing and asserting of our test conditions all very much in line with proven Test Driven Development (TDD) practices.I won’t go into this here as there are plenty of well grounded resources out there on these subjects.
Up Next…
In the final instalment of the series we get remove some of the referenced library limitations inherent in the ADF execution environment. Y’all come back soon now for Part 5: Using Cross Application Domains in ADF…