"""
@module de.stschnell
@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.2.0
@runtime python:3.11
@inputType in_userName {string}
@inputType in_password {SecureString}
@outputType Properties
"""
import json
from util.http import Http
http = Http()
def getResource(
vcoUrl: str,
bearerToken: str,
resourceFolder: str,
resourceName: str,
mimeType: str
) -> dict | bytes:
""" Gets a resource
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@param {string} resourceFolder - Path of the resource
@param {string} resourceName - Name of the resource
@param {string} mimeType - MIME type of the resource
@returns {dict or bytes}
"""
returnValue: dict | bytes | None = None
try:
resources: dict = http.request(
url = vcoUrl + "/api/resources",
bearerToken = bearerToken
)
found: bool = False
for resource in resources["link"]:
id: str | None = None
for attribute in resource["attributes"]:
if attribute["name"] == "name" and \
attribute["value"] == resourceName:
found = True
if attribute["name"] == "id":
id = attribute["value"]
if found == True and id != None:
categoryId: str = http.request(
url = vcoUrl + "/api/resources/" + id,
bearerToken = bearerToken,
accept = "application/" + \
"vnd.o11n.resource.metadata+json;charset=UTF-8"
)["category-id"]
categoryPath: str = http.request(
url = vcoUrl + "/api/categories/" + categoryId,
bearerToken = bearerToken
)["path"]
if categoryPath != resourceFolder:
found = False
if found:
break
if id != None:
returnValue = http.request(
url = vcoUrl + "/api/resources/" + id,
bearerToken = bearerToken,
accept = mimeType
)
except Exception as err:
raise ValueError(f"An error occurred at get resource id - {err}") \
from err
return returnValue
def handler(context: dict, inputs: dict) -> dict:
""" Aria Automation standard handler, the main function.
"""
vcoUrl: str = context["vcoUrl"]
bearerToken: str = context["getToken"]()
resourceFolder: str = "Library/VC/Configuration"
resourceName: str = "b56969bb-eef7-459c-aad1-13d23ece2f97"
mimeType: str = "text/xml"
# resourceFolder: str = "Library/VC/Configuration"
# resourceName: str = "properties.json"
# mimeType: str = "application/json"
outputs: dict = {}
try:
output: dict | bytes = getResource(
vcoUrl,
bearerToken,
resourceFolder,
resourceName,
mimeType
)
if not isinstance(output, dict):
output = output.decode("utf-8")
outputs = {
"status": "done",
"error": None,
"result": output
}
except Exception as err:
outputs = {
"status": "incomplete",
"error": repr(err),
"result": None
}
return outputs
|