]> code.delx.au - pulseaudio/blobdiff - src/modules/module-null-sink.c
Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[pulseaudio] / src / modules / module-null-sink.c
index 54a8e89023cb7ff80cc1cb1e9046f7645160363d..8cf961b9d240081c86970d287a140c27ecb1422c 100644 (file)
@@ -28,7 +28,6 @@
 #include <stdlib.h>
 #include <sys/stat.h>
 #include <stdio.h>
-#include <assert.h>
 #include <errno.h>
 #include <string.h>
 #include <fcntl.h>
@@ -38,6 +37,7 @@
 #include <pulse/timeval.h>
 #include <pulse/xmalloc.h>
 
+#include <pulsecore/macro.h>
 #include <pulsecore/iochannel.h>
 #include <pulsecore/sink.h>
 #include <pulsecore/module.h>
@@ -64,11 +64,9 @@ struct userdata {
     pa_core *core;
     pa_module *module;
     pa_sink *sink;
-    pa_time_event *time_event;
+    pa_thread *thread;
     size_t block_size;
-
-    uint64_t n_bytes;
-    struct timeval start_time;
+    struct timeval timestamp;
 };
 
 static const char* const valid_modargs[] = {
@@ -81,35 +79,131 @@ static const char* const valid_modargs[] = {
     NULL
 };
 
-static void time_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
+static void thread_func(void *userdata) {
     struct userdata *u = userdata;
-    pa_memchunk chunk;
-    struct timeval ntv = *tv;
-    size_t l;
-
-    assert(u);
-
-    if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) {
-        l = chunk.length;
-        pa_memblock_unref(chunk.memblock);
-    } else
-        l = u->block_size;
-
-    pa_timeval_add(&ntv, pa_bytes_to_usec(l, &u->sink->sample_spec));
-    m->time_restart(e, &ntv);
-
-    u->n_bytes += l;
-}
-
-static pa_usec_t get_latency(pa_sink *s) {
-    struct userdata *u = s->userdata;
-    pa_usec_t a, b;
-    struct timeval now;
+    int quit = 0;
+    struct pollfd pollfd;
+    int running = 1;
+
+    pa_assert(u);
+
+    pa_log_debug("Thread starting up");
+
+    memset(&pollfd, 0, sizeof(pollfd));
+    pollfd.fd = pa_asyncmsgq_get_fd(u->sink->asyncmsgq, PA_ASYNCQ_POP);
+    pollfd.events = POLLIN;
+
+    pa_gettimeofday(u->timestamp);
+    
+    for (;;) {
+        int code;
+        void *data, *object;
+        int r, timeout;
+        struct timeval now;
+
+        /* Check whether there is a message for us to process */
+        if (pa_asyncmsgq_get(u->sink->asyncmsgq, &object, &code, &data) == 0) {
+
+
+            /* Now process these messages our own way */
+            if (!object) {
+
+                switch (code) {
+                    case PA_MESSAGE_SHUTDOWN:
+                        goto finish;
+
+                    default:
+                        pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
+
+                }
+                
+            } else if (object == u->sink) {
+
+                switch (code) {
+                    case PA_SINK_MESSAGE_STOP:
+                        pa_assert(running);
+                        running = 0;
+                        break;
+                        
+                    case PA_SINK_MESSAGE_START:
+                        pa_assert(!running);
+                        running = 1;
+                        
+                        pa_gettimeofday(u->timestamp);
+                        break;
+                        
+                    case PA_SINK_MESSAGE_GET_LATENCY:
+                        
+                        if (pa_timeval_cmp(&u->timestamp, &now) > 0)
+                            *((pa_usec_t*) data) = 0;
+                        else
+                            *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
+                        break;
+                        
+                        /* ... */
+
+                    default:
+                        pa_sink_process_msg(u->sink->asyncmsgq, object, code, data);
+                }
+            }
+            
+            pa_asyncmsgq_done(u->sink->asyncmsgq);
+            continue;
+        }
+
+        /* Render some data and drop it immediately */
+
+        if (running) {
+            pa_gettimeofday(&now);
+            
+            if (pa_timeval_cmp(u->timestamp, &now) <= 0) {
+                pa_memchunk chunk;
+                size_t l;
+                
+                if (pa_sink_render(u->sink, u->block_size, &chunk) >= 0) {
+                    l = chunk.length;
+                    pa_memblock_unref(chunk.memblock);
+                } else
+                    l = u->block_size;
+                
+                pa_timeval_add(&u->timestamp, pa_bytes_to_usec(l, &u->sink->sample_spec));
+                continue;
+            }
+
+            timeout = pa_timeval_diff(&u->timestamp, &now)/1000;
+            
+            if (timeout < 1)
+                timeout = 1;
+        } else
+            timeout = -1;
+
+        /* Hmm, nothing to do. Let's sleep */
+        
+        if (pa_asyncmsgq_before_poll(u->sink->asyncmsgq) < 0)
+            continue;
+
+        r = poll(&pollfd, 1, timeout);
+        pa_asyncmsgq_after_poll(u->sink->asyncmsgq);
+
+        if (r < 0) {
+            if (errno == EINTR)
+                continue;
+
+            pa_log("poll() failed: %s", pa_cstrerror(errno));
+            goto fail;
+        }
+        
+        pa_assert(r == 0 || pollfd.revents == POLLIN);
+    }
 
-    a = pa_timeval_diff(pa_gettimeofday(&now), &u->start_time);
-    b = pa_bytes_to_usec(u->n_bytes, &s->sample_spec);
+fail:
+    /* We have to continue processing messages until we receive the
+     * SHUTDOWN message */
+    pa_asyncmsgq_post(u->core->asyncmsgq, u->core, PA_CORE_MESSAGE_UNLOAD_MODULE, pa_module_ref(u->module), NULL, pa_module_unref);
+    pa_asyncmsgq_wait_for(PA_MESSAGE_SHUTDOWN);
 
-    return b > a ? b - a : 0;
+finish:
+    pa_log_debug("Thread shutting down");
 }
 
 int pa__init(pa_core *c, pa_module*m) {
@@ -118,17 +212,17 @@ int pa__init(pa_core *c, pa_module*m) {
     pa_channel_map map;
     pa_modargs *ma = NULL;
 
-    assert(c);
-    assert(m);
+    pa_assert(c);
+    pa_assert(m);
 
     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
-        pa_log("failed to parse module arguments.");
+        pa_log("Failed to parse module arguments.");
         goto fail;
     }
 
     ss = c->default_sample_spec;
     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
-        pa_log("invalid sample format specification or channel map.");
+        pa_log("Invalid sample format specification or channel map");
         goto fail;
     }
 
@@ -138,22 +232,24 @@ int pa__init(pa_core *c, pa_module*m) {
     m->userdata = u;
 
     if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
-        pa_log("failed to create sink.");
+        pa_log("Failed to create sink.");
         goto fail;
     }
 
-    u->sink->get_latency = get_latency;
     u->sink->userdata = u;
     pa_sink_set_owner(u->sink, m);
     pa_sink_set_description(u->sink, pa_modargs_get_value(ma, "description", "NULL sink"));
 
-    u->n_bytes = 0;
-    pa_gettimeofday(&u->start_time);
-
-    u->time_event = c->mainloop->time_new(c->mainloop, &u->start_time, time_callback, u);
-
-    u->block_size = pa_bytes_per_second(&ss) / 10;
+    u->block_size = pa_bytes_per_second(&ss) / 20; /* 50 ms */
+    
+    if (u->block_size <= 0)
+        u->block_size = pa_frame_size(&ss);
 
+    if (!(u->thread = pa_thread_new(thread_func, u))) {
+        pa_log("Failed to create thread.");
+        goto fail;
+    }
+    
     pa_modargs_free(ma);
 
     return 0;
@@ -169,15 +265,21 @@ fail:
 
 void pa__done(pa_core *c, pa_module*m) {
     struct userdata *u;
-    assert(c && m);
+    
+    pa_assert(c);
+    pa_assert(m);
 
     if (!(u = m->userdata))
         return;
 
     pa_sink_disconnect(u->sink);
-    pa_sink_unref(u->sink);
 
-    u->core->mainloop->time_free(u->time_event);
+    if (u->thread) {
+        pa_asyncmsgq_send(u->sink->asyncmsgq, PA_SINK_MESSAGE_SHUTDOWN, NULL);
+        pa_thread_free(u->thread);
+    }
+    
+    pa_sink_unref(u->sink);
 
     pa_xfree(u);
 }