Data Transformation & Loading
Objectives
- Create JSON Query Path to extract data
- Create SQL Tables to hold transformed data
- Create External Data Source to connect database to Blob Store
- Create Stored Procedure to Transform and Load data into SQL Tables
Transformation
I’m almost at a stage where I can start fetching the JSON Documents from the Blob Storage container into SQLDB, where I’ll parse and load the data into SQL Tables.
To avoid rehashing content, the use of JSON data in SQL Server is well documented here.
I’m particularly interested in using SQL Server built-in function operators to “Transform arrays of JSON objects into table format”. For this I’ll make use of the OPENJSON
function, which is documented here.
In order to take full advantage of the OPENJSON
function I’ll specify the schema using the WITH
clause. Failing to do so just means I’ll have a limited structure with only 3 default columns; “name, value and type”. Using the WITH
clause, I can specify a discretionary number of columns along with data types and the path that returns the JSON Objects’ value.
I’ll be replicating the PPD Data Model structure within SQLDB using the below JSON hierarchy structure
[row style=”divided”]
[col span=”1/4″ ]
Data Model Entity
EstateType
Transaction
PropertyType
RecordStatus
TransactionCategory
TransactionRecord
TransactionRecord
TransactionRecord
Address
Address
Address
Address
Address
Address
Address
Address
[/col]
[col span=”1/4″ ]
Data Model Attribute
estateType
transactionId
propertyType
recordStatus
transactionCategory
newBuild
pricePaid
transactionDate
County
District
Locality
Paon
Postcode
Saon
Street
town
[/col]
[col span=”1/2″ ]
Query_Parameter
“$.result.items.estateType.label._value”
“$.result.items.transactionId”
“$.result.items.propertyType.label._value”
“$.result.items.recordStatus.label._value”
“$.result.items.transactionCategory.label._value”
“$.result.items.newBuild”
“$.result.items.pricePaid”
“$.result.items.transactionDate”
“$.result.items.propertyAddress.county”
“$.result.items.propertyAddress.district”
“$.result.items.propertyAddress.locality”
“$.result.items.propertyAddress.paon”
“$.result.items.propertyAddress.postcode”
“$.result.items.propertyAddress.paon”
“$.result.items.propertyAddress.street”
“$.result.items.propertyAddress.town”
[/col]
[/row]
Using JSON Editor Online, I can inspect the JSON Document. Highlighted in yellow is the path taken to arrive at the value for the “estateType”.
Since I have the freedom to define the schema, I’ll do so ensuring that each Entity will have a “transactionId”, which will allow me to tie the tables back together.
Data Loading
Before I can begin loading data, I’ll need to connect to Azure SQLDB using Management Studio.
I’ll begin by creating the tables that will hold the parsed JSON Document.
-- =================================================== -- ===============EstateType Table==================== -- =================================================== CREATE TABLE [dbo].[EstateType]( [transactionId] nvarchar(50) NOT NULL, [estateType] nvarchar(100) NULL CONSTRAINT [pktransactionIdEstateType] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ==============Transaction Table==================== -- =================================================== CREATE TABLE [dbo].[Transaction]( [transactionId] nvarchar(50) NOT NULL CONSTRAINT [pktransactionIdTransaction] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ===============PropertyType Table================== -- =================================================== CREATE TABLE [dbo].[PropertyType]( [transactionId] nvarchar(50) NOT NULL, [propertyType] nvarchar(100) NULL CONSTRAINT [pktransactionIdPropertyType] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ==============RecordStatus Table=================== -- =================================================== CREATE TABLE [dbo].[RecordStatus]( [transactionId] nvarchar(50) NOT NULL, [recordStatus] nvarchar(100) NULL CONSTRAINT [pktransactionIdRecordStatus] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ===============TransactionCategory Table=========== -- =================================================== CREATE TABLE [dbo].[TransactionCategory]( [transactionId] nvarchar(50) NOT NULL, [transactionCategory] nvarchar(100) NULL CONSTRAINT [pktransactionIdTransactionCategory] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ==============TransactionRecord Table============== -- =================================================== CREATE TABLE [dbo].[TransactionRecord]( [transactionId] nvarchar(50) NOT NULL, [newBuild] nvarchar(10) NULL, [pricePaid] int NULL, [transactionDate] nvarchar(100) NULL CONSTRAINT [pktransactionIdTransactionRecord] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ==============Address Table======================== -- =================================================== CREATE TABLE [dbo].[Address]( [transactionId] nvarchar(50) NOT NULL, [county] nvarchar(100) NULL, [district] nvarchar(100) NULL, [locality] nvarchar(100) NULL, [paon] nvarchar(100) NULL, [postcode] nvarchar(100) NULL, [saon] nvarchar(100) NULL, [street] nvarchar(100) NULL, [town] nvarchar(100) NULL CONSTRAINT [pktransactionIdAddress] PRIMARY KEY CLUSTERED ([transactionId] ASC) ); -- =================================================== -- ===============BlobFileName Table================== -- =================================================== CREATE TABLE [dbo].[BlobFileName]( [blobFileName] nvarchar(50) NOT NULL );
The DDL Script above will produce the following tables
You’ll have noticed that there is a table named “BlobFileName”. This table holds all the file names for the Blobs stored within the Blob container. I’ll need this list in the Stored Procedure as it will allow me to iterate through each file.
Given that I’ve got a small number of Blobs (6), I’ll manually insert each file name into the Table. If I had thousands of files, as I did on a client project, I would create an additional Logic App that would simply read all the file names in the Blob container and insert the filename into the “BlobFileName” table.
Here’s how I would set it up
- Under List blobs – point the Logic App to Blob container
- Under Initialize variable – declare an array variable
- Under For each Loop
- In “Select an output from previous step” – select “Value” (Blob metadata collection) which will give us access to the Blob name among other things.
- Add an Action – select “Append to array variable” which allows you to select “Name” which has the name of the file/folder.
- Add an Action – select “Insert row” which requires you to connect to Azure SQLDB and select the table that will receive the name of the Blob file.
Continuing on with the data load. I now need to create an External Data Source that connects to the Blob Storage container which will allow me to use the OPENROWSET function (enables data from a file to be read and returned as a rowset).
Please note that you’ll need to
“Create a database master key if one does not already exist, using your own password. This key is used to encrypt the credential secret […]”.
As you can see, I was able to successfully create the resource as shown below.
You can run the following queries to confirm this step.
--External Data Source SELECT * FROM [sys].[external_data_sources] --Database scoped credentials SELECT * FROM [sys].[database_scoped_credentials]
I then ran the below Stored Procedure to execute the extract, transformation and loading.
CREATE PROCEDURE blogDemoSPROC AS -- ============================================================================================================== -- Author: Tino Zishiri -- Description: Performs a extract, transform and load of data from a blob store. The script uses cursor to loop -- through the JSON files as specified in the BlobFileName table. -- ============================================================================================================== BEGIN SET NOCOUNT ON DECLARE @filename varchar(100) DECLARE @sqlCommand nvarchar(1000); DECLARE @data nvarchar(max); DECLARE curs_files CURSOR FOR SELECT [BlobFileName] FROM [dbo].[BlobFileName] WHERE [BlobFileName] IS NOT NULL OPEN curs_files FETCH NEXT FROM curs_files INTO @filename WHILE (@@FETCH_STATUS = 0) BEGIN SET @sqlCommand = ' SELECT @dataTemp = BulkColumn FROM OPENROWSET(BULK ''' + @filename + ''', DATA_SOURCE = ''pricePaidDateset'', SINGLE_CLOB) as j'; PRINT @sqlCommand; BEGIN TRY EXEC sp_executesql @sqlCommand, N'@dataTemp NVARCHAR(MAX) OUTPUT', @dataTemp = @data OUTPUT -- =================================================== -- ===============EstateType Table==================== -- =================================================== INSERT INTO [dbo].[EstateType] SELECT [transactionId] , [estateType] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' , --transactionId will be attached across all entities [label] nvarchar(max) '$."estateType"."label"' as json --whenever there is an array we use outer apply to specify the child element(s) )A outer apply openjson(A.[label]) with( [estateType] nvarchar(100) '$."_value"' ); -- =============================================== -- ==============Transaction table================ -- =============================================== INSERT INTO [dbo].[Transaction] SELECT [transactionId] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' --transactionId will be attached across all entities )A; ---- =================================================== ---- ===============PropertyType Table================== ---- =================================================== INSERT INTO [dbo].[PropertyType] SELECT [transactionId] , [propertyType] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' , --transactionId will be attached across all entities [label] nvarchar(max) '$."propertyType"."label"' as json )A outer apply openjson(A.[label]) with( [propertyType] nvarchar(100) '$."_value"' ); ---- =================================================== ---- ===============RecordStatus Table================== ---- =================================================== INSERT INTO [dbo].[RecordStatus] SELECT [transactionId] , [recordStatus] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' , --transactionId will be attached across all entities [label] nvarchar(max) '$."recordStatus"."label"' as json )A outer apply openjson(A.[label]) with( [recordStatus] nvarchar(100) '$."_value"' ); ---- =================================================== ---- ===============TransactionCategory Table=========== ---- =================================================== INSERT INTO [dbo].[TransactionCategory] SELECT [transactionId] , [transactionCategory] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' , --transactionId will be attached across all entities [label] nvarchar(max) '$."transactionCategory"."label"' as json )A outer apply openjson(A.[label]) with( [transactionCategory] nvarchar(100) '$."_value"' ); ---- =================================================== ---- ===============TransactionRecord Table============= ---- =================================================== INSERT INTO [dbo].[TransactionRecord] SELECT [transactionId] , [newBuild] , [pricePaid] , [transactionDate] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' , --transactionId will be attached across all entities [newBuild] nvarchar(10) '$."newBuild"' , [pricePaid] int '$."pricePaid"' , [transactionDate] nvarchar(100) '$."transactionDate"' ); ---- =================================================== ---- ===============Address Table======================= ---- =================================================== INSERT INTO [dbo].[Address] SELECT [transactionId] , [county] , [district] , [locality] , [paon] , [postcode] , [saon] , [street] , [town] FROM OPENJSON(@data,'$."result"') WITH ( [items] nvarchar(max) as json ) AS B cross apply openjson (B.[items]) WITH ( [transactionId] nvarchar(100) '$."transactionId"' , --transactionId will be attached across all entities [propertyAddress] nvarchar(max) '$."propertyAddress"' as json )A outer apply openjson(A.[propertyAddress]) with( [county] nvarchar(100) '$."county"' , [district] nvarchar(100) '$."district"' , [locality] nvarchar(100) '$."locality"' , [paon] nvarchar(100) '$."paon"' , [postcode] nvarchar(100) '$."postcode"' , [saon] nvarchar(100) '$."saon"' , [street] nvarchar(100) '$."street"' , [town] nvarchar(100) '$."town"' ); END TRY BEGIN CATCH PRINT @fileName + ' not loaded.' END CATCH FETCH NEXT FROM curs_files INTO @filename END CLOSE curs_files DEALLOCATE curs_files END GO
Ensure the “data source” name in the script matches the name you gave the External Data Source you created earlier.
Pay close attention to how the script parses the hierarchies. You may refer back to the JSON hierarchy structure table to understand how the values are being extracted.
Once you run the Stored Procedure, you should be able to tie the tables together, I’ve provided the below diagram to illustrate how this would look once in Azure SQLDB.
PricePaidDataset SQL Diagram
Summary
I’ve now come to the end of the 3-Part blog series where I’ve shown how we can Extract, Transform and Load API Responses within a relational database, using Azure. If you’ve followed along, you should have a data model within your relational database that closely mirrors the data model within the LandRegistry API Documentation.
A few things to keep in mind, particularly for data intensive operations. The Stored Procedures’ performance can be enhanced by upgrading the database from basic to premium when you run the Stored Procedure. You can always revert to a basic database after the run in order to keep costs under control. Note, this works for compute, however storage cannot be descaled so easily.
Also, its worth bearing in mind that, when creating the External Data Source, secrets have an expiration date attached to them. If your secret expires, you can use the following to alter the Database scoped credential otherwise the external data source will fail to connect to Blob store.
ALTER DATABASE SCOPED CREDENTIAL [Name of the database_scoped_credentials] WITH IDENTITY = 'Value of the credential_identity column', SECRET = ‘Enter Secret here’
You can view my previous blogs: Extract, Transform & Load REST API responses in Azure SQLDB part 1 & Extract, Transform & Load REST API responses in Azure SQLDB part 2.
Myself and the data geeks at Telefónica Tech love helping customers make sense of their data, if this describes you then feel free to get in touch and our awesome sales guys and girls will be happy to direct your query.