Merge "Location Utils - LocIpc class enhancement"

This commit is contained in:
Linux Build Service Account 2017-12-21 13:58:24 -08:00 committed by Gerrit - the friendly Code Review server
commit d75c010c59
2 changed files with 103 additions and 23 deletions

View file

@ -49,7 +49,7 @@ namespace loc_util {
#endif #endif
#define LOG_TAG "LocSvc_LocIpc" #define LOG_TAG "LocSvc_LocIpc"
#define LOC_MSG_BUF_LEN 1024 #define LOC_MSG_BUF_LEN 8192
#define LOC_MSG_HEAD "$MSGLEN$" #define LOC_MSG_HEAD "$MSGLEN$"
class LocIpcRunnable : public LocRunnable { class LocIpcRunnable : public LocRunnable {
@ -101,28 +101,32 @@ bool LocIpc::startListeningBlocking(const std::string& name) {
mIpcFd = fd; mIpcFd = fd;
ssize_t nBytes = 0; ssize_t nBytes = 0;
std::vector<char> buf(LOC_MSG_BUF_LEN); std::string msg = "";
while ((nBytes = ::recvfrom(mIpcFd, buf.data(), buf.size(), 0, NULL, NULL)) >= 0) { while (1) {
if (nBytes == 0) { msg.resize(LOC_MSG_BUF_LEN);
nBytes = ::recvfrom(mIpcFd, (void*)(msg.data()), msg.size(), 0, NULL, NULL);
if (nBytes < 0) {
break;
} else if (nBytes == 0) {
continue; continue;
} }
std::string msg; if (strncmp(msg.data(), LOC_MSG_HEAD, sizeof(LOC_MSG_HEAD) - 1)) {
if (strncmp(buf.data(), LOC_MSG_HEAD, sizeof(LOC_MSG_HEAD) - 1)) {
// short message // short message
msg.append(buf.data(), nBytes); msg.resize(nBytes);
onReceive(msg); onReceive(msg);
} else { } else {
// long message // long message
size_t msgLen = 0; size_t msgLen = 0;
sscanf(buf.data(), LOC_MSG_HEAD"%zu", &msgLen); sscanf(msg.data(), LOC_MSG_HEAD"%zu", &msgLen);
while (msg.length() < msgLen && msg.resize(msgLen);
(nBytes = recvfrom(mIpcFd, buf.data(), buf.size(), 0, NULL, NULL)) >= 0) { size_t msgLenReceived = 0;
msg.append(buf.data(), nBytes); while ((msgLenReceived < msgLen) && (nBytes > 0)) {
nBytes = recvfrom(mIpcFd, (void*)&(msg[msgLenReceived]),
msg.size() - msgLenReceived, 0, NULL, NULL);
msgLenReceived += nBytes;
} }
if (nBytes > 0) {
if (nBytes >= 0) {
onReceive(msg); onReceive(msg);
} else { } else {
break; break;
@ -154,6 +158,12 @@ void LocIpc::stopListening() {
} }
bool LocIpc::send(const char name[], const std::string& data) { bool LocIpc::send(const char name[], const std::string& data) {
return send(name, (const uint8_t*)data.c_str(), data.length());
}
bool LocIpc::send(const char name[], const uint8_t data[], uint32_t length) {
bool result = true;
int fd = ::socket(AF_UNIX, SOCK_DGRAM, 0); int fd = ::socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0) { if (fd < 0) {
LOC_LOGe("create socket error. reason:%s", strerror(errno)); LOC_LOGe("create socket error. reason:%s", strerror(errno));
@ -163,28 +173,38 @@ bool LocIpc::send(const char name[], const std::string& data) {
struct sockaddr_un addr = { .sun_family = AF_UNIX }; struct sockaddr_un addr = { .sun_family = AF_UNIX };
snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", name); snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", name);
result = sendData(fd, addr, data, length);
(void)::close(fd);
return result;
}
bool LocIpc::sendData(int fd, const sockaddr_un &addr, const uint8_t data[], uint32_t length) {
bool result = true; bool result = true;
if (data.length() <= LOC_MSG_BUF_LEN) {
if (::sendto(fd, data.c_str(), data.length(), 0, if (length <= LOC_MSG_BUF_LEN) {
if (::sendto(fd, data, length, 0,
(struct sockaddr*)&addr, sizeof(addr)) < 0) { (struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOC_LOGe("cannot send to socket. reason:%s", strerror(errno)); LOC_LOGe("cannot send to socket. reason:%s", strerror(errno));
result = false; result = false;
} }
} else { } else {
std::string head = LOC_MSG_HEAD; std::string head = LOC_MSG_HEAD;
head.append(std::to_string(data.length())); head.append(std::to_string(length));
if (::sendto(fd, head.c_str(), head.length(), 0, if (::sendto(fd, head.c_str(), head.length(), 0,
(struct sockaddr*)&addr, sizeof(addr)) < 0) { (struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOC_LOGe("cannot send to socket. reason:%s", strerror(errno)); LOC_LOGe("cannot send to socket. reason:%s", strerror(errno));
result = false; result = false;
} else { } else {
size_t sentBytes = 0; size_t sentBytes = 0;
while(sentBytes < data.length()) { while(sentBytes < length) {
size_t partLen = data.length() - sentBytes; size_t partLen = length - sentBytes;
if (partLen > LOC_MSG_BUF_LEN) { if (partLen > LOC_MSG_BUF_LEN) {
partLen = LOC_MSG_BUF_LEN; partLen = LOC_MSG_BUF_LEN;
} }
ssize_t rv = ::sendto(fd, data.c_str() + sentBytes, partLen, 0, ssize_t rv = ::sendto(fd, data + sentBytes, partLen, 0,
(struct sockaddr*)&addr, sizeof(addr)); (struct sockaddr*)&addr, sizeof(addr));
if (rv < 0) { if (rv < 0) {
LOC_LOGe("cannot send to socket. reason:%s", strerror(errno)); LOC_LOGe("cannot send to socket. reason:%s", strerror(errno));
@ -195,8 +215,6 @@ bool LocIpc::send(const char name[], const std::string& data) {
} }
} }
} }
(void)::close(fd);
return result; return result;
} }

View file

@ -32,11 +32,18 @@
#include <string> #include <string>
#include <memory> #include <memory>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <LocThread.h> #include <LocThread.h>
namespace loc_util { namespace loc_util {
class LocIpcSender;
class LocIpc { class LocIpc {
friend LocIpcSender;
public: public:
inline LocIpc() : mIpcFd(-1), mStopRequested(false), mRunnable(nullptr) {} inline LocIpc() : mIpcFd(-1), mStopRequested(false), mRunnable(nullptr) {}
inline virtual ~LocIpc() { stopListening(); } inline virtual ~LocIpc() { stopListening(); }
@ -75,14 +82,69 @@ public:
// message to be sent out. Convert your message to a string before calling this function. // message to be sent out. Convert your message to a string before calling this function.
// The function will return true on success, and false on failure. // The function will return true on success, and false on failure.
static bool send(const char name[], const std::string& data); static bool send(const char name[], const std::string& data);
static bool send(const char name[], const uint8_t data[], uint32_t length);
private: private:
static bool sendData(int fd, const sockaddr_un& addr,
const uint8_t data[], uint32_t length);
int mIpcFd; int mIpcFd;
bool mStopRequested; bool mStopRequested;
LocThread mThread; LocThread mThread;
std::unique_ptr<LocRunnable> mRunnable; std::unique_ptr<LocRunnable> mRunnable;
}; };
class LocIpcSender {
public:
// Constructor of LocIpcSender class
//
// Argument destSocket contains the full path name of destination socket.
// This class hides generated fd and destination address object from user.
inline LocIpcSender(const char* destSocket):
LocIpcSender(std::make_shared<int>(::socket(AF_UNIX, SOCK_DGRAM, 0)), destSocket) {
if (-1 == *mSocket) {
mSocket = nullptr;
}
}
// Replicate a new LocIpcSender object with new destination socket.
inline LocIpcSender* replicate(const char* destSocket) {
return (nullptr == mSocket) ? nullptr : new LocIpcSender(mSocket, destSocket);
}
inline ~LocIpcSender() {
if (nullptr != mSocket && mSocket.unique()) {
::close(*mSocket);
}
}
// Send out a message.
// Call this function to send a message
//
// Argument data and length contains the message to be sent out.
// Return true when succeeded
inline bool send(const uint8_t data[], uint32_t length) {
bool rtv = false;
if (nullptr != mSocket && nullptr != data) {
ssize_t rv = LocIpc::sendData(*mSocket, mDestAddr, data, length);
rtv = (rv == (int)length);
}
return rtv;
}
private:
std::shared_ptr<int> mSocket;
struct sockaddr_un mDestAddr;
inline LocIpcSender(
const std::shared_ptr<int>& mySocket, const char* destSocket) : mSocket(mySocket) {
if ((nullptr != mSocket) && (-1 != *mSocket) && (nullptr != destSocket)) {
mDestAddr.sun_family = AF_UNIX;
snprintf(mDestAddr.sun_path, sizeof(mDestAddr.sun_path), "%s", destSocket);
}
}
};
} }
#endif //__LOC_SOCKET__ #endif //__LOC_SOCKET__