mergecollectionsazure-sql-databaseazure-ai-search

How do I merge two Azure SQL data sources?


I have two tables with 1..* relation:

Table 'Product'

Id Name
1 Product 1
2 Product 2

Table 'ProductVersion'

Id ProductId Published
10 1 1
11 1 0

Now I want to merge these data sources in Azure Search to a document index that looks like this:

{
    "Id": "1",
    "Name": "Product 1",
    "Versions": [
        {
            "Id": 10,
            "ProductId": 1,
            "Published": true
        },
        {
            "Id": 11,
            "ProductId": 1,
            "Published": false
        }        
    ]
}

My attempt:

Create index:

async Task CreateIndexAsync( string indexName, SearchIndexClient indexClient )
{
    FieldBuilder builder = new FieldBuilder();
    var definition = new SearchIndex( indexName, builder. Build( typeof( Product ) ) );

    await indexClient.CreateIndexAsync( definition );
}

Create and run indexer for Product data source:

SearchIndexerDataSourceConnection dataSource =
    await indexerClient.GetDataSourceConnectionAsync( dataSourceName );

Console.WriteLine( "Creating SQL indexer...\n" );

SearchIndexer dbIndexer = new SearchIndexer(
    name: "lq-product-indexer",
    dataSourceName: dataSource.Name,
    targetIndexName: indexName )
{
    Schedule = new IndexingSchedule( TimeSpan.FromDays( 1 ) )
};

try
{
    await indexerClient.GetIndexerAsync( dbIndexer.Name );
    //Rest the indexer if it exsits.
    await indexerClient.ResetIndexerAsync( dbIndexer.Name );
}
catch ( RequestFailedException ex ) when ( ex.Status == 404 )
{
    //if the specified indexer not exist, 404 will be thrown.
}

await indexerClient.CreateOrUpdateIndexerAsync( dbIndexer );

Console.WriteLine( "Running SQL indexer...\n" );

try
{
    await indexerClient.RunIndexerAsync( dbIndexer.Name );
}
catch ( RequestFailedException ex ) when ( ex.Status == 429 )
{
    Console.WriteLine( "Failed to run indexer: {0}", ex.Message );
}

Create and run indexer for ProductVersion is basibally the same but with field mapping:

dbIndexer.FieldMappings.Add(
    new FieldMapping( "ProductId" )
    {
        TargetFieldName = "Id"
    } );

And here's my model:

public record Product
{
    [SearchableField( IsKey = true )]
    public string Id { get; set; } = string.Empty;

    [SearchableField]
    public string Name { get; set; } = string.Empty;

    [SearchableField]
    public ProductVersion[] Versions { get; set; } = [];
}

public record ProductVersion
{
    [SearchableField( IsFilterable = true )]
    public string Id { get; set; } = string.Empty;

    [SearchableField( IsFilterable = true )]
    public string ProductId { get; set; } = string.Empty;

    [SimpleField( IsFilterable = true )]
    public bool Published { get; set; }
}

No matter what I try, Versions array always ends up empty in the index.

UPDATE

Solution based on Balaji's comment:

CREATE VIEW [dbo].[vw_ASProduct]
AS
SELECT *, 
    (SELECT * 
        FROM dbo.vw_ProductMarketData 
        WHERE 
            dbo.vw_ProductMarketData.MarketId=dbo.vw_MarketProduct.MarketId AND 
            dbo.vw_ProductMarketData.VersionId=dbo.vw_MarketProduct.VersionId FOR JSON AUTO ) AS Texts,
    (SELECT * 
        FROM dbo.vw_ProductMarketDataImage 
        WHERE 
            dbo.vw_ProductMarketDataImage.MarketId=dbo.vw_MarketProduct.MarketId AND 
            dbo.vw_ProductMarketDataImage.VersionId=dbo.vw_MarketProduct.VersionId FOR JSON AUTO ) AS Images,
    (SELECT * 
        FROM dbo.vw_ProductMarketDataDocument 
        WHERE 
            dbo.vw_ProductMarketDataDocument.MarketId=dbo.vw_MarketProduct.MarketId AND 
            dbo.vw_ProductMarketDataDocument.VersionId=dbo.vw_MarketProduct.VersionId FOR JSON AUTO ) AS Documents
FROM dbo.vw_MarketProduct WHERE dbo.vw_MarketProduct.VersionName='PKA' and dbo.vw_MarketProduct.Published=1
GO

Solution

  • How do I merge two Azure SQL data sources?

    As I mentioned in comments i tried in my environment by using SQL view and it successfully merged as you can see in the below output.

    View in SQL:

    CREATE VIEW [dbo].[vw_ASProduct] AS
    SELECT 
        mp.Id AS ProductId,
        mp.VersionName,
        mp.Published,
        (SELECT * FROM dbo.vw_ProductMarketData 
            WHERE dbo.vw_ProductMarketData.MarketId = mp.MarketId 
              AND dbo.vw_ProductMarketData.VersionId = mp.VersionId 
            FOR JSON AUTO) AS Texts,
        (SELECT * FROM dbo.vw_ProductMarketDataImage 
            WHERE dbo.vw_ProductMarketDataImage.MarketId = mp.MarketId 
              AND dbo.vw_ProductMarketDataImage.VersionId = mp.VersionId 
            FOR JSON AUTO) AS Images,
        (SELECT * FROM dbo.vw_ProductMarketDataDocument 
            WHERE dbo.vw_ProductMarketDataDocument.MarketId = mp.MarketId 
              AND dbo.vw_ProductMarketDataDocument.VersionId = mp.VersionId 
            FOR JSON AUTO) AS Documents
    FROM dbo.vw_MarketProduct mp 
    WHERE mp.VersionName = 'PKA' 
      AND mp.Published = 1;
    
    class Program
    {
        static async Task Main(string[] args)
        {
            string searchServiceEndpoint = "https://<searchService>.search.windows.net";
            string searchServiceApiKey = "<key>";
            string sqlConnectionString = "Server=tcp:<server>.database.windows.net;Database=<db>;User ID=<userId>;Password=<pwd>;Trusted_Connection=False;Encrypt=True;";
    
            string dataSourceName = "product-data-source";
            string indexName = "product-index";
            string indexerName = "product-indexer";
            string sqlViewName = "vw_ASProduct";  
    
            var indexerClient = new SearchIndexerClient(
                new Uri(searchServiceEndpoint), 
                new AzureKeyCredential(searchServiceApiKey));
    
            var indexClient = new SearchIndexClient(
                new Uri(searchServiceEndpoint), 
                new AzureKeyCredential(searchServiceApiKey));
    
            await CreateDataSourceAsync(indexerClient, dataSourceName, sqlConnectionString, sqlViewName);
            await CreateIndexAsync(indexClient, indexName);
            await CreateIndexerAsync(indexerClient, dataSourceName, indexName, indexerName);
        }
    
        static async Task CreateDataSourceAsync(SearchIndexerClient indexerClient, string dataSourceName, string connectionString, string tableName)
        {
            Console.WriteLine("Creating Data Source...");
    
            var dataSource = new SearchIndexerDataSourceConnection(
                name: dataSourceName,
                type: SearchIndexerDataSourceType.AzureSql,
                connectionString: connectionString,
                container: new SearchIndexerDataContainer(tableName)
            );
    
            await indexerClient.CreateOrUpdateDataSourceConnectionAsync(dataSource);
            Console.WriteLine("Data Source Created Successfully!");
        }
    
        static async Task CreateIndexAsync(SearchIndexClient indexClient, string indexName)
        {
            Console.WriteLine("Creating Index...");
    
            var index = new SearchIndex(indexName)
            {
                Fields =
                {
                    new SimpleField("Id", SearchFieldDataType.String) { IsKey = true, IsFilterable = true },
                    new SearchableField("Name") { IsFilterable = true, IsSortable = true },
    
                    new ComplexField("Versions", collection: true)  
                    {
                        Fields = 
                        {
                            new SimpleField("Id", SearchFieldDataType.Int32) { IsFilterable = true },
                            new SimpleField("ProductId", SearchFieldDataType.Int32) { IsFilterable = true },
                            new SimpleField("Published", SearchFieldDataType.Boolean) { IsFilterable = true }
                        }
                    },
    
                    new ComplexField("Texts", collection: true),    
                    new ComplexField("Images", collection: true),
                    new ComplexField("Documents", collection: true)
                }
            };
    
            await indexClient.CreateOrUpdateIndexAsync(index);
            Console.WriteLine("Index Created Successfully!");
        }
    
        static async Task CreateIndexerAsync(SearchIndexerClient indexerClient, string dataSourceName, string indexName, string indexerName)
        {
            Console.WriteLine("Creating Indexer...");
    
            var indexer = new SearchIndexer(indexerName, dataSourceName, indexName)
            {
                Schedule = new IndexingSchedule(TimeSpan.FromDays(1)),
    
                FieldMappings =
                {
                    new FieldMapping("ProductId") { TargetFieldName = "Id" },  
                    new FieldMapping("Texts") { MappingFunction = new FieldMappingFunction("jsonParse") },
                    new FieldMapping("Images") { MappingFunction = new FieldMappingFunction("jsonParse") },
                    new FieldMapping("Documents") { MappingFunction = new FieldMappingFunction("jsonParse") }
                }
            };
    
            await indexerClient.CreateOrUpdateIndexerAsync(indexer);
            Console.WriteLine("Indexer Created Successfully!");
    
            await indexerClient.RunIndexerAsync(indexerName);
            Console.WriteLine("Indexer Running!");
        }
    }
    

    Output:

    Creating Data Source...
    Data Source Created Successfully!
    
    Creating Index...
    Index Created Successfully!
    
    Creating Indexer...
    Indexer Created Successfully!
    
    Indexer Running!
    

    Below is the data in the Azure Cognitive Search:

    [
      {
        "Id": "1",
        "Name": "Product 1",
        "Versions": [
          {
            "Id": 10,
            "ProductId": 1,
            "Published": true
          },
          {
            "Id": 11,
            "ProductId": 1,
            "Published": false
          }
        ],
        "Texts": [
          { "MarketId": 1, "VersionId": 10, "Text": "Some Text" }
        ],
        "Images": [
          { "MarketId": 1, "VersionId": 10, "ImageUrl": "http://..." }
        ],
        "Documents": [
          { "MarketId": 1, "VersionId": 10, "DocumentUrl": "http://..." }
        ]
      },
      {
        "Id": "2",
        "Name": "Product 2",
        "Versions": [],
        "Texts": [],
        "Images": [],
        "Documents": []
      }
    ]