#include <stdlib.h>
#include <sys/stat.h>
#include <stdio.h>
-#include <assert.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <pulse/timeval.h>
#include <pulse/xmalloc.h>
+#include <pulsecore/macro.h>
#include <pulsecore/iochannel.h>
#include <pulsecore/sink.h>
#include <pulsecore/module.h>
pa_core *core;
pa_module *module;
pa_sink *sink;
- pa_time_event *time_event;
+ pa_thread *thread;
size_t block_size;
-
- uint64_t n_bytes;
- struct timeval start_time;
+ struct timeval timestamp;
};
static const char* const valid_modargs[] = {
NULL
};
-static void time_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
+static void thread_func(void *userdata) {
struct userdata *u = userdata;
- pa_memchunk chunk;
- struct timeval ntv = *tv;
- size_t l;
-
- assert(u);
-
- if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) {
- l = chunk.length;
- pa_memblock_unref(chunk.memblock);
- } else
- l = u->block_size;
-
- pa_timeval_add(&ntv, pa_bytes_to_usec(l, &u->sink->sample_spec));
- m->time_restart(e, &ntv);
-
- u->n_bytes += l;
-}
-
-static pa_usec_t get_latency(pa_sink *s) {
- struct userdata *u = s->userdata;
- pa_usec_t a, b;
- struct timeval now;
+ int quit = 0;
+ struct pollfd pollfd;
+ int running = 1;
+
+ pa_assert(u);
+
+ pa_log_debug("Thread starting up");
+
+ memset(&pollfd, 0, sizeof(pollfd));
+ pollfd.fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP);
+ pollfd.events = POLLIN;
+
+ pa_gettimeofday(u->timestamp);
+
+ for (;;) {
+ int code;
+ void *data, *object;
+ int r, timeout;
+ struct timeval now;
+
+ /* Check whether there is a message for us to process */
+ if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) {
+
+
+ /* Now process these messages our own way */
+ if (!object) {
+
+ switch (code) {
+ case PA_MESSAGE_SHUTDOWN:
+ goto finish;
+
+ default:
+ pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
+
+ }
+
+ } else if (object == u->sink) {
+
+ switch (code) {
+ case PA_SINK_MESSAGE_STOP:
+ pa_assert(running);
+ running = 0;
+ break;
+
+ case PA_SINK_MESSAGE_START:
+ pa_assert(!running);
+ running = 1;
+
+ pa_gettimeofday(u->timestamp);
+ break;
+
+ case PA_SINK_MESSAGE_GET_LATENCY:
+
+ if (pa_timeval_cmp(&u->timestamp, &now) > 0)
+ *((pa_usec_t*) data) = 0;
+ else
+ *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
+ break;
+
+ /* ... */
+
+ default:
+ pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
+ }
+ }
+
+ pa_asyncmsgq_done(u->sink->asyncmsgq);
+ continue;
+ }
+
+ /* Render some data and drop it immediately */
+
+ if (running) {
+ pa_gettimeofday(&now);
+
+ if (pa_timeval_cmp(u->timestamp, &now) <= 0) {
+ pa_memchunk chunk;
+ size_t l;
+
+ if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) {
+ l = chunk.length;
+ pa_memblock_unref(chunk.memblock);
+ } else
+ l = u->block_size;
+
+ pa_timeval_add(&u->timestamp, pa_bytes_to_usec(l, &u->sink->sample_spec));
+ continue;
+ }
+
+ timeout = pa_timeval_diff(&u->timestamp, &now)/1000;
+
+ if (timeout < 1)
+ timeout = 1;
+ } else
+ timeout = -1;
+
+ /* Hmm, nothing to do. Let's sleep */
+
+ if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0)
+ continue;
+
+ r = poll(&pollfd, 1, timeout);
+ pa_asyncmsgq_after_poll(u->sink->asyncmsgq);
+
+ if (r < 0) {
+ if (errno == EINTR)
+ continue;
+
+ pa_log("poll() failed: %s", pa_cstrerror(errno));
+ goto fail;
+ }
+
+ pa_assert(r == 0 || pollfd.revents == POLLIN);
+ }
- a = pa_timeval_diff(pa_gettimeofday(&now), &u->start_time);
- b = pa_bytes_to_usec(u->n_bytes, &s->sample_spec);
+fail:
+ /* We have to continue processing messages until we receive the
+ * SHUTDOWN message */
+ pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), NULL, pa_module_unref);
+ pa_asyncmsgq_wait_for(PA_MESSAGE_SHUTDOWN);
- return b > a ? b - a : 0;
+finish:
+ pa_log_debug("Thread shutting down");
}
int pa__init(pa_core *c, pa_module*m) {
pa_channel_map map;
pa_modargs *ma = NULL;
- assert(c);
- assert(m);
+ pa_assert(c);
+ pa_assert(m);
if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
- pa_log("failed to parse module arguments.");
+ pa_log("Failed to parse module arguments.");
goto fail;
}
ss = c->default_sample_spec;
if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
- pa_log("invalid sample format specification or channel map.");
+ pa_log("Invalid sample format specification or channel map");
goto fail;
}
m->userdata = u;
if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
- pa_log("failed to create sink.");
+ pa_log("Failed to create sink.");
goto fail;
}
- u->sink->get_latency = get_latency;
u->sink->userdata = u;
pa_sink_set_owner(u->sink, m);
pa_sink_set_description(u->sink, pa_modargs_get_value(ma, "description", "NULL sink"));
- u->n_bytes = 0;
- pa_gettimeofday(&u->start_time);
-
- u->time_event = c->mainloop->time_new(c->mainloop, &u->start_time, time_callback, u);
-
- u->block_size = pa_bytes_per_second(&ss) / 10;
+ u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */
+
+ if (u->block_size <= 0)
+ u->block_size = pa_frame_size(&ss);
+ if (!(u->thread = pa_thread_new(thread_func, u))) {
+ pa_log("Failed to create thread.");
+ goto fail;
+ }
+
pa_modargs_free(ma);
return 0;
void pa__done(pa_core *c, pa_module*m) {
struct userdata *u;
- assert(c && m);
+
+ pa_assert(c);
+ pa_assert(m);
if (!(u = m->userdata))
return;
pa_sink_disconnect(u->sink);
- pa_sink_unref(u->sink);
- u->core->mainloop->time_free(u->time_event);
+ if (u->thread) {
+ pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL);
+ pa_thread_free(u->thread);
+ }
+
+ pa_sink_unref(u->sink);
pa_xfree(u);
}