using System;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.ResourceManager;
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager.DataFactory.Models;
namespace DataFactoryExample
{
class Program
{
static async Task Main(string[] args)
{
// 认证信息和Data Factory资源
string subscriptionId = "<Your Subscription ID>";
string resourceGroupName = "<Your Resource Group Name>";
string dataFactoryName = "<Your Data Factory Name>";
string pipelineName = "<Your Pipeline Name>";
// 创建Azure资源管理客户端
var armClient = new ArmClient(new DefaultAzureCredential());
// 获取Data Factory资源
var dataFactory = await GetExistingDataFactoryAsync(armClient, subscriptionId, resourceGroupName, dataFactoryName);
// 启动Pipeline
var runId = await StartPipelineAsync(dataFactory, pipelineName);
// 监视Pipeline状态
await MonitorPipelineRunAsync(dataFactory, runId);
}
static async Task<DataFactoryResource> GetExistingDataFactoryAsync(ArmClient armClient, string subscriptionId, string resourceGroupName, string dataFactoryName)
{
var resourceGroup = armClient.GetResourceGroupResource(new Azure.Core.ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}"));
var dataFactoryContainer = resourceGroup.GetDataFactories();
var dataFactory = await dataFactoryContainer.GetAsync(dataFactoryName);
return dataFactory.Value;
}
static async Task<string> StartPipelineAsync(DataFactoryResource dataFactory, string pipelineName)
{
var response = await dataFactory.GetDataFactoryPipeline(pipelineName).InvokeAsync();
Console.WriteLine($"Pipeline run started with run ID: {response.Value.RunId}");
return response.Value.RunId;
}
static async Task MonitorPipelineRunAsync(DataFactoryResource dataFactory, string runId)
{
var pipelineRuns = dataFactory.GetPipelineRuns();
PipelineRun pipelineRun = null;
do
{
pipelineRun = await pipelineRuns.GetAsync(runId);
Console.WriteLine($"Pipeline run status: {pipelineRun.Status}");
await Task.Delay(5000); // 5秒间隔
} while (pipelineRun.Status == "InProgress");
Console.WriteLine($"Pipeline run completed with status: {pipelineRun.Status}");
}
}
}
using System;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.ResourceManager;
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager.DataFactory.Models;
namespace DataFactoryExample
{
class Program
{
static async Task Main(string[] args)
{
// 认证信息和Data Factory资源
string subscriptionId = "<Your Subscription ID>";
string resourceGroupName = "<Your Resource Group Name>";
string dataFactoryName = "<Your Data Factory Name>";
string pipelineName = "<Your Pipeline Name>";
// 创建Azure资源管理客户端
var armClient = new ArmClient(new DefaultAzureCredential());
// 获取Data Factory资源
var dataFactory = await GetExistingDataFactoryAsync(armClient, subscriptionId, resourceGroupName, dataFactoryName);
// 启动Pipeline
var runId = await StartPipelineAsync(dataFactory, pipelineName);
// 监视Pipeline状态并获取Activity运行结果
await MonitorPipelineRunAsync(dataFactory, runId);
}
static async Task<DataFactoryResource> GetExistingDataFactoryAsync(ArmClient armClient, string subscriptionId, string resourceGroupName, string dataFactoryName)
{
var resourceGroup = armClient.GetResourceGroupResource(new Azure.Core.ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}"));
var dataFactoryContainer = resourceGroup.GetDataFactories();
var dataFactory = await dataFactoryContainer.GetAsync(dataFactoryName);
return dataFactory.Value;
}
static async Task<string> StartPipelineAsync(DataFactoryResource dataFactory, string pipelineName)
{
var response = await dataFactory.GetDataFactoryPipeline(pipelineName).InvokeAsync();
Console.WriteLine($"Pipeline run started with run ID: {response.Value.RunId}");
return response.Value.RunId;
}
static async Task MonitorPipelineRunAsync(DataFactoryResource dataFactory, string runId)
{
var pipelineRuns = dataFactory.GetPipelineRuns();
PipelineRun pipelineRun = null;
do
{
pipelineRun = await pipelineRuns.GetAsync(runId);
Console.WriteLine($"Pipeline run status: {pipelineRun.Status}");
await Task.Delay(5000); // 5秒间隔
} while (pipelineRun.Status == "InProgress");
Console.WriteLine($"Pipeline run completed with status: {pipelineRun.Status}");
// 获取Pipeline的Activity运行结果
await GetActivityRunsAsync(dataFactory, runId);
}
static async Task GetActivityRunsAsync(DataFactoryResource dataFactory, string runId)
{
var pipelineRuns = dataFactory.GetPipelineRuns();
var filter = new RunFilterParameters(DateTime.UtcNow.AddHours(-1), DateTime.UtcNow)
{
Filters = new List<RunQueryFilter>
{
new RunQueryFilter(RunQueryFilterOperand.PipelineRunId, RunQueryFilterOperator.Equals, new List<string> { runId })
}
};
var activityRuns = await pipelineRuns.GetActivityRunsAsync(filter);
await foreach (var activityRun in activityRuns)
{
Console.WriteLine($"Activity: {activityRun.ActivityName}, Status: {activityRun.Status}, Start: {activityRun.Start}, End: {activityRun.End}, Output: {activityRun.Output}, Error: {activityRun.Error}");
}
}
}
}