在构造SessionManagerTask对象时,我们必须要实现XmppTaskParentInterface特例。以下为XmppTaskParentInterface抽象接口说明:
class XmppTaskParentInterface : public talk_base::Task {
public:explicit XmppTaskParentInterface(talk_base::TaskParent* parent): Task(parent) {}virtual ~XmppTaskParentInterface() {}virtual XmppClientInterface* GetClient() = 0;DISALLOW_EVIL_CONSTRUCTORS(XmppTaskParentInterface);};这个类之提供了XmppClientInterface* GetClient()接口。以下为XmppClientInterface抽象接口说明:class XmppClientInterface {public:XmppClientInterface();virtual ~XmppClientInterface();virtual XmppEngine::State GetState() const = 0;virtual const Jid& jid() const = 0;virtual std::string NextId() = 0;virtual XmppReturnStatus SendStanza(const XmlElement* stanza) = 0;virtual XmppReturnStatus SendStanzaError(const XmlElement* original_stanza,XmppStanzaError error_code,const std::string& message) = 0;virtual void AddXmppTask(XmppTask* task, XmppEngine::HandlerLevel level) = 0;virtual void RemoveXmppTask(XmppTask* task) = 0;sigslot::signal0<> SignalDisconnected;DISALLOW_EVIL_CONSTRUCTORS(XmppClientInterface);};其中AddXmppTask是在SessionManagerTask构造的时候会将自身的指针会通过此接口的实现保存在一个容器中,当socket收到消息的时候会通过容器中的指针将消息返回给SessionManagerTask处理。这个在后面会有说明。笔者重构了XmppEngine的实现,即libjingle实现的XmppEngineImpl的特例,为了适应笔者自己的项目,我实现了自己特例,以下为我的部分代码和相关代码说明:设置输出处理器,这样的好处是应用层可以定义具体的实现XmppReturnStatus eyouim_XmppEngineImpl::SetOutputHandler(XmppOutputHandler *pxoh){m_pXmppOutputHandler = pxoh;return XMPP_RETURN_OK;}数据发送,通过XmppOutputHandler :: WriteOutput接口,此接口的具体实现由应用层实现XmppReturnStatus eyouim_XmppEngineImpl::SendStanza(const XmlElement * pelStanza)
添加数据处理器,SessionManagerTask在构造的时候会将指针通过此接口保存在 stanza_handlers_ 中{if (NULL == m_pXmppOutputHandler)return XMPP_RETURN_BADARGUMENT;m_pXmppOutputHandler->WriteOutput(pelStanza->Str().c_str(),pelStanza->Str().size());return XMPP_RETURN_OK;}
XmppReturnStatus eyouim_XmppEngineImpl::AddStanzaHandler(XmppStanzaHandler* handler, HandlerLevel level /*= HL_PEEK*/){stanza_handlers_[level]->push_back(handler);return XMPP_RETURN_OK;}处理socket接收到的数据,会遍历stanza_handlers_ 容器中的处理器,如果匹配就将数据给相关的处理处理
XmppReturnStatus eyouim_XmppEngineImpl::HandleInput(const char * bytes, size_t len){buzz::XmlBuilder builder;buzz::XmlParser::ParseXml(&builder, bytes);buzz::XmlElement *stanza = builder.BuiltElement();for (size_t i = 0; i < stanza_handlers_[HL_PEEK]->size(); i += 1) {(*stanza_handlers_[HL_PEEK])[i]->HandleStanza(stanza);}// give other handlers a shot in precedence order, stopping after handledfor (int level = HL_SINGLE; level <= HL_ALL; level += 1) {for (size_t i = 0; i < stanza_handlers_[level]->size(); i += 1) {if ((*stanza_handlers_[level])[i]->HandleStanza(stanza))return XMPP_RETURN_OK;}}// If nobody wants to handle a stanza then send back an error.// Only do this for IQ stanzas as messages should probably just be dropped// and presence stanzas should certainly be dropped.std::string type = stanza->Attr(QN_TYPE);if (stanza->Name() == QN_IQ &&!(type == "error" || type == "result")) {SendStanzaError(stanza, XSE_FEATURE_NOT_IMPLEMENTED, STR_EMPTY);return XMPP_RETURN_UNEXPECTED;}return XMPP_RETURN_OK;}移除相关的处理器XmppReturnStatus eyouim_XmppEngineImpl::RemoveStanzaHandler(XmppStanzaHandler* handler){bool found = false;for (int level = 0; level < HL_COUNT; level += 1) {StanzaHandlerVector::iterator new_end =std::remove(stanza_handlers_[level]->begin(),stanza_handlers_[level]->end(),handler);if (new_end != stanza_handlers_[level]->end()) {stanza_handlers_[level]->erase(new_end, stanza_handlers_[level]->end());found = true;}}if (!found)return XMPP_RETURN_BADARGUMENT;return XMPP_RETURN_OK;}
笔者同样也实现了 XmppTaskParentInterface 和 XmppClientInterface 特例,以下为部分代码和说明:
XmppClient.h头文件
class XmppClient : public buzz::XmppTaskParentInterface
, public buzz::XmppClientInterface
, public sigslot::has_slots<>
{
public:
XmppClient(talk_base::TaskParent * parent);
~XmppClient();
//设置输出处理器,这样的好处是应用层可以定义具体的实现
void set_XmppOutputHandler(buzz::XmppOutputHandler *pXmppOutputHandler)
{
m_engine->SetOutputHandler(pXmppOutputHandler);
}
//处理socket接收到的数据
XmppReturnStatus HandleInput(const char * bytes, size_t len)
{
return m_engine->HandleInput(bytes,len);
}
XmppReturnStatus Disconnect();
//XmppTaskParentInterface
virtual XmppClientInterface* GetClient();
//XmppClientInterface
virtual buzz::XmppEngine::State GetState() const;
virtual const buzz::Jid& jid() const;
virtual std::string NextId();
virtual XmppReturnStatus SendStanza(const buzz::XmlElement* stanza);
virtual XmppReturnStatus SendStanzaError(const buzz::XmlElement* original_stanza,
XmppStanzaError error_code,
const std::string& message);
virtual void AddXmppTask(buzz::XmppTask* task, buzz::XmppEngine::HandlerLevel level);
virtual void RemoveXmppTask(buzz::XmppTask* task);
//task
virtual int ProcessStart();
protected:
private:
talk_base::scoped_ptr<XmppEngine> m_engine;
};
XmppClient.cpp实现
XmppClient::XmppClient(talk_base::TaskParent * parent)
:XmppTaskParentInterface(parent)
{
m_engine.reset(buzz::XmppEngine::Create()); //这里实际上创建的是eyouim_XmppEngineImpl
}
XmppClient::~XmppClient()
{
}
XmppClientInterface* XmppClient::GetClient()
{
return this;
}
XmppReturnStatus XmppClient::Disconnect()
{
Abort();
return XMPP_RETURN_OK;
}
//XmppClientInterface
XmppEngine::State XmppClient::GetState() const
{
return m_engine->GetState();
}
const buzz::Jid& XmppClient::jid() const
{
return m_engine->FullJid();
}
std::string XmppClient::NextId()
{
return m_engine->NextId();
}
XmppReturnStatus XmppClient::SendStanza(const buzz::XmlElement* stanza)
{
return m_engine->SendStanza(stanza);
}
XmppReturnStatus XmppClient::SendStanzaError(const buzz::XmlElement* original_stanza,
XmppStanzaError error_code,
const std::string& message)
{
return m_engine->SendStanzaError(original_stanza,error_code,message);
}
添加数据处理器
void XmppClient::AddXmppTask(buzz::XmppTask* task, buzz::XmppEngine::HandlerLevel level)
{
m_engine->AddStanzaHandler(task,level);
}
void XmppClient::RemoveXmppTask(buzz::XmppTask* task)
{
m_engine->RemoveStanzaHandler(task);
}
int XmppClient::ProcessStart()
{
LOG(LS_INFO) << __FUNCTION__;
return 0;
}
在构造XmppClient对象的时候我们需要一个任务执行管理对象,以下为笔者定义的:
XmppPump.h
class XmppPump : public talk_base::MessageHandler,public talk_base::TaskRunner
{
public:
XmppPump(talk_base::Thread *pXmppThread = NULL);
~XmppPump();
eyouim::XmppClient *client()
{
return m_pXmppClient;
}
//MessageHandeler
virtual void OnMessage(talk_base::Message* msg);
//TaskRunner
virtual void WakeTasks();
virtual int64 CurrentTime();
protected:
private:
eyouim::XmppClient *m_pXmppClient;
talk_base::Thread *m_pXmppThread;
};
XmppPump.cpp
XmppPump::XmppPump(talk_base::Thread *pXmppThread /*= NULL*/)
:m_pXmppClient(new eyouim::XmppClient(this)) NOTE: deleted by TaskRunner
,m_pXmppThread((pXmppThread != NULL) ? pXmppThread : talk_base::Thread::Current())
{
}
XmppPump::~XmppPump()
{
if (!AllChildrenDone())
m_pXmppClient->Disconnect();
}
void XmppPump::OnMessage(talk_base::Message* msg)
{
RunTasks();
}
void XmppPump::WakeTasks()
{
//talk_base::Thread::Current()->Post(this);
m_pXmppThread->Post(this);
}
int64 XmppPump::CurrentTime()
{
return (int64)talk_base::Time();
}
至此,相关的类都已经实现完,下面我们调用相关的代码:
XmppPump *m_pXmppPump = new XmppPump(NULL); cricket::SessionManagerTask * receiver = NULL;
receiver = new cricket::SessionManagerTask(m_pXmppPump->client(), session_manager); //deleted by TaskRunner
receiver->EnableOutgoingMessages();
receiver->Start();
receiver在启动时候,会调用XmppPump::WakeTasks()接口,因此TaskRunner任务管理会运行起来。 在TaskRunner运行过程中会执行每个任务ProcessStart过程,在SessionManagerTask中会执行SessionManagerTask:: ProcessStart,负责处里跟两端交互session消息的数据包。
一端信号发送,通过如下流程发送给对方:
SessionManagerTask::OnOutgoingMessage -> SessionSendTask::ProcessStart() -> XmppClient::SendStanza -> eyouim_XmppEngineImpl::SendStanza -> XmppOutputHandler ::WriteOut
一端信号接收,接收流程如下:
XmppClient:: HandleInput -> eyouim_XmppEngineImpl::HandleInput -> SessionManagerTask:: ProcessStart或是SessionSendTask::ProcessResponse -> SessionManagerTask::OnIncomingMessage或是 SessionManagerTask ::OnIncomingResponse
其中SessionManagerTask:: ProcessStar处理跟Session有关的信号,SessionSendTask::ProcessResponse处理对方发过来的Ack
数据为何会驱动到这两个接口?原因是因为在创建SessionManagerTask和SessionSendTask实例的时候,会将他们的指针通过XmppClient:: AddXmppTask添加到eyouim_XmppEngineImpl容器中。而在接收到数据的时候会通过eyouim_XmppEngineImpl::HandleInput接口处理数据,处理完数据后驱动SessionManagerTask或SessionSendTask的HandleStanza处理过程运行,而HandleStanza过程将继续驱动SessionManagerTask:: ProcessSta或SessionSendTask::ProcessResponse的运行。
因此所有的信号交互都在SessionManagerTask中处理,如果一切顺利成功,P2P将会建立成功。
祝你好运!!!