Friday, April 24, 2009

Qt 的线程和线程通信

传统的多线程程序都是通过 POSIX 的线程库 pthread 实现的,而 Qt 为用户提供的线程就是对 pthread 的再次封装,这个封装的类就是 QThread 直接继承 QObject 而来。

class Q_CORE_EXPORT QThread : public QObject
{
public:
    static Qt::HANDLE currentThreadId();
    static QThread *currentThread();
    static int idealThreadCount();
    static void yieldCurrentThread();

    explicit QThread(QObject *parent = 0);
    ~QThread();

    enum Priority {
        IdlePriority,

        LowestPriority,
        LowPriority,
        NormalPriority,
        HighPriority,
        HighestPriority,

        TimeCriticalPriority,

        InheritPriority
    };

    void setPriority(Priority priority);
    Priority priority() const;

    bool isFinished() const;
    bool isRunning() const;

    void setStackSize(uint stackSize);
    uint stackSize() const;

    void exit(int retcode = 0);

public Q_SLOTS:
    void start(Priority = InheritPriority);
    void terminate();
    void quit();

public:
    // default argument causes thread to block indefinately
    bool wait(unsigned long time = ULONG_MAX);

Q_SIGNALS:
    void started();
    void finished();
    void terminated();

protected:
    virtual void run();
    int exec();

    static void setTerminationEnabled(bool enabled = true);

    static void sleep(unsigned long);
    static void msleep(unsigned long);
    static void usleep(unsigned long);

#ifdef QT3_SUPPORT
public:
    inline QT3_SUPPORT bool finished() const { return isFinished(); }
    inline QT3_SUPPORT bool running() const { return isRunning(); }
#endif

protected:
    QThread(QThreadPrivate &dd, QObject *parent = 0);

private:
    Q_OBJECT
    Q_DECLARE_PRIVATE(QThread)

    static void initialize();
    static void cleanup();

    friend class QCoreApplication;
    friend class QThreadData;
};
编程人员主要通过继承 QThread 类,并自己重新实现 QThread::run(),我们来看该类在哪里留下的“钩子”,这个原始的 run() 做了什么呢?
void QThread::run()
{
    (void) exec();
}

int QThread::exec()
{
    Q_D(QThread);
    d->mutex.lock();
    d->data->quitNow = false;
    QEventLoop eventLoop;
    d->mutex.unlock();
    int returnCode = eventLoop.exec();
    return returnCode;
}
可见默认情况就是进入一个事件循环。

所谓的 event loop 是每一个 thread 都需要进行的任务,这个过程将处理外部环境发给自己的 event,Qt 将各种 event 进行了包装,用 QEvent 类进行了封装,不同的事件就是 QEvent 的子类,很多类都具有处理 event 的能力,比如 QWidget,下面是 QWidget 实现的事件,

protected:
    // Event handlers
    bool event(QEvent *);
    virtual void mousePressEvent(QMouseEvent *);
    virtual void mouseReleaseEvent(QMouseEvent *);
    virtual void mouseDoubleClickEvent(QMouseEvent *);
    virtual void mouseMoveEvent(QMouseEvent *);
#ifndef QT_NO_WHEELEVENT
    virtual void wheelEvent(QWheelEvent *);
#endif
    virtual void keyPressEvent(QKeyEvent *);
    virtual void keyReleaseEvent(QKeyEvent *);
    virtual void focusInEvent(QFocusEvent *);
    virtual void focusOutEvent(QFocusEvent *);
    virtual void enterEvent(QEvent *);
    virtual void leaveEvent(QEvent *);
    virtual void paintEvent(QPaintEvent *);
    virtual void moveEvent(QMoveEvent *);
    virtual void resizeEvent(QResizeEvent *);
    virtual void closeEvent(QCloseEvent *);
#ifndef QT_NO_CONTEXTMENU
    virtual void contextMenuEvent(QContextMenuEvent *);
#endif
#ifndef QT_NO_TABLETEVENT
    virtual void tabletEvent(QTabletEvent *);
#endif
#ifndef QT_NO_ACTION
    virtual void actionEvent(QActionEvent *);
#endif

#ifndef QT_NO_DRAGANDDROP
    virtual void dragEnterEvent(QDragEnterEvent *);
    virtual void dragMoveEvent(QDragMoveEvent *);
    virtual void dragLeaveEvent(QDragLeaveEvent *);
    virtual void dropEvent(QDropEvent *);
#endif

    virtual void showEvent(QShowEvent *);
    virtual void hideEvent(QHideEvent *);

#if defined(Q_WS_MAC)
    virtual bool macEvent(EventHandlerCallRef, EventRef);
#endif
#if defined(Q_WS_WIN)
    virtual bool winEvent(MSG *message, long *result);
#endif
#if defined(Q_WS_X11)
    virtual bool x11Event(XEvent *);
#endif
#if defined(Q_WS_QWS)
    virtual bool qwsEvent(QWSEvent *);
#endif

    // Misc. protected functions
    virtual void changeEvent(QEvent *);
这里看起来很复杂,主要两种,一个是 QWidget::event() 本身,另一个是各种虚函数 *Event 用于处理不同的 event,其实是一种 event handler,我们通过继承 QWidget 实现不同的 event handler 就可以获得不同的 component 了,比如 QAbstractButton 需要重写 paintEvent,使得该区域没有被按下的时候凸起,而按下后变成凹下的样子,另外,在 QWidget::event() 的实现中我们看见如下实现,
switch (event->type()) {
case QEvent::MouseMove:
  mouseMoveEvent((QMouseEvent*)event);
  break;
// ...
}
这就是下“钩子”的地方,Qt 通过调用 event() 方法,响应向一个 QObject 发送一个需要响应的事件(调用者利用 QApplication::sendEvent() 方法发送),event() 处理后返回 true,否则返回 false,event() 函数进一步利用 virtual function *Event() 函数分类处理该事件,因此要改变一个 QWidget 响应某事件就需要 override 这些事件的 event handler。而为了让用户更好的处理这些事件而不干预内部的实现,在 event handler 里面会 emit signal,比如
void QAbstractButton::mousePressEvent(QMouseEvent *e)
{
    Q_D(QAbstractButton);
    if (e->button() != Qt::LeftButton) {
        e->ignore();
        return;
    }
    if (hitButton(e->pos())) {
        setDown(true);
        repaint(); //flush paint event before invoking potentially expensive operation
        QApplication::flush();
        d->emitPressed();
        e->accept();
    } else {
        e->ignore();
    }
}
这样,用户就可以利用这个 signal 而不是事件本身来通知别的 component 了。如果是需要通过 QApplication::sendEvent() 那么必须重新 implement 对应的 event handler,也就是必须继承该 class,这是非常繁琐的(每个控件都是库里面的衍生体 -,-b)。

我们接着看 QThread 如何调用 run(),这部分到不是在 [qt]/src/corelib/qthread.cpp 里定义的,是在 qthread_unix.cpp 里面,这相当于是跟实现的 OS 相关的代码,因此分成了几个文件,QThread::start() 调用了 run(),

int code =
        pthread_create(&d->thread_id, &attr, QThreadPrivate::start, this);
void *QThreadPrivate::start(void *arg)
{
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    pthread_cleanup_push(QThreadPrivate::finish, arg);

    QThread *thr = reinterpret_cast<QThread *>(arg);
    QThreadData *data = QThreadData::get2(thr);

    pthread_once(¤t_thread_data_once, create_current_thread_data_key);
    pthread_setspecific(current_thread_data_key, data);

    data->ref();
    data->quitNow = false;

    // ### TODO: allow the user to create a custom event dispatcher
    if (QCoreApplication::instance())
        createEventDispatcher(data);

    emit thr->started();
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_testcancel();
    thr->run();

    pthread_cleanup_pop(1);
    return 0;
}
这个程序首先把 QThreadPrivate::finish() 压栈(等线程结束后清理),设置好线程特性,然后建立 event dispatcher,触发 QThread::started() 信号,然后就调用 QThread::run() 了。最后等待该部分结束后,就会通过前面压栈的 finish() 完成清理工作。

因此尽管 QThread 这个对象在主线程中创建,但是实际上 QThread::run() 是在新的线程里面运行的,这里涉及到很重要的问题就是 reentrant 和 thread-safe。一个函数/类称为 reentrant 如果可以同时被若干线程调用,一般来说就是说他不会改变内部所依赖的数据,有的函数通过一个 static 成员计算结果,而该成员在计算中会被反复修改,因此一个线程调用该函数后,另一个线程就不可以继续调用。所谓的 thread-safe 就是说,某函数/类尽管需要访问 shared data,但是通过串行化访问,保证在不同线程调用时使得每个操作都是安全的。为了保证这点,Qt 提供了我们 QMutex、QReadWriteLock、QSemaphore 和 QWaitCondition 这些用于保证访问的 thread-safety 和同步我们的线程。

所谓 mutex(mutual exclusive)就是保证访问是 serialized 的,创建一个 QMutex 对象,所有合法的请求应该都遵守首先使用 QMutexLocker 将该 mutex 锁定,然后才能访问数据的原则,这时因为 QMutex 只能被 lock 一次,后续请求的线程将被挂起直到前面锁定的线程解除。可是 QMutex 无法区分读、写的不同,因为往往读操作可以多线程并行,而写操作必须串行化,这可以用 QReadWriteLock。另外 QMetaphore 是对一批资源的控制,比如 Qt 提供的例程,

#include <QtCore>

#include <stdio.h>
#include <stdlib.h>

const int DataSize = 100000;
const int BufferSize = 8192;
char buffer[BufferSize];

QSemaphore freeBytes(BufferSize);
QSemaphore usedBytes;

class Producer : public QThread
{
public:
  void run();
};

void Producer::run()
{
  qsrand(QTime(0,0,0).secsTo(QTime::currentTime()));
  for (int i = 0; i < DataSize; ++i) {
    freeBytes.acquire();
    buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4];
    usedBytes.release();
  }
}

class Consumer : public QThread
{
public:
  void run();
};

void Consumer::run()
{
  for (int i = 0; i < DataSize; ++i) {
    usedBytes.acquire();
    fprintf(stderr, "%c", buffer[i % BufferSize]);
    freeBytes.release();
  }
  fprintf(stderr, "\n");
}

int main(int argc, char *argv[])
{
  QCoreApplication app(argc, argv);
  Producer producer;
  Consumer consumer;
  producer.start();
  consumer.start();
  producer.wait();
  consumer.wait();
  return 0;
}
有人可能会抱怨,那个 fprintf() 会不会不是 atomic 的呢?
The POSIX standard requires that by default the stream operations are atomic. I.e., issuing two stream operations for the same stream in two threads at the same time will cause the operations to be executed as if they were issued sequentially. The buffer operations performed while reading or writing are protected from other uses of the same stream. To do this each stream has an internal lock object which has to be (implicitly) acquired before any work can be done.
But there are situations where this is not enough and there are also situations where this is not wanted. The implicit locking is not enough if the program requires more than one stream function call to happen atomically. One example would be if an output line a program wants to generate is created by several function calls. The functions by themselves would ensure only atomicity of their own operation, but not atomicity over all the function calls. For this it is necessary to perform the stream locking in the application code.
而 QWaitCondition 主要用于多个线程的同步,比如创建一个该对象,几个线程调用 QWaitCondition::wait( &mutex ),另外的通过 QWaitCondition::wakeAll() 或者 QWaitCondition::wakeOne() 唤醒。

最后我们谈一下 signal/slot 机制和多线程的关系。尽管 QObject 保证是 reentrant 的(QWidget 一般不是的),但是我们编程的时候要注意,一个 QObject 的的子对象(通过 QObject::QObject( QObject *parent) 构造)必须在同一个线程里面产生;另外,对事件驱动的如 QTimer 和 network 一些类,启动的代码必须在同一个线程里(这意味着如果在一个QThread 子类里面创建一个 QTimer,其实是不能在 QThread::run() 里面启动该 timer,因为创建 QThread 对象是在另外一个线程里面,只有 QThread::run() 的那部分在新的线程里面执行);在一个线程里面创建的对象必须在线程被释放 QThread 前释放。

一个 QObject 创建后存在于一个线程中,我们可以用 QObject::moveToThread() 把该对象“置于”另外的线程,但是该 QObject 不能有 parent。这时候从一个线程调用另外一个线程的方法可以认为不是 thread safe 的,因为可能该对象正在处理事件,而比如调用析构函数就会产生严重问题(因此最好调用 QObject::deleteLater() 函数),这也就是我们需要注意信号发送的方式的问题,QObject::connect() 允许我们使用所谓 queued connection。低层一点的 QCoreApplication::sendEvent() 其实也应该在这种情况被替换为 QCoreApplication::postEvent()。这样避免在本线程重复调用某个 QObject 的代码,产生 thread safety 的问题。

我们将在下面研究 QApplication 和 QCoreApplication 类,分析 sendEvent()、postEvent() 以及 Qt 如何实现的 queued connection 的。

No comments: