]> code.delx.au - pulseaudio/commitdiff
echo-cancel: Plug in WebRTC drift compensation
authorArun Raghavan <arun.raghavan@collabora.co.uk>
Fri, 7 Oct 2011 10:58:11 +0000 (16:28 +0530)
committerArun Raghavan <arun.raghavan@collabora.co.uk>
Tue, 1 Nov 2011 12:50:32 +0000 (18:20 +0530)
This adds the ability for echo cancellers to provide their own drift
compensation, and hooks in the appropriate bits to implement this in the
WebRTC canceller.

We do this by introducing an alternative model for the canceller. So
far, the core engine just provided a run() method which was given
blocksize-sized chunks of playback and record samples. The new model has
the engine provide play() and record() methods that can (in theory) be
called by the playback and capture threads. The latter would actually do
the processing required.

In addition to this a set_drift() method may be provided by the
implementation. PA will provide periodic samples of the drift to the
engine. These values need to be aggregated and processed over some time,
since the point values vary quite a bit (but generally fit a linear
regression reasonably accurately). At some point of time, we might move
the actual drift calculation into PA and change the semantics of this
function.

NOTE: This needs further testing before being deemed ready for wider use.

src/modules/echo-cancel/echo-cancel.h
src/modules/echo-cancel/module-echo-cancel.c
src/modules/echo-cancel/webrtc.cc

index 19e13505a43bd41a2a15c20648f758b31c5ca874..799631b0bb42fe50f728555e04d0bb983a732995 100644 (file)
@@ -60,11 +60,16 @@ struct pa_echo_canceller_params {
 #endif
         /* each canceller-specific structure goes here */
     } priv;
+
+    /* Set this if canceller can do drift compensation. Also see set_drift()
+     * below */
+    pa_bool_t drift_compensation;
 };
 
 typedef struct pa_echo_canceller pa_echo_canceller;
 
 struct pa_echo_canceller {
+    /* Initialise canceller engine. */
     pa_bool_t   (*init)                 (pa_core *c,
                                          pa_echo_canceller *ec,
                                          pa_sample_spec *source_ss,
@@ -73,9 +78,36 @@ struct pa_echo_canceller {
                                          pa_channel_map *sink_map,
                                          uint32_t *blocksize,
                                          const char *args);
+
+    /* You should have only one of play()+record() or run() set. The first
+     * works under the assumption that you'll handle buffering and matching up
+     * samples yourself. If you set run(), module-echo-cancel will handle
+     * synchronising the playback and record streams. */
+
+    /* Feed the engine 'blocksize' playback bytes.. */
+    void        (*play)                 (pa_echo_canceller *ec, const uint8_t *play);
+    /* Feed the engine 'blocksize' record bytes. blocksize processed bytes are
+     * returned in out. */
+    void        (*record)               (pa_echo_canceller *ec, const uint8_t *rec, uint8_t *out);
+    /* Feed the engine blocksize playback and record streams, with a reasonable
+     * effort at keeping the two in sync. blocksize processed bytes are
+     * returned in out. */
     void        (*run)                  (pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out);
+
+    /* Optional callback to set the drift, expressed as the ratio of the
+     * difference in number of playback and capture samples to the number of
+     * capture samples, for some instant of time. This is used only if the
+     * canceller signals that it supports drift compensation, and is called
+     * before record(). The actual implementation needs to derive drift based
+     * on point samples -- the individual values are not accurate enough to use
+     * as-is. */
+    /* NOTE: the semantics of this function might change in the future. */
+    void        (*set_drift)            (pa_echo_canceller *ec, float drift);
+
+    /* Free up resources. */
     void        (*done)                 (pa_echo_canceller *ec);
 
+    /* Structure with common and engine-specific canceller parameters. */
     pa_echo_canceller_params params;
 };
 
@@ -102,6 +134,9 @@ pa_bool_t pa_webrtc_ec_init(pa_core *c, pa_echo_canceller *ec,
                             pa_sample_spec *source_ss, pa_channel_map *source_map,
                             pa_sample_spec *sink_ss, pa_channel_map *sink_map,
                             uint32_t *blocksize, const char *args);
+void pa_webrtc_ec_play(pa_echo_canceller *ec, const uint8_t *play);
+void pa_webrtc_ec_record(pa_echo_canceller *ec, const uint8_t *rec, uint8_t *out);
+void pa_webrtc_ec_set_drift(pa_echo_canceller *ec, float drift);
 void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out);
 void pa_webrtc_ec_done(pa_echo_canceller *ec);
 PA_C_DECL_END
index 7360b270d7609f51181c6d0092b77dfe1f7844fe..05d3bd4f6ee31a181f4fb19ec6eb58b4306eb27f 100644 (file)
@@ -31,6 +31,7 @@
 #endif
 
 #include <stdio.h>
+#include <math.h>
 
 #include "echo-cancel.h"
 
@@ -107,6 +108,9 @@ static const pa_echo_canceller ec_table[] = {
     {
         /* WebRTC's audio processing engine */
         .init                   = pa_webrtc_ec_init,
+        .play                   = pa_webrtc_ec_play,
+        .record                 = pa_webrtc_ec_record,
+        .set_drift              = pa_webrtc_ec_set_drift,
         .run                    = pa_webrtc_ec_run,
         .done                   = pa_webrtc_ec_done,
     },
@@ -200,6 +204,10 @@ struct userdata {
     int64_t recv_counter;
     size_t sink_skip;
 
+    /* Bytes left over from previous iteration */
+    size_t sink_rem;
+    size_t source_rem;
+
     pa_atomic_t request_resync;
 
     pa_time_event *time_event;
@@ -650,11 +658,157 @@ static void do_resync(struct userdata *u) {
     apply_diff_time(u, diff_time);
 }
 
+/* 1. Calculate drift at this point, pass to canceller
+ * 2. Push out playback samples in blocksize chunks
+ * 3. Push out capture samples in blocksize chunks
+ * 4. ???
+ * 5. Profit
+ */
+static void do_push_drift_comp(struct userdata *u) {
+    size_t rlen, plen;
+    pa_memchunk rchunk, pchunk, cchunk;
+    uint8_t *rdata, *pdata, *cdata;
+    float drift;
+
+    rlen = pa_memblockq_get_length(u->source_memblockq);
+    plen = pa_memblockq_get_length(u->sink_memblockq);
+
+    /* Estimate snapshot drift as follows:
+     *   pd: amount of data consumed since last time
+     *   rd: amount of data consumed since last time
+     *
+     *   drift = (pd - rd) / rd;
+     *
+     * We calculate pd and rd as the memblockq length less the number of
+     * samples left from the last iteration (to avoid double counting
+     * those remainder samples.
+     */
+    drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
+    u->sink_rem = plen % u->blocksize;
+    u->source_rem = rlen % u->blocksize;
+
+    /* Now let the canceller work its drift compensation magic */
+    u->ec->set_drift(u->ec, drift);
+
+    /* Send in the playback samples first */
+    while (plen >= u->blocksize) {
+        pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
+        pdata = pa_memblock_acquire(pchunk.memblock);
+        pdata += pchunk.index;
+
+        u->ec->play(u->ec, pdata);
+
+        pa_memblock_release(pchunk.memblock);
+        pa_memblockq_drop(u->sink_memblockq, u->blocksize);
+        pa_memblock_unref(pchunk.memblock);
+
+        plen -= u->blocksize;
+    }
+
+    /* And now the capture samples */
+    while (rlen >= u->blocksize) {
+        pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
+
+        rdata = pa_memblock_acquire(rchunk.memblock);
+        rdata += rchunk.index;
+
+        cchunk.index = 0;
+        cchunk.length = u->blocksize;
+        cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
+        cdata = pa_memblock_acquire(cchunk.memblock);
+
+        u->ec->record(u->ec, rdata, cdata);
+
+        pa_memblock_release(cchunk.memblock);
+        pa_memblock_release(rchunk.memblock);
+
+        pa_memblock_unref(rchunk.memblock);
+
+        pa_source_post(u->source, &cchunk);
+        pa_memblock_unref(cchunk.memblock);
+
+        pa_memblockq_drop(u->source_memblockq, u->blocksize);
+        rlen -= u->blocksize;
+    }
+}
+
+/* This one's simpler than the drift compensation case -- we just iterate over
+ * the capture buffer, and pass the canceller blocksize bytes of playback and
+ * capture data. */
+static void do_push(struct userdata *u) {
+    size_t rlen, plen;
+    pa_memchunk rchunk, pchunk, cchunk;
+    uint8_t *rdata, *pdata, *cdata;
+    int unused;
+
+    rlen = pa_memblockq_get_length(u->source_memblockq);
+    plen = pa_memblockq_get_length(u->sink_memblockq);
+
+    while (rlen >= u->blocksize) {
+        /* take fixed block from recorded samples */
+        pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
+
+        if (plen > u->blocksize) {
+            if (plen > u->blocksize) {
+                /* take fixed block from played samples */
+                pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
+
+                rdata = pa_memblock_acquire(rchunk.memblock);
+                rdata += rchunk.index;
+                pdata = pa_memblock_acquire(pchunk.memblock);
+                pdata += pchunk.index;
+
+                cchunk.index = 0;
+                cchunk.length = u->blocksize;
+                cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
+                cdata = pa_memblock_acquire(cchunk.memblock);
+
+                if (u->save_aec) {
+                    if (u->captured_file)
+                        unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
+                    if (u->played_file)
+                        unused = fwrite(pdata, 1, u->blocksize, u->played_file);
+                }
+
+                /* perform echo cancellation */
+                u->ec->run(u->ec, rdata, pdata, cdata);
+
+                if (u->save_aec) {
+                    if (u->canceled_file)
+                        unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
+                }
+
+                pa_memblock_release(cchunk.memblock);
+                pa_memblock_release(pchunk.memblock);
+                pa_memblock_release(rchunk.memblock);
+
+                /* drop consumed sink samples */
+                pa_memblockq_drop(u->sink_memblockq, u->blocksize);
+                pa_memblock_unref(pchunk.memblock);
+
+                pa_memblock_unref(rchunk.memblock);
+                /* the filtered samples now become the samples from our
+                 * source */
+                rchunk = cchunk;
+
+                plen -= u->blocksize;
+            }
+        }
+
+        /* forward the (echo-canceled) data to the virtual source */
+        pa_source_post(u->source, &rchunk);
+        pa_memblock_unref(rchunk.memblock);
+
+        pa_memblockq_drop(u->source_memblockq, u->blocksize);
+        rlen -= u->blocksize;
+    }
+}
+
 /* Called from input thread context */
 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
     struct userdata *u;
     size_t rlen, plen, to_skip;
-    pa_memchunk rchunk, pchunk;
+    pa_memchunk rchunk;
 
     pa_source_output_assert_ref(o);
     pa_source_output_assert_io_context(o);
@@ -727,68 +881,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
         u->sink_skip -= to_skip;
     }
 
-    while (rlen >= u->blocksize) {
-        /* take fixed block from recorded samples */
-        pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
-
-        if (plen > u->blocksize) {
-            uint8_t *rdata, *pdata, *cdata;
-            pa_memchunk cchunk;
-            int unused;
-
-            if (plen > u->blocksize) {
-                /* take fixed block from played samples */
-                pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
-
-                rdata = pa_memblock_acquire(rchunk.memblock);
-                rdata += rchunk.index;
-                pdata = pa_memblock_acquire(pchunk.memblock);
-                pdata += pchunk.index;
-
-                cchunk.index = 0;
-                cchunk.length = u->blocksize;
-                cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
-                cdata = pa_memblock_acquire(cchunk.memblock);
-
-                if (u->save_aec) {
-                    if (u->captured_file)
-                        unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
-                    if (u->played_file)
-                        unused = fwrite(pdata, 1, u->blocksize, u->played_file);
-                }
-
-                /* perform echo cancellation */
-                u->ec->run(u->ec, rdata, pdata, cdata);
-
-                if (u->save_aec) {
-                    if (u->canceled_file)
-                        unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
-                }
-
-                pa_memblock_release(cchunk.memblock);
-                pa_memblock_release(pchunk.memblock);
-                pa_memblock_release(rchunk.memblock);
-
-                /* drop consumed sink samples */
-                pa_memblockq_drop(u->sink_memblockq, u->blocksize);
-                pa_memblock_unref(pchunk.memblock);
-
-                pa_memblock_unref(rchunk.memblock);
-                /* the filtered samples now become the samples from our
-                 * source */
-                rchunk = cchunk;
-
-                plen -= u->blocksize;
-            }
-        }
-
-        /* forward the (echo-canceled) data to the virtual source */
-        pa_source_post(u->source, &rchunk);
-        pa_memblock_unref(rchunk.memblock);
-
-        pa_memblockq_drop(u->source_memblockq, u->blocksize);
-        rlen -= u->blocksize;
-    }
+    /* process and push out samples */
+    if (u->ec->params.drift_compensation)
+        do_push_drift_comp(u);
+    else
+        do_push(u);
 }
 
 /* Called from I/O thread context */
@@ -1380,6 +1477,9 @@ static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *sourc
     }
 
     u->ec->init = ec_table[ec_method].init;
+    u->ec->play = ec_table[ec_method].play;
+    u->ec->record = ec_table[ec_method].record;
+    u->ec->set_drift = ec_table[ec_method].set_drift;
     u->ec->run = ec_table[ec_method].run;
     u->ec->done = ec_table[ec_method].done;
 
@@ -1499,6 +1599,9 @@ int pa__init(pa_module*m) {
         }
     }
 
+    if (u->ec->params.drift_compensation)
+        pa_assert(u->ec->set_drift);
+
     /* Create source */
     pa_source_new_data_init(&source_data);
     source_data.driver = __FILE__;
@@ -1688,8 +1791,14 @@ int pa__init(pa_module*m) {
         goto fail;
     }
 
-    if (u->adjust_time > 0)
+    if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
+    else if (u->ec->params.drift_compensation) {
+        pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
+        u->adjust_time = 0;
+        /* Perform resync just once to give the canceller a leg up */
+        pa_atomic_store(&u->request_resync, 1);
+    }
 
     if (u->save_aec) {
         pa_log("Creating AEC files in /tmp");
index c53e96303b96b66a72a63c6896513b68dc591430..f84555b490402a67160fb2aa0726390e33c5e604 100644 (file)
@@ -47,6 +47,7 @@ PA_C_DECL_END
 #define DEFAULT_MOBILE FALSE
 #define DEFAULT_ROUTING_MODE "speakerphone"
 #define DEFAULT_COMFORT_NOISE TRUE
+#define DEFAULT_DRIFT_COMPENSATION FALSE
 
 static const char* const valid_modargs[] = {
     "high_pass_filter",
@@ -56,6 +57,7 @@ static const char* const valid_modargs[] = {
     "mobile",
     "routing_mode",
     "comfort_noise",
+    "drift_compensation",
     NULL
 };
 
@@ -125,7 +127,18 @@ pa_bool_t pa_webrtc_ec_init(pa_core *c, pa_echo_canceller *ec,
         goto fail;
     }
 
+    ec->params.drift_compensation = DEFAULT_DRIFT_COMPENSATION;
+    if (pa_modargs_get_value_boolean(ma, "drift_compensation", &ec->params.drift_compensation) < 0) {
+        pa_log("Failed to parse drift_compensation value");
+        goto fail;
+    }
+
     if (mobile) {
+        if (ec->params.drift_compensation) {
+            pa_log("Can't use drift_compensation in mobile mode");
+            goto fail;
+        }
+
         if ((rm = routing_mode_from_string(pa_modargs_get_value(ma, "routing_mode", DEFAULT_ROUTING_MODE))) < 0) {
             pa_log("Failed to parse routing_mode value");
             goto fail;
@@ -160,7 +173,13 @@ pa_bool_t pa_webrtc_ec_init(pa_core *c, pa_echo_canceller *ec,
         apm->high_pass_filter()->Enable(true);
 
     if (!mobile) {
-        apm->echo_cancellation()->enable_drift_compensation(false);
+        if (ec->params.drift_compensation) {
+            apm->echo_cancellation()->set_device_sample_rate_hz(source_ss->rate);
+            apm->echo_cancellation()->enable_drift_compensation(true);
+        } else {
+            apm->echo_cancellation()->enable_drift_compensation(false);
+        }
+
         apm->echo_cancellation()->Enable(true);
     } else {
         apm->echo_control_mobile()->set_routing_mode(static_cast<webrtc::EchoControlMobile::RoutingMode>(rm));
@@ -204,9 +223,9 @@ fail:
     return FALSE;
 }
 
-void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out) {
+void pa_webrtc_ec_play(pa_echo_canceller *ec, const uint8_t *play) {
     webrtc::AudioProcessing *apm = (webrtc::AudioProcessing*)ec->params.priv.webrtc.apm;
-    webrtc::AudioFrame play_frame, out_frame;
+    webrtc::AudioFrame play_frame;
     const pa_sample_spec *ss = &ec->params.priv.webrtc.sample_spec;
 
     play_frame._audioChannel = ss->channels;
@@ -214,18 +233,37 @@ void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *
     play_frame._payloadDataLengthInSamples = ec->params.priv.webrtc.blocksize / pa_frame_size(ss);
     memcpy(play_frame._payloadData, play, ec->params.priv.webrtc.blocksize);
 
+    apm->AnalyzeReverseStream(&play_frame);
+}
+
+void pa_webrtc_ec_record(pa_echo_canceller *ec, const uint8_t *rec, uint8_t *out) {
+    webrtc::AudioProcessing *apm = (webrtc::AudioProcessing*)ec->params.priv.webrtc.apm;
+    webrtc::AudioFrame out_frame;
+    const pa_sample_spec *ss = &ec->params.priv.webrtc.sample_spec;
+
     out_frame._audioChannel = ss->channels;
     out_frame._frequencyInHz = ss->rate;
     out_frame._payloadDataLengthInSamples = ec->params.priv.webrtc.blocksize / pa_frame_size(ss);
     memcpy(out_frame._payloadData, rec, ec->params.priv.webrtc.blocksize);
 
-    apm->AnalyzeReverseStream(&play_frame);
     apm->set_stream_delay_ms(0);
     apm->ProcessStream(&out_frame);
 
     memcpy(out, out_frame._payloadData, ec->params.priv.webrtc.blocksize);
 }
 
+void pa_webrtc_ec_set_drift(pa_echo_canceller *ec, float drift) {
+    webrtc::AudioProcessing *apm = (webrtc::AudioProcessing*)ec->params.priv.webrtc.apm;
+    const pa_sample_spec *ss = &ec->params.priv.webrtc.sample_spec;
+
+    apm->echo_cancellation()->set_stream_drift_samples(drift * ec->params.priv.webrtc.blocksize / pa_frame_size(ss));
+}
+
+void pa_webrtc_ec_run(pa_echo_canceller *ec, const uint8_t *rec, const uint8_t *play, uint8_t *out) {
+    pa_webrtc_ec_play(ec, play);
+    pa_webrtc_ec_record(ec, rec, out);
+}
+
 void pa_webrtc_ec_done(pa_echo_canceller *ec) {
     if (ec->params.priv.webrtc.apm) {
         webrtc::AudioProcessing::Destroy((webrtc::AudioProcessing*)ec->params.priv.webrtc.apm);