Merge "LocIpc redesign"

This commit is contained in:
qctecmdr 2019-05-09 07:45:59 -07:00 committed by Gerrit - the friendly Code Review server
commit c08365ad60
6 changed files with 519 additions and 309 deletions

View file

@ -49,6 +49,7 @@
#include <DataItemsFactoryProxy.h>
#include <DataItemConcreteTypesBase.h>
using namespace loc_util;
using namespace loc_core;
#ifdef LOG_TAG
@ -56,6 +57,61 @@ using namespace loc_core;
#endif
#define LOG_TAG "LocSvc_XSSO"
class XtraIpcListener : public ILocIpcListener {
IOsObserver* mSystemStatusObsrvr;
const MsgTask* mMsgTask;
XtraSystemStatusObserver& mXSSO;
public:
inline XtraIpcListener(IOsObserver* observer, const MsgTask* msgTask,
XtraSystemStatusObserver& xsso) :
mSystemStatusObsrvr(observer), mMsgTask(msgTask), mXSSO(xsso) {}
virtual void onReceive(const char* data, uint32_t length) override {
#define STRNCMP(str, constStr) strncmp(str, constStr, sizeof(constStr)-1)
if (!STRNCMP(data, "ping")) {
LOC_LOGd("ping received");
#ifdef USE_GLIB
} else if (!STRNCMP(data, "connectBackhaul")) {
mSystemStatusObsrvr->connectBackhaul();
} else if (!STRNCMP(data, "disconnectBackhaul")) {
mSystemStatusObsrvr->disconnectBackhaul();
#endif
} else if (!STRNCMP(data, "requestStatus")) {
int32_t xtraStatusUpdated = 0;
sscanf(data, "%*s %d", &xtraStatusUpdated);
struct HandleStatusRequestMsg : public LocMsg {
XtraSystemStatusObserver& mXSSO;
int32_t mXtraStatusUpdated;
inline HandleStatusRequestMsg(XtraSystemStatusObserver& xsso,
int32_t xtraStatusUpdated) :
mXSSO(xsso), mXtraStatusUpdated(xtraStatusUpdated) {}
inline void proc() const override {
mXSSO.onStatusRequested(mXtraStatusUpdated);
}
};
mMsgTask->sendMsg(new HandleStatusRequestMsg(mXSSO, xtraStatusUpdated));
} else {
LOC_LOGw("unknown event: %s", data);
}
}
};
XtraSystemStatusObserver::XtraSystemStatusObserver(IOsObserver* sysStatObs,
const MsgTask* msgTask) :
mSystemStatusObsrvr(sysStatObs), mMsgTask(msgTask),
mGpsLock(-1), mConnections(~0), mXtraThrottle(true),
mReqStatusReceived(false),
mIsConnectivityStatusKnown(false),
mSender(LocIpc::getLocIpcLocalSender(LOC_IPC_XTRA)),
mDelayLocTimer(*mSender) {
subscribe(true);
auto recver = LocIpc::getLocIpcLocalRecver(
make_shared<XtraIpcListener>(sysStatObs, msgTask, *this),
LOC_IPC_HAL);
mIpc.startNonBlockingListening(recver);
mDelayLocTimer.start(100 /*.1 sec*/, false);
}
bool XtraSystemStatusObserver::updateLockStatus(GnssConfigGpsLock lock) {
// mask NI(NFW bit) since from XTRA's standpoint GPS is enabled if
// MO(AFW bit) is enabled and disabled when MO is disabled
@ -68,7 +124,8 @@ bool XtraSystemStatusObserver::updateLockStatus(GnssConfigGpsLock lock) {
stringstream ss;
ss << "gpslock";
ss << " " << lock;
return ( send(LOC_IPC_XTRA, ss.str()) );
string s = ss.str();
return ( LocIpc::send(*mSender, (const uint8_t*)s.data(), s.size()) );
}
bool XtraSystemStatusObserver::updateConnections(uint64_t allConnections,
@ -85,8 +142,8 @@ bool XtraSystemStatusObserver::updateConnections(uint64_t allConnections,
stringstream ss;
ss << "connection" << endl << mConnections << endl << wifiNetworkHandle
<< endl << mobileNetworkHandle;
return ( send(LOC_IPC_XTRA, ss.str()) );
string s = ss.str();
return ( LocIpc::send(*mSender, (const uint8_t*)s.data(), s.size()) );
}
bool XtraSystemStatusObserver::updateTac(const string& tac) {
@ -99,7 +156,8 @@ bool XtraSystemStatusObserver::updateTac(const string& tac) {
stringstream ss;
ss << "tac";
ss << " " << tac.c_str();
return ( send(LOC_IPC_XTRA, ss.str()) );
string s = ss.str();
return ( LocIpc::send(*mSender, (const uint8_t*)s.data(), s.size()) );
}
bool XtraSystemStatusObserver::updateMccMnc(const string& mccmnc) {
@ -112,7 +170,8 @@ bool XtraSystemStatusObserver::updateMccMnc(const string& mccmnc) {
stringstream ss;
ss << "mncmcc";
ss << " " << mccmnc.c_str();
return ( send(LOC_IPC_XTRA, ss.str()) );
string s = ss.str();
return ( LocIpc::send(*mSender, (const uint8_t*)s.data(), s.size()) );
}
bool XtraSystemStatusObserver::updateXtraThrottle(const bool enabled) {
@ -125,7 +184,8 @@ bool XtraSystemStatusObserver::updateXtraThrottle(const bool enabled) {
stringstream ss;
ss << "xtrathrottle";
ss << " " << (enabled ? 1 : 0);
return ( send(LOC_IPC_XTRA, ss.str()) );
string s = ss.str();
return ( LocIpc::send(*mSender, (const uint8_t*)s.data(), s.size()) );
}
inline bool XtraSystemStatusObserver::onStatusRequested(int32_t xtraStatusUpdated) {
@ -143,38 +203,8 @@ inline bool XtraSystemStatusObserver::onStatusRequested(int32_t xtraStatusUpdate
<< mWifiNetworkHandle << endl << mMobileNetworkHandle << endl
<< mTac << endl << mMccmnc << endl << mIsConnectivityStatusKnown;
return ( send(LOC_IPC_XTRA, ss.str()) );
}
void XtraSystemStatusObserver::onReceive(const std::string& data) {
if (!strncmp(data.c_str(), "ping", sizeof("ping") - 1)) {
LOC_LOGd("ping received");
#ifdef USE_GLIB
} else if (!strncmp(data.c_str(), "connectBackhaul", sizeof("connectBackhaul") - 1)) {
mSystemStatusObsrvr->connectBackhaul();
} else if (!strncmp(data.c_str(), "disconnectBackhaul", sizeof("disconnectBackhaul") - 1)) {
mSystemStatusObsrvr->disconnectBackhaul();
#endif
} else if (!strncmp(data.c_str(), "requestStatus", sizeof("requestStatus") - 1)) {
int32_t xtraStatusUpdated = 0;
sscanf(data.c_str(), "%*s %d", &xtraStatusUpdated);
struct HandleStatusRequestMsg : public LocMsg {
XtraSystemStatusObserver& mXSSO;
int32_t mXtraStatusUpdated;
inline HandleStatusRequestMsg(XtraSystemStatusObserver& xsso,
int32_t xtraStatusUpdated) :
mXSSO(xsso), mXtraStatusUpdated(xtraStatusUpdated) {}
inline void proc() const override { mXSSO.onStatusRequested(mXtraStatusUpdated); }
};
mMsgTask->sendMsg(new (nothrow) HandleStatusRequestMsg(*this, xtraStatusUpdated));
} else {
LOC_LOGw("unknown event: %s", data.c_str());
}
string s = ss.str();
return ( LocIpc::send(*mSender, (const uint8_t*)s.data(), s.size()) );
}
void XtraSystemStatusObserver::subscribe(bool yes)

View file

@ -35,25 +35,18 @@
#include <LocTimer.h>
using namespace std;
using namespace loc_util;
using loc_core::IOsObserver;
using loc_core::IDataItemObserver;
using loc_core::IDataItemCore;
using loc_util::LocIpc;
class XtraSystemStatusObserver : public IDataItemObserver, public LocIpc{
class XtraSystemStatusObserver : public IDataItemObserver {
public :
// constructor & destructor
inline XtraSystemStatusObserver(IOsObserver* sysStatObs, const MsgTask* msgTask):
mSystemStatusObsrvr(sysStatObs), mMsgTask(msgTask),
mGpsLock(-1), mConnections(~0), mXtraThrottle(true), mReqStatusReceived(false),
mIsConnectivityStatusKnown (false), mDelayLocTimer(*this) {
subscribe(true);
startListeningNonBlocking(LOC_IPC_HAL);
mDelayLocTimer.start(100 /*.1 sec*/, false);
}
XtraSystemStatusObserver(IOsObserver* sysStatObs, const MsgTask* msgTask);
inline virtual ~XtraSystemStatusObserver() {
subscribe(false);
stopListening();
mIpc.stopNonBlockingListening();
}
// IDataItemObserver overrides
@ -68,14 +61,13 @@ public :
bool updateXtraThrottle(const bool enabled);
inline const MsgTask* getMsgTask() { return mMsgTask; }
void subscribe(bool yes);
protected:
void onReceive(const std::string& data) override;
bool onStatusRequested(int32_t xtraStatusUpdated);
private:
IOsObserver* mSystemStatusObsrvr;
const MsgTask* mMsgTask;
GnssConfigGpsLock mGpsLock;
LocIpc mIpc;
uint64_t mConnections;
uint64_t mWifiNetworkHandle;
uint64_t mMobileNetworkHandle;
@ -84,17 +76,16 @@ private:
bool mXtraThrottle;
bool mReqStatusReceived;
bool mIsConnectivityStatusKnown;
shared_ptr<LocIpcSender> mSender;
class DelayLocTimer : public LocTimer {
XtraSystemStatusObserver& mXSSO;
LocIpcSender& mSender;
public:
DelayLocTimer(XtraSystemStatusObserver& xsso) : mXSSO(xsso) {}
DelayLocTimer(LocIpcSender& sender) : mSender(sender) {}
void timeOutCallback() override {
mXSSO.send(LOC_IPC_XTRA, "halinit");
LocIpc::send(mSender, (const uint8_t*)"halinit", sizeof("halinit"));
}
} mDelayLocTimer;
bool onStatusRequested(int32_t xtraStatusUpdated);
};
#endif

View file

@ -30,10 +30,14 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <netinet/in.h>
#include <netdb.h>
#include <loc_misc_utils.h>
#include <log_util.h>
#include "LocIpc.h"
#include <LocIpc.h>
#include <algorithm>
using std::string;
using namespace std;
namespace loc_util {
@ -42,188 +46,346 @@ namespace loc_util {
#endif
#define LOG_TAG "LocSvc_LocIpc"
#define LOC_MSG_BUF_LEN 8192
#define LOC_MSG_HEAD "$MSGLEN$"
#define LOC_MSG_ABORT "LocIpcMsg::ABORT"
class LocIpcRunnable : public LocRunnable {
friend LocIpc;
public:
LocIpcRunnable(LocIpc& locIpc, const string& ipcName)
: mLocIpc(locIpc), mIpcName(ipcName) {}
bool run() override {
if (!mLocIpc.startListeningBlocking(mIpcName)) {
LOC_LOGe("listen to socket failed");
}
return false;
#define SOCK_OP_AND_LOG(buf, length, opable, rtv, exe) \
if (nullptr == (buf) || 0 == (length)) { \
LOC_LOGe("Invalid inputs: buf - %p, length - %d", (buf), (length)); \
} else if (!(opable)) { \
LOC_LOGe("Invalid object: operable - %d", (opable)); \
} else { \
rtv = (exe); \
if (-1 == rtv) { \
LOC_LOGw("failed reason: %s", strerror(errno)); \
} \
}
const char Sock::MSG_ABORT[] = "LocIpc::Sock::ABORT";
const char Sock::LOC_IPC_HEAD[] = "$MSGLEN$";
ssize_t Sock::send(const void *buf, size_t len, int flags, const struct sockaddr *destAddr,
socklen_t addrlen) const {
ssize_t rtv = -1;
SOCK_OP_AND_LOG(buf, len, isValid(), rtv, sendto(buf, len, flags, destAddr, addrlen));
return rtv;
}
ssize_t Sock::recv(const shared_ptr<ILocIpcListener>& dataCb, int flags, struct sockaddr *srcAddr,
socklen_t *addrlen, int sid) const {
ssize_t rtv = -1;
if (-1 == sid) {
sid = mSid;
} // else it sid would be connection based socket id for recv
SOCK_OP_AND_LOG(dataCb.get(), mMaxTxSize, isValid(), rtv,
recvfrom(dataCb, sid, flags, srcAddr, addrlen));
return rtv;
}
ssize_t Sock::sendto(const void *buf, size_t len, int flags, const struct sockaddr *destAddr,
socklen_t addrlen) const {
ssize_t rtv = -1;
if (len <= mMaxTxSize) {
rtv = ::sendto(mSid, buf, len, flags, destAddr, addrlen);
} else {
std::string head(LOC_IPC_HEAD + to_string(len));
rtv = ::sendto(mSid, head.c_str(), head.length(), flags, destAddr, addrlen);
if (rtv > 0) {
for (size_t offset = 0; offset < len && rtv > 0; offset += rtv) {
rtv = ::sendto(mSid, (char*)buf + offset, min(len - offset, (size_t)mMaxTxSize),
flags, destAddr, addrlen);
}
rtv = (rtv > 0) ? (head.length() + len) : -1;
}
}
return rtv;
}
ssize_t Sock::recvfrom(const shared_ptr<ILocIpcListener>& dataCb, int sid, int flags,
struct sockaddr *srcAddr, socklen_t *addrlen) const {
ssize_t nBytes = -1;
std::string msg(mMaxTxSize, 0);
if ((nBytes = ::recvfrom(sid, (void*)msg.data(), msg.size(), flags, srcAddr, addrlen)) > 0) {
if (strncmp(msg.data(), MSG_ABORT, sizeof(MSG_ABORT)) == 0) {
LOC_LOGi("recvd abort msg.data %s", msg.data());
nBytes = 0;
} else if (strncmp(msg.data(), LOC_IPC_HEAD, sizeof(LOC_IPC_HEAD) - 1)) {
// short message
msg.resize(nBytes);
dataCb->onReceive(msg.data(), nBytes);
} else {
// long message
size_t msgLen = 0;
sscanf(msg.data() + sizeof(LOC_IPC_HEAD) - 1, "%zu", &msgLen);
msg.resize(msgLen);
for (size_t msgLenReceived = 0; (msgLenReceived < msgLen) && (nBytes > 0);
msgLenReceived += nBytes) {
nBytes = ::recvfrom(sid, &(msg[msgLenReceived]), msg.size() - msgLenReceived,
flags, srcAddr, addrlen);
}
if (nBytes > 0) {
nBytes = msgLen;
dataCb->onReceive(msg.data(), nBytes);
}
}
}
return nBytes;
}
ssize_t Sock::sendAbort(int flags, const struct sockaddr *destAddr, socklen_t addrlen) {
return send(MSG_ABORT, sizeof(MSG_ABORT), flags, destAddr, addrlen);
}
class LocIpcLocalSender : public LocIpcSender {
protected:
shared_ptr<Sock> mSock;
struct sockaddr_un mAddr;
inline virtual bool isOperable() const override { return mSock != nullptr && mSock->isValid(); }
inline virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t /* msgId */) const {
return mSock->send(data, length, 0, (struct sockaddr*)&mAddr, sizeof(mAddr));
}
public:
inline LocIpcLocalSender(const char* name) : LocIpcSender(),
mSock(make_shared<Sock>((nullptr == name) ? -1 : (::socket(AF_UNIX, SOCK_DGRAM, 0)))),
mAddr({.sun_family = AF_UNIX, {}}) {
if (mSock != nullptr && mSock->isValid()) {
snprintf(mAddr.sun_path, sizeof(mAddr.sun_path), "%s", name);
}
}
private:
LocIpc& mLocIpc;
const string mIpcName;
};
bool LocIpc::startListeningNonBlocking(const string& name) {
auto runnable = new LocIpcRunnable(*this, name);
string threadName("LocIpc-");
threadName.append(name);
return mThread.start(threadName.c_str(), runnable);
}
class LocIpcLocalRecver : public LocIpcLocalSender, public LocIpcRecver {
protected:
inline virtual ssize_t recv() const override {
socklen_t size = sizeof(mAddr);
return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size);
}
public:
inline LocIpcLocalRecver(const shared_ptr<ILocIpcListener>& listener, const char* name) :
LocIpcLocalSender(name), LocIpcRecver(listener, *this) {
bool LocIpc::startListeningBlocking(const string& name) {
bool stopRequested = false;
int fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if ((unlink(mAddr.sun_path) < 0) && (errno != ENOENT)) {
LOC_LOGw("unlink socket error. reason:%s", strerror(errno));
}
if (fd < 0) {
LOC_LOGe("create socket error. reason:%s", strerror(errno));
umask(0157);
if (mSock->isValid() && ::bind(mSock->mSid, (struct sockaddr*)&mAddr, sizeof(mAddr)) < 0) {
LOC_LOGe("bind socket error. sock fd: %d, reason: %s", mSock->mSid, strerror(errno));
mSock->close();
}
}
inline virtual ~LocIpcLocalRecver() { unlink(mAddr.sun_path); }
inline virtual const char* getName() const override { return mAddr.sun_path; };
inline virtual void abort() const override {
if (isSendable()) {
mSock->sendAbort(0, (struct sockaddr*)&mAddr, sizeof(mAddr));
}
}
};
class LocIpcInetTcpSender : public LocIpcSender {
protected:
shared_ptr<Sock> mSock;
const string mName;
sockaddr_in mAddr;
mutable bool mFirstTime;
inline virtual bool isOperable() const override { return mSock != nullptr && mSock->isValid(); }
inline virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t /* msgId */) const {
if (mFirstTime) {
mFirstTime = false;
::connect(mSock->mSid, (const struct sockaddr*)&mAddr, sizeof(mAddr));
}
return mSock->send(data, length, 0, (struct sockaddr*)&mAddr, sizeof(mAddr));
}
public:
inline LocIpcInetTcpSender(const char* name, int32_t port) : LocIpcSender(),
mSock(make_shared<Sock>((nullptr == name) ? -1 : (::socket(AF_INET, SOCK_STREAM, 0)))),
mName((nullptr == name) ? "" : name),
mAddr({.sin_family=AF_INET, .sin_port=htons(port), .sin_addr={htonl(INADDR_ANY)}}),
mFirstTime(true) {
if (mSock != nullptr && mSock->isValid() && nullptr != name) {
struct hostent* hp = gethostbyname(name);
if (nullptr != hp) {
memcpy((char*)&(mAddr.sin_addr.s_addr), hp->h_addr_list[0], hp->h_length);
}
}
}
};
class LocIpcInetTcpRecver : public LocIpcInetTcpSender, public LocIpcRecver {
mutable int32_t mConnFd;
protected:
inline virtual ssize_t recv() const override {
socklen_t size = sizeof(mAddr);
if (-1 == mConnFd && mSock->isValid()) {
if (::listen(mSock->mSid, 3) < 0 ||
(mConnFd = accept(mSock->mSid, (struct sockaddr*)&mAddr, &size)) < 0) {
mSock->close();
mConnFd = -1;
}
}
return mSock->recv(mDataCb, 0, (struct sockaddr*)&mAddr, &size, mConnFd);
}
public:
inline LocIpcInetTcpRecver(const shared_ptr<ILocIpcListener>& listener, const char* name,
int32_t port) :
LocIpcInetTcpSender(name, port), LocIpcRecver(listener, *this), mConnFd(-1) {
if (mSock->isValid() && ::bind(mSock->mSid, (struct sockaddr*)&mAddr, sizeof(mAddr)) < 0) {
LOC_LOGe("bind socket error. sock fd: %d, reason: %s", mSock->mSid, strerror(errno));
mSock->close();
}
}
inline virtual ~LocIpcInetTcpRecver() { if (-1 != mConnFd) ::close(mConnFd); }
inline virtual const char* getName() const override { return mName.data(); };
inline virtual void abort() const override {
if (isSendable()) {
mSock->sendAbort(0, (struct sockaddr*)&mAddr, sizeof(mAddr));
}
}
};
#ifdef NOT_DEFINED
class LocIpcQcsiSender : public LocIpcSender {
protected:
inline virtual bool isOperable() const override {
return mService != nullptr && mService->isServiceRegistered();
}
inline virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t msgId) const override {
return mService->sendIndToClient(msgId, data, length);
}
inline LocIpcQcsiSender(shared_ptr<QcsiService>& service) : mService(service) {}
public:
inline virtual ~LocIpcQcsi() {}
};
class LocIpcQcsiRecver : public LocIpcQcsiSender, public LocIpcRecver {
protected:
inline virtual ssize_t recv() const override { return mService->recv(); }
public:
inline LocIpcQcsiRecver(unique_ptr<QcsiService>& service) :
LocIpcQcsiSender(service), LocIpcRecver(mService->getDataCallback(), *this) {
}
// only the dele
inline ~LocIpcQcsiRecver() {}
inline virtual const char* getName() const override { return mService->getName().data(); };
inline virtual void abort() const override { if (isSendable()) mService->abort(); }
shared_ptr<LocIpcQcsiSender> getSender() { return make_pare<LocIpcQcsiSender>(mService); }
};
#endif
class LocIpcRunnable : public LocRunnable {
bool mAbortCalled;
LocIpc& mLocIpc;
unique_ptr<LocIpcRecver> mIpcRecver;
public:
inline LocIpcRunnable(LocIpc& locIpc, unique_ptr<LocIpcRecver>& ipcRecver) :
mAbortCalled(false),
mLocIpc(locIpc),
mIpcRecver(move(ipcRecver)) {}
inline bool run() override {
if (mIpcRecver != nullptr) {
mLocIpc.startBlockingListening(*(mIpcRecver.get()));
if (!mAbortCalled) {
LOC_LOGw("startListeningBlocking() returned w/o stopBlockingListening() called");
}
}
// return false so the calling thread exits while loop
return false;
}
if ((unlink(name.c_str()) < 0) && (errno != ENOENT)) {
LOC_LOGw("unlink socket error. reason:%s", strerror(errno));
inline void abort() {
mAbortCalled = true;
if (mIpcRecver != nullptr) {
mIpcRecver->abort();
}
}
};
struct sockaddr_un addr = { .sun_family = AF_UNIX };
snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", name.c_str());
umask(0157);
if (::bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOC_LOGe("bind socket error. reason:%s", strerror(errno));
bool LocIpc::startNonBlockingListening(unique_ptr<LocIpcRecver>& ipcRecver) {
if (ipcRecver != nullptr && ipcRecver->isRecvable()) {
std::string threadName("LocIpc-");
threadName.append(ipcRecver->getName());
mRunnable = new LocIpcRunnable(*this, ipcRecver);
return mThread.start(threadName.c_str(), mRunnable);
} else {
mIpcFd = fd;
mIpcName = name;
LOC_LOGe("ipcRecver is null OR ipcRecver->recvable() is fasle");
return false;
}
}
bool LocIpc::startBlockingListening(LocIpcRecver& ipcRecver) {
if (ipcRecver.isRecvable()) {
// inform that the socket is ready to receive message
onListenerReady();
ssize_t nBytes = 0;
string msg = "";
string abort = LOC_MSG_ABORT;
while (1) {
msg.resize(LOC_MSG_BUF_LEN);
nBytes = ::recvfrom(fd, (void*)(msg.data()), msg.size(), 0, NULL, NULL);
if (nBytes < 0) {
LOC_LOGe("cannot read socket. reason:%s", strerror(errno));
break;
} else if (0 == nBytes) {
continue;
}
if (strncmp(msg.data(), abort.c_str(), abort.length()) == 0) {
LOC_LOGi("recvd abort msg.data %s", msg.data());
stopRequested = true;
break;
}
if (strncmp(msg.data(), LOC_MSG_HEAD, sizeof(LOC_MSG_HEAD) - 1)) {
// short message
msg.resize(nBytes);
onReceive(msg);
} else {
// long message
size_t msgLen = 0;
sscanf(msg.data(), LOC_MSG_HEAD"%zu", &msgLen);
msg.resize(msgLen);
size_t msgLenReceived = 0;
while ((msgLenReceived < msgLen) && (nBytes > 0)) {
nBytes = recvfrom(fd, (void*)&(msg[msgLenReceived]),
msg.size() - msgLenReceived, 0, NULL, NULL);
msgLenReceived += nBytes;
}
if (nBytes > 0) {
onReceive(msg);
} else {
LOC_LOGe("cannot read socket. reason:%s", strerror(errno));
break;
}
}
}
}
if (::close(fd)) {
LOC_LOGe("cannot close socket:%s", strerror(errno));
}
unlink(name.c_str());
return stopRequested;
}
void LocIpc::stopListening() {
if (mIpcFd >= 0) {
string abort = LOC_MSG_ABORT;
if (!mIpcName.empty()) {
send(mIpcName.c_str(), abort);
}
mIpcFd = -1;
}
if (!mIpcName.empty()) {
mIpcName.clear();
}
}
bool LocIpc::send(const char name[], const string& data) {
return send(name, (const uint8_t*)data.c_str(), data.length());
}
bool LocIpc::send(const char name[], const uint8_t data[], uint32_t length) {
bool result = true;
int fd = ::socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0) {
LOC_LOGe("create socket error. reason:%s", strerror(errno));
ipcRecver.onListenerReady();
while (ipcRecver.recvData());
return true;
} else {
LOC_LOGe("ipcRecver is null OR ipcRecver->recvable() is fasle");
return false;
}
struct sockaddr_un addr = { .sun_family = AF_UNIX };
snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", name);
result = sendData(fd, addr, data, length);
(void)::close(fd);
return result;
}
bool LocIpc::sendData(int fd, const sockaddr_un &addr, const uint8_t data[], uint32_t length) {
bool result = true;
if (length <= LOC_MSG_BUF_LEN) {
if (::sendto(fd, data, length, 0,
(struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOC_LOGe("cannot send to socket:%s. reason:%s",
addr.sun_path, strerror(errno));
result = false;
}
} else {
string head = LOC_MSG_HEAD;
head.append(std::to_string(length));
if (::sendto(fd, head.c_str(), head.length(), 0,
(struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOC_LOGe("cannot send to socket:%s. reason:%s",
addr.sun_path, strerror(errno));
result = false;
} else {
size_t sentBytes = 0;
while(sentBytes < length) {
size_t partLen = length - sentBytes;
if (partLen > LOC_MSG_BUF_LEN) {
partLen = LOC_MSG_BUF_LEN;
}
ssize_t rv = ::sendto(fd, data + sentBytes, partLen, 0,
(struct sockaddr*)&addr, sizeof(addr));
if (rv < 0) {
LOC_LOGe("cannot send to socket:%s. reason:%s",
addr.sun_path, strerror(errno));
result = false;
break;
}
sentBytes += rv;
}
}
void LocIpc::stopNonBlockingListening() {
if (mRunnable) {
mRunnable->abort();
mRunnable = nullptr;
}
return result;
}
void LocIpc::stopBlockingListening(LocIpcRecver& ipcRecver) {
if (ipcRecver.isRecvable()) {
ipcRecver.abort();
}
}
bool LocIpc::send(LocIpcSender& sender, const uint8_t data[], uint32_t length, int32_t msgId) {
return sender.sendData(data, length, msgId);
}
shared_ptr<LocIpcSender> LocIpc::getLocIpcLocalSender(const char* localSockName) {
return make_shared<LocIpcLocalSender>(localSockName);
}
unique_ptr<LocIpcRecver> LocIpc::getLocIpcLocalRecver(const shared_ptr<ILocIpcListener>& listener,
const char* localSockName) {
return make_unique<LocIpcLocalRecver>(listener, localSockName);
}
static void* sLibQrtrHandle = nullptr;
static const char* sLibQrtrName = "libloc_socket.so";
shared_ptr<LocIpcSender> LocIpc::getLocIpcQrtrSender(int service, int instance) {
typedef shared_ptr<LocIpcSender> (*creator_t) (int, int);
static creator_t creator = (creator_t)dlGetSymFromLib(sLibQrtrHandle, sLibQrtrName,
"_ZN8loc_util22createLocIpcQrtrSenderEii");
return (nullptr == creator) ? nullptr : creator(service, instance);
}
unique_ptr<LocIpcRecver> LocIpc::getLocIpcQrtrRecver(const shared_ptr<ILocIpcListener>& listener,
int service, int instance) {
typedef unique_ptr<LocIpcRecver> (*creator_t)(const shared_ptr<ILocIpcListener>&, int, int);
static creator_t creator = (creator_t)dlGetSymFromLib(sLibQrtrHandle, sLibQrtrName,
"_ZN8loc_util22createLocIpcQrtrRecverERKNSt3__110shared_ptrINS_15ILocIpcListenerEEEii");
return (nullptr == creator) ? nullptr : creator(listener, service, instance);
}
shared_ptr<LocIpcSender> LocIpc::getLocIpcQsockSender(int service, int instance) {
typedef shared_ptr<LocIpcSender> (*creator_t) (int, int);
static creator_t creator = (creator_t)dlGetSymFromLib(sLibQrtrHandle, sLibQrtrName,
"_ZN8loc_util23createLocIpcQsockSenderEii");
return (nullptr == creator) ? nullptr : creator(service, instance);
}
unique_ptr<LocIpcRecver> LocIpc::getLocIpcQsockRecver(const shared_ptr<ILocIpcListener>& listener,
int service, int instance) {
typedef unique_ptr<LocIpcRecver> (*creator_t)(const shared_ptr<ILocIpcListener>&, int, int);
static creator_t creator = (creator_t)dlGetSymFromLib(sLibQrtrHandle, sLibQrtrName,
"_ZN8loc_util23createLocIpcQsockRecverERKSt10shared_ptrINS_15ILocIpcListenerEEii");
return (nullptr == creator) ? nullptr : creator(listener, service, instance);
}
shared_ptr<LocIpcSender> LocIpc::getLocIpcInetTcpSender(const char* serverName, int32_t port) {
return make_shared<LocIpcInetTcpSender>(serverName, port);
}
unique_ptr<LocIpcRecver> LocIpc::getLocIpcInetTcpRecver(const shared_ptr<ILocIpcListener>& listener,
const char* serverName, int32_t port) {
return make_unique<LocIpcInetTcpRecver>(listener, serverName, port);
}
pair<shared_ptr<LocIpcSender>, unique_ptr<LocIpcRecver>>
LocIpc::getLocIpcQmiLocServiceSenderRecverPair(const shared_ptr<ILocIpcListener>& listener, int instance) {
typedef pair<shared_ptr<LocIpcSender>, unique_ptr<LocIpcRecver>> (*creator_t)(const shared_ptr<ILocIpcListener>&, int);
static void* sLibEmuHandle = nullptr;
static creator_t creator = (creator_t)dlGetSymFromLib(sLibEmuHandle, "libloc_emu.so",
"_ZN13QmiLocService41createLocIpcQmiLocServiceSenderRecverPairERKNSt3__110shared_ptrIN8loc_util15ILocIpcListenerEEEi");
return (nullptr == creator) ?
make_pair<shared_ptr<LocIpcSender>, unique_ptr<LocIpcRecver>>(nullptr, nullptr) :
creator(listener, instance);
}
}

View file

@ -37,35 +37,71 @@
#include <sys/un.h>
#include <LocThread.h>
using std::string;
using namespace std;
namespace loc_util {
class LocIpcRecver;
class LocIpcSender;
class LocIpcRunnable;
class ILocIpcListener {
protected:
inline virtual ~ILocIpcListener() {}
public:
// LocIpc client can overwrite this function to get notification
// when the socket for LocIpc is ready to receive messages.
inline virtual void onListenerReady() {}
virtual void onReceive(const char* data, uint32_t length)= 0;
};
class LocIpc {
friend LocIpcSender;
public:
inline LocIpc() : mIpcFd(-1) {}
inline virtual ~LocIpc() { stopListening(); }
inline LocIpc() : mRunnable(nullptr) {}
inline virtual ~LocIpc() {
stopNonBlockingListening();
}
static shared_ptr<LocIpcSender>
getLocIpcLocalSender(const char* localSockName);
static shared_ptr<LocIpcSender>
getLocIpcInetTcpSender(const char* serverName, int32_t port);
static shared_ptr<LocIpcSender>
getLocIpcQrtrSender(int service, int instance);
static shared_ptr<LocIpcSender>
getLocIpcQsockSender(int service, int instance);
static unique_ptr<LocIpcRecver>
getLocIpcLocalRecver(const shared_ptr<ILocIpcListener>& listener,
const char* localSockName);
static unique_ptr<LocIpcRecver>
getLocIpcInetTcpRecver(const shared_ptr<ILocIpcListener>& listener,
const char* serverName, int32_t port);
static unique_ptr<LocIpcRecver>
getLocIpcQrtrRecver(const shared_ptr<ILocIpcListener>& listener,
int service, int instance);
static unique_ptr<LocIpcRecver>
getLocIpcQsockRecver(const shared_ptr<ILocIpcListener>& listener,
int service, int instance);
static pair<shared_ptr<LocIpcSender>, unique_ptr<LocIpcRecver>>
getLocIpcQmiLocServiceSenderRecverPair(const shared_ptr<ILocIpcListener>& listener,
int instance);
// Listen for new messages in current thread. Calling this funciton will
// block current thread. The listening can be stopped by calling stopListening().
//
// Argument name is the path of the unix local socket to be listened.
// The function will return true on success, and false on failure.
bool startListeningBlocking(const std::string& name);
// block current thread.
// The listening can be stopped by calling stopBlockingListening() passing
// in the same ipcRecver obj handle.
static bool startBlockingListening(LocIpcRecver& ipcRecver);
static void stopBlockingListening(LocIpcRecver& ipcRecver);
// Create a new LocThread and listen for new messages in it.
// Calling this function will return immediately and won't block current thread.
// The listening can be stopped by calling stopListening().
//
// Argument name is the path of the unix local socket to be be listened.
// The function will return true on success, and false on failure.
bool startListeningNonBlocking(const std::string& name);
// Stop listening to new messages.
void stopListening();
// The listening can be stopped by calling stopNonBlockingListening().
bool startNonBlockingListening(unique_ptr<LocIpcRecver>& ipcRecver);
void stopNonBlockingListening();
// Send out a message.
// Call this function to send a message in argument data to socket in argument name.
@ -73,78 +109,70 @@ public:
// Argument name contains the name of the target unix socket. data contains the
// message to be sent out. Convert your message to a string before calling this function.
// The function will return true on success, and false on failure.
static bool send(const char name[], const std::string& data);
static bool send(const char name[], const uint8_t data[], uint32_t length);
protected:
// Callback function for receiving incoming messages.
// Override this function in your derived class to process incoming messages.
// For each received message, this callback function will be called once.
// This callback function will be called in the calling thread of startListeningBlocking
// or in the new LocThread created by startListeningNonBlocking.
//
// Argument data contains the received message. You need to parse it.
inline virtual void onReceive(const std::string& /*data*/) {}
// LocIpc client can overwrite this function to get notification
// when the socket for LocIpc is ready to receive messages.
inline virtual void onListenerReady() {}
static bool send(LocIpcSender& sender, const uint8_t data[],
uint32_t length, int32_t msgId = -1);
private:
static bool sendData(int fd, const sockaddr_un& addr,
const uint8_t data[], uint32_t length);
int mIpcFd;
LocThread mThread;
string mIpcName;
LocIpcRunnable *mRunnable;
};
/* this is only when client needs to implement Sender / Recver that are not already provided by
the factor methods prvoided by LocIpc. */
class LocIpcSender {
protected:
LocIpcSender() = default;
virtual ~LocIpcSender() = default;
virtual bool isOperable() const = 0;
virtual ssize_t send(const uint8_t data[], uint32_t length, int32_t msgId) const = 0;
public:
// Constructor of LocIpcSender class
//
// Argument destSocket contains the full path name of destination socket.
// This class hides generated fd and destination address object from user.
inline LocIpcSender(const char* destSocket):
LocIpcSender(std::make_shared<int>(::socket(AF_UNIX, SOCK_DGRAM, 0)), destSocket) {
if (mSocket != nullptr && -1 == *mSocket) {
mSocket = nullptr;
}
inline bool isSendable() const { return isOperable(); }
inline bool sendData(const uint8_t data[], uint32_t length, int32_t msgId) const {
return isSendable() && (send(data, length, msgId) > 0);
}
};
// Replicate a new LocIpcSender object with new destination socket.
inline LocIpcSender* replicate(const char* destSocket) {
return (nullptr == mSocket) ? nullptr : new LocIpcSender(mSocket, destSocket);
}
class LocIpcRecver {
LocIpcSender& mIpcSender;
protected:
const shared_ptr<ILocIpcListener> mDataCb;
inline LocIpcRecver(const shared_ptr<ILocIpcListener>& listener, LocIpcSender& sender) :
mIpcSender(sender), mDataCb(listener) {}
LocIpcRecver(LocIpcRecver const& recver) = delete;
LocIpcRecver& operator=(LocIpcRecver const& recver) = delete;
virtual ssize_t recv() const = 0;
public:
virtual ~LocIpcRecver() = default;
inline bool recvData() const { return isRecvable() && (recv() > 0); }
inline bool isRecvable() const { return mDataCb != nullptr && mIpcSender.isSendable(); }
virtual void onListenerReady() { if (mDataCb != nullptr) mDataCb->onListenerReady(); }
virtual void abort() const = 0;
virtual const char* getName() const = 0;
};
inline ~LocIpcSender() {
if (nullptr != mSocket && mSocket.unique()) {
::close(*mSocket);
}
}
// Send out a message.
// Call this function to send a message
//
// Argument data and length contains the message to be sent out.
// Return true when succeeded
inline bool send(const uint8_t data[], uint32_t length) {
bool rtv = false;
if (nullptr != mSocket && nullptr != data) {
rtv = LocIpc::sendData(*mSocket, mDestAddr, data, length);
}
return rtv;
}
private:
std::shared_ptr<int> mSocket;
struct sockaddr_un mDestAddr;
inline LocIpcSender(const std::shared_ptr<int>& mySocket, const char* destSocket) :
mSocket(mySocket),
mDestAddr({.sun_family = AF_UNIX, {}}) {
if ((nullptr != mSocket) && (-1 != *mSocket) && (nullptr != destSocket)) {
snprintf(mDestAddr.sun_path, sizeof(mDestAddr.sun_path), "%s", destSocket);
class Sock {
static const char MSG_ABORT[];
static const char LOC_IPC_HEAD[];
const uint32_t mMaxTxSize;
ssize_t sendto(const void *buf, size_t len, int flags, const struct sockaddr *destAddr,
socklen_t addrlen) const;
ssize_t recvfrom(const shared_ptr<ILocIpcListener>& dataCb, int sid, int flags,
struct sockaddr *srcAddr, socklen_t *addrlen) const;
public:
int mSid;
inline Sock(int sid, const uint32_t maxTxSize = 8192) : mMaxTxSize(maxTxSize), mSid(sid) {}
inline ~Sock() { close(); }
inline bool isValid() const { return -1 != mSid; }
ssize_t send(const void *buf, size_t len, int flags, const struct sockaddr *destAddr,
socklen_t addrlen) const;
ssize_t recv(const shared_ptr<ILocIpcListener>& dataCb, int flags, struct sockaddr *srcAddr,
socklen_t *addrlen, int sid = -1) const;
ssize_t sendAbort(int flags, const struct sockaddr *destAddr, socklen_t addrlen);
inline void close() {
if (isValid()) {
::close(mSid);
mSid = -1;
}
}
};

View file

@ -8,7 +8,7 @@ AM_CFLAGS = -Wundef \
-fno-short-enums \
-fpic \
-I./ \
-std=c++11 \
-std=c++14 \
$(LOCPLA_CFLAGS)
libgps_utils_la_h_sources = \

View file

@ -2163,11 +2163,10 @@ typedef void (*LocAgpsCloseResultCb)(bool isSuccess, AGpsExtType agpsType, void*
#define SOCKET_DIR_EHUB "/dev/socket/location/ehub/"
#define SOCKET_TO_LOCATION_HAL_DAEMON "/dev/socket/loc_client/hal_daemon"
#define SOCKET_DIR_TO_CLIENT "/dev/socket/loc_client/"
#define SOCKET_TO_LOCATION_CLIENT_BASE "/dev/socket/loc_client/toclient"
#define DIR_FOR_EXT_AP_LOC_CLIENT "/data/vendor/location/extap_locclient/"
#define FILE_FOR_EXT_AP_LOC_CLIENT_BASE "/data/vendor/location/extap_locclient/client"
#define SOCKET_LOC_CLIENT_DIR "/dev/socket/loc_client/"
#define EAP_LOC_CLIENT_DIR "/data/vendor/location/extap_locclient/"
#define LOC_CLIENT_NAME_PREFIX "toclient"
typedef uint64_t NetworkHandle;
#define NETWORK_HANDLE_UNKNOWN ~0