Feature: remove mutex of pthread

pull/221/head
xufuji456 2 years ago
parent 5d52f71258
commit 67fc16ef0c
  1. 82
      Live/src/main/cpp/safe_queue.h

@ -3,11 +3,7 @@
#define SAFE_QUEUE_H #define SAFE_QUEUE_H
#include <queue> #include <queue>
#include <pthread.h>
#ifdef C11
#include <thread> #include <thread>
#endif
using namespace std; using namespace std;
@ -19,48 +15,25 @@ class SafeQueue {
public: public:
SafeQueue() { SafeQueue() {
#ifdef C11
#else
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
#endif
} }
~SafeQueue() { ~SafeQueue() {
#ifdef C11
#else
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
#endif
} }
void push(T new_value) { void push(T new_value) {
#ifdef C11
lock_guard<mutex> lk(mt); lock_guard<mutex> lk(mt);
if (work) { if (work) {
q.push(new_value); q.push(new_value);
cv.notify_one(); cv.notify_one();
} }
#else
pthread_mutex_lock(&mutex);
if (work) {
q.push(new_value);
pthread_cond_signal(&cond);
} else {
releaseCallback(new_value);
}
pthread_mutex_unlock(&mutex);
#endif
} }
int pop(T &value) { int pop(T &value) {
int ret = 0; int ret = 0;
#ifdef C11
unique_lock<mutex> lk(mt); unique_lock<mutex> lk(mt);
if (!work) { if (!work) {
return ret; return ret;
@ -70,33 +43,12 @@ public:
q.pop(); q.pop();
ret = 1; ret = 1;
} }
#else
pthread_mutex_lock(&mutex);
while (work && q.empty()) {
pthread_cond_wait(&cond, &mutex);
}
if (!q.empty()) {
value = q.front();
q.pop();
ret = 1;
}
pthread_mutex_unlock(&mutex);
#endif
return ret; return ret;
} }
void setWork(int work) { void setWork(int run) {
#ifdef C11
lock_guard<mutex> lk(mt); lock_guard<mutex> lk(mt);
this->work = work; this->work = run;
#else
pthread_mutex_lock(&mutex);
this->work = work;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
#endif
} }
int empty() { int empty() {
@ -108,7 +60,6 @@ public:
} }
void clear() { void clear() {
#ifdef C11
lock_guard<mutex> lk(mt); lock_guard<mutex> lk(mt);
int size = q.size(); int size = q.size();
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
@ -116,48 +67,21 @@ public:
releaseCallback(value); releaseCallback(value);
q.pop(); q.pop();
} }
#else
pthread_mutex_lock(&mutex);
int size = static_cast<int>(q.size());
for (int i = 0; i < size; ++i) {
T value = q.front();
releaseCallback(value);
q.pop();
}
pthread_mutex_unlock(&mutex);
#endif
} }
void sync() { void sync() {
#ifdef C11
lock_guard<mutex> lk(mt); lock_guard<mutex> lk(mt);
syncHandle(q); syncHandle(q);
#else
pthread_mutex_lock(&mutex);
syncHandle(q);
pthread_mutex_unlock(&mutex);
#endif
} }
void setReleaseCallback(ReleaseCallback r) { void setReleaseCallback(ReleaseCallback r) {
releaseCallback = r; releaseCallback = r;
} }
void setSyncHandle(SyncHandle s) {
syncHandle = s;
}
private: private:
#ifdef C11
mutex mt; mutex mt;
condition_variable cv; condition_variable cv;
#else
pthread_cond_t cond;
pthread_mutex_t mutex;
#endif
queue<T> q; queue<T> q;
int work; int work;

Loading…
Cancel
Save