Merge "LocIpc sender recv and recver send"
This commit is contained in:
commit
f14b5413a7
3 changed files with 55 additions and 53 deletions
|
@ -65,7 +65,8 @@ public:
|
|||
inline XtraIpcListener(IOsObserver* observer, const MsgTask* msgTask,
|
||||
XtraSystemStatusObserver& xsso) :
|
||||
mSystemStatusObsrvr(observer), mMsgTask(msgTask), mXSSO(xsso) {}
|
||||
virtual void onReceive(const char* data, uint32_t length) override {
|
||||
virtual void onReceive(const char* data, uint32_t length,
|
||||
const LocIpcRecver* recver) override {
|
||||
#define STRNCMP(str, constStr) strncmp(str, constStr, sizeof(constStr)-1)
|
||||
if (!STRNCMP(data, "ping")) {
|
||||
LOC_LOGd("ping received");
|
||||
|
|
|
@ -66,18 +66,18 @@ ssize_t Sock::send(const void *buf, uint32_t len, int flags, const struct sockad
|
|||
SOCK_OP_AND_LOG(buf, len, isValid(), rtv, sendto(buf, len, flags, destAddr, addrlen));
|
||||
return rtv;
|
||||
}
|
||||
ssize_t Sock::recv(const shared_ptr<ILocIpcListener>& dataCb, int flags, struct sockaddr *srcAddr,
|
||||
socklen_t *addrlen, int sid) const {
|
||||
ssize_t Sock::recv(const LocIpcRecver& recver, const shared_ptr<ILocIpcListener>& dataCb, int flags,
|
||||
struct sockaddr *srcAddr, socklen_t *addrlen, int sid) const {
|
||||
ssize_t rtv = -1;
|
||||
if (-1 == sid) {
|
||||
sid = mSid;
|
||||
} // else it sid would be connection based socket id for recv
|
||||
SOCK_OP_AND_LOG(dataCb.get(), mMaxTxSize, isValid(), rtv,
|
||||
recvfrom(dataCb, sid, flags, srcAddr, addrlen));
|
||||
recvfrom(recver, dataCb, sid, flags, srcAddr, addrlen));
|
||||
return rtv;
|
||||
}
|
||||
ssize_t Sock::sendto(const void *buf, size_t len, int flags, const struct sockaddr *destAddr,
|
||||
socklen_t addrlen) const {
|
||||
socklen_t addrlen) const {
|
||||
ssize_t rtv = -1;
|
||||
if (len <= mMaxTxSize) {
|
||||
rtv = ::sendto(mSid, buf, len, flags, destAddr, addrlen);
|
||||
|
@ -94,19 +94,18 @@ ssize_t Sock::sendto(const void *buf, size_t len, int flags, const struct sockad
|
|||
}
|
||||
return rtv;
|
||||
}
|
||||
ssize_t Sock::recvfrom(const shared_ptr<ILocIpcListener>& dataCb, int sid, int flags,
|
||||
struct sockaddr *srcAddr, socklen_t *addrlen) const {
|
||||
ssize_t nBytes = -1;
|
||||
ssize_t Sock::recvfrom(const LocIpcRecver& recver, const shared_ptr<ILocIpcListener>& dataCb,
|
||||
int sid, int flags, struct sockaddr *srcAddr, socklen_t *addrlen) const {
|
||||
std::string msg(mMaxTxSize, 0);
|
||||
|
||||
if ((nBytes = ::recvfrom(sid, (void*)msg.data(), msg.size(), flags, srcAddr, addrlen)) > 0) {
|
||||
ssize_t nBytes = ::recvfrom(sid, (void*)msg.data(), msg.size(), flags, srcAddr, addrlen);
|
||||
if (nBytes > 0) {
|
||||
if (strncmp(msg.data(), MSG_ABORT, sizeof(MSG_ABORT)) == 0) {
|
||||
LOC_LOGi("recvd abort msg.data %s", msg.data());
|
||||
nBytes = 0;
|
||||
} else if (strncmp(msg.data(), LOC_IPC_HEAD, sizeof(LOC_IPC_HEAD) - 1)) {
|
||||
// short message
|
||||
msg.resize(nBytes);
|
||||
dataCb->onReceive(msg.data(), nBytes);
|
||||
dataCb->onReceive(msg.data(), nBytes, &recver);
|
||||
} else {
|
||||
// long message
|
||||
size_t msgLen = 0;
|
||||
|
@ -119,7 +118,7 @@ ssize_t Sock::recvfrom(const shared_ptr<ILocIpcListener>& dataCb, int sid, int f
|
|||
}
|
||||
if (nBytes > 0) {
|
||||
nBytes = msgLen;
|
||||
dataCb->onReceive(msg.data(), nBytes);
|
||||
dataCb->onReceive(msg.data(), nBytes, &recver);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -152,7 +151,7 @@ class LocIpcLocalRecver : public LocIpcLocalSender, public LocIpcRecver {
|
|||
protected:
|
||||
inline virtual ssize_t recv() const override {
|
||||
socklen_t size = sizeof(mAddr);
|
||||
return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size);
|
||||
return mSock->recv(*this, mDataCb, 0, (struct sockaddr*)&mAddr, &size);
|
||||
}
|
||||
public:
|
||||
inline LocIpcLocalRecver(const shared_ptr<ILocIpcListener>& listener, const char* name) :
|
||||
|
@ -189,6 +188,10 @@ protected:
|
|||
return mSock->send(data, length, 0, (struct sockaddr*)&mAddr, sizeof(mAddr));
|
||||
}
|
||||
public:
|
||||
inline LocIpcInetSender(const LocIpcInetSender& sender) :
|
||||
mSockType(sender.mSockType), mSock(sender.mSock),
|
||||
mName(sender.mName), mAddr(sender.mAddr) {
|
||||
}
|
||||
inline LocIpcInetSender(const char* name, int32_t port, int sockType) : LocIpcSender(),
|
||||
mSockType(sockType),
|
||||
mSock(make_shared<Sock>((nullptr == name) ? -1 : (::socket(AF_INET, mSockType, 0)))),
|
||||
|
@ -202,6 +205,10 @@ public:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
unique_ptr<LocIpcRecver> getRecver(const shared_ptr<ILocIpcListener>& listener) override {
|
||||
return make_unique<SockRecver>(listener, *this, mSock);
|
||||
}
|
||||
};
|
||||
|
||||
class LocIpcInetTcpSender : public LocIpcInetSender {
|
||||
|
@ -245,7 +252,9 @@ public:
|
|||
mSock->sendAbort(0, (struct sockaddr*)&loopBackAddr, sizeof(loopBackAddr));
|
||||
}
|
||||
}
|
||||
|
||||
inline virtual unique_ptr<LocIpcSender> getLastSender() const override {
|
||||
return make_unique<LocIpcInetSender>(static_cast<const LocIpcInetSender&>(*this));
|
||||
}
|
||||
};
|
||||
|
||||
class LocIpcInetTcpRecver : public LocIpcInetRecver {
|
||||
|
@ -260,7 +269,7 @@ protected:
|
|||
mConnFd = -1;
|
||||
}
|
||||
}
|
||||
return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size, mConnFd);
|
||||
return mSock->recv(*this, mDataCb, 0, (struct sockaddr*)&mAddr, &size, mConnFd);
|
||||
}
|
||||
public:
|
||||
inline LocIpcInetTcpRecver(const shared_ptr<ILocIpcListener>& listener, const char* name,
|
||||
|
@ -273,7 +282,7 @@ class LocIpcInetUdpRecver : public LocIpcInetRecver {
|
|||
protected:
|
||||
inline virtual ssize_t recv() const override {
|
||||
socklen_t size = sizeof(mAddr);
|
||||
return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size);
|
||||
return mSock->recv(*this, mDataCb, 0, (struct sockaddr*)&mAddr, &size);
|
||||
}
|
||||
public:
|
||||
inline LocIpcInetUdpRecver(const shared_ptr<ILocIpcListener>& listener, const char* name,
|
||||
|
@ -283,37 +292,6 @@ public:
|
|||
inline virtual ~LocIpcInetUdpRecver() {}
|
||||
};
|
||||
|
||||
|
||||
|
||||
#ifdef NOT_DEFINED
|
||||
class LocIpcQcsiSender : public LocIpcSender {
|
||||
protected:
|
||||
inline virtual bool isOperable() const override {
|
||||
return mService != nullptr && mService->isServiceRegistered();
|
||||
}
|
||||
inline virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t msgId) const override {
|
||||
return mService->sendIndToClient(msgId, data, length);
|
||||
}
|
||||
inline LocIpcQcsiSender(shared_ptr<QcsiService>& service) : mService(service) {}
|
||||
public:
|
||||
inline virtual ~LocIpcQcsi() {}
|
||||
};
|
||||
|
||||
class LocIpcQcsiRecver : public LocIpcQcsiSender, public LocIpcRecver {
|
||||
protected:
|
||||
inline virtual ssize_t recv() const override { return mService->recv(); }
|
||||
public:
|
||||
inline LocIpcQcsiRecver(unique_ptr<QcsiService>& service) :
|
||||
LocIpcQcsiSender(service), LocIpcRecver(mService->getDataCallback(), *this) {
|
||||
}
|
||||
// only the dele
|
||||
inline ~LocIpcQcsiRecver() {}
|
||||
inline virtual const char* getName() const override { return mService->getName().data(); };
|
||||
inline virtual void abort() const override { if (isSendable()) mService->abort(); }
|
||||
shared_ptr<LocIpcQcsiSender> getSender() { return make_pare<LocIpcQcsiSender>(mService); }
|
||||
};
|
||||
#endif
|
||||
|
||||
class LocIpcRunnable : public LocRunnable {
|
||||
bool mAbortCalled;
|
||||
LocIpc& mLocIpc;
|
||||
|
|
|
@ -53,7 +53,7 @@ public:
|
|||
// LocIpc client can overwrite this function to get notification
|
||||
// when the socket for LocIpc is ready to receive messages.
|
||||
inline virtual void onListenerReady() {}
|
||||
virtual void onReceive(const char* data, uint32_t length)= 0;
|
||||
virtual void onReceive(const char* data, uint32_t len, const LocIpcRecver* recver) = 0;
|
||||
};
|
||||
|
||||
|
||||
|
@ -123,15 +123,18 @@ private:
|
|||
class LocIpcSender {
|
||||
protected:
|
||||
LocIpcSender() = default;
|
||||
virtual ~LocIpcSender() = default;
|
||||
virtual bool isOperable() const = 0;
|
||||
virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t msgId) const = 0;
|
||||
public:
|
||||
virtual ~LocIpcSender() = default;
|
||||
virtual void informRecverRestarted() {}
|
||||
inline bool isSendable() const { return isOperable(); }
|
||||
inline bool sendData(const uint8_t data[], uint32_t length, int32_t msgId) const {
|
||||
return isSendable() && (send(data, length, msgId) > 0);
|
||||
}
|
||||
virtual unique_ptr<LocIpcRecver> getRecver(const shared_ptr<ILocIpcListener>& listener) {
|
||||
return nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
class LocIpcRecver {
|
||||
|
@ -148,6 +151,9 @@ public:
|
|||
inline bool recvData() const { return isRecvable() && (recv() > 0); }
|
||||
inline bool isRecvable() const { return mDataCb != nullptr && mIpcSender.isSendable(); }
|
||||
virtual void onListenerReady() { if (mDataCb != nullptr) mDataCb->onListenerReady(); }
|
||||
inline virtual unique_ptr<LocIpcSender> getLastSender() const {
|
||||
return nullptr;
|
||||
}
|
||||
virtual void abort() const = 0;
|
||||
virtual const char* getName() const = 0;
|
||||
};
|
||||
|
@ -158,8 +164,8 @@ class Sock {
|
|||
const uint32_t mMaxTxSize;
|
||||
ssize_t sendto(const void *buf, size_t len, int flags, const struct sockaddr *destAddr,
|
||||
socklen_t addrlen) const;
|
||||
ssize_t recvfrom(const shared_ptr<ILocIpcListener>& dataCb, int sid, int flags,
|
||||
struct sockaddr *srcAddr, socklen_t *addrlen) const;
|
||||
ssize_t recvfrom(const LocIpcRecver& recver, const shared_ptr<ILocIpcListener>& dataCb,
|
||||
int sid, int flags, struct sockaddr *srcAddr, socklen_t *addrlen) const;
|
||||
public:
|
||||
int mSid;
|
||||
inline Sock(int sid, const uint32_t maxTxSize = 8192) : mMaxTxSize(maxTxSize), mSid(sid) {}
|
||||
|
@ -167,8 +173,8 @@ public:
|
|||
inline bool isValid() const { return -1 != mSid; }
|
||||
ssize_t send(const void *buf, uint32_t len, int flags, const struct sockaddr *destAddr,
|
||||
socklen_t addrlen) const;
|
||||
ssize_t recv(const shared_ptr<ILocIpcListener>& dataCb, int flags, struct sockaddr *srcAddr,
|
||||
socklen_t *addrlen, int sid = -1) const;
|
||||
ssize_t recv(const LocIpcRecver& recver, const shared_ptr<ILocIpcListener>& dataCb, int flags,
|
||||
struct sockaddr *srcAddr, socklen_t *addrlen, int sid = -1) const;
|
||||
ssize_t sendAbort(int flags, const struct sockaddr *destAddr, socklen_t addrlen);
|
||||
inline void close() {
|
||||
if (isValid()) {
|
||||
|
@ -178,6 +184,23 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
class SockRecver : public LocIpcRecver {
|
||||
shared_ptr<Sock> mSock;
|
||||
protected:
|
||||
inline virtual ssize_t recv() const override {
|
||||
return mSock->recv(*this, mDataCb, 0, nullptr, nullptr);
|
||||
}
|
||||
public:
|
||||
inline SockRecver(const shared_ptr<ILocIpcListener>& listener,
|
||||
LocIpcSender& sender, shared_ptr<Sock> sock) :
|
||||
LocIpcRecver(listener, sender), mSock(sock) {
|
||||
}
|
||||
inline virtual const char* getName() const override {
|
||||
return "SockRecver";
|
||||
}
|
||||
inline virtual void abort() const override {}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif //__LOC_IPC__
|
||||
|
|
Loading…
Reference in a new issue