博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
libjingle : sessionmanagertask 分析
阅读量:6277 次
发布时间:2019-06-22

本文共 8632 字,大约阅读时间需要 28 分钟。

hot3.png

     sessionmanagertask模块主要负责创建会话通道(session),在session中能创建自己的数据通道。
    
    在构造SessionManagerTask对象时,我们必须要实现XmppTaskParentInterface特例。以下为XmppTaskParentInterface抽象接口说明:
    class XmppTaskParentInterface : public talk_base::Task {
 public:
  explicit XmppTaskParentInterface(talk_base::TaskParent* parent)
      : Task(parent) {
  }
  virtual ~XmppTaskParentInterface() {}
  virtual XmppClientInterface* GetClient() = 0;
  DISALLOW_EVIL_CONSTRUCTORS(XmppTaskParentInterface);
};
这个类之提供了XmppClientInterface* GetClient()接口。
以下为XmppClientInterface抽象接口说明:
class XmppClientInterface {
 public:
  XmppClientInterface();
  virtual ~XmppClientInterface();
  virtual XmppEngine::State GetState() const = 0;
  virtual const Jid& jid() const = 0;
  virtual std::string NextId() = 0;
  virtual XmppReturnStatus SendStanza(const XmlElement* stanza) = 0;
  virtual XmppReturnStatus SendStanzaError(const XmlElement* original_stanza,
                                           XmppStanzaError error_code,
                                           const std::string& message) = 0;
  virtual void AddXmppTask(XmppTask* task, XmppEngine::HandlerLevel level) = 0;
  virtual void RemoveXmppTask(XmppTask* task) = 0;
  sigslot::signal0<> SignalDisconnected;
  DISALLOW_EVIL_CONSTRUCTORS(XmppClientInterface);
};
其中AddXmppTask是在SessionManagerTask构造的时候会将自身的指针会通过此接口的实现保存在一个容器中,当socket收到消息的时候会通过容器中的指针将消息返回给SessionManagerTask处理。这个在后面会有说明。
笔者重构了XmppEngine的实现,即libjingle实现的XmppEngineImpl的特例,为了适应笔者自己的项目,我实现了自己特例,以下为我的部分代码和相关代码说明:
设置输出处理器,这样的好处是应用层可以定义具体的实现
XmppReturnStatus eyouim_XmppEngineImpl::SetOutputHandler(XmppOutputHandler *pxoh)
{
m_pXmppOutputHandler = pxoh;
return XMPP_RETURN_OK;
}
数据发送,通过XmppOutputHandler :: WriteOutput接口,此接口的具体实现由应用层实现
XmppReturnStatus eyouim_XmppEngineImpl::SendStanza(const XmlElement * pelStanza)
{
if (NULL == m_pXmppOutputHandler)
return XMPP_RETURN_BADARGUMENT;
m_pXmppOutputHandler->WriteOutput(pelStanza->Str().c_str(),pelStanza->Str().size());
return XMPP_RETURN_OK;
}
         添加数据处理器,SessionManagerTask在构造的时候会将指针通过此接口保存在 stanza_handlers_ 中
XmppReturnStatus eyouim_XmppEngineImpl::AddStanzaHandler(XmppStanzaHandler* handler, HandlerLevel level /*= HL_PEEK*/)
{
stanza_handlers_[level]->push_back(handler);
return XMPP_RETURN_OK;
}
处理socket接收到的数据,会遍历stanza_handlers_ 容器中的处理器,如果匹配就将数据给相关的处理处理
XmppReturnStatus eyouim_XmppEngineImpl::HandleInput(const char * bytes, size_t len)
{
buzz::XmlBuilder builder;
buzz::XmlParser::ParseXml(&builder, bytes);
buzz::XmlElement *stanza = builder.BuiltElement();
for (size_t i = 0; i < stanza_handlers_[HL_PEEK]->size(); i += 1) {
(*stanza_handlers_[HL_PEEK])[i]->HandleStanza(stanza);
}
// give other handlers a shot in precedence order, stopping after handled
for (int level = HL_SINGLE; level <= HL_ALL; level += 1) {
for (size_t i = 0; i < stanza_handlers_[level]->size(); i += 1) {
if ((*stanza_handlers_[level])[i]->HandleStanza(stanza))
return XMPP_RETURN_OK;
}
}
// If nobody wants to handle a stanza then send back an error.
// Only do this for IQ stanzas as messages should probably just be dropped
// and presence stanzas should certainly be dropped.
std::string type = stanza->Attr(QN_TYPE);
if (stanza->Name() == QN_IQ &&
!(type == "error" || type == "result")) {
SendStanzaError(stanza, XSE_FEATURE_NOT_IMPLEMENTED, STR_EMPTY);
return XMPP_RETURN_UNEXPECTED;
}
return XMPP_RETURN_OK;
}
       移除相关的处理器
XmppReturnStatus eyouim_XmppEngineImpl::RemoveStanzaHandler(XmppStanzaHandler* handler)
{
bool found = false;
for (int level = 0; level < HL_COUNT; level += 1) {
StanzaHandlerVector::iterator new_end =
std::remove(stanza_handlers_[level]->begin(),
stanza_handlers_[level]->end(),
handler);
if (new_end != stanza_handlers_[level]->end()) {
stanza_handlers_[level]->erase(new_end, stanza_handlers_[level]->end());
found = true;
}
}
if (!found)
return XMPP_RETURN_BADARGUMENT;
return XMPP_RETURN_OK;
}
    笔者同样也实现了 XmppTaskParentInterface 和 XmppClientInterface 特例,以下为部分代码和说明:
    XmppClient.h头文件
    class XmppClient : public buzz::XmppTaskParentInterface
, public buzz::XmppClientInterface
, public sigslot::has_slots<>
{
public:
XmppClient(talk_base::TaskParent * parent);
~XmppClient();
                //设置输出处理器,这样的好处是应用层可以定义具体的实现
void set_XmppOutputHandler(buzz::XmppOutputHandler *pXmppOutputHandler)
{
m_engine->SetOutputHandler(pXmppOutputHandler);
}
                
                //处理socket接收到的数据
XmppReturnStatus HandleInput(const char * bytes, size_t len)
{
return m_engine->HandleInput(bytes,len);
}
XmppReturnStatus Disconnect();
//XmppTaskParentInterface
virtual XmppClientInterface* GetClient();
//XmppClientInterface
virtual buzz::XmppEngine::State GetState() const;
virtual const buzz::Jid& jid() const;
virtual std::string NextId();
virtual XmppReturnStatus SendStanza(const buzz::XmlElement* stanza);
virtual XmppReturnStatus SendStanzaError(const buzz::XmlElement* original_stanza,
XmppStanzaError error_code,
const std::string& message);
virtual void AddXmppTask(buzz::XmppTask* task, buzz::XmppEngine::HandlerLevel level);
virtual void RemoveXmppTask(buzz::XmppTask* task);
//task
virtual int ProcessStart();
protected:
private:
talk_base::scoped_ptr<XmppEngine> m_engine;
};
  
    XmppClient.cpp实现
    XmppClient::XmppClient(talk_base::TaskParent * parent)
:XmppTaskParentInterface(parent)
{
m_engine.reset(buzz::XmppEngine::Create());    //这里实际上创建的是eyouim_XmppEngineImpl
}
XmppClient::~XmppClient()
{
}
XmppClientInterface* XmppClient::GetClient()
{
return this;
}
XmppReturnStatus XmppClient::Disconnect()
{
Abort();
return XMPP_RETURN_OK;
}
//XmppClientInterface
XmppEngine::State XmppClient::GetState() const
{
return m_engine->GetState();
}
const buzz::Jid& XmppClient::jid() const
{
return m_engine->FullJid();
}
std::string XmppClient::NextId()
{
return m_engine->NextId();
}
XmppReturnStatus XmppClient::SendStanza(const buzz::XmlElement* stanza)
{
return m_engine->SendStanza(stanza);
}
XmppReturnStatus XmppClient::SendStanzaError(const buzz::XmlElement* original_stanza,
XmppStanzaError error_code,
const std::string& message)
{
return m_engine->SendStanzaError(original_stanza,error_code,message);
}
        
        
 添加数据处理器
void XmppClient::AddXmppTask(buzz::XmppTask* task, buzz::XmppEngine::HandlerLevel level)
{
m_engine->AddStanzaHandler(task,level);
}
void XmppClient::RemoveXmppTask(buzz::XmppTask* task)
{
m_engine->RemoveStanzaHandler(task);
}
int XmppClient::ProcessStart()
{
LOG(LS_INFO) << __FUNCTION__;
return 0;
}
    在构造XmppClient对象的时候我们需要一个任务执行管理对象,以下为笔者定义的:
    XmppPump.h
    class XmppPump : public talk_base::MessageHandler,public talk_base::TaskRunner
{
public:
XmppPump(talk_base::Thread *pXmppThread = NULL);
~XmppPump();
eyouim::XmppClient *client()
{
return m_pXmppClient;
}
//MessageHandeler
virtual void OnMessage(talk_base::Message* msg);
//TaskRunner
virtual void WakeTasks();
virtual int64 CurrentTime();
protected:
private:
eyouim::XmppClient *m_pXmppClient;
talk_base::Thread *m_pXmppThread;
};
    XmppPump.cpp
     XmppPump::XmppPump(talk_base::Thread *pXmppThread /*= NULL*/)
:m_pXmppClient(new eyouim::XmppClient(this)) NOTE: deleted by TaskRunner
,m_pXmppThread((pXmppThread != NULL) ? pXmppThread : talk_base::Thread::Current())
{
}
XmppPump::~XmppPump()
{
if (!AllChildrenDone())
m_pXmppClient->Disconnect();
}
void XmppPump::OnMessage(talk_base::Message* msg)
{
RunTasks();
}
void XmppPump::WakeTasks()
{
//talk_base::Thread::Current()->Post(this);
m_pXmppThread->Post(this);
}
int64 XmppPump::CurrentTime()
{
return (int64)talk_base::Time();
}
至此,相关的类都已经实现完,下面我们调用相关的代码:
       XmppPump *m_pXmppPump = new XmppPump(NULL);
        cricket::SessionManagerTask * receiver = NULL;
        receiver = new cricket::SessionManagerTask(m_pXmppPump->client(), session_manager); //deleted by TaskRunner
receiver->EnableOutgoingMessages();
receiver->Start();
        receiver在启动时候,会调用XmppPump::WakeTasks()接口,因此TaskRunner任务管理会运行起来。
        在TaskRunner运行过程中会执行每个任务ProcessStart过程,在SessionManagerTask中会执行SessionManagerTask:: ProcessStart,负责处里跟两端交互session消息的数据包。
       一端信号发送,通过如下流程发送给对方:
        SessionManagerTask::OnOutgoingMessage 
-> SessionSendTask::ProcessStart() 
->  XmppClient::SendStanza 
-> eyouim_XmppEngineImpl::SendStanza 
-> XmppOutputHandler ::WriteOut
      一端信号接收,接收流程如下:
        XmppClient:: HandleInput -> eyouim_XmppEngineImpl::HandleInput -> SessionManagerTask:: ProcessStart或是SessionSendTask::ProcessResponse -> SessionManagerTask::OnIncomingMessage或是 SessionManagerTask ::OnIncomingResponse
    其中SessionManagerTask:: ProcessStar处理跟Session有关的信号,SessionSendTask::ProcessResponse处理对方发过来的Ack
    数据为何会驱动到这两个接口?原因是因为在创建SessionManagerTask和SessionSendTask实例的时候,会将他们的指针通过XmppClient:: AddXmppTask添加到eyouim_XmppEngineImpl容器中。而在接收到数据的时候会通过eyouim_XmppEngineImpl::HandleInput接口处理数据,处理完数据后驱动SessionManagerTask或SessionSendTask的HandleStanza处理过程运行,而HandleStanza过程将继续驱动SessionManagerTask:: ProcessSta或SessionSendTask::ProcessResponse的运行。
    因此所有的信号交互都在SessionManagerTask中处理,如果一切顺利成功,P2P将会建立成功。
    祝你好运!!!
    

转载于:https://my.oschina.net/shaxunyeman/blog/115320

你可能感兴趣的文章
Android逆向工程 实践篇
查看>>
[LeetCode] Pacific Atlantic Water Flow
查看>>
adb 常用命令
查看>>
[vscode]快速更新package.json里的依赖版本
查看>>
Windows中MongoDB之简单安装(1)
查看>>
搭建Hexo博客进阶篇---主题自定义(三)
查看>>
【Mysql中间件】Mycat安装部署+读写分离
查看>>
这3家在线旅行公司是如何通过转化优化提高订单量的
查看>>
RocketMq使用过程的那些小事
查看>>
Autodesk Forge 学习简谈 - 4
查看>>
OWNER支持配置文件目录的继承
查看>>
Walls and Gates
查看>>
JavaScript 继承的那些事
查看>>
Scala中的函数式特性
查看>>
脱离“体验”和“安全”谈盈利的游戏运营 都是耍流氓
查看>>
试水区块链出版?纽约时报在招人了
查看>>
拥抱PostgreSQL,红帽再表态:SSPL的MongoDB坚决不用
查看>>
让架构更简单,QCon上海2016热点前瞻
查看>>
如何测试ASP.NET Core Web API
查看>>
SQL Server新一轮更新
查看>>