"""
Aria Automation handler emulation for Python with the use of the bearer
token.
Hint: All inputs are prepared for the HOL.
@author Stefan Schnell <mail@stefan-schnell.de>
@license MIT
@version 0.3.0
"""
import getpass
import ssl
import urllib.request
# Begin-----------------------------------------------------------------
import json
def handler(context: dict, inputs: dict) -> dict:
""" Standard Aria Automation handler function.
"""
result: str = "Hello World"
outputs: dict = {}
try:
vcoUrl: str = context["vcoUrl"]
bearerToken: str = context['getToken']()
jsonOut: str = json.dumps(inputs, separators=(',', ':'))
print("Inputs were {0}".format(jsonOut))
outputs = {
"status": "done",
"error": None,
"result": result
}
except Exception as err:
outputs = {
"status": "incomplete",
"error": repr(err),
"result": result
}
return outputs
# End-------------------------------------------------------------------
def getBearerToken(url: str, userName: str, userPassword: str) -> str:
""" Delivers the bearer token.
"""
try:
data: str = str(json.dumps(
{ "username": userName, "password": userPassword }
)).encode("utf-8")
# Gets refresh token
requestRefreshToken: urllib.request.Request = \
urllib.request.Request(
url = url + "/csp/gateway/am/api/login?access_token",
method = "POST",
data = data
)
requestRefreshToken.add_header(
"Content-Type", "application/json"
)
responseRefreshToken: http.client.HTTPResponse = \
urllib.request.urlopen(
requestRefreshToken,
context = ssl._create_unverified_context()
)
if responseRefreshToken.getcode() == 200:
refreshToken: dict = json.loads(responseRefreshToken.read())
data: str = str(json.dumps(
{ "refreshToken": refreshToken["refresh_token"] }
)).encode("utf-8")
# Gets bearer token
requestBearerToken: urllib.request.Request = \
urllib.request.Request(
url = url + "/iaas/api/login",
method = "POST",
data = data
)
requestBearerToken.add_header(
"Content-Type", "application/json"
)
responseBearerToken: http.client.HTTPResponse = \
urllib.request.urlopen(
requestBearerToken,
context = ssl._create_unverified_context()
)
if responseBearerToken.getcode() == 200:
bearerToken: dict = json.loads(responseBearerToken.read())
return bearerToken["token"]
except Exception as err:
raise Exception("An error occurred at receiving Bearer token") \
from err
def getToken() -> str:
""" Delivers the bearer token at the handler context parameter.
"""
userName: str = input("User name [holadmin@vcf.holo.lab]: ").strip() \
or "holadmin@vcf.holo.lab"
userPassword: str = getpass.getpass(prompt = "Password [VMware123!]: ") \
or "VMware123!"
return getBearerToken(url, userName, userPassword)
def main():
""" Main function
"""
global url
url = input("URL [https://rainpole.auto.vcf.sddc.lab]: ").strip() \
or "https://rainpole.auto.vcf.sddc.lab"
context: dict = {
"executionId": "a92e5e2b-7e8e-4c83-ad16-34f4c4f68528",
"returnType": "Properties",
"vcoUrl": url + "/vco",
"getToken": getToken
}
inputs: dict = {
}
outputs: dict = handler(context, inputs)
print(outputs["status"])
print(outputs["error"])
print(outputs["result"])
# Main
if __name__ == "__main__" :
main()
|