"""
@module de.stschnell
@version 0.4.0
@runtime python:3.10
@memoryLimit 128000000
@inputType in_userName {string}
@inputType in_password {string}
@outputType Properties
"""
import base64
import json
import ssl
import urllib.request
def request(
url,
user = None,
password = None,
bearerToken = None,
method = "GET",
body = {},
contentType = "application/json;charset=utf-8",
accept = "application/json"
):
""" Executes a REST request
@param {string} url - URL to execute the request
@param {string} user
@param {string} password
@param {string} bearerToken
@param {string} method - Method of request, e.g. GET, POST, etc
@param {dictionary} body - Body of request
@param {string} contentType - MIME type of the body for the request
@param {string} accept - MIME type of content expect/prefer as response
@returns {dictionary or bytes}
"""
returnValue = {}
try:
request = urllib.request.Request(
url = url,
method = method,
data = bytes(json.dumps(body).encode("utf-8"))
)
if user and password:
authorization = base64.b64encode(
bytes(user + ":" + password, "UTF-8")
).decode("UTF-8")
request.add_header(
"Authorization", "Basic " + authorization
)
if bearerToken:
request.add_header(
"Authorization", "Bearer " + bearerToken
)
request.add_header(
"Content-Type", contentType
)
request.add_header(
"Accept", accept
)
response = urllib.request.urlopen(
request,
context = ssl._create_unverified_context()
)
if response.getcode() == 200 or response.getcode() == 202:
if "json" in accept:
returnValue = json.loads(response.read())
else:
returnValue = response.read()
except Exception as err:
raise Exception(f"An error occurred at request - {err}") \
from err
return returnValue
def getConfigurationVariableValue(
vcoUrl,
bearerToken,
configurationFolder,
configurationName,
configurationVariableName,
configurationVariableType
):
""" Gets a value of a variable from a configuration
@param {string} vcoUrl - URL of Aria orchestrator
@param {string} bearerToken
@param {string} configurationFolder - Path of the configuration
@param {string} configurationName - Name of the configuration
@param {string} configurationVariableName - Name of the variable
@param {string} configurationVariableType - Type of the variable
"""
returnValue = None
try:
configurations = request(
url = vcoUrl + "/api/configurations",
bearerToken = bearerToken
)
found = False
for configuration in configurations["link"]:
id = None
for attribute in configuration["attributes"]:
if attribute["name"] == "name" and \
attribute["value"] == configurationName:
found = True
if attribute["name"] == "id":
id = attribute["value"]
if found == True and id != None:
categoryId = request(
url = vcoUrl + "/api/configurations/" + id,
bearerToken = bearerToken
)["category-id"]
categoryPath = request(
url = vcoUrl + "/api/categories/" + categoryId,
bearerToken = bearerToken
)["path"]
if categoryPath != configurationFolder:
found = False
if found:
break
if id != None:
config = request(
url = vcoUrl + "/api/configurations/" + id,
bearerToken = bearerToken
)
for attribute in config["attributes"]:
if attribute["name"] == configurationVariableName:
returnValue = \
attribute["value"][configurationVariableType.lower()]
break
except Exception as err:
raise ValueError(f"An error occurred at get variable value - {err}") \
from err
return returnValue
def handler(context, inputs):
""" Aria Automation standard handler, the main function.
"""
vcoUrl = context["vcoUrl"]
bearerToken = context["getToken"]()
# @example
configurationFolder = "Configurations"
configurationName = "Host Security Hardening Options"
configurationVariableName = "authSource"
configurationVariableType = "string"
# In the HOL, this delivers local.
# Types are: string, boolean, number, date, secure-string, array,
# sdk-object, properties, encrypted-string, mime-attachment and
# regex.
# Hint: Do not use a leading slash for the folder variable.
output = {}
try:
output = getConfigurationVariableValue(
vcoUrl,
bearerToken,
configurationFolder,
configurationName,
configurationVariableName,
configurationVariableType
)
outputs = {
"status": "done",
"error": None,
"result": output
}
except Exception as err:
outputs = {
"status": "incomplete",
"error": repr(err),
"result": output
}
return outputs
|