0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

怎么使用Azure.ResourceManager.DataFactory来获取一个既有的DataFactory

Posted at
using System;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.ResourceManager;
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager.DataFactory.Models;

namespace DataFactoryExample
{
    class Program
    {
        static async Task Main(string[] args)
        {
            // 认证信息和Data Factory资源
            string subscriptionId = "<Your Subscription ID>";
            string resourceGroupName = "<Your Resource Group Name>";
            string dataFactoryName = "<Your Data Factory Name>";
            string pipelineName = "<Your Pipeline Name>";

            // 创建Azure资源管理客户端
            var armClient = new ArmClient(new DefaultAzureCredential());

            // 获取Data Factory资源
            var dataFactory = await GetExistingDataFactoryAsync(armClient, subscriptionId, resourceGroupName, dataFactoryName);

            // 启动Pipeline
            var runId = await StartPipelineAsync(dataFactory, pipelineName);

            // 监视Pipeline状态
            await MonitorPipelineRunAsync(dataFactory, runId);
        }

        static async Task<DataFactoryResource> GetExistingDataFactoryAsync(ArmClient armClient, string subscriptionId, string resourceGroupName, string dataFactoryName)
        {
            var resourceGroup = armClient.GetResourceGroupResource(new Azure.Core.ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}"));
            var dataFactoryContainer = resourceGroup.GetDataFactories();
            var dataFactory = await dataFactoryContainer.GetAsync(dataFactoryName);
            return dataFactory.Value;
        }

        static async Task<string> StartPipelineAsync(DataFactoryResource dataFactory, string pipelineName)
        {
            var response = await dataFactory.GetDataFactoryPipeline(pipelineName).InvokeAsync();
            Console.WriteLine($"Pipeline run started with run ID: {response.Value.RunId}");
            return response.Value.RunId;
        }

        static async Task MonitorPipelineRunAsync(DataFactoryResource dataFactory, string runId)
        {
            var pipelineRuns = dataFactory.GetPipelineRuns();
            PipelineRun pipelineRun = null;

            do
            {
                pipelineRun = await pipelineRuns.GetAsync(runId);
                Console.WriteLine($"Pipeline run status: {pipelineRun.Status}");
                await Task.Delay(5000); // 5秒间隔
            } while (pipelineRun.Status == "InProgress");

            Console.WriteLine($"Pipeline run completed with status: {pipelineRun.Status}");
        }
    }
}
using System;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.ResourceManager;
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager.DataFactory.Models;

namespace DataFactoryExample
{
    class Program
    {
        static async Task Main(string[] args)
        {
            // 认证信息和Data Factory资源
            string subscriptionId = "<Your Subscription ID>";
            string resourceGroupName = "<Your Resource Group Name>";
            string dataFactoryName = "<Your Data Factory Name>";
            string pipelineName = "<Your Pipeline Name>";

            // 创建Azure资源管理客户端
            var armClient = new ArmClient(new DefaultAzureCredential());

            // 获取Data Factory资源
            var dataFactory = await GetExistingDataFactoryAsync(armClient, subscriptionId, resourceGroupName, dataFactoryName);

            // 启动Pipeline
            var runId = await StartPipelineAsync(dataFactory, pipelineName);

            // 监视Pipeline状态并获取Activity运行结果
            await MonitorPipelineRunAsync(dataFactory, runId);
        }

        static async Task<DataFactoryResource> GetExistingDataFactoryAsync(ArmClient armClient, string subscriptionId, string resourceGroupName, string dataFactoryName)
        {
            var resourceGroup = armClient.GetResourceGroupResource(new Azure.Core.ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}"));
            var dataFactoryContainer = resourceGroup.GetDataFactories();
            var dataFactory = await dataFactoryContainer.GetAsync(dataFactoryName);
            return dataFactory.Value;
        }

        static async Task<string> StartPipelineAsync(DataFactoryResource dataFactory, string pipelineName)
        {
            var response = await dataFactory.GetDataFactoryPipeline(pipelineName).InvokeAsync();
            Console.WriteLine($"Pipeline run started with run ID: {response.Value.RunId}");
            return response.Value.RunId;
        }

        static async Task MonitorPipelineRunAsync(DataFactoryResource dataFactory, string runId)
        {
            var pipelineRuns = dataFactory.GetPipelineRuns();
            PipelineRun pipelineRun = null;

            do
            {
                pipelineRun = await pipelineRuns.GetAsync(runId);
                Console.WriteLine($"Pipeline run status: {pipelineRun.Status}");
                await Task.Delay(5000); // 5秒间隔
            } while (pipelineRun.Status == "InProgress");

            Console.WriteLine($"Pipeline run completed with status: {pipelineRun.Status}");

            // 获取Pipeline的Activity运行结果
            await GetActivityRunsAsync(dataFactory, runId);
        }

        static async Task GetActivityRunsAsync(DataFactoryResource dataFactory, string runId)
        {
            var pipelineRuns = dataFactory.GetPipelineRuns();
            var filter = new RunFilterParameters(DateTime.UtcNow.AddHours(-1), DateTime.UtcNow)
            {
                Filters = new List<RunQueryFilter>
                {
                    new RunQueryFilter(RunQueryFilterOperand.PipelineRunId, RunQueryFilterOperator.Equals, new List<string> { runId })
                }
            };

            var activityRuns = await pipelineRuns.GetActivityRunsAsync(filter);

            await foreach (var activityRun in activityRuns)
            {
                Console.WriteLine($"Activity: {activityRun.ActivityName}, Status: {activityRun.Status}, Start: {activityRun.Start}, End: {activityRun.End}, Output: {activityRun.Output}, Error: {activityRun.Error}");
            }
        }
    }
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?