支持HW团队,就支付宝领取下面的红包吧!(2018年3月31前,就几毛,也会几块,可以和其他红包叠加使用),你领取消费,HW有奖励。红包使用无条件限制,有条件请注意是不是有病毒。

小伙伴们,给大家发红包喽!人人可领,领完就能用。祝大家领取的红包金额大大大!#吱口令#长按复制此消息,打开支付宝就能领取!er1OEj73Uj

登入 注册 | 验证
| 搜索

博主:初学MPEG

初学MPEG 本博客-采用Python的web框架Django与Mysql数据库,致力于对Python、Django的了解 与研究
Django技术QQ群:XXXXXXX
Python技术QQ群:XXXXXXX

分类

关键字

本站最新博文

友情链接  

[转]BOOST 线程完全攻略 - 扩展 - 线程消息通讯

类别:STL、Boost 状态:游客可见,可回,自己可关联(良好) 阅读:4800 评论:0 时间:四月 12, 2012, 9:55 a.m.
关键字: boost Thread

来源:http://blog.csdn.net/iamnieo/article/details/2909236

// controlled_module_ex.hpp : controlled_module类的扩展
// 增强线程之间消息通讯
// 增加线程安全启动和安全关闭功能
// 增加定时器功能
 
#pragma once
#include <boost/shared_ptr.hpp>
#include <boost/any.hpp>
#include "controlled_module.hpp"
 
struct _command
{
    typedef boost::shared_ptr<_command> CCmdPtr;
    unsigned int nCmd;
    boost::any anyParam;
};
struct _wait_command
{       
    boost::any par;
    unsigned int command;
    void * event;
    boost::shared_ptr<boost::any> resp;
};
class controlled_module_ex;
struct _notify
{
    controlled_module_ex * sender;
    int id;
    boost::any par;
};
#define BM_RESERVE 1000
#define BM_RING_START BM_RESERVE+1
#define BM_RING_STOP BM_RESERVE+2
#define BM_RING_SETTIME  BM_RESERVE+3
#define BM_RING_SETPARENT BM_RESERVE+4
#define BM_RING_CYCLE BM_RESERVE+5
#define BM_RING_PROCESS BM_RESERVE+6
#define BM_RING_PROCESSEND BM_RESERVE+7
#define BM_RING_PROCESSFAIL BM_RESERVE+8
#define BM_TIMER    BM_RESERVE+9
#define BM_COMMAND  BM_RESERVE+10
#define BM_NOTIFY   BM_RESERVE+11
 
#define BM_USER 9000
class controlled_timer;
class controlled_module_ex: public controlled_module
{
public:
    controlled_module_ex()
    {
        m_safe = false;
    }
    ~controlled_module_ex()
    {
        safestop();
    }
public:
    template<typename T>
    bool postmessage(unsigned int nCmd, const boost::shared_ptr<T>& p)
    {
        if(this==0||!m_safe)return false;
        boost::mutex::scoped_lock lock(m_mutex_command);
        _command::CCmdPtr cmd(new _command);
        cmd->nCmd = nCmd;
        cmd->anyParam = p;
        m_list_command.push_back(cmd);
        return true;
    }
    boost::any execute(unsigned int command,boost::any par,int timeout=-1)
    {
        boost::shared_ptr<_wait_command> shared(new _wait_command);
        _wait_command & cmd = *shared;
        cmd.command = command;
        cmd.event = (void *)CreateEvent(0,FALSE,FALSE,0);
        cmd.par = par;
        cmd.resp = boost::shared_ptr<boost::any>(new boost::any);
        if(this->postmessage(BM_COMMAND,shared))
        {
            DWORD dw = WaitForSingleObject(cmd.event,timeout);
            CloseHandle(cmd.event);
            if(dw!=WAIT_OBJECT_0)
                return boost::any();
            else
                return *cmd.resp;
        }
        else
        {
            CloseHandle(cmd.event);
            return boost::any();
        }
    }
    void notify(_notify p)
    {
        this->postmessage(BM_NOTIFY,p);
    }
    bool postmessage(unsigned int nCmd,boost::any p)
    {
        if(this==0||!m_safe)
            return false;
        boost::mutex::scoped_lock lock(m_mutex_command);
        _command::CCmdPtr cmd(new _command);
        cmd->nCmd = nCmd;
        cmd->anyParam = p;
        m_list_command.push_back(cmd);
        return true;
    }
    bool postmessage(unsigned int nCmd)
    {
        if(this==0||!m_safe)
            return false;
        boost::mutex::scoped_lock lock(m_mutex_command);
        _command::CCmdPtr cmd(new _command);
        cmd->nCmd = nCmd;
        cmd->anyParam = 0;
        m_list_command.push_back(cmd);
        return true;
    }
    virtual bool work()
    {
        if(!getmessage())
            return false;
        else
        {
            Sleep(this->m_sleeptime);
            return true;
        }
    }
    virtual void message(const _command & cmd)
    {
        if(cmd.nCmd==BM_RING_START)
        {
            this->on_safestart();
        }
        else if(cmd.nCmd==BM_RING_STOP)
        {
            this->on_safestop();
        }
        else if(cmd.nCmd==BM_TIMER)
        {
            this->on_timer(boost::any_cast<controlled_timer*>(cmd.anyParam));
        }
        else if(cmd.nCmd==BM_COMMAND)
        {
            boost::shared_ptr<_wait_command> shared = boost::any_cast< boost::shared_ptr<_wait_command> >(cmd.anyParam);
            _wait_command & cmd = *shared;
            *cmd.resp = this->on_command(cmd.command,cmd.par);
            SetEvent((HANDLE)cmd.event);
        }
        else if(cmd.nCmd==BM_NOTIFY)
        {
            try
            {
                _notify par = boost::any_cast<_notify>(cmd.anyParam);
                this->on_notify(par);
            }
            catch(boost::bad_any_cast)
            {
            }
        }
    }
    virtual void release()
    {
        boost::mutex::scoped_lock lock(m_mutex_command);
        m_list_command.clear();
    }
    void safestart()
    {
        if(!islive())
            start();
        m_safe = true;
        m_safestart_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
        postmessage(BM_RING_START);
        ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE);
        CloseHandle(m_safestart_event);
    }
    void safestop()
    {
    if(this->islive())
    {
        m_safe = false;
        m_safestop_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
        {
            boost::mutex::scoped_lock lock(m_mutex_command);
            _command::CCmdPtr cmd(new _command);
            cmd->nCmd = BM_RING_STOP;
            cmd->anyParam = 0;
            m_list_command.push_back(cmd);
        }
        DWORD dw = ::WaitForSingleObject((HANDLE)m_safestop_event,3*1000);
        if(WAIT_OBJECT_0!=dw)
        {
        }
        CloseHandle(m_safestop_event);
        stop();
    }
    }
    virtual void on_timer(const controlled_timer * p){}
    virtual void on_safestart()
    {
        SetEvent(m_safestart_event);
    }
    virtual void on_safestop()
    {
        SetEvent(m_safestop_event);
    }
    virtual void on_notify(const _notify & p)
    {
    }
protected:
    virtual boost::any on_command(const unsigned int command,const boost::any par)
    {
        return boost::any();
    }
    bool getmessage()
    {
        std::list<_command::CCmdPtr> cache;
        {
            boost::mutex::scoped_lock lock(m_mutex_command);
            while(!m_list_command.empty())
            {
                _command::CCmdPtr p = m_list_command.front();
                m_list_command.pop_front();
                cache.push_back(p);
            }
        }
        _command::CCmdPtr stop_command;
        std::list<_command::CCmdPtr>::iterator item;
        for(item = cache.begin();item!=cache.end();item++)
        {
            if((*(*item)).nCmd==BM_RING_STOP)
            {
                stop_command = *item;               
                break;
            }
        }
        if(stop_command.get()==0)
        {
            while(!cache.empty())
            {
                _command::CCmdPtr p = cache.front();
                cache.pop_front();
                try
                {
                    if((*p).nCmd!=BM_RING_START)
                    {
                        if(!this->m_safe)
                            continue;
                    }
                    this->message(*p);
                }
                catch(boost::bad_any_cast &)
                {
                }
            }
            return true;
        }
        else
        {
            cache.clear();
            this->message(*stop_command);
            return false;
        }
    }
private:
    void*   m_safestart_event;
    void* m_safestop_event;
    bool m_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,当父线程safestop以后有可能会收到其他线程的postmessage,这时会引起线程死锁,这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理
    boost::mutex m_mutex_command;
    std::list<_command::CCmdPtr> m_list_command;
};
class controlled_timer: public controlled_module_ex
{
public:
  controlled_timer()
  {
    this->m_time = 0;
    this->m_parent = 0;
    this->m_step = 0;
  }
  ~controlled_timer(){
  }
protected:
  controlled_module_ex* m_parent;
  int m_time;
  int m_step;
public:
  void starttimer(int time,controlled_module_ex* parent)
  {
    this->safestart();
    this->postmessage(BM_RING_SETPARENT,parent);
    this->postmessage(BM_RING_SETTIME,time);
  }
  void stoptimer()
  {
    this->safestop();
  }
public:
  virtual void on_safestop()
  {
    m_time = 0;
    controlled_module_ex::on_safestop();
  }
  virtual void message(const _command & cmd)
  {
    controlled_module_ex::message(cmd);
    if(cmd.nCmd==BM_RING_SETTIME)
    {
        int time = boost::any_cast<int>(cmd.anyParam);
        this->m_time = time/this->m_sleeptime;
        this->postmessage(BM_RING_CYCLE);
    }
    else if(cmd.nCmd==BM_RING_SETPARENT)
    {
        this->m_parent  = boost::any_cast<controlled_module_ex*>(cmd.anyParam);
    }   
    else if(cmd.nCmd==BM_RING_CYCLE)
    {
        if(m_time>0)
        {
            if(m_step>m_time)
            {
                m_parent->postmessage(BM_TIMER,this);
                m_step=0;
            }
            m_step++;
        }
        this->postmessage(BM_RING_CYCLE);
    } 
  }
};

1.向线程PostMessage

  函数controlled_module_ex::postmessage完成消息推送。
  虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。
#include "controlled_module_ex.hpp"
 
 
class thdex: public controlled_module_ex
{
protected:
    virtual void message(const _command & cmd)
    {
        controlled_module_ex::message(cmd);
        if(cmd.nCmd==BM_USER+1)
        {
            cout << "get message" << endl;
        }
    }
};
 
int _tmain(int argc, _TCHAR* argv[])
{
    thdex t;
    t.safestart();
    t.postmessage(BM_USER+1);
    char buf[10];
    gets_s(buf,sizeof buf);
    t.safestop();
    return 0;
}
 
  2.向线程PostMessage,并携带简单对象参数
 我们都知道常规的PostMessage要传参,如果是整型参数,还可以用强制转换,但如果是其他类型,例如字符串,我们就必须创建一个字符串缓冲,把缓冲指针作为参数传过去,线程还不能忘记删除,否则导致内存泄漏,自定义结构也是一样的操作,如果想尝试传递一个CString对象,是不可能的。
  幸运的是boost提供了boost::any来抽象任何对象类型,controlled_module_ex的消息传递都是基于boost::any来完成,程序员可以由此写出干净而且内存安全的代码。
#include "controlled_module_ex.hpp"
 
struct mystruct
{
    string a;
    int b;
};
class thdex: public controlled_module_ex
{
protected:
    virtual void message(const _command & cmd)
    {
        controlled_module_ex::message(cmd);
        if(cmd.nCmd==BM_USER+1)
        {
            cout << "get integer:" << boost::any_cast<int>(cmd.anyParam) << endl;
        }
        if(cmd.nCmd==BM_USER+2)
        {
            cout << "get string:" << boost::any_cast<string>(cmd.anyParam) << endl;
        }
        if(cmd.nCmd==BM_USER+3)
        {
            mystruct par = boost::any_cast<mystruct>(cmd.anyParam);
            cout << "get mystruct:" << par.a << "," << par.b << endl;
        }
    }
};
 
int _tmain(int argc, _TCHAR* argv[])
{
    thdex t;
    t.safestart();
    t.postmessage(BM_USER+1,123);
    t.postmessage(BM_USER+2,string("hello world"));
    mystruct par;
    par.a = "hello world";
    par.b = 123;
    t.postmessage(BM_USER+3,par);
 
    char buf[10];
    gets_s(buf,sizeof buf);
    t.safestop();
    return 0;
}
 
3.向线程PostMessage,并传递内存块参数
  假如我们书写了一个录音子线程,如何将录制的语音数据传递给其他线程呢,常规做法是创建一个缓冲,将语音数据填充进去,然后将缓冲地址作为参数传递,这种做法要求接收线程不能忘记删除,否则会导致内存泄漏。
  幸运的是boost提供了智能指针,可以类似java,c#的智能内存回收一样来管理内存分配,我们如果使用这个对象来作为参数传递,就可以完美的防范内存泄漏行为,就算子线程没有处理,别担心,内存它会自动回收的。
#include "controlled_module_ex.hpp"
 
struct mystruct
{
    boost::shared_ptr<char> data;
    int datalen;
};
class thdex: public controlled_module_ex
{
protected:
    virtual void message(const _command & cmd)
    {
        controlled_module_ex::message(cmd);
        if(cmd.nCmd==BM_USER+1)
        {
            cout << "get sharedptr" << endl; //仅仅得到数据,得不到数据长度
        }
        if(cmd.nCmd==BM_USER+2)
        {
            mystruct par = boost::any_cast<mystruct>(cmd.anyParam);
            cout << "get sharedptr len:" << par.datalen << endl;
        }
    }
};
 
int _tmain(int argc, _TCHAR* argv[])
{
    thdex t;
    t.safestart();
    t.postmessage(BM_USER+1,boost::shared_ptr<char>(new char[1000]));
    mystruct par;
    par.datalen = 1000;
    par.data = boost::shared_ptr<char>(new char[par.datalen]);
    t.postmessage(BM_USER+2,par);
 
    char buf[10];
    gets_s(buf,sizeof buf);
    t.safestop();
    return 0;
}
 
3.向线程SendMessage
  函数controlled_module_ex::execute完成这个工作
  虚拟函数controlled_module_ex::on_command(const unsigned int command,const boost::any par)响应消息处理
#include "controlled_module_ex.hpp"
 
class thdex: public controlled_module_ex
{
protected:
    boost::any on_command(const unsigned int command,const boost::any par)
    {
        if(command==1)
        {
            cout << "on command" << endl;
            return 0;
        }
        if(command==2)
        {
            cout << "on command,par:" << boost::any_cast<string>(par) << endl;
            return 0;
        }
        if(command==3)
        {
            return true;
        }
        else
            return controlled_module_ex::on_command(command,par);
    }
};
 
int _tmain(int argc, _TCHAR* argv[])
{
    thdex t;
    t.safestart();
    t.execute(1,0);//等待子线程处理完成
    t.execute(2,string("hello world"));//带参数 等待子线程完成
    bool rs = boost::any_cast<bool>(t.execute(3,0));//等待子线程处理完成,并取得返回值
    cout << "get thread result:" << rs << endl;
    boost::any timeout = t.execute(4,0,1000);//等待子线程处理,超时1秒
    if(timeout.empty())
        cout << "timeout " << endl;
 
    char buf[10];
    gets_s(buf,sizeof buf);
    t.safestop();
    return 0;
}
4.定时器
  类似于CWnd::OnTimer,controlled_module_ex也提供一个虚拟函数virtual void on_timer(const controlled_timer * p);来处理定时
#include "controlled_module_ex.hpp"
 
class thdex: public controlled_module_ex
{
protected:
    virtual void on_timer(const controlled_timer *p)
    {
        cout << "ontimer" << endl;
    }
};
 
int _tmain(int argc, _TCHAR* argv[])
{
    thdex t;
    controlled_timer timer;
    t.safestart();
    timer.starttimer(1000,&t);
 
    char buf[10];
    gets_s(buf,sizeof buf);
    t.safestop();
    return 0;
}
 
操作:

Please Login (or Sign Up) to leave a comment