Feature: rename to PacketQueue

pull/221/head
xufuji456 2 years ago
parent 67fc16ef0c
commit 689cf27fe9
  1. 21
      Live/src/main/cpp/PacketQueue.h
  2. 26
      Live/src/main/cpp/RtmpPusher.cpp

@ -8,34 +8,25 @@
using namespace std; using namespace std;
template<typename T> template<typename T>
class SafeQueue { class PacketQueue {
typedef void (*ReleaseCallback)(T &); typedef void (*ReleaseCallback)(T &);
typedef void (*SyncHandle)(queue<T> &); typedef void (*SyncHandle)(queue<T> &);
public: public:
SafeQueue() {
}
~SafeQueue() {
}
void push(T new_value) { void push(T new_value) {
lock_guard<mutex> lk(mt); lock_guard<mutex> lk(mt);
if (work) { if (running) {
q.push(new_value); q.push(new_value);
cv.notify_one(); cv.notify_one();
} }
} }
int pop(T &value) { int pop(T &value) {
int ret = 0; int ret = 0;
unique_lock<mutex> lk(mt); unique_lock<mutex> lk(mt);
if (!work) { if (!running) {
return ret; return ret;
} }
if (!q.empty()) { if (!q.empty()) {
@ -46,9 +37,9 @@ public:
return ret; return ret;
} }
void setWork(int run) { void setRunning(bool run) {
lock_guard<mutex> lk(mt); lock_guard<mutex> lk(mt);
this->work = run; this->running = run;
} }
int empty() { int empty() {
@ -84,7 +75,7 @@ private:
condition_variable cv; condition_variable cv;
queue<T> q; queue<T> q;
int work; bool running;
ReleaseCallback releaseCallback; ReleaseCallback releaseCallback;
SyncHandle syncHandle; SyncHandle syncHandle;
}; };

@ -1,6 +1,6 @@
#include <jni.h> #include <jni.h>
#include <string> #include <string>
#include "safe_queue.h" #include "PacketQueue.h"
#include "PushInterface.h" #include "PushInterface.h"
#include "VideoStream.h" #include "VideoStream.h"
#include "AudioStream.h" #include "AudioStream.h"
@ -10,12 +10,12 @@
JNIEXPORT RETURN_TYPE JNICALL Java_com_frank_live_LivePusherNew_ ## FUNC_NAME \ JNIEXPORT RETURN_TYPE JNICALL Java_com_frank_live_LivePusherNew_ ## FUNC_NAME \
(JNIEnv *env, jobject instance, ##__VA_ARGS__)\ (JNIEnv *env, jobject instance, ##__VA_ARGS__)\
SafeQueue<RTMPPacket *> packets; PacketQueue<RTMPPacket *> packets;
VideoStream *videoStream = nullptr; VideoStream *videoStream = nullptr;
int isStart = 0; int isStart = 0;
pthread_t pid; pthread_t pid;
int readyPushing = 0; std::atomic<bool> isPushing;
uint32_t start_time; uint32_t start_time;
AudioStream *audioStream = nullptr; AudioStream *audioStream = nullptr;
@ -107,13 +107,13 @@ void *start(void *args) {
//start time //start time
start_time = RTMP_GetTime(); start_time = RTMP_GetTime();
//start pushing //start pushing
readyPushing = 1; isPushing = true;
packets.setWork(1); packets.setRunning(true);
callback(audioStream->getAudioTag()); callback(audioStream->getAudioTag());
RTMPPacket *packet = nullptr; RTMPPacket *packet = nullptr;
while (readyPushing) { while (isPushing) {
packets.pop(packet); packets.pop(packet);
if (!readyPushing) { if (!isPushing) {
break; break;
} }
if (!packet) { if (!packet) {
@ -132,8 +132,8 @@ void *start(void *args) {
releasePackets(packet); releasePackets(packet);
} while (0); } while (0);
isStart = 0; isStart = 0;
readyPushing = 0; isPushing = false;
packets.setWork(0); packets.setRunning(false);
packets.clear(); packets.clear();
if (rtmp) { if (rtmp) {
RTMP_Close(rtmp); RTMP_Close(rtmp);
@ -174,7 +174,7 @@ RTMP_PUSHER_FUNC(void, native_1start, jstring path_) {
} }
RTMP_PUSHER_FUNC(void, native_1pushVideo, jbyteArray yuv, jint camera_type) { RTMP_PUSHER_FUNC(void, native_1pushVideo, jbyteArray yuv, jint camera_type) {
if (!videoStream || !readyPushing) { if (!videoStream || !isPushing) {
return; return;
} }
jbyte *yuv_plane = env->GetByteArrayElements(yuv, JNI_FALSE); jbyte *yuv_plane = env->GetByteArrayElements(yuv, JNI_FALSE);
@ -196,7 +196,7 @@ RTMP_PUSHER_FUNC(jint, native_1getInputSamples) {
} }
RTMP_PUSHER_FUNC(void, native_1pushAudio, jbyteArray data_) { RTMP_PUSHER_FUNC(void, native_1pushAudio, jbyteArray data_) {
if (!audioStream || !readyPushing) { if (!audioStream || !isPushing) {
return; return;
} }
jbyte *data = env->GetByteArrayElements(data_, nullptr); jbyte *data = env->GetByteArrayElements(data_, nullptr);
@ -206,8 +206,8 @@ RTMP_PUSHER_FUNC(void, native_1pushAudio, jbyteArray data_) {
RTMP_PUSHER_FUNC(void, native_1stop) { RTMP_PUSHER_FUNC(void, native_1stop) {
LOGI("native stop..."); LOGI("native stop...");
readyPushing = 0; isPushing = false;
packets.setWork(0); packets.setRunning(false);
pthread_join(pid, nullptr); pthread_join(pid, nullptr);
} }

Loading…
Cancel
Save