Biml XVII – Building Extraction ETL from Metadata (Database Definitions) (Part 2)


This post is the 2st post (in 2) for using biml to create a content driven extraction layer. The first post can be found at https://paultebraak.wordpress.com/2015/01/11/biml-xiii-building-extraction-etl-from-metadata-database-definitions/

Introduction

Our previous post on extraction (see here and here) looked at using biml (and biml script) to create a package driven by metadata which was defined in biml. We introduced the concepts of modularisation and showed how a group of biml files could be compiled sequentially to separate out the logic of solution components.

Including asset definitions in biml sounds like a good idea, you can have objects defined as structures in code so the entire solution is defined and managed within a single set of project files. However, one issue with this approach (and the problem relates to the SSDT or BIDS representation of biml) is that that object definitions are only available for package generation and you can’t visualise and edit the table as you can in MIST (a biml tool from Varigence).

As an alternative to this approach, we can define our ETL as content in a relational model. Then, we can use biml script to build a package based on that content. Note that we are not talking about a data driven modelling approach here. A data driven approach builds ETL and target schema based on the design of a source system. This approach simply uses database content to define how data is moved. The benefits are that there can be any number of interfaces to the model and packages are built with consistency in a very fast manner. We have already defined the target environment and now want some managed way of creating a package(s) to populate it.

Package Biml

Now that we’ve got an idea about how to use iteration in C# code and embed that with biml, we can look at how to define the package that moves the data. We’ll look at this as a twostep process, firstly, create the biml (and script) to iterate over each extraction (the output from the stored proc staging_extractions) and create a container with a truncate and empty data flow (as below). Then in the second step, we can populate the data flow with transforms.

Our proc (staging_extractions) has a list of extraction objects. All we have to do is iterate over those and create the containers and their tasks. This is not dissimilar to the way that we created connections – its just that the biml contained in the foreach operator is a little longer. We used fully qualify names with the extraction_id in order to uniquely identity each task.

<#@ import namespace="System.Data" #>
<#@ import namespace="System.Data.SqlClient" #>
<#@ template language="C#" tier="2" #>
<#
	string _con = @"Data Source=.\SQL2012;Persist Security Info=true;Integrated Security=SSPI;Initial Catalog=stage";
	DataTable _extract_table = new DataTable();
	SqlDataAdapter _con_da = new SqlDataAdapter(" exec dbo.staging_extractions", _con);
    _con_da.Fill(_extract_table);	
#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">

	<Packages>
		<Package Name="002_Extractions" ConstraintMode="Parallel">
			<Tasks>
				<# foreach (DataRow _extraction in _extract_table.Rows){ #>
				
				<Container Name="<#= _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>" ConstraintMode="Linear">
					
					<Tasks>
						<ExecuteSQL Name="truncate_<#=  _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>" ConnectionName="<#=  _extraction["connection_tgt"] #>" >
							<DirectInput>Select 1</DirectInput>
						</ExecuteSQL>
						<Dataflow Name="extract_<#=  _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>">
						</Dataflow>
                    </Tasks>
					
                </Container>
				<#} #>
				
            </Tasks>
        </Package>
    </Packages>
    
</Biml>

I think there is also an interesting use of constraint declaration that’s worth discussion. Notice that the package definition defines parralell execution? This ensures that each container can execute in parrell. However for each container, I’ve specified its ConstraintMode as liner which means that tasks will have dependancies in the order that they are created. This is a neat trick so that we can enjoy the benefits of parralell execution for extractions but enforce a constraints within each extraction without an added burden of coding.

Now that we’ve populated the control flow, we just need to populate the data flow with transformations (get source data, append the date, define the destination). This is shown in the image below. We will do this so we just add the tasks and look at the mapping later.

All we have to do is replace the data flow task (above) with the following snippet. You’ll notice that all script uses data from the _extraction table. The framework just simply adds the datasource, then the derived column and destination with names (and tables) that relate to the (current) extraction row in iteration.


<Dataflow Name="extract_<#= _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>">

	<Transformations>

		<OleDbSource 
		Name="src_<#= _extraction["extraction_id"] + "_" + _extraction["entity_schema_source"] + "_" + _extraction["entity_definition_source"] #>" 
		ConnectionName="<#= _extraction["connection_source"] #>">
			<DirectInput>
				Select * from <#= _extraction["entity_schema_source"] #>.<#= _extraction["entity_definition_source"] #>;
			</DirectInput>
		</OleDbSource>

		<DerivedColumns Name="append_dt">
				<Columns>
					<Column Name="append_dt" DataType="DateTime">@[System::StartTime]</Column>
				</Columns>
		</DerivedColumns>
		
		<OleDbDestination 
			Name="tgt_<#= _extraction["extraction_id"]+ "_" + _extraction["entity_schema_tgt"] + "_" + _extraction["entity_definition_tgt"] #>" 
			ConnectionName="<#= _extraction["connection_tgt"] #>">
				<ExternalTableOutput Table="<#= _extraction["entity_schema_tgt"] #>.<#= _extraction["entity_definition_tgt"] #>"></ExternalTableOutput>
		</OleDbDestination>
	
	</Transformations>

</Dataflow>


Now to turn our attention to column mapping in the destination. If we generate the package and look at the mappings for a task, you’ll note that the columns are not mapped. We can recall from our post on destinations (here) that we can specify source to target column with the following sample.


<OleDbDestination Name="some_name" ConnectionName="some_connection">
	<ExternalTableOutput Table="target_table"></ExternalTableOutput>
	<Columns>
		<Column SourceColumn="ProductAlternateKey" TargetColumn="product_id" />
	</Columns>
</OleDbDestination>

Since we are already familiar with the idea of iteration, all we have to do is get a table of mappings for the current extraction and populate the column references. Remember that if we execute staging_column_mappings 1 (where 1 refers to the current extraction_id) we would get a set of results as below.

Now, all we have to do is used script to add a columns tag to our OledbDestination. This can be done by adding the following snippet.


<Columns>
	<#
		DataTable _col_maps = new DataTable();
		string _col_sql = "exec [dbo].[staging_column_mappings] " + _extraction["extraction_id"];
		SqlDataAdapter _col_da = new SqlDataAdapter(_col_sql, _con);
		_col_da.Fill(_col_maps);
		foreach (DataRow _map in _col_maps.Rows){ #>
		<Column SourceColumn="<#= _map["column_source"] #>" TargetColumn="<#= _map["column_tgt"] #>" />	
    <# }  #>	
</Columns>

Perhaps whats not so apparent (it may be moreso when you examine the entire script below) is that the generation of the column list is dependent on the extraction_id in the extraction iterator. This is really just a nested loop where the second loop (the inner loop) is dependent on the outer loop. The psuedo code for this is shown as;


Foreach (record _r in [dbo].[staging_extractions])
{
	.. do some stuff
	Foreach (record _mapp in _r.extraction_id)
	{
		Define the mappings
	}
}

For completeness, here is the entire file for package creation.


<#@ import namespace="System.Data" #>
<#@ import namespace="System.Data.SqlClient" #>
<#@ template language="C#" tier="2" #>
<#
	string _con = @"Data Source=.\SQL2012;Persist Security Info=true;Integrated Security=SSPI;Initial Catalog=stage";
	DataTable _extract_table = new DataTable();
	SqlDataAdapter _con_da = new SqlDataAdapter(" exec dbo.staging_extractions", _con);
    _con_da.Fill(_extract_table);	
#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">

	<Packages>
		<Package Name="002_Extractions" ConstraintMode="Parallel" ProtectionLevel="EncryptSensitiveWithUserKey">
			<Tasks>
				<# foreach (DataRow _extraction in _extract_table.Rows){ #>
				
				<Container Name="<#= _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>" ConstraintMode="Linear">
					
					<Tasks>
						<ExecuteSQL Name="truncate_<#=  _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>" ConnectionName="<#=  _extraction["connection_tgt"] #>" >
							<DirectInput>truncate table <#= _extraction["entity_schema_tgt"] #>.<#= _extraction["entity_definition_tgt"] #>;</DirectInput>
						</ExecuteSQL>
						<Dataflow Name="extract_<#= _extraction["extraction_id"] #>_<#= _extraction["entity_definition_tgt"] #>">
							<Transformations>
								<OleDbSource Name="src_<#= _extraction["extraction_id"] + "_" + _extraction["entity_schema_source"] + "_" + _extraction["entity_definition_source"] #>" 
								ConnectionName="<#= _extraction["connection_source"] #>">
									<DirectInput>
										Select * from <#= _extraction["entity_schema_source"] #>.<#= _extraction["entity_definition_source"] #>;
                                    </DirectInput>
                                </OleDbSource>
								<DerivedColumns Name="append_dt">
									<Columns>
										<Column Name="append_dt" DataType="DateTime">@[System::StartTime]</Column>
                                    </Columns>
                                </DerivedColumns>
								<OleDbDestination Name="tgt_<#= _extraction["extraction_id"]+ "_" + _extraction["entity_schema_tgt"] + "_" + _extraction["entity_definition_tgt"] #>" 
								ConnectionName="<#= _extraction["connection_tgt"] #>">
									<ExternalTableOutput Table="<#= _extraction["entity_schema_tgt"] #>.<#= _extraction["entity_definition_tgt"] #>"></ExternalTableOutput>
									<Columns>
										<#
											DataTable _col_maps = new DataTable();
											string _col_sql = "exec [dbo].[staging_column_mappings] " + _extraction["extraction_id"];
											SqlDataAdapter _col_da = new SqlDataAdapter(_col_sql, _con);
											_col_da.Fill(_col_maps);
											foreach (DataRow _map in _col_maps.Rows){ #>
											<Column SourceColumn="<#= _map["column_source"] #>" TargetColumn="<#= _map["column_tgt"] #>" />	
                                        <# }  #>	
                                    </Columns>
								</OleDbDestination>
                            </Transformations>
						</Dataflow>
                    </Tasks>
					
                </Container>
				<#} #>
				
            </Tasks>
        </Package>
    </Packages>
    
</Biml>

The entire code may be a little much to take in all at once and you could suggest that such an approach is quiet easy to create by hand. True, for three ‘tasks’, dragging and dropping may be easier than setting up this solution. Remember though, that the goal of the post(s) is to build packages based on content. Adding tasks would be as simple as adding records to our extract table and creating additional mappings (although the mappings are only needed to the extent that columns must be mapped).

Advertisements

11 thoughts on “Biml XVII – Building Extraction ETL from Metadata (Database Definitions) (Part 2)

  1. Pingback: Biml XIII – Building Extraction ETL from Metadata (Database Definitions) (Part I) | Paul te Braak

  2. Pingback: BIML XIX – Creating a Staging Area using Biml | Paul te Braak

  3. Add this to the execute SQL Task to truncate the tables:

    <ExecuteSQL Name="truncate__” ConnectionName=”” >
    Truncate Table .;

    Add this procedure. Note that I have changed som names:

    CREATE Procedure [dbo].[spStagingColumnMapping] @ExtractionId int
    As
    (
    Select ColumnSource, ColumnTarget, ExtractionId
    From dbo.ColumnMappings
    Where ExtractionId = @ExtractionId)

      • Great staff! I love using biml to automate SSIS packages. Paul, How do you create table for selected columns from source table? If I want to control over which columns to be selected in excel or metadata table.

      • Hi Beverly,
        There are a couple of ways to do it. For me, the easiest method it to have an external database with meta data that people can interact with. This is very similar to the information_schema catalogs in SQL Server.
        Then i have a group of classes which interacts with the data base. You don’t really need those, I just find it a lot easier to separate the code from biml script.
        Then all i have to do it incorporate that into biml script. For example, I might use something like

        _tables = _meta_connection.GetTables(_t => _t.schema = ‘something’ and IsExtracted);
        foreach (Table _t in _tables.Data)
        {
        }

        I hope that makes sense ?

  4. Pingback: BIML - Stage pro DWH snadno, rychle a automaticky - BI Notes

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s