Using QTimers to simulate asynchronous events

2 minutes read

So you write a new module that that exhibits some asynchronous behaviour. It could for example be a driver for a motor controller that has functions that trigger an action that will only complete seconds later and notify the system via an Event or a QtSignal.

A naive approach could use a QTimer which will execute a callback function that sets the event and dispatches a signal once the timer span runs out.

class AxisController(QObject):
  positionReachedSignal = pyqtSignal()
  def __init__(self):
    QObject.__init__(self)
    self.positionReachedEvent = threading.Event()
  
  def moveToPosition(self, pos):
    # Return immediately and dispatch event later
    self.positionReachedEvent.clear()
    QtCore.QTimer.singleShot(pos*1000, self.positionReached)
  
  @pyqtSlot()
  def positionReached(self):
    self.positionReachedEvent.set()
    self.positionReachedSignal.emit()

This approach will work fine as long as the moveToPosition(…) function is called from the GUI thread. But it fails (the callback function will not be triggered) once it is called from another Thread context, which is bound to happen in most use-cases with a reasonable complexity.

The important information that explains this behavior can be found in the QTimer Documentation:

In multithreaded applications, you can use QTimer in any thread that has an event loop. To start an event loop from a non-GUI thread, use QThread.exec(). Qt uses the timer’s thread affinity to determine which thread will emit the timeout() signal. Because of this, you must start and stop the timer in its thread; it is not possible to start a timer from another thread.

The solution is to have the timer running in its own thread which has an event loop and use the existing multithreading synchronization primitives to communicate to other threads.

class AxisController(QObject):
  positionReachedSignal = pyqtSignal()
  def __init__(self):
    QObject.__init__(self)
    self.timerThread = QThread(self)
    self.timerThread.started.connect(self._startTimer)
    
    self.timer = QtCore.QTimer(self)
    self.timer.setSingleShot(True)
    self.timer.timeout.connect(self.positionReached)
    self.timer.moveToThread(self.timerThread)
    
    self.pos = 0
  
  def moveToPosition(self, pos):
    self.pos = pos
    self.positionReachedEvent.clear()
    self.timerThread.start()
  
  @pyqtSlot()
  def _startTimer(self):
    self.timer.start(self.pos*1000)
  
  @pyqtSlot()
  def positionReached(self):
    self.timerThread.quit()
    self.positionReachedEvent.set()
    self.positionReachedSignal.emit()

A QThread object is added to the AxisController which will exclusively run the QTimer and provide an event loop for it. The documentation states that it is not possible to start a timer from another thread. But it is possible to start a thread from another thread - so we use the QThread’s started signal to launch the timer from the context of the timerThread self.timerThread.started.connect(self._startTimer) and start the thread in the user oriented moveToPosition(..) function.

The timer’s timeout signal is connected to the positionReached(..) slot. In this callback function the timer thread’s event loop has to be terminated using the quit() function so that the timer can be started again in the future.

Updated: