From c891e738f547e1ae9c4fa246e574a47c7c83e496 Mon Sep 17 00:00:00 2001 From: Kevin Tang Date: Wed, 28 Aug 2019 18:18:21 -0700 Subject: [PATCH] LocIpc sender recv and recver send Added two APIs, which allow ILocIpcListener to provide a sender for sending data back to LocIpcSender; and to allow a LocIpcSender to get a recver from the last send target so that it can receive data from it. Change-Id: I94e2204a2588d375805d0674d1e877bbe414c99f CRs-Fixed: 2524039 --- gnss/XtraSystemStatusObserver.cpp | 3 +- utils/LocIpc.cpp | 70 +++++++++++-------------------- utils/LocIpc.h | 35 +++++++++++++--- 3 files changed, 55 insertions(+), 53 deletions(-) diff --git a/gnss/XtraSystemStatusObserver.cpp b/gnss/XtraSystemStatusObserver.cpp index 1671c76d..a58f7357 100644 --- a/gnss/XtraSystemStatusObserver.cpp +++ b/gnss/XtraSystemStatusObserver.cpp @@ -65,7 +65,8 @@ public: inline XtraIpcListener(IOsObserver* observer, const MsgTask* msgTask, XtraSystemStatusObserver& xsso) : mSystemStatusObsrvr(observer), mMsgTask(msgTask), mXSSO(xsso) {} - virtual void onReceive(const char* data, uint32_t length) override { + virtual void onReceive(const char* data, uint32_t length, + const LocIpcRecver* recver) override { #define STRNCMP(str, constStr) strncmp(str, constStr, sizeof(constStr)-1) if (!STRNCMP(data, "ping")) { LOC_LOGd("ping received"); diff --git a/utils/LocIpc.cpp b/utils/LocIpc.cpp index f4dd0b40..e9dbe9da 100644 --- a/utils/LocIpc.cpp +++ b/utils/LocIpc.cpp @@ -66,18 +66,18 @@ ssize_t Sock::send(const void *buf, uint32_t len, int flags, const struct sockad SOCK_OP_AND_LOG(buf, len, isValid(), rtv, sendto(buf, len, flags, destAddr, addrlen)); return rtv; } -ssize_t Sock::recv(const shared_ptr& dataCb, int flags, struct sockaddr *srcAddr, - socklen_t *addrlen, int sid) const { +ssize_t Sock::recv(const LocIpcRecver& recver, const shared_ptr& dataCb, int flags, + struct sockaddr *srcAddr, socklen_t *addrlen, int sid) const { ssize_t rtv = -1; if (-1 == sid) { sid = mSid; } // else it sid would be connection based socket id for recv SOCK_OP_AND_LOG(dataCb.get(), mMaxTxSize, isValid(), rtv, - recvfrom(dataCb, sid, flags, srcAddr, addrlen)); + recvfrom(recver, dataCb, sid, flags, srcAddr, addrlen)); return rtv; } ssize_t Sock::sendto(const void *buf, size_t len, int flags, const struct sockaddr *destAddr, - socklen_t addrlen) const { + socklen_t addrlen) const { ssize_t rtv = -1; if (len <= mMaxTxSize) { rtv = ::sendto(mSid, buf, len, flags, destAddr, addrlen); @@ -94,19 +94,18 @@ ssize_t Sock::sendto(const void *buf, size_t len, int flags, const struct sockad } return rtv; } -ssize_t Sock::recvfrom(const shared_ptr& dataCb, int sid, int flags, - struct sockaddr *srcAddr, socklen_t *addrlen) const { - ssize_t nBytes = -1; +ssize_t Sock::recvfrom(const LocIpcRecver& recver, const shared_ptr& dataCb, + int sid, int flags, struct sockaddr *srcAddr, socklen_t *addrlen) const { std::string msg(mMaxTxSize, 0); - - if ((nBytes = ::recvfrom(sid, (void*)msg.data(), msg.size(), flags, srcAddr, addrlen)) > 0) { + ssize_t nBytes = ::recvfrom(sid, (void*)msg.data(), msg.size(), flags, srcAddr, addrlen); + if (nBytes > 0) { if (strncmp(msg.data(), MSG_ABORT, sizeof(MSG_ABORT)) == 0) { LOC_LOGi("recvd abort msg.data %s", msg.data()); nBytes = 0; } else if (strncmp(msg.data(), LOC_IPC_HEAD, sizeof(LOC_IPC_HEAD) - 1)) { // short message msg.resize(nBytes); - dataCb->onReceive(msg.data(), nBytes); + dataCb->onReceive(msg.data(), nBytes, &recver); } else { // long message size_t msgLen = 0; @@ -119,7 +118,7 @@ ssize_t Sock::recvfrom(const shared_ptr& dataCb, int sid, int f } if (nBytes > 0) { nBytes = msgLen; - dataCb->onReceive(msg.data(), nBytes); + dataCb->onReceive(msg.data(), nBytes, &recver); } } } @@ -152,7 +151,7 @@ class LocIpcLocalRecver : public LocIpcLocalSender, public LocIpcRecver { protected: inline virtual ssize_t recv() const override { socklen_t size = sizeof(mAddr); - return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size); + return mSock->recv(*this, mDataCb, 0, (struct sockaddr*)&mAddr, &size); } public: inline LocIpcLocalRecver(const shared_ptr& listener, const char* name) : @@ -189,6 +188,10 @@ protected: return mSock->send(data, length, 0, (struct sockaddr*)&mAddr, sizeof(mAddr)); } public: + inline LocIpcInetSender(const LocIpcInetSender& sender) : + mSockType(sender.mSockType), mSock(sender.mSock), + mName(sender.mName), mAddr(sender.mAddr) { + } inline LocIpcInetSender(const char* name, int32_t port, int sockType) : LocIpcSender(), mSockType(sockType), mSock(make_shared((nullptr == name) ? -1 : (::socket(AF_INET, mSockType, 0)))), @@ -202,6 +205,10 @@ public: } } } + + unique_ptr getRecver(const shared_ptr& listener) override { + return make_unique(listener, *this, mSock); + } }; class LocIpcInetTcpSender : public LocIpcInetSender { @@ -245,7 +252,9 @@ public: mSock->sendAbort(0, (struct sockaddr*)&loopBackAddr, sizeof(loopBackAddr)); } } - + inline virtual unique_ptr getLastSender() const override { + return make_unique(static_cast(*this)); + } }; class LocIpcInetTcpRecver : public LocIpcInetRecver { @@ -260,7 +269,7 @@ protected: mConnFd = -1; } } - return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size, mConnFd); + return mSock->recv(*this, mDataCb, 0, (struct sockaddr*)&mAddr, &size, mConnFd); } public: inline LocIpcInetTcpRecver(const shared_ptr& listener, const char* name, @@ -273,7 +282,7 @@ class LocIpcInetUdpRecver : public LocIpcInetRecver { protected: inline virtual ssize_t recv() const override { socklen_t size = sizeof(mAddr); - return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size); + return mSock->recv(*this, mDataCb, 0, (struct sockaddr*)&mAddr, &size); } public: inline LocIpcInetUdpRecver(const shared_ptr& listener, const char* name, @@ -283,37 +292,6 @@ public: inline virtual ~LocIpcInetUdpRecver() {} }; - - -#ifdef NOT_DEFINED -class LocIpcQcsiSender : public LocIpcSender { -protected: - inline virtual bool isOperable() const override { - return mService != nullptr && mService->isServiceRegistered(); - } - inline virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t msgId) const override { - return mService->sendIndToClient(msgId, data, length); - } - inline LocIpcQcsiSender(shared_ptr& service) : mService(service) {} -public: - inline virtual ~LocIpcQcsi() {} -}; - -class LocIpcQcsiRecver : public LocIpcQcsiSender, public LocIpcRecver { -protected: - inline virtual ssize_t recv() const override { return mService->recv(); } -public: - inline LocIpcQcsiRecver(unique_ptr& service) : - LocIpcQcsiSender(service), LocIpcRecver(mService->getDataCallback(), *this) { - } - // only the dele - inline ~LocIpcQcsiRecver() {} - inline virtual const char* getName() const override { return mService->getName().data(); }; - inline virtual void abort() const override { if (isSendable()) mService->abort(); } - shared_ptr getSender() { return make_pare(mService); } -}; -#endif - class LocIpcRunnable : public LocRunnable { bool mAbortCalled; LocIpc& mLocIpc; diff --git a/utils/LocIpc.h b/utils/LocIpc.h index af4c2c3a..d6f8d1d8 100644 --- a/utils/LocIpc.h +++ b/utils/LocIpc.h @@ -53,7 +53,7 @@ public: // LocIpc client can overwrite this function to get notification // when the socket for LocIpc is ready to receive messages. inline virtual void onListenerReady() {} - virtual void onReceive(const char* data, uint32_t length)= 0; + virtual void onReceive(const char* data, uint32_t len, const LocIpcRecver* recver) = 0; }; @@ -123,15 +123,18 @@ private: class LocIpcSender { protected: LocIpcSender() = default; - virtual ~LocIpcSender() = default; virtual bool isOperable() const = 0; virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t msgId) const = 0; public: + virtual ~LocIpcSender() = default; virtual void informRecverRestarted() {} inline bool isSendable() const { return isOperable(); } inline bool sendData(const uint8_t data[], uint32_t length, int32_t msgId) const { return isSendable() && (send(data, length, msgId) > 0); } + virtual unique_ptr getRecver(const shared_ptr& listener) { + return nullptr; + } }; class LocIpcRecver { @@ -148,6 +151,9 @@ public: inline bool recvData() const { return isRecvable() && (recv() > 0); } inline bool isRecvable() const { return mDataCb != nullptr && mIpcSender.isSendable(); } virtual void onListenerReady() { if (mDataCb != nullptr) mDataCb->onListenerReady(); } + inline virtual unique_ptr getLastSender() const { + return nullptr; + } virtual void abort() const = 0; virtual const char* getName() const = 0; }; @@ -158,8 +164,8 @@ class Sock { const uint32_t mMaxTxSize; ssize_t sendto(const void *buf, size_t len, int flags, const struct sockaddr *destAddr, socklen_t addrlen) const; - ssize_t recvfrom(const shared_ptr& dataCb, int sid, int flags, - struct sockaddr *srcAddr, socklen_t *addrlen) const; + ssize_t recvfrom(const LocIpcRecver& recver, const shared_ptr& dataCb, + int sid, int flags, struct sockaddr *srcAddr, socklen_t *addrlen) const; public: int mSid; inline Sock(int sid, const uint32_t maxTxSize = 8192) : mMaxTxSize(maxTxSize), mSid(sid) {} @@ -167,8 +173,8 @@ public: inline bool isValid() const { return -1 != mSid; } ssize_t send(const void *buf, uint32_t len, int flags, const struct sockaddr *destAddr, socklen_t addrlen) const; - ssize_t recv(const shared_ptr& dataCb, int flags, struct sockaddr *srcAddr, - socklen_t *addrlen, int sid = -1) const; + ssize_t recv(const LocIpcRecver& recver, const shared_ptr& dataCb, int flags, + struct sockaddr *srcAddr, socklen_t *addrlen, int sid = -1) const; ssize_t sendAbort(int flags, const struct sockaddr *destAddr, socklen_t addrlen); inline void close() { if (isValid()) { @@ -178,6 +184,23 @@ public: } }; +class SockRecver : public LocIpcRecver { + shared_ptr mSock; +protected: + inline virtual ssize_t recv() const override { + return mSock->recv(*this, mDataCb, 0, nullptr, nullptr); + } +public: + inline SockRecver(const shared_ptr& listener, + LocIpcSender& sender, shared_ptr sock) : + LocIpcRecver(listener, sender), mSock(sock) { + } + inline virtual const char* getName() const override { + return "SockRecver"; + } + inline virtual void abort() const override {} +}; + } #endif //__LOC_IPC__