Note
Go to the end to download the full example code.
12.1.10.8.17. Observe parallel functions#
This demo shows how to use a customized progressObserver to observer and possibly cancel multiple ‘complex’ function calls, that are executed in parallel in four different background threads.
Each single function call has its own progressObserver (member observer of class FunctionCall). Additionally, there is one major progressObserver, that tracks the total state of all coomplex functions call (each lasting for 10 seconds).
In this demo script, we do not only use the pre-defined possibilities of
the class itom.progressObserver
to show the current progress value and
text in a given itom.uiItem
of the GUI, but we use the
itom.progressObserver.connect
method to connect different callable python
methods to signals of the progressObserver such that customized actions
can be done.
The overall observer is set to a range between 0 and 100 (percent). Every of the four parallel function calls can add up to 25% of the total progress. Once the overall progress reaches 100%, the call is considered to be finished.
Using the cancellation features of each progressObserver, this demo further shows, that it is both possible to only cancel single function calls or the entire call to all (currently running) sub-functions.
This demo uses the Thread class of python. Since python does not allow real concurrent calls within python itself, we do not have to handle race conditions.
Hint: Algorithm calls of itom algorithms will release the Python GIL during the entire time of the function call, therefore other Python thread can work in the meantime.
from itomUi import ItomUi
from itom import progressObserver
from threading import Thread
from time import sleep
class FunctionCall:
"""This class wraps GUI items, the thread and observer of one function call."""
def __init__(self, label, progress, abortButton, overallObserver):
"""Constructor."""
self.label = label
self.progress = progress
self.abortBtn = abortButton
self.thread = None
# create the local observer for one function call
self.observer = progressObserver(
progressBar=self.progress, progressMinimum=0, progressMaximum=100
)
# pass the overall observer
self.overallObserver = overallObserver
# make several connections to signals of some objects with methods of this class
self.abortBtn.connect("clicked()", self.on_abortButton_clicked)
self.observer.connect(
"progressValueChanged(int)", self.on_progressValue_changed
)
self.observer.connect(
"progressTextChanged(QString)", self.on_progressText_changed
)
self.observer.connect(
"cancellationRequested()", self.on_observer_cancellationRequested
)
# True if the function is currently executed
self.inProgress = False
# Last reported progress value to the overallObserver [0, 25] in percent.
self._lastReportedProgressValue = 0
def reset(self):
"""Resets the relevant GUI items to an idle state."""
self.label["text"] = "-"
self.progress["value"] = 0
self.progress["enabled"] = False
self.abortBtn["enabled"] = False
def on_abortButton_clicked(self):
"""Callback if the abort button of this function is clicked."""
if self.observer:
self.abortBtn["enabled"] = False
# force the observer to cancel the running algorithm. The algorithm
# has to regularly check for this request and terminate the algorithm
# (with an exception set) as soon as possible.
self.observer.requestCancellation()
def start(self):
"""Start the complex function in a Python thread by executing self.run."""
self.thread = Thread(target=self.run)
self.thread.start()
def cancel(self):
"""Method to request a cancellation of the algorithm call as public interface.
This method is usually called if the global abort button is clicked."""
self.observer.requestCancellation()
def run(self):
"""Run method, executed in a thread.
This method mainly starts the itom algorithm ``demoCancellationFunction``
and passes the local observer to this function. If this observer should
be requested to cancel, the algorithm will return with a RuntimeError.
This exception is handled. At the end, the contribution to the global
progress of this function is set to the maximum of 25% (Even in the case
of a cancellation).
"""
self.observer.reset()
self.progress["value"] = 0
self.progress["enabled"] = True
self.abortBtn["enabled"] = True
try:
self.inProgress = True
filter("demoCancellationFunction", _observer=self.observer)
# self.on_progressValue_changed(100)
except RuntimeError:
# cancellation
pass
finally:
# done or cancelled: report a full progress of 25%
self.overallObserver.progressValue += max(
0, 25 - self._lastReportedProgressValue
)
self._lastReportedProgressValue = 25
self.abortBtn["enabled"] = False
self.inProgress = False
sleep(1)
self.reset()
def on_progressValue_changed(self, value):
"""Callback if the local observer reports a new progress value."""
if self.inProgress:
if value < 100:
self.label["text"] = "%i/100" % value
else:
self.label["text"] = "done"
self.overallObserver.progressValue += max(
0, (value // 4) - self._lastReportedProgressValue
)
self._lastReportedProgressValue = value // 4
def on_progressText_changed(self, text):
"""Callback if the local observer reports a new progress text.
Hint: it makes no real sense to change the toolTip. It is just an example."""
self.label["toolTip"] = text
def on_observer_cancellationRequested(self):
"""Callback if a cancellation has been requested to the local observer."""
if self.inProgress:
self.label["text"] = "cancelled"
self.progress["value"] = 0
class DemoObserver(ItomUi):
"""Main GUI class that provides functionality to run for complex algorithms in parallel."""
def __init__(self):
"""Constructor."""
ItomUi.__init__(self, "observedParallelFunctions.ui")
self.sets = [] # sets of widgets in the GUI for each parallel function execution
# this overallObserver gives 25% to each of the 4 parallel function calls.
self.overallObserver = progressObserver(
progressBar=self.gui.progressAll,
progressMinimum=0,
progressMaximum=100,
)
self.overallObserver.connect(
"progressValueChanged(int)", self.overallProgressChanged
)
# Initialization of four FunctionCall objects for four parallel complex
# function calls.
for idx in range(1, 5):
self.sets.append(
FunctionCall(
self.gui.getChild("lblRun%i" % idx),
self.gui.getChild("progressRun%i" % idx),
self.gui.getChild("btnAbortRun%i" % idx),
self.overallObserver,
)
)
# reset and hide all besides the start button
for s in self.sets:
s.reset()
self.gui.btnAbort["enabled"] = False
self.gui.progressAll["visible"] = False
@ItomUi.autoslot("")
def on_btnStart_clicked(self):
"""Auto-connected slot, called if the start button is clicked."""
self.gui.btnAbort["enabled"] = True
self.gui.btnStart["enabled"] = False
self.gui.progressAll["visible"] = True
# resets the overall
self.overallObserver.reset()
# start the 4 threads with a short delay
for s in self.sets:
s.start()
sleep(0.1)
@ItomUi.autoslot("")
def on_btnAbort_clicked(self):
"""Informs the algorithm call to interrupt as soon as possible."""
# cancels all four function calls
for s in self.sets:
s.cancel()
self.gui.btnAbort["enabled"] = False
self.gui.btnStart["enabled"] = True
self.gui.progressAll["visible"] = False
def overallProgressChanged(self, value):
"""Callback if the progressValueChanged signal of the overall progressObserver is emitted."""
if value >= 100:
# all done
self.gui.btnAbort["enabled"] = False
self.gui.btnStart["enabled"] = True
self.gui.progressAll["visible"] = False
if __name__ == "__main__":
demoObserverGui = DemoObserver()
demoObserverGui.show()
Total running time of the script: (0 minutes 0.081 seconds)