Sample Application Code
To build the application (the code that processes data to comply with application objectives), you must edit the file:
Python: $HOME/restapi_sample_interface/app.py
C#: $HOME/edgesdkexamplecsharp/App.cs
The app.py (using Python as an example) application file looks like this:
#
# Python client interface sample for HCC2 Rest API Server
# Sample app
#
from datetime import datetime
import logging
import time
import queue
from apiclient import APIClient
from classes.api_classes import TvqtDataPoint
from classes.enums import quality_enum
from classes.log_control import LogControl
from classes.webhook import WebHook
from config.appconfig import AppConfig
from classes.heartbeat import HeartBeat
from config.varsdict import Var
from lib.miscfuncs import text_to_log_level
from lib.webhookfuncs import dequeue
#
# Get configuration
#
appcfg = AppConfig()
#
# Setup logger
#
logging.basicConfig( level=appcfg.log.level,
format=appcfg.log.format,
datefmt=appcfg.log.date_format)
logger = logging.getLogger(appcfg.app.name)
logger.setLevel (text_to_log_level(appcfg.log.level))
if appcfg.log.log_to_file == True:
fh = logging.FileHandler(appcfg.log.log_file, mode='w')
fh.setLevel(logging.getLevelName(appcfg.log.level))
fmt = logging.Formatter(appcfg.log.format)
fh.setFormatter(fmt)
logger.addHandler(fh)
logging.getLogger().propagate = False
logging.addLevelName(logging.CRITICAL, "critical")
logging.addLevelName(logging.ERROR, "error")
logging.addLevelName(logging.WARNING, "warning")
logging.addLevelName(logging.INFO, "info")
logging.addLevelName(logging.DEBUG, "debug")
###############################################################################################
#
# 1. Create APIClient object
#
reload_required = False
client = APIClient(app_name=appcfg.app.name)
###############################################################################################
hbq = queue.Queue()
#
# Set Application Heartbeat (required by Unity Edge)
#
hb = HeartBeat(logger, client, hbq, appcfg.misc.hearbeat_initial_state, appcfg.misc.heartbeat_period)
log_control = LogControl(logger=logger, retry_period=appcfg.misc.retry_period, max_retries=appcfg.misc.error_retries, heartbeat_obj=hb, client_name=client.app_name)
################################################################################################
#
# Get vars configuration data (optional)
#
if appcfg.app.vars_enabled == True:
try:
with open(appcfg.app.var_config_path) as json_file:
v = Var().from_json(json_file.read())
client.vars_dict.load(v)
except Exception as e:
logger.error(f"Error trying to read variable configuration file. Error: {e}. PROCESS ABORTED.")
exit(-1)
###############################################################################################
#
# Initialize webhook (optional)
#
if appcfg.app.webhook_enabled == True:
whq = queue.Queue()
wh = WebHook(logger=logger, queue=whq, config=appcfg)
############################################################################################################
#
# OUTER LOOP
# 2. Connect with HCC2 REST API Server
#
log_control.reset_retries()
while True:
log_control.check_retries()
status = client.connect()
logger.info(f"Connecting with API at URL: {client.cfg.api_url}")
if status == False:
logger.error(f"Connect - Error trying to connect to API at URL: {client.cfg.api_url}. Retrying.")
time.sleep(appcfg.misc.retry_period)
continue
logger.debug(f"Connect - Application {appcfg.app.name} is connected with API")
###############################################################################################
#
# 3. Register App using existing stored tarball (TEST)
#
try:
response = client.registerApp(tarfile_path=appcfg.app.tarfile_path, is_complex_provisioned=appcfg.app.complex_provisioned)
logger.info(f"Application {client.app_name} correcty registered to API using tar.gz file: {appcfg.app.tarfile_path}")
except Exception as e:
logger.error(f"registerApp - Error trying to register application. Config file: \"{appcfg.app.tarfile_path}\". Error: {e}. Retrying.")
time.sleep(appcfg.misc.retry_period)
continue
###############################################################################################
#
# 4. Start the heartbeat as an independent thread
#
try:
hb.start()
logger.info(f"Heartbeat thread has been fired successfully. ")
except Exception as e:
logger.error(f"heartbeat - Error trying to start heartbeat thread. Error: {e}. Retrying.")
time.sleep(appcfg.misc.retry_period)
continue
#
logger.debug (f"Wait {appcfg.misc.provision_time} for configuration to settle down....")
time.sleep(appcfg.misc.provision_time)
###############################################################################################
#
# 5. Check if a new deployment was done just after app registering
#
while True:
try:
response = client.checkProvisioningStatus()
logger.debug (f"checkProvisioningStatus responded ok")
except Exception as e:
logger.error(f"checkProvisioningStatus - Error trying to check provision status. Error: {e}. Retrying.")
time.sleep(appcfg.misc.retry_period)
continue
if response.hasNewConfig == True:
logger.info(f"checkProvisioningStatus -> New configuration found! ")
break
time.sleep(appcfg.misc.retry_period)
###############################################################################################
#
# 6. Set provisioning valid true
#
try:
response = client.validateProvision(valid=True)
logger.debug (f"validateProvision responded ok")
except Exception as e:
logger.error(f"validateProvision -Error trying to validate provision. Error: {e}. Retrying.")
time.sleep(appcfg.misc.retry_period)
continue
###############################################################################################
#
# 7. Change heartbeat to isUp=true
#
hb.change_state(True)
#
if appcfg.app.webhook_enabled == True:
#
# Initialize subscribed topics
#
subscribed = dict()
#
# 8. Delete all subscriptions
#
try:
status = client.deleteAllSubscriptions(client.app_name)
logger.debug (f"DeleteAllSubscriptions - completed succesfully.")
if status == False:
logger.warning(f"DeleteAllSubscriptions - no susbcriptions were found.")
except Exception as e:
logger.error(f"DeleteAllSubscriptions - Error trying to subscribe. Check parameters and configuration. Error: {e}. Try again.")
time.sleep(appcfg.misc.retry_period)
continue
#
# 9a. Subscribe to one topic using SimpleSubscribe
#
topic1 = "liveValue.state.this.io.0.general.upTime."
subscribed[topic1] = {}
callback_url = client.cfg.api_callback_url + "/" + appcfg.wh.simple_message.command
try:
status = client.simpleSubscribe(client.app_name, topic1, callback_url, True)
logger.debug (f"SimpleSubscribe for topic {topic1} on url {callback_url} completed succesfully.")
except Exception as e:
logger.error(f"SimpleSubscribe - Error trying to subscribe. Check parameters and configuration. Error: {e}. Try again.")
time.sleep(appcfg.misc.retry_period)
continue
#
# 9b. Subscribe to other topics using setOfMessagesSubscribe
#
topic_list = ["liveValue.diagnostics.this.io.0.temperature.cpu.",
"liveValue.diagnostics.this.io.0.rail.voltage.v1p2."
]
subscribed[topic_list[0]] = {}
subscribed[topic_list[1]] = {}
callback_url = client.cfg.api_callback_url + "/" + appcfg.wh.set_of_messages.command
try:
status = client.setOfMessagesSubscribe(client.app_name, topic_list, callback_url, True)
logger.debug (f"SetOfMessagesSubscribe for topic List {topic_list} on url {callback_url} completed succesfully.")
except Exception as e:
logger.error(f"SetOfMessagesSubscribe - Error trying to subscribe. Check parameters and configuration. Error: {e}. Try again.")
time.sleep(appcfg.misc.retry_period)
continue
#
# 9c. Subscribe to other topics using advanced Message
#
topic_list = ["liveValue.diagnostics.this.core.0.diskUsage|.",
"liveValue.diagnostics.this.io.0.rail.voltage.v3p3."
]
subscribed[topic_list[0]] = {}
subscribed[topic_list[1]] = {}
callback_url = client.cfg.api_callback_url + "/" + appcfg.wh.advanced_messages.command
try:
status = client.advancedMessagesSubscribe(client.app_name, topic_list, callback_url)
logger.debug (f"AdvancedMessagesSubscribe for topic List {topic_list} on url {callback_url} completed succesfully.")
except Exception as e:
logger.error(f"AdvancedMessagesSubscribe - Error trying to subscribe. Check parameters and configuration. Error: {e}. Try again.")
time.sleep(appcfg.misc.retry_period)
continue
###############################################################################################
#
# 10. Start the webhook thread
#
while True:
log_control.check_retries()
try:
wh.start()
logger.debug(f"Web hook thread has been fired successfully. ")
break
except Exception as e:
logger.error(f"webhook manager - Error trying to start webhook thread for \"{client.app_name}\". Error: {e}. Retrying.")
time.sleep(appcfg.misc.retry_period)
continue
###############################################################################################
#
# Store datetime of first run
#
tvqt_datapoint_list = [
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.firstruntime.",
value = datetime.now().strftime("%Y-%m-%d %H:%M:%S"), quality = quality_enum.OK, timeStamp = datetime.now()),
]
try:
status = client.messageWrite(tvqt_datapoint_list)
logger.debug (f"messageWrite - Writing: {tvqt_datapoint_list}. status: {status}")
except Exception as e:
logger.error(f"messageWrite - Error trying to write tags. Check topic spelling. Error: {e}. Try again.")
time.sleep(appcfg.misc.retry_period)
continue
#
# End of road - all configuration good!
break
###############################################################################################
#
# 11. INNER LOOP - Here comes the app business logic
#
run_counter = 1
cpu_usage = 0
memory_usage = 0
temperature = 0
log_control.reset_retries()
while True:
#################################################################################################
#
# 12a. Read configuration parameters:
#
try:
value_array = client.messageRead(
[
"liveValue.postvalidConfig.this.courseApp.0.configrunningperiod.",
"liveValue.postvalidConfig.this.courseApp.0.maxminrestartperiod."
]
)
for val in value_array:
logger.debug(f"topic: {val.topic}, value: {val.value}, type: {type(val.value)}, quality: {val.quality}, timeStamp: {val.timeStamp}")
except Exception as e:
logger.error(f"messageRead - Error trying to read topics. Check topics spelling. Error: {e}. Try Again.")
time.sleep(appcfg.misc.retry_period)
continue
period = int(value_array[0].value)
if period < 1:
period = 1
if period > 60:
period = 60
restart_period = int(value_array[1].value)
if period < 1:
period = 1
if period > 60:
period = 60
#
# 12b. Read values
#
try:
value_array = client.messageRead(
[
"liveValue.diagnostics.this.io.0.temperature.cpu."
]
)
for val in value_array:
logger.debug(f"topic: {val.topic}, value: {val.value}, type: {type(val.value)}, quality: {val.quality}, timeStamp: {val.timeStamp}")
temperature = value_array[0].value
except Exception as e:
logger.error(f"messageRead - Error trying to read topics. Check topic spelling. Error: {e}. Try Again.")
time.sleep(appcfg.misc.retry_period)
continue
#########################################################################################################################3
#
# 12c. Read configuration parameters (using Read Advanced)
#
try:
response_array = client.messageReadAdvanced(
[
"liveValue.diagnostics.this.core.0.cpuUsage|.",
"liveValue.diagnostics.this.core.0.memoryUsage|."
]
)
if len(response_array) == 0:
raise Exception ("One or more topics do not exist. Check topic string.")
for resp in response_array:
for dp in resp.datapoints:
logger.debug(f"topic: {resp.topic}, quality: {dp.quality}")
for i in range(len(dp.values)):
if dp.dataPointName == "total.":
cpu_usage = dp.values[i]
break
elif dp.dataPointName == "memoryUsed.":
memory_usage = dp.values[i]
break
logger.debug(f"item: {i+1}, datapoint_name: {dp.dataPointName}, value: {dp.values[i]}, type: {type(dp.values[i])} timeStamp: {dp.timeStamps[i]}")
except Exception as e:
logger.error(f"messageReadAdvanced - Error trying to read tags. Check topic spelling. Error: {e}. Try Again.")
time.sleep(appcfg.misc.retry_period)
continue
#
###################################################################################################################################
#
# 13. THE CODE OF YOUR APPLICATION GOES HERE:
# THIS IS AN EXAMPLE
#
ts = datetime.now()
if ts.minute % restart_period == 0 and ts.second == 0:
run_counter = 1
if run_counter == 1:
cpu_usage_max = cpu_usage
cpu_usage_min = cpu_usage
memory_usage_max = memory_usage
memory_usage_min = memory_usage
else:
if cpu_usage > cpu_usage_max:
cpu_usage_max = cpu_usage
elif cpu_usage < cpu_usage_min:
cpu_usage_min = cpu_usage
if memory_usage > memory_usage_max:
memory_usage_max = memory_usage
elif memory_usage < memory_usage_min:
memory_usage_min = memory_usage
#
# YOUR APPLICATION CODE ENDS HERE
###################################################################################################################################
#
# 14. Write back your app results
#
tvqt_datapoint_list = [
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.runcounter.",
value = run_counter, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.lastruntime.",
value = datetime.now().strftime("%Y-%m-%d %H:%M:%S"), quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.cpuusagecurrent.",
value = cpu_usage, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.cpuusagemax.",
value = cpu_usage_max, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.cpuusagemin.",
value = cpu_usage_min, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.memoryusagecurrent.",
value = memory_usage, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.memoryusagemax.",
value = memory_usage_max, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.memoryusagemin.",
value = memory_usage_min, quality = quality_enum.OK, timeStamp = datetime.now()),
TvqtDataPoint(topic ="liveValue.production.this.courseApp.0.temperature.",
value = temperature, quality = quality_enum.OK, timeStamp = datetime.now())
]
try:
status = client.messageWrite(tvqt_datapoint_list)
logger.debug (f"messageWrite - Writing: {tvqt_datapoint_list}. status: {status}")
except Exception as e:
logger.error(f"messageWrite - Error trying to write tags. Check topic spelling. Error: {e}. Try again.")
time.sleep(appcfg.misc.retry_period)
continue
#
# 15. If webhook is enabled, get data from webhook queue
#
if appcfg.app.webhook_enabled == True:
#
# Dequeue async messages coming from webhook (if enabled)
#
try:
payloads = dequeue(whq)
except Exception as e:
logger.error(f"Webhook message dequeue - Error trying to dequeue messages from webhook. Error: {e}.")
if len(payloads) > 0:
for pl in payloads:
if pl.topic in subscribed:
subscribed[pl.topic]=pl
else:
logger.warning (f"Topic {pl.topic} is not subscribed by this application. Check configuration.")
continue
#
# It is up to you how to decode and use the data you are acquiring using Webhooks.
# Your data is in "pl" object. pl is an object instance of class MessageOutboundInterchange (api_classes.py).
# Look there for details.
#
run_counter += 1
time.sleep(period)
thread.join()