Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Linq;
- using System.Threading.Tasks;
- using Microsoft.Azure.WebJobs;
- using Microsoft.Extensions.Logging;
- using Microsoft.WindowsAzure.Storage.Queue;
- using Newtonsoft.Json;
- using ObjectDetection.Models;
- using ObjectDetection.Services;
- namespace ObjectDetection.Functions
- {
- // Queue Trigger for processing initial upload requests, forwarding to be Resized or Infered
- public class ImageQueueTrigger: BaseFunc
- {
- private readonly IInferenceClient _inferenceClient;
- private readonly IAzureStorageClient _azureStorageClient;
- private readonly IAuthorizationHandler _authHandler;
- public ImageQueueTrigger(
- IInferenceClient inferenceClient,
- IAzureStorageClient azureStorageClient,
- ILogDispatcher logger,
- IAuthorizationHandler authHandler): base(logger, "ImageQueueTriggerFunc")
- {
- _inferenceClient = inferenceClient;
- _azureStorageClient = azureStorageClient;
- _authHandler = authHandler;
- }
- [FunctionName("ImageQueueTrigger")]
- public async Task Run([QueueTrigger("image-batch-items", Connection = "AzureWebJobsStorage")]InferenceQueueMessage inferenceQueueMessage)
- {
- try
- {
- var jwt = await _authHandler.GetClientCredentialsToken(Logger.Scope());
- Logger.LogInformation($"Fetched Service AuthToken{jwt}");
- var resizeFiles = inferenceQueueMessage.ImageMetas.Where(_ => _.Resize).ToList();
- if (resizeFiles.Any())
- {
- var resizeQueueMessage = new InferenceBackendQueueMessage(inferenceQueueMessage)
- {
- Jwt = jwt,
- ActivityId = ActivityId,
- ImageMetas = resizeFiles,
- };
- var seriResizeQueueMessage = JsonConvert.SerializeObject(resizeQueueMessage);
- await _azureStorageClient.AddQueueMessage("image-resize-items",
- new CloudQueueMessage(seriResizeQueueMessage));
- }
- var readyFiles = inferenceQueueMessage.ImageMetas.Where(_ => !_.Resize).ToList();
- if (readyFiles.Any())
- {
- Logger.LogInformation("Fetching Blob Uris With Sas Token");
- var sasUriTasks = new Task<string>[readyFiles.Count];
- foreach ((int index, var fileMeta) in readyFiles.Enumerate())
- sasUriTasks[index] = _azureStorageClient.GetBlobUriWithSasToken(fileMeta.FileId);
- Task.WaitAll(sasUriTasks);
- var sasUris = new string[sasUriTasks.Length];
- foreach ((int index, var sasUriTask) in sasUriTasks.Enumerate())
- sasUris[index] = await sasUriTask;
- var inferenceBackendQueueMessage = new InferenceBackendQueueMessage(inferenceQueueMessage)
- {
- Jwt = jwt,
- ActivityId = ActivityId,
- BlobUris = sasUris,
- ImageMetas = readyFiles,
- };
- var modelValid =
- CvModelDefs.Models.TryGetValue(inferenceBackendQueueMessage.ModelName, out CvModelType type);
- if (modelValid)
- {
- if (type.Value == CvModelType.Classification.Value)
- {
- Logger.LogInformation("Queueing Classification Requests For Python Function");
- if (await _inferenceClient.RequestClassificationResults(inferenceBackendQueueMessage))
- Logger.LogInformation("Queued Classification Request");
- else
- throw new Exception("Error Queueing Classicifation Request");
- }
- if (type.Value == CvModelType.InstanceSegmentation.Value)
- {
- Logger.LogInformation("Queueing Segmentation Requests For Python Function");
- if (await _inferenceClient.RequestSegmentationResults(inferenceBackendQueueMessage))
- Logger.LogInformation("Queued Segmentation Request");
- else
- throw new Exception("Error Queueing Segmentation Request");
- }
- }
- else
- {
- throw new Exception("Error getting Model Type");
- }
- }
- }
- catch (Exception e)
- {
- Logger.LogError($"Error Processing Blob or Queueing Inference Request: {e.ToString()}");
- }
- await Logger.DispatchLogs();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement