支持HW团队,就支付宝领取下面的红包吧!(2018年3月31前,就几毛,也会几块,可以和其他红包叠加使用),你领取消费,HW有奖励。红包使用无条件限制,有条件请注意是不是有病毒。

小伙伴们,给大家发红包喽!人人可领,领完就能用。祝大家领取的红包金额大大大!#吱口令#长按复制此消息,打开支付宝就能领取!er1OEj73Uj

登入 注册 | 验证
| 搜索

博主:初学MPEG

初学MPEG 本博客-采用Python的web框架Django与Mysql数据库,致力于对Python、Django的了解 与研究
Django技术QQ群:XXXXXXX
Python技术QQ群:XXXXXXX

分类

关键字

本站最新博文

友情链接  

[转]BOOST 线程完全攻略 - 扩展 - 事务线程

类别:STL、Boost 状态:游客可见,可回,自己可关联(良好) 阅读:4035 评论:0 时间:四月 12, 2012, 9:56 a.m.
关键字: boost Thread

 来源:http://blog.csdn.net/iamnieo/article/details/2910595

什么叫事务线程

举个例子:
我们写一个IM客户端的登录子线程,则该子线程会有这么几个事务要处理
No.1 TCP Socket物理连接
No.2 逻辑登录
No.3 好友在线查询
No.4 状态更新
我们通常的代码写法是
void ThreadLogin()
{
  try
  {
     if(fail(物理连接))
        throw;
     if(fail(登录))
        throw;
     if(fail(查询好友))
        throw;
     if(fail(更新))
        throw;
  }
  catch(exception)
  {
  }
}
串行的逻辑用串行的代码写,不太好看,况且中途如果主线程发出取消指令,还不好处理。
这里扩展的thread类,就是来解决这个问题的,他会提供给程序员一种事件处理的模式
class threadLogin
{
void onEventConnect()
{
  物理连接
}
void onEventLogin()
{
 登录
}
void onEventQuery()
{
查询
}
void onEventUpdate()
{
更新
}
}
源码如下
// thread.hpp : controlled_module_ex类的扩展
// 增强线程事务处理能力
#pragma once
#include "controlled_module_ex.hpp"
  class thread: public controlled_module_ex
  {
    protected:
        static const int NONE = -1;
        static const int WAITING =-2;
        static const int DONE =-3;
        static const int FAILED =-4;
    protected:
        struct process
        {
            int level;
            int status;
            int sequence;
            int trycount;
            int tryindex;
            std::string lasterror;
            double timeout;
            bool bTimeout;
        };
        process m_process;
        controlled_timer m_timer_process;
        int m_process_begin,m_process_end;
        double m_timeout_default;
    public:
        void startprocess(int process_begin,int process_end,double timeout_default=1.0,int cycle=1000)
        {
            m_process_begin = process_begin;
            m_process_end = process_end;
            m_timeout_default = timeout_default;
            m_process.level = m_process_begin;
            m_process.tryindex = 0;
            this->postmessage(BM_RING_PROCESS);
            m_timer_process.starttimer(cycle,this);
        }
        void tryagain()
        {
            if(this->m_process.level==thread::NONE)
                return;
            this->m_process.tryindex++;
            if(this->m_process.trycount>0 && this->m_process.tryindex>=this->m_process.trycount)
            {
                this->fail();
            }
            else
                this->postmessage(BM_RING_PROCESS);
        }
        void next()
        {
            if(this->m_process.level==thread::NONE)
                return;
            if(this->m_process.level>=this->m_process_end)
            {
                this->m_timer_process.stoptimer();
                this->postmessage(BM_RING_PROCESSEND);
            }
            else
            {
                this->m_process.tryindex = 0;
                this->m_process.level++;
                this->m_process.bTimeout = false;
                this->postmessage(BM_RING_PROCESS);
            }
        }
        void fail()
        {
            m_process.level = thread::NONE;
            this->m_timer_process.stoptimer();
            this->postmessage(BM_RING_PROCESSFAIL);
        }
        virtual void on_safestart()
        {
            m_process.level = thread::NONE;
            m_process.status = thread::NONE;
            m_process_begin = m_process_end = thread::NONE;
            controlled_module_ex::on_safestart();
        }
        virtual void on_safestop()
        {
            m_timer_process.stoptimer();
            controlled_module_ex::on_safestop();
        }
        virtual void message(const _command & cmd)
        {
            controlled_module_ex::message(cmd);
            if(cmd.nCmd==BM_RING_PROCESS)
            {
                this->on_process();
            }
            if(cmd.nCmd==BM_RING_PROCESSEND)
            {
                this->m_process.level = thread::NONE;
                this->on_process_end();
            }
            if(cmd.nCmd==BM_RING_PROCESSFAIL)
            {
                this->m_process.level = thread::NONE;
                this->on_process_fail();
            }
        }
        virtual void on_timer(const controlled_timer * p)
        {
            if(p==this->m_timer_process)
            {
                if(this->m_process.level!=thread::NONE)
                {
                    if(this->m_process.level>=this->m_process_begin && this->m_process.level<=this->m_process_end)
                    {
                        if(this->m_process.status==thread::NONE)
                        {
                            this->m_process.level = this->m_process_begin;
                            m_process.tryindex = 0;
                            on_process();
                        }
                        else if(this->m_process.status==thread::WAITING)
                        {
                            if(this->m_process.timeout>0)
                            {
                                time_t cur;
                                time(&cur);
                                if(difftime(cur,(time_t)this->m_process.sequence)>this->m_process.timeout)
                                {
                                    this->m_process.bTimeout = true;
                                    this->tryagain();
                                }
                            }
                        }
                        else if(this->m_process.status==thread::FAILED)
                        {
                            this->tryagain();
                        }
                        else if(this->m_process.status==thread::DONE)
                        {
                            this->m_process.level++;
                            m_process.tryindex = 0;
                            this->on_process();
                        }
                    }
                }
            }
        }
        virtual void on_process()
        {
            time((time_t*)&m_process.sequence);
            m_process.timeout = m_timeout_default;
            m_process.status = thread::WAITING;
            m_process.trycount = -1;
        }
        virtual void on_process_end(){}
        virtual void on_process_fail(){}
        int get_sequence(){return m_process.sequence;}
        void put_timeout(double v){m_process.timeout = v;}
        void put_trycount(int v){m_process.trycount = v;}
        int get_level(){return m_process.level;}
        void put_level(int v){m_process.level=v;}
        std::string get_lasterror(){return m_process.lasterror;}
        void put_lasterror(std::string v){m_process.lasterror=v;}
        __declspec(property(put=put_trycount)) int trycount;
        __declspec(property(put=put_timeout)) double timeout;
        __declspec(property(get=get_level,put=put_level)) int level;
        __declspec(property(get=get_sequence)) int sequence;
        __declspec(property(get=get_lasterror,put=put_lasterror)) std::string lasterror;
  };
虚拟函数thread::on_process()处理各种事务事件
虚拟函数thread::on_process_end()是所有事务处理完毕事件
虚拟函数thread::on_process_fail()是事务处理出现错误,这时所有事务被取消,线程终止
这里给一个简单的范例,
总共线程要完成3件事务,其中第二个事务要求用户确认是否继续
#define PROCESS_1   1
#define PROCESS_2   2
#define PROCESS_3   3
class thdex: public thread
{
public:
    virtual void on_process()
    {
        thread::on_process();
        if(this->level==PROCESS_1)
        {
            cout << "work on process 1..." << endl;
            Sleep(100);
            cout << "process 1 done." << endl;
            this->next();
        }
        else if(this->level==PROCESS_2)
        {
            cout << "work on process 2..." << endl;
            this->timeout = -1;
            if(IDNO==::MessageBox(0,"are your want continue?","ask",MB_ICONQUESTION|MB_YESNO))
            {
                this->lasterror = "canceled by user";
                this->fail();
            }
            else
            {
                Sleep(100);
                cout << "process 2 done." << endl;
                this->next();
            }
        }
        else if(this->level==PROCESS_3)
        {
            cout << "work on process 3..." << endl;
            Sleep(100);
            cout << "process 3 done." << endl;
            this->next();
        }
    }
    virtual void on_process_fail()
    {
        cout << this->lasterror << endl;
    }
    virtual void on_process_end()
    {
        cout << "all process done." << endl;
    }
};
int _tmain(int argc, _TCHAR* argv[])
{
    thdex t;
    t.safestart();
    t.startprocess(PROCESS_1,PROCESS_3);
    char buf[10];
    gets_s(buf,sizeof buf);
    t.safestop();
    return 0;
}
thread事务还支持超时设定和重试次数设定,这里就不做介绍,读者可以自己研究代码。
操作:

Please Login (or Sign Up) to leave a comment