]> code.delx.au - pulseaudio/blob - src/modules/echo-cancel/module-echo-cancel.c
9f3e85c96ed08360cb20f327e16eb56a18d09e51
[pulseaudio] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6 Based on module-virtual-sink.c
7 module-virtual-source.c
8 module-loopback.c
9
10 Copyright 2010 Intel Corporation
11 Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13 PulseAudio is free software; you can redistribute it and/or modify
14 it under the terms of the GNU Lesser General Public License as published
15 by the Free Software Foundation; either version 2.1 of the License,
16 or (at your option) any later version.
17
18 PulseAudio is distributed in the hope that it will be useful, but
19 WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 General Public License for more details.
22
23 You should have received a copy of the GNU Lesser General Public License
24 along with PulseAudio; if not, write to the Free Software
25 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63 _("source_name=<name for the source> "
64 "source_properties=<properties for the source> "
65 "source_master=<name of source to filter> "
66 "sink_name=<name for the sink> "
67 "sink_properties=<properties for the sink> "
68 "sink_master=<name of sink to filter> "
69 "adjust_time=<how often to readjust rates in s> "
70 "adjust_threshold=<how much drift to readjust after in ms> "
71 "format=<sample format> "
72 "rate=<sample rate> "
73 "channels=<number of channels> "
74 "channel_map=<channel map> "
75 "aec_method=<implementation to use> "
76 "aec_args=<parameters for the AEC engine> "
77 "save_aec=<save AEC data in /tmp> "
78 "autoloaded=<set if this module is being loaded automatically> "
79 "use_volume_sharing=<yes or no> "
80 ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84 PA_ECHO_CANCELLER_INVALID = -1,
85 #ifdef HAVE_SPEEX
86 PA_ECHO_CANCELLER_SPEEX,
87 #endif
88 PA_ECHO_CANCELLER_ADRIAN,
89 #ifdef HAVE_WEBRTC
90 PA_ECHO_CANCELLER_WEBRTC,
91 #endif
92 } pa_echo_canceller_method_t;
93
94 #ifdef HAVE_WEBRTC
95 #define DEFAULT_ECHO_CANCELLER "webrtc"
96 #else
97 #define DEFAULT_ECHO_CANCELLER "speex"
98 #endif
99
100 static const pa_echo_canceller ec_table[] = {
101 #ifdef HAVE_SPEEX
102 {
103 /* Speex */
104 .init = pa_speex_ec_init,
105 .run = pa_speex_ec_run,
106 .done = pa_speex_ec_done,
107 },
108 #endif
109 {
110 /* Adrian Andre's NLMS implementation */
111 .init = pa_adrian_ec_init,
112 .run = pa_adrian_ec_run,
113 .done = pa_adrian_ec_done,
114 },
115 #ifdef HAVE_WEBRTC
116 {
117 /* WebRTC's audio processing engine */
118 .init = pa_webrtc_ec_init,
119 .play = pa_webrtc_ec_play,
120 .record = pa_webrtc_ec_record,
121 .set_drift = pa_webrtc_ec_set_drift,
122 .run = pa_webrtc_ec_run,
123 .done = pa_webrtc_ec_done,
124 },
125 #endif
126 };
127
128 #define DEFAULT_RATE 32000
129 #define DEFAULT_CHANNELS 1
130 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
131 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
132 #define DEFAULT_SAVE_AEC FALSE
133 #define DEFAULT_AUTOLOADED FALSE
134
135 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
136
137 /* Can only be used in main context */
138 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
139 (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
140
141 /* This module creates a new (virtual) source and sink.
142 *
143 * The data sent to the new sink is kept in a memblockq before being
144 * forwarded to the real sink_master.
145 *
146 * Data read from source_master is matched against the saved sink data and
147 * echo canceled data is then pushed onto the new source.
148 *
149 * Both source and sink masters have their own threads to push/pull data
150 * respectively. We however perform all our actions in the source IO thread.
151 * To do this we send all played samples to the source IO thread where they
152 * are then pushed into the memblockq.
153 *
154 * Alignment is performed in two steps:
155 *
156 * 1) when something happens that requires quick adjustment of the alignment of
157 * capture and playback samples, we perform a resync. This adjusts the
158 * position in the playback memblock to the requested sample. Quick
159 * adjustments include moving the playback samples before the capture
160 * samples (because else the echo canceler does not work) or when the
161 * playback pointer drifts too far away.
162 *
163 * 2) periodically check the difference between capture and playback. we use a
164 * low and high watermark for adjusting the alignment. playback should always
165 * be before capture and the difference should not be bigger than one frame
166 * size. We would ideally like to resample the sink_input but most driver
167 * don't give enough accuracy to be able to do that right now.
168 */
169
170 struct userdata;
171
172 struct pa_echo_canceller_msg {
173 pa_msgobject parent;
174 struct userdata *userdata;
175 };
176
177 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
178 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
179
180 struct snapshot {
181 pa_usec_t sink_now;
182 pa_usec_t sink_latency;
183 size_t sink_delay;
184 int64_t send_counter;
185
186 pa_usec_t source_now;
187 pa_usec_t source_latency;
188 size_t source_delay;
189 int64_t recv_counter;
190 size_t rlen;
191 size_t plen;
192 };
193
194 struct userdata {
195 pa_core *core;
196 pa_module *module;
197
198 pa_bool_t autoloaded;
199 pa_bool_t dead;
200 pa_bool_t save_aec;
201
202 pa_echo_canceller *ec;
203 uint32_t blocksize;
204
205 pa_bool_t need_realign;
206
207 /* to wakeup the source I/O thread */
208 pa_asyncmsgq *asyncmsgq;
209 pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
210
211 pa_source *source;
212 pa_bool_t source_auto_desc;
213 pa_source_output *source_output;
214 pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
215 size_t source_skip;
216
217 pa_sink *sink;
218 pa_bool_t sink_auto_desc;
219 pa_sink_input *sink_input;
220 pa_memblockq *sink_memblockq;
221 int64_t send_counter; /* updated in sink IO thread */
222 int64_t recv_counter;
223 size_t sink_skip;
224
225 /* Bytes left over from previous iteration */
226 size_t sink_rem;
227 size_t source_rem;
228
229 pa_atomic_t request_resync;
230
231 pa_time_event *time_event;
232 pa_usec_t adjust_time;
233 int adjust_threshold;
234
235 FILE *captured_file;
236 FILE *played_file;
237 FILE *canceled_file;
238 FILE *drift_file;
239
240 pa_bool_t use_volume_sharing;
241
242 struct {
243 pa_cvolume current_volume;
244 } thread_info;
245 };
246
247 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
248
249 static const char* const valid_modargs[] = {
250 "source_name",
251 "source_properties",
252 "source_master",
253 "sink_name",
254 "sink_properties",
255 "sink_master",
256 "adjust_time",
257 "adjust_threshold",
258 "format",
259 "rate",
260 "channels",
261 "channel_map",
262 "aec_method",
263 "aec_args",
264 "save_aec",
265 "autoloaded",
266 "use_volume_sharing",
267 NULL
268 };
269
270 enum {
271 SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
272 SOURCE_OUTPUT_MESSAGE_REWIND,
273 SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
274 SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
275 };
276
277 enum {
278 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
279 };
280
281 enum {
282 ECHO_CANCELLER_MESSAGE_SET_VOLUME,
283 };
284
285 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
286 int64_t buffer, diff_time, buffer_latency;
287
288 /* get the number of samples between capture and playback */
289 if (snapshot->plen > snapshot->rlen)
290 buffer = snapshot->plen - snapshot->rlen;
291 else
292 buffer = 0;
293
294 buffer += snapshot->source_delay + snapshot->sink_delay;
295
296 /* add the amount of samples not yet transferred to the source context */
297 if (snapshot->recv_counter <= snapshot->send_counter)
298 buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
299 else
300 buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
301
302 /* convert to time */
303 buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
304
305 /* capture and playback samples are perfectly aligned when diff_time is 0 */
306 diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
307 (snapshot->source_now - snapshot->source_latency);
308
309 pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
310 (long long) snapshot->sink_latency,
311 (long long) buffer_latency, (long long) snapshot->source_latency,
312 (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
313 (long long) (snapshot->send_counter - snapshot->recv_counter),
314 (long long) (snapshot->sink_now - snapshot->source_now));
315
316 return diff_time;
317 }
318
319 /* Called from main context */
320 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
321 struct userdata *u = userdata;
322 uint32_t old_rate, base_rate, new_rate;
323 int64_t diff_time;
324 /*size_t fs*/
325 struct snapshot latency_snapshot;
326
327 pa_assert(u);
328 pa_assert(a);
329 pa_assert(u->time_event == e);
330 pa_assert_ctl_context();
331
332 if (!IS_ACTIVE(u))
333 return;
334
335 /* update our snapshots */
336 pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
337 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
338
339 /* calculate drift between capture and playback */
340 diff_time = calc_diff(u, &latency_snapshot);
341
342 /*fs = pa_frame_size(&u->source_output->sample_spec);*/
343 old_rate = u->sink_input->sample_spec.rate;
344 base_rate = u->source_output->sample_spec.rate;
345
346 if (diff_time < 0) {
347 /* recording before playback, we need to adjust quickly. The echo
348 * canceler does not work in this case. */
349 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
350 NULL, diff_time, NULL, NULL);
351 /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
352 new_rate = base_rate;
353 }
354 else {
355 if (diff_time > u->adjust_threshold) {
356 /* diff too big, quickly adjust */
357 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
358 NULL, diff_time, NULL, NULL);
359 }
360
361 /* recording behind playback, we need to slowly adjust the rate to match */
362 /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
363
364 /* assume equal samplerates for now */
365 new_rate = base_rate;
366 }
367
368 /* make sure we don't make too big adjustments because that sounds horrible */
369 if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
370 new_rate = base_rate;
371
372 if (new_rate != old_rate) {
373 pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
374
375 pa_sink_input_set_rate(u->sink_input, new_rate);
376 }
377
378 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
379 }
380
381 /* Called from source I/O thread context */
382 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
383 struct userdata *u = PA_SOURCE(o)->userdata;
384
385 switch (code) {
386
387 case PA_SOURCE_MESSAGE_GET_LATENCY:
388
389 /* The source is _put() before the source output is, so let's
390 * make sure we don't access it in that time. Also, the
391 * source output is first shut down, the source second. */
392 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
393 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
394 *((pa_usec_t*) data) = 0;
395 return 0;
396 }
397
398 *((pa_usec_t*) data) =
399
400 /* Get the latency of the master source */
401 pa_source_get_latency_within_thread(u->source_output->source) +
402 /* Add the latency internal to our source output on top */
403 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
404 /* and the buffering we do on the source */
405 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
406
407 return 0;
408
409 case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
410 u->thread_info.current_volume = u->source->reference_volume;
411 break;
412 }
413
414 return pa_source_process_msg(o, code, data, offset, chunk);
415 }
416
417 /* Called from sink I/O thread context */
418 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
419 struct userdata *u = PA_SINK(o)->userdata;
420
421 switch (code) {
422
423 case PA_SINK_MESSAGE_GET_LATENCY:
424
425 /* The sink is _put() before the sink input is, so let's
426 * make sure we don't access it in that time. Also, the
427 * sink input is first shut down, the sink second. */
428 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
429 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
430 *((pa_usec_t*) data) = 0;
431 return 0;
432 }
433
434 *((pa_usec_t*) data) =
435
436 /* Get the latency of the master sink */
437 pa_sink_get_latency_within_thread(u->sink_input->sink) +
438
439 /* Add the latency internal to our sink input on top */
440 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
441
442 return 0;
443 }
444
445 return pa_sink_process_msg(o, code, data, offset, chunk);
446 }
447
448
449 /* Called from main context */
450 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
451 struct userdata *u;
452
453 pa_source_assert_ref(s);
454 pa_assert_se(u = s->userdata);
455
456 if (!PA_SOURCE_IS_LINKED(state) ||
457 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
458 return 0;
459
460 if (state == PA_SOURCE_RUNNING) {
461 /* restart timer when both sink and source are active */
462 if (IS_ACTIVE(u) && u->adjust_time)
463 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
464
465 pa_atomic_store(&u->request_resync, 1);
466 pa_source_output_cork(u->source_output, FALSE);
467 } else if (state == PA_SOURCE_SUSPENDED) {
468 pa_source_output_cork(u->source_output, TRUE);
469 }
470
471 return 0;
472 }
473
474 /* Called from main context */
475 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
476 struct userdata *u;
477
478 pa_sink_assert_ref(s);
479 pa_assert_se(u = s->userdata);
480
481 if (!PA_SINK_IS_LINKED(state) ||
482 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
483 return 0;
484
485 if (state == PA_SINK_RUNNING) {
486 /* restart timer when both sink and source are active */
487 if (IS_ACTIVE(u) && u->adjust_time)
488 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
489
490 pa_atomic_store(&u->request_resync, 1);
491 pa_sink_input_cork(u->sink_input, FALSE);
492 } else if (state == PA_SINK_SUSPENDED) {
493 pa_sink_input_cork(u->sink_input, TRUE);
494 }
495
496 return 0;
497 }
498
499 /* Called from I/O thread context */
500 static void source_update_requested_latency_cb(pa_source *s) {
501 struct userdata *u;
502
503 pa_source_assert_ref(s);
504 pa_assert_se(u = s->userdata);
505
506 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
507 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
508 return;
509
510 pa_log_debug("Source update requested latency");
511
512 /* Just hand this one over to the master source */
513 pa_source_output_set_requested_latency_within_thread(
514 u->source_output,
515 pa_source_get_requested_latency_within_thread(s));
516 }
517
518 /* Called from I/O thread context */
519 static void sink_update_requested_latency_cb(pa_sink *s) {
520 struct userdata *u;
521
522 pa_sink_assert_ref(s);
523 pa_assert_se(u = s->userdata);
524
525 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
526 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
527 return;
528
529 pa_log_debug("Sink update requested latency");
530
531 /* Just hand this one over to the master sink */
532 pa_sink_input_set_requested_latency_within_thread(
533 u->sink_input,
534 pa_sink_get_requested_latency_within_thread(s));
535 }
536
537 /* Called from I/O thread context */
538 static void sink_request_rewind_cb(pa_sink *s) {
539 struct userdata *u;
540
541 pa_sink_assert_ref(s);
542 pa_assert_se(u = s->userdata);
543
544 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
545 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
546 return;
547
548 pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
549
550 /* Just hand this one over to the master sink */
551 pa_sink_input_request_rewind(u->sink_input,
552 s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
553 }
554
555 /* Called from main context */
556 static void source_set_volume_cb(pa_source *s) {
557 struct userdata *u;
558
559 pa_source_assert_ref(s);
560 pa_assert_se(u = s->userdata);
561
562 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
563 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
564 return;
565
566 pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
567 }
568
569 /* Called from main context */
570 static void sink_set_volume_cb(pa_sink *s) {
571 struct userdata *u;
572
573 pa_sink_assert_ref(s);
574 pa_assert_se(u = s->userdata);
575
576 if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
577 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
578 return;
579
580 pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
581 }
582
583 static void source_get_volume_cb(pa_source *s) {
584 struct userdata *u;
585 pa_cvolume v;
586
587 pa_source_assert_ref(s);
588 pa_assert_se(u = s->userdata);
589
590 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
591 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
592 return;
593
594 pa_source_output_get_volume(u->source_output, &v, TRUE);
595
596 if (pa_cvolume_equal(&s->real_volume, &v))
597 /* no change */
598 return;
599
600 s->real_volume = v;
601 pa_source_set_soft_volume(s, NULL);
602 }
603
604 /* Called from main context */
605 static void source_set_mute_cb(pa_source *s) {
606 struct userdata *u;
607
608 pa_source_assert_ref(s);
609 pa_assert_se(u = s->userdata);
610
611 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
612 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
613 return;
614
615 pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
616 }
617
618 /* Called from main context */
619 static void sink_set_mute_cb(pa_sink *s) {
620 struct userdata *u;
621
622 pa_sink_assert_ref(s);
623 pa_assert_se(u = s->userdata);
624
625 if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
626 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
627 return;
628
629 pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
630 }
631
632 /* Called from main context */
633 static void source_get_mute_cb(pa_source *s) {
634 struct userdata *u;
635
636 pa_source_assert_ref(s);
637 pa_assert_se(u = s->userdata);
638
639 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
640 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
641 return;
642
643 pa_source_output_get_mute(u->source_output);
644 }
645
646 /* must be called from the input thread context */
647 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
648 int64_t diff;
649
650 if (diff_time < 0) {
651 diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
652
653 if (diff > 0) {
654 /* add some extra safety samples to compensate for jitter in the
655 * timings */
656 diff += 10 * pa_frame_size (&u->source_output->sample_spec);
657
658 pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
659
660 u->sink_skip = diff;
661 u->source_skip = 0;
662 }
663 } else if (diff_time > 0) {
664 diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
665
666 if (diff > 0) {
667 pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
668
669 u->source_skip = diff;
670 u->sink_skip = 0;
671 }
672 }
673 }
674
675 /* must be called from the input thread */
676 static void do_resync(struct userdata *u) {
677 int64_t diff_time;
678 struct snapshot latency_snapshot;
679
680 pa_log("Doing resync");
681
682 /* update our snapshot */
683 source_output_snapshot_within_thread(u, &latency_snapshot);
684 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
685
686 /* calculate drift between capture and playback */
687 diff_time = calc_diff(u, &latency_snapshot);
688
689 /* and adjust for the drift */
690 apply_diff_time(u, diff_time);
691 }
692
693 /* 1. Calculate drift at this point, pass to canceller
694 * 2. Push out playback samples in blocksize chunks
695 * 3. Push out capture samples in blocksize chunks
696 * 4. ???
697 * 5. Profit
698 */
699 static void do_push_drift_comp(struct userdata *u) {
700 size_t rlen, plen;
701 pa_memchunk rchunk, pchunk, cchunk;
702 uint8_t *rdata, *pdata, *cdata;
703 float drift;
704 int unused PA_GCC_UNUSED;
705
706 rlen = pa_memblockq_get_length(u->source_memblockq);
707 plen = pa_memblockq_get_length(u->sink_memblockq);
708
709 /* Estimate snapshot drift as follows:
710 * pd: amount of data consumed since last time
711 * rd: amount of data consumed since last time
712 *
713 * drift = (pd - rd) / rd;
714 *
715 * We calculate pd and rd as the memblockq length less the number of
716 * samples left from the last iteration (to avoid double counting
717 * those remainder samples.
718 */
719 drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
720 u->sink_rem = plen % u->blocksize;
721 u->source_rem = rlen % u->blocksize;
722
723 /* Now let the canceller work its drift compensation magic */
724 u->ec->set_drift(u->ec, drift);
725
726 if (u->save_aec) {
727 if (u->drift_file)
728 fprintf(u->drift_file, "d %a\n", drift);
729 }
730
731 /* Send in the playback samples first */
732 while (plen >= u->blocksize) {
733 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
734 pdata = pa_memblock_acquire(pchunk.memblock);
735 pdata += pchunk.index;
736
737 u->ec->play(u->ec, pdata);
738
739 if (u->save_aec) {
740 if (u->drift_file)
741 fprintf(u->drift_file, "p %d\n", u->blocksize);
742 if (u->played_file)
743 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
744 }
745
746 pa_memblock_release(pchunk.memblock);
747 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
748 pa_memblock_unref(pchunk.memblock);
749
750 plen -= u->blocksize;
751 }
752
753 /* And now the capture samples */
754 while (rlen >= u->blocksize) {
755 pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
756
757 rdata = pa_memblock_acquire(rchunk.memblock);
758 rdata += rchunk.index;
759
760 cchunk.index = 0;
761 cchunk.length = u->blocksize;
762 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
763 cdata = pa_memblock_acquire(cchunk.memblock);
764
765 u->ec->record(u->ec, rdata, cdata);
766
767 if (u->save_aec) {
768 if (u->drift_file)
769 fprintf(u->drift_file, "c %d\n", u->blocksize);
770 if (u->captured_file)
771 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
772 if (u->canceled_file)
773 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
774 }
775
776 pa_memblock_release(cchunk.memblock);
777 pa_memblock_release(rchunk.memblock);
778
779 pa_memblock_unref(rchunk.memblock);
780
781 pa_source_post(u->source, &cchunk);
782 pa_memblock_unref(cchunk.memblock);
783
784 pa_memblockq_drop(u->source_memblockq, u->blocksize);
785 rlen -= u->blocksize;
786 }
787 }
788
789 /* This one's simpler than the drift compensation case -- we just iterate over
790 * the capture buffer, and pass the canceller blocksize bytes of playback and
791 * capture data. */
792 static void do_push(struct userdata *u) {
793 size_t rlen, plen;
794 pa_memchunk rchunk, pchunk, cchunk;
795 uint8_t *rdata, *pdata, *cdata;
796 int unused PA_GCC_UNUSED;
797
798 rlen = pa_memblockq_get_length(u->source_memblockq);
799 plen = pa_memblockq_get_length(u->sink_memblockq);
800
801 while (rlen >= u->blocksize) {
802 /* take fixed block from recorded samples */
803 pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
804
805 if (plen > u->blocksize) {
806 if (plen > u->blocksize) {
807 /* take fixed block from played samples */
808 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
809
810 rdata = pa_memblock_acquire(rchunk.memblock);
811 rdata += rchunk.index;
812 pdata = pa_memblock_acquire(pchunk.memblock);
813 pdata += pchunk.index;
814
815 cchunk.index = 0;
816 cchunk.length = u->blocksize;
817 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
818 cdata = pa_memblock_acquire(cchunk.memblock);
819
820 if (u->save_aec) {
821 if (u->captured_file)
822 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
823 if (u->played_file)
824 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
825 }
826
827 /* perform echo cancellation */
828 u->ec->run(u->ec, rdata, pdata, cdata);
829
830 if (u->save_aec) {
831 if (u->canceled_file)
832 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
833 }
834
835 pa_memblock_release(cchunk.memblock);
836 pa_memblock_release(pchunk.memblock);
837 pa_memblock_release(rchunk.memblock);
838
839 /* drop consumed sink samples */
840 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
841 pa_memblock_unref(pchunk.memblock);
842
843 pa_memblock_unref(rchunk.memblock);
844 /* the filtered samples now become the samples from our
845 * source */
846 rchunk = cchunk;
847
848 plen -= u->blocksize;
849 }
850 }
851
852 /* forward the (echo-canceled) data to the virtual source */
853 pa_source_post(u->source, &rchunk);
854 pa_memblock_unref(rchunk.memblock);
855
856 pa_memblockq_drop(u->source_memblockq, u->blocksize);
857 rlen -= u->blocksize;
858 }
859 }
860
861 /* Called from input thread context */
862 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
863 struct userdata *u;
864 size_t rlen, plen, to_skip;
865 pa_memchunk rchunk;
866
867 pa_source_output_assert_ref(o);
868 pa_source_output_assert_io_context(o);
869 pa_assert_se(u = o->userdata);
870
871 if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
872 pa_log("push when no link?");
873 return;
874 }
875
876 if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
877 u->sink->thread_info.state != PA_SINK_RUNNING)) {
878 pa_source_post(u->source, chunk);
879 return;
880 }
881
882 /* handle queued messages, do any message sending of our own */
883 while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
884 ;
885
886 pa_memblockq_push_align(u->source_memblockq, chunk);
887
888 rlen = pa_memblockq_get_length(u->source_memblockq);
889 plen = pa_memblockq_get_length(u->sink_memblockq);
890
891 /* Let's not do anything else till we have enough data to process */
892 if (rlen < u->blocksize)
893 return;
894
895 /* See if we need to drop samples in order to sync */
896 if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
897 do_resync(u);
898 }
899
900 /* Okay, skip cancellation for skipped source samples if needed. */
901 if (PA_UNLIKELY(u->source_skip)) {
902 /* The slightly tricky bit here is that we drop all but modulo
903 * blocksize bytes and then adjust for that last bit on the sink side.
904 * We do this because the source data is coming at a fixed rate, which
905 * means the only way to try to catch up is drop sink samples and let
906 * the canceller cope up with this. */
907 to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
908 to_skip -= to_skip % u->blocksize;
909
910 if (to_skip) {
911 pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
912 pa_source_post(u->source, &rchunk);
913
914 pa_memblock_unref(rchunk.memblock);
915 pa_memblockq_drop(u->source_memblockq, u->blocksize);
916
917 rlen -= to_skip;
918 u->source_skip -= to_skip;
919 }
920
921 if (rlen && u->source_skip % u->blocksize) {
922 u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
923 u->source_skip -= (u->source_skip % u->blocksize);
924 }
925 }
926
927 /* And for the sink, these samples have been played back already, so we can
928 * just drop them and get on with it. */
929 if (PA_UNLIKELY(u->sink_skip)) {
930 to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
931
932 pa_memblockq_drop(u->sink_memblockq, to_skip);
933
934 plen -= to_skip;
935 u->sink_skip -= to_skip;
936 }
937
938 /* process and push out samples */
939 if (u->ec->params.drift_compensation)
940 do_push_drift_comp(u);
941 else
942 do_push(u);
943 }
944
945 /* Called from I/O thread context */
946 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
947 struct userdata *u;
948
949 pa_sink_input_assert_ref(i);
950 pa_assert(chunk);
951 pa_assert_se(u = i->userdata);
952
953 if (u->sink->thread_info.rewind_requested)
954 pa_sink_process_rewind(u->sink, 0);
955
956 pa_sink_render_full(u->sink, nbytes, chunk);
957
958 if (i->thread_info.underrun_for > 0) {
959 pa_log_debug("Handling end of underrun.");
960 pa_atomic_store(&u->request_resync, 1);
961 }
962
963 /* let source thread handle the chunk. pass the sample count as well so that
964 * the source IO thread can update the right variables. */
965 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
966 NULL, 0, chunk, NULL);
967 u->send_counter += chunk->length;
968
969 return 0;
970 }
971
972 /* Called from input thread context */
973 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
974 struct userdata *u;
975
976 pa_source_output_assert_ref(o);
977 pa_source_output_assert_io_context(o);
978 pa_assert_se(u = o->userdata);
979
980 pa_source_process_rewind(u->source, nbytes);
981
982 /* go back on read side, we need to use older sink data for this */
983 pa_memblockq_rewind(u->sink_memblockq, nbytes);
984
985 /* manipulate write index */
986 pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
987
988 pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
989 (long long) pa_memblockq_get_length (u->source_memblockq));
990 }
991
992 /* Called from I/O thread context */
993 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
994 struct userdata *u;
995
996 pa_sink_input_assert_ref(i);
997 pa_assert_se(u = i->userdata);
998
999 pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1000
1001 pa_sink_process_rewind(u->sink, nbytes);
1002
1003 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1004 u->send_counter -= nbytes;
1005 }
1006
1007 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1008 size_t delay, rlen, plen;
1009 pa_usec_t now, latency;
1010
1011 now = pa_rtclock_now();
1012 latency = pa_source_get_latency_within_thread(u->source_output->source);
1013 delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1014
1015 delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1016 rlen = pa_memblockq_get_length(u->source_memblockq);
1017 plen = pa_memblockq_get_length(u->sink_memblockq);
1018
1019 snapshot->source_now = now;
1020 snapshot->source_latency = latency;
1021 snapshot->source_delay = delay;
1022 snapshot->recv_counter = u->recv_counter;
1023 snapshot->rlen = rlen + u->sink_skip;
1024 snapshot->plen = plen + u->source_skip;
1025 }
1026
1027
1028 /* Called from output thread context */
1029 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1030 struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1031
1032 switch (code) {
1033
1034 case SOURCE_OUTPUT_MESSAGE_POST:
1035
1036 pa_source_output_assert_io_context(u->source_output);
1037
1038 if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1039 pa_memblockq_push_align(u->sink_memblockq, chunk);
1040 else
1041 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1042
1043 u->recv_counter += (int64_t) chunk->length;
1044
1045 return 0;
1046
1047 case SOURCE_OUTPUT_MESSAGE_REWIND:
1048 pa_source_output_assert_io_context(u->source_output);
1049
1050 /* manipulate write index, never go past what we have */
1051 if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1052 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1053 else
1054 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1055
1056 pa_log_debug("Sink rewind (%lld)", (long long) offset);
1057
1058 u->recv_counter -= offset;
1059
1060 return 0;
1061
1062 case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1063 struct snapshot *snapshot = (struct snapshot *) data;
1064
1065 source_output_snapshot_within_thread(u, snapshot);
1066 return 0;
1067 }
1068
1069 case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1070 apply_diff_time(u, offset);
1071 return 0;
1072
1073 }
1074
1075 return pa_source_output_process_msg(obj, code, data, offset, chunk);
1076 }
1077
1078 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1079 struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1080
1081 switch (code) {
1082
1083 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1084 size_t delay;
1085 pa_usec_t now, latency;
1086 struct snapshot *snapshot = (struct snapshot *) data;
1087
1088 pa_sink_input_assert_io_context(u->sink_input);
1089
1090 now = pa_rtclock_now();
1091 latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1092 delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1093
1094 delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1095
1096 snapshot->sink_now = now;
1097 snapshot->sink_latency = latency;
1098 snapshot->sink_delay = delay;
1099 snapshot->send_counter = u->send_counter;
1100 return 0;
1101 }
1102 }
1103
1104 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1105 }
1106
1107 /* Called from I/O thread context */
1108 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1109 struct userdata *u;
1110
1111 pa_sink_input_assert_ref(i);
1112 pa_assert_se(u = i->userdata);
1113
1114 pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1115
1116 pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1117 pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1118 }
1119
1120 /* Called from I/O thread context */
1121 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1122 struct userdata *u;
1123
1124 pa_source_output_assert_ref(o);
1125 pa_assert_se(u = o->userdata);
1126
1127 pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1128
1129 pa_source_set_max_rewind_within_thread(u->source, nbytes);
1130 }
1131
1132 /* Called from I/O thread context */
1133 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1134 struct userdata *u;
1135
1136 pa_sink_input_assert_ref(i);
1137 pa_assert_se(u = i->userdata);
1138
1139 pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1140
1141 pa_sink_set_max_request_within_thread(u->sink, nbytes);
1142 }
1143
1144 /* Called from I/O thread context */
1145 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1146 struct userdata *u;
1147 pa_usec_t latency;
1148
1149 pa_sink_input_assert_ref(i);
1150 pa_assert_se(u = i->userdata);
1151
1152 latency = pa_sink_get_requested_latency_within_thread(i->sink);
1153
1154 pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1155 }
1156
1157 /* Called from I/O thread context */
1158 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1159 struct userdata *u;
1160 pa_usec_t latency;
1161
1162 pa_source_output_assert_ref(o);
1163 pa_assert_se(u = o->userdata);
1164
1165 latency = pa_source_get_requested_latency_within_thread(o->source);
1166
1167 pa_log_debug("source output update requested latency %lld", (long long) latency);
1168 }
1169
1170 /* Called from I/O thread context */
1171 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1172 struct userdata *u;
1173
1174 pa_sink_input_assert_ref(i);
1175 pa_assert_se(u = i->userdata);
1176
1177 pa_log_debug("Sink input update latency range %lld %lld",
1178 (long long) i->sink->thread_info.min_latency,
1179 (long long) i->sink->thread_info.max_latency);
1180
1181 pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1182 }
1183
1184 /* Called from I/O thread context */
1185 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1186 struct userdata *u;
1187
1188 pa_source_output_assert_ref(o);
1189 pa_assert_se(u = o->userdata);
1190
1191 pa_log_debug("Source output update latency range %lld %lld",
1192 (long long) o->source->thread_info.min_latency,
1193 (long long) o->source->thread_info.max_latency);
1194
1195 pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1196 }
1197
1198 /* Called from I/O thread context */
1199 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1200 struct userdata *u;
1201
1202 pa_sink_input_assert_ref(i);
1203 pa_assert_se(u = i->userdata);
1204
1205 pa_log_debug("Sink input update fixed latency %lld",
1206 (long long) i->sink->thread_info.fixed_latency);
1207
1208 pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1209 }
1210
1211 /* Called from I/O thread context */
1212 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1213 struct userdata *u;
1214
1215 pa_source_output_assert_ref(o);
1216 pa_assert_se(u = o->userdata);
1217
1218 pa_log_debug("Source output update fixed latency %lld",
1219 (long long) o->source->thread_info.fixed_latency);
1220
1221 pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1222 }
1223
1224 /* Called from output thread context */
1225 static void source_output_attach_cb(pa_source_output *o) {
1226 struct userdata *u;
1227
1228 pa_source_output_assert_ref(o);
1229 pa_source_output_assert_io_context(o);
1230 pa_assert_se(u = o->userdata);
1231
1232 pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1233 pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1234 pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1235 pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1236
1237 pa_log_debug("Source output %d attach", o->index);
1238
1239 pa_source_attach_within_thread(u->source);
1240
1241 u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1242 o->source->thread_info.rtpoll,
1243 PA_RTPOLL_LATE,
1244 u->asyncmsgq);
1245 }
1246
1247 /* Called from I/O thread context */
1248 static void sink_input_attach_cb(pa_sink_input *i) {
1249 struct userdata *u;
1250
1251 pa_sink_input_assert_ref(i);
1252 pa_assert_se(u = i->userdata);
1253
1254 pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1255 pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1256
1257 /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1258 * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1259 pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1260
1261 /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1262 * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1263 * HERE. SEE (6) */
1264 pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1265 pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1266
1267 pa_log_debug("Sink input %d attach", i->index);
1268
1269 u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1270 i->sink->thread_info.rtpoll,
1271 PA_RTPOLL_LATE,
1272 u->asyncmsgq);
1273
1274 pa_sink_attach_within_thread(u->sink);
1275 }
1276
1277
1278 /* Called from output thread context */
1279 static void source_output_detach_cb(pa_source_output *o) {
1280 struct userdata *u;
1281
1282 pa_source_output_assert_ref(o);
1283 pa_source_output_assert_io_context(o);
1284 pa_assert_se(u = o->userdata);
1285
1286 pa_source_detach_within_thread(u->source);
1287 pa_source_set_rtpoll(u->source, NULL);
1288
1289 pa_log_debug("Source output %d detach", o->index);
1290
1291 if (u->rtpoll_item_read) {
1292 pa_rtpoll_item_free(u->rtpoll_item_read);
1293 u->rtpoll_item_read = NULL;
1294 }
1295 }
1296
1297 /* Called from I/O thread context */
1298 static void sink_input_detach_cb(pa_sink_input *i) {
1299 struct userdata *u;
1300
1301 pa_sink_input_assert_ref(i);
1302 pa_assert_se(u = i->userdata);
1303
1304 pa_sink_detach_within_thread(u->sink);
1305
1306 pa_sink_set_rtpoll(u->sink, NULL);
1307
1308 pa_log_debug("Sink input %d detach", i->index);
1309
1310 if (u->rtpoll_item_write) {
1311 pa_rtpoll_item_free(u->rtpoll_item_write);
1312 u->rtpoll_item_write = NULL;
1313 }
1314 }
1315
1316 /* Called from output thread context */
1317 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1318 struct userdata *u;
1319
1320 pa_source_output_assert_ref(o);
1321 pa_source_output_assert_io_context(o);
1322 pa_assert_se(u = o->userdata);
1323
1324 pa_log_debug("Source output %d state %d", o->index, state);
1325 }
1326
1327 /* Called from IO thread context */
1328 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1329 struct userdata *u;
1330
1331 pa_sink_input_assert_ref(i);
1332 pa_assert_se(u = i->userdata);
1333
1334 pa_log_debug("Sink input %d state %d", i->index, state);
1335
1336 /* If we are added for the first time, ask for a rewinding so that
1337 * we are heard right-away. */
1338 if (PA_SINK_INPUT_IS_LINKED(state) &&
1339 i->thread_info.state == PA_SINK_INPUT_INIT) {
1340 pa_log_debug("Requesting rewind due to state change.");
1341 pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1342 }
1343 }
1344
1345 /* Called from main thread */
1346 static void source_output_kill_cb(pa_source_output *o) {
1347 struct userdata *u;
1348
1349 pa_source_output_assert_ref(o);
1350 pa_assert_ctl_context();
1351 pa_assert_se(u = o->userdata);
1352
1353 u->dead = TRUE;
1354
1355 /* The order here matters! We first kill the source output, followed
1356 * by the source. That means the source callbacks must be protected
1357 * against an unconnected source output! */
1358 pa_source_output_unlink(u->source_output);
1359 pa_source_unlink(u->source);
1360
1361 pa_source_output_unref(u->source_output);
1362 u->source_output = NULL;
1363
1364 pa_source_unref(u->source);
1365 u->source = NULL;
1366
1367 pa_log_debug("Source output kill %d", o->index);
1368
1369 pa_module_unload_request(u->module, TRUE);
1370 }
1371
1372 /* Called from main context */
1373 static void sink_input_kill_cb(pa_sink_input *i) {
1374 struct userdata *u;
1375
1376 pa_sink_input_assert_ref(i);
1377 pa_assert_se(u = i->userdata);
1378
1379 u->dead = TRUE;
1380
1381 /* The order here matters! We first kill the sink input, followed
1382 * by the sink. That means the sink callbacks must be protected
1383 * against an unconnected sink input! */
1384 pa_sink_input_unlink(u->sink_input);
1385 pa_sink_unlink(u->sink);
1386
1387 pa_sink_input_unref(u->sink_input);
1388 u->sink_input = NULL;
1389
1390 pa_sink_unref(u->sink);
1391 u->sink = NULL;
1392
1393 pa_log_debug("Sink input kill %d", i->index);
1394
1395 pa_module_unload_request(u->module, TRUE);
1396 }
1397
1398 /* Called from main thread */
1399 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1400 struct userdata *u;
1401
1402 pa_source_output_assert_ref(o);
1403 pa_assert_ctl_context();
1404 pa_assert_se(u = o->userdata);
1405
1406 if (u->dead || u->autoloaded)
1407 return FALSE;
1408
1409 return (u->source != dest) && (u->sink != dest->monitor_of);
1410 }
1411
1412 /* Called from main context */
1413 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1414 struct userdata *u;
1415
1416 pa_sink_input_assert_ref(i);
1417 pa_assert_se(u = i->userdata);
1418
1419 if (u->dead || u->autoloaded)
1420 return FALSE;
1421
1422 return u->sink != dest;
1423 }
1424
1425 /* Called from main thread */
1426 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1427 struct userdata *u;
1428
1429 pa_source_output_assert_ref(o);
1430 pa_assert_ctl_context();
1431 pa_assert_se(u = o->userdata);
1432
1433 if (dest) {
1434 pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1435 pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1436 } else
1437 pa_source_set_asyncmsgq(u->source, NULL);
1438
1439 if (u->source_auto_desc && dest) {
1440 const char *y, *z;
1441 pa_proplist *pl;
1442
1443 pl = pa_proplist_new();
1444 y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1445 z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1446 pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1447 y ? y : u->sink_input->sink->name);
1448
1449 pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1450 pa_proplist_free(pl);
1451 }
1452 }
1453
1454 /* Called from main context */
1455 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1456 struct userdata *u;
1457
1458 pa_sink_input_assert_ref(i);
1459 pa_assert_se(u = i->userdata);
1460
1461 if (dest) {
1462 pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1463 pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1464 } else
1465 pa_sink_set_asyncmsgq(u->sink, NULL);
1466
1467 if (u->sink_auto_desc && dest) {
1468 const char *y, *z;
1469 pa_proplist *pl;
1470
1471 pl = pa_proplist_new();
1472 y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1473 z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1474 pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1475 y ? y : u->source_output->source->name);
1476
1477 pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1478 pa_proplist_free(pl);
1479 }
1480 }
1481
1482 /* Called from main context */
1483 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1484 struct userdata *u;
1485
1486 pa_sink_input_assert_ref(i);
1487 pa_assert_se(u = i->userdata);
1488
1489 pa_sink_volume_changed(u->sink, &i->volume);
1490 }
1491
1492 /* Called from main context */
1493 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1494 struct userdata *u;
1495
1496 pa_sink_input_assert_ref(i);
1497 pa_assert_se(u = i->userdata);
1498
1499 pa_sink_mute_changed(u->sink, i->muted);
1500 }
1501
1502 /* Called from main context */
1503 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1504 struct pa_echo_canceller_msg *msg;
1505 struct userdata *u;
1506
1507 pa_assert(o);
1508
1509 msg = PA_ECHO_CANCELLER_MSG(o);
1510 u = msg->userdata;
1511
1512 switch (code) {
1513 case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1514 pa_cvolume *v = (pa_cvolume *) userdata;
1515
1516 if (u->use_volume_sharing)
1517 pa_source_set_volume(u->source, v, TRUE, FALSE);
1518 else
1519 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1520
1521 break;
1522 }
1523
1524 default:
1525 pa_assert_not_reached();
1526 break;
1527 }
1528
1529 return 0;
1530 }
1531
1532 /* Called by the canceller, so thread context */
1533 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1534 *v = ec->msg->userdata->thread_info.current_volume;
1535 }
1536
1537 /* Called by the canceller, so thread context */
1538 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1539 if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1540 pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1541
1542 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1543 pa_xfree);
1544 }
1545 }
1546
1547 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1548 #ifdef HAVE_SPEEX
1549 if (pa_streq(method, "speex"))
1550 return PA_ECHO_CANCELLER_SPEEX;
1551 #endif
1552 if (pa_streq(method, "adrian"))
1553 return PA_ECHO_CANCELLER_ADRIAN;
1554 #ifdef HAVE_WEBRTC
1555 if (pa_streq(method, "webrtc"))
1556 return PA_ECHO_CANCELLER_WEBRTC;
1557 #endif
1558 return PA_ECHO_CANCELLER_INVALID;
1559 }
1560
1561 /* Common initialisation bits between module-echo-cancel and the standalone test program */
1562 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1563 pa_echo_canceller_method_t ec_method;
1564
1565 if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1566 pa_log("Invalid sample format specification or channel map");
1567 goto fail;
1568 }
1569
1570 u->ec = pa_xnew0(pa_echo_canceller, 1);
1571 if (!u->ec) {
1572 pa_log("Failed to alloc echo canceller");
1573 goto fail;
1574 }
1575
1576 if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1577 pa_log("Invalid echo canceller implementation");
1578 goto fail;
1579 }
1580
1581 u->ec->init = ec_table[ec_method].init;
1582 u->ec->play = ec_table[ec_method].play;
1583 u->ec->record = ec_table[ec_method].record;
1584 u->ec->set_drift = ec_table[ec_method].set_drift;
1585 u->ec->run = ec_table[ec_method].run;
1586 u->ec->done = ec_table[ec_method].done;
1587
1588 return 0;
1589
1590 fail:
1591 return -1;
1592 }
1593
1594
1595 int pa__init(pa_module*m) {
1596 struct userdata *u;
1597 pa_sample_spec source_ss, sink_ss;
1598 pa_channel_map source_map, sink_map;
1599 pa_modargs *ma;
1600 pa_source *source_master=NULL;
1601 pa_sink *sink_master=NULL;
1602 pa_source_output_new_data source_output_data;
1603 pa_sink_input_new_data sink_input_data;
1604 pa_source_new_data source_data;
1605 pa_sink_new_data sink_data;
1606 pa_memchunk silence;
1607 uint32_t temp;
1608
1609 pa_assert(m);
1610
1611 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1612 pa_log("Failed to parse module arguments.");
1613 goto fail;
1614 }
1615
1616 if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1617 pa_log("Master source not found");
1618 goto fail;
1619 }
1620 pa_assert(source_master);
1621
1622 if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1623 pa_log("Master sink not found");
1624 goto fail;
1625 }
1626 pa_assert(sink_master);
1627
1628 if (source_master->monitor_of == sink_master) {
1629 pa_log("Can't cancel echo between a sink and its monitor");
1630 goto fail;
1631 }
1632
1633 source_ss = source_master->sample_spec;
1634 source_ss.rate = DEFAULT_RATE;
1635 source_ss.channels = DEFAULT_CHANNELS;
1636 pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1637
1638 sink_ss = sink_master->sample_spec;
1639 sink_map = sink_master->channel_map;
1640
1641 u = pa_xnew0(struct userdata, 1);
1642 if (!u) {
1643 pa_log("Failed to alloc userdata");
1644 goto fail;
1645 }
1646 u->core = m->core;
1647 u->module = m;
1648 m->userdata = u;
1649 u->dead = FALSE;
1650
1651 u->use_volume_sharing = TRUE;
1652 if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1653 pa_log("use_volume_sharing= expects a boolean argument");
1654 goto fail;
1655 }
1656
1657 temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1658 if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1659 pa_log("Failed to parse adjust_time value");
1660 goto fail;
1661 }
1662
1663 if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1664 u->adjust_time = temp * PA_USEC_PER_SEC;
1665 else
1666 u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1667
1668 temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1669 if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1670 pa_log("Failed to parse adjust_threshold value");
1671 goto fail;
1672 }
1673
1674 if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1675 u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1676 else
1677 u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1678
1679 u->save_aec = DEFAULT_SAVE_AEC;
1680 if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1681 pa_log("Failed to parse save_aec value");
1682 goto fail;
1683 }
1684
1685 u->autoloaded = DEFAULT_AUTOLOADED;
1686 if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1687 pa_log("Failed to parse autoloaded value");
1688 goto fail;
1689 }
1690
1691 if (init_common(ma, u, &source_ss, &source_map))
1692 goto fail;
1693
1694 u->asyncmsgq = pa_asyncmsgq_new(0);
1695 u->need_realign = TRUE;
1696
1697 if (u->ec->init) {
1698 if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1699 pa_log("Failed to init AEC engine");
1700 goto fail;
1701 }
1702 }
1703
1704 if (u->ec->params.drift_compensation)
1705 pa_assert(u->ec->set_drift);
1706
1707 /* Create source */
1708 pa_source_new_data_init(&source_data);
1709 source_data.driver = __FILE__;
1710 source_data.module = m;
1711 if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1712 source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1713 pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1714 pa_source_new_data_set_channel_map(&source_data, &source_map);
1715 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1716 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1717 if (!u->autoloaded)
1718 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1719
1720 if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1721 pa_log("Invalid properties");
1722 pa_source_new_data_done(&source_data);
1723 goto fail;
1724 }
1725
1726 if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1727 const char *y, *z;
1728
1729 y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1730 z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1731 pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1732 z ? z : source_master->name, y ? y : sink_master->name);
1733 }
1734
1735 u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1736 | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1737 pa_source_new_data_done(&source_data);
1738
1739 if (!u->source) {
1740 pa_log("Failed to create source.");
1741 goto fail;
1742 }
1743
1744 u->source->parent.process_msg = source_process_msg_cb;
1745 u->source->set_state = source_set_state_cb;
1746 u->source->update_requested_latency = source_update_requested_latency_cb;
1747 pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1748 pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1749 if (!u->use_volume_sharing) {
1750 pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1751 pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1752 pa_source_enable_decibel_volume(u->source, TRUE);
1753 }
1754 u->source->userdata = u;
1755
1756 pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1757
1758 /* Create sink */
1759 pa_sink_new_data_init(&sink_data);
1760 sink_data.driver = __FILE__;
1761 sink_data.module = m;
1762 if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1763 sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1764 pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1765 pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1766 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1767 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1768 if (!u->autoloaded)
1769 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1770
1771 if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1772 pa_log("Invalid properties");
1773 pa_sink_new_data_done(&sink_data);
1774 goto fail;
1775 }
1776
1777 if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1778 const char *y, *z;
1779
1780 y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1781 z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1782 pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1783 z ? z : sink_master->name, y ? y : source_master->name);
1784 }
1785
1786 u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1787 | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1788 pa_sink_new_data_done(&sink_data);
1789
1790 if (!u->sink) {
1791 pa_log("Failed to create sink.");
1792 goto fail;
1793 }
1794
1795 u->sink->parent.process_msg = sink_process_msg_cb;
1796 u->sink->set_state = sink_set_state_cb;
1797 u->sink->update_requested_latency = sink_update_requested_latency_cb;
1798 u->sink->request_rewind = sink_request_rewind_cb;
1799 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1800 if (!u->use_volume_sharing) {
1801 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1802 pa_sink_enable_decibel_volume(u->sink, TRUE);
1803 }
1804 u->sink->userdata = u;
1805
1806 pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1807
1808 /* Create source output */
1809 pa_source_output_new_data_init(&source_output_data);
1810 source_output_data.driver = __FILE__;
1811 source_output_data.module = m;
1812 pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1813 source_output_data.destination_source = u->source;
1814 /* FIXME
1815 source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1816
1817 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1818 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1819 pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1820 pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1821
1822 pa_source_output_new(&u->source_output, m->core, &source_output_data);
1823 pa_source_output_new_data_done(&source_output_data);
1824
1825 if (!u->source_output)
1826 goto fail;
1827
1828 u->source_output->parent.process_msg = source_output_process_msg_cb;
1829 u->source_output->push = source_output_push_cb;
1830 u->source_output->process_rewind = source_output_process_rewind_cb;
1831 u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1832 u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1833 u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1834 u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1835 u->source_output->kill = source_output_kill_cb;
1836 u->source_output->attach = source_output_attach_cb;
1837 u->source_output->detach = source_output_detach_cb;
1838 u->source_output->state_change = source_output_state_change_cb;
1839 u->source_output->may_move_to = source_output_may_move_to_cb;
1840 u->source_output->moving = source_output_moving_cb;
1841 u->source_output->userdata = u;
1842
1843 u->source->output_from_master = u->source_output;
1844
1845 /* Create sink input */
1846 pa_sink_input_new_data_init(&sink_input_data);
1847 sink_input_data.driver = __FILE__;
1848 sink_input_data.module = m;
1849 pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1850 sink_input_data.origin_sink = u->sink;
1851 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1852 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1853 pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1854 pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1855 sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1856
1857 pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1858 pa_sink_input_new_data_done(&sink_input_data);
1859
1860 if (!u->sink_input)
1861 goto fail;
1862
1863 u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1864 u->sink_input->pop = sink_input_pop_cb;
1865 u->sink_input->process_rewind = sink_input_process_rewind_cb;
1866 u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1867 u->sink_input->update_max_request = sink_input_update_max_request_cb;
1868 u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1869 u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1870 u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1871 u->sink_input->kill = sink_input_kill_cb;
1872 u->sink_input->attach = sink_input_attach_cb;
1873 u->sink_input->detach = sink_input_detach_cb;
1874 u->sink_input->state_change = sink_input_state_change_cb;
1875 u->sink_input->may_move_to = sink_input_may_move_to_cb;
1876 u->sink_input->moving = sink_input_moving_cb;
1877 if (!u->use_volume_sharing)
1878 u->sink_input->volume_changed = sink_input_volume_changed_cb;
1879 u->sink_input->mute_changed = sink_input_mute_changed_cb;
1880 u->sink_input->userdata = u;
1881
1882 u->sink->input_to_master = u->sink_input;
1883
1884 pa_sink_input_get_silence(u->sink_input, &silence);
1885
1886 u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1887 &source_ss, 1, 1, 0, &silence);
1888 u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1889 &sink_ss, 1, 1, 0, &silence);
1890
1891 pa_memblock_unref(silence.memblock);
1892
1893 if (!u->source_memblockq || !u->sink_memblockq) {
1894 pa_log("Failed to create memblockq.");
1895 goto fail;
1896 }
1897
1898 if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1899 u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1900 else if (u->ec->params.drift_compensation) {
1901 pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1902 u->adjust_time = 0;
1903 /* Perform resync just once to give the canceller a leg up */
1904 pa_atomic_store(&u->request_resync, 1);
1905 }
1906
1907 if (u->save_aec) {
1908 pa_log("Creating AEC files in /tmp");
1909 u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1910 if (u->captured_file == NULL)
1911 perror ("fopen failed");
1912 u->played_file = fopen("/tmp/aec_play.sw", "wb");
1913 if (u->played_file == NULL)
1914 perror ("fopen failed");
1915 u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1916 if (u->canceled_file == NULL)
1917 perror ("fopen failed");
1918 if (u->ec->params.drift_compensation) {
1919 u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1920 if (u->drift_file == NULL)
1921 perror ("fopen failed");
1922 }
1923 }
1924
1925 u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1926 u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1927 u->ec->msg->userdata = u;
1928
1929 u->thread_info.current_volume = u->source->reference_volume;
1930
1931 pa_sink_put(u->sink);
1932 pa_source_put(u->source);
1933
1934 pa_sink_input_put(u->sink_input);
1935 pa_source_output_put(u->source_output);
1936 pa_modargs_free(ma);
1937
1938 return 0;
1939
1940 fail:
1941 if (ma)
1942 pa_modargs_free(ma);
1943
1944 pa__done(m);
1945
1946 return -1;
1947 }
1948
1949 int pa__get_n_used(pa_module *m) {
1950 struct userdata *u;
1951
1952 pa_assert(m);
1953 pa_assert_se(u = m->userdata);
1954
1955 return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1956 }
1957
1958 void pa__done(pa_module*m) {
1959 struct userdata *u;
1960
1961 pa_assert(m);
1962
1963 if (!(u = m->userdata))
1964 return;
1965
1966 u->dead = TRUE;
1967
1968 /* See comments in source_output_kill_cb() above regarding
1969 * destruction order! */
1970
1971 if (u->time_event)
1972 u->core->mainloop->time_free(u->time_event);
1973
1974 if (u->source_output)
1975 pa_source_output_unlink(u->source_output);
1976 if (u->sink_input)
1977 pa_sink_input_unlink(u->sink_input);
1978
1979 if (u->source)
1980 pa_source_unlink(u->source);
1981 if (u->sink)
1982 pa_sink_unlink(u->sink);
1983
1984 if (u->source_output)
1985 pa_source_output_unref(u->source_output);
1986 if (u->sink_input)
1987 pa_sink_input_unref(u->sink_input);
1988
1989 if (u->source)
1990 pa_source_unref(u->source);
1991 if (u->sink)
1992 pa_sink_unref(u->sink);
1993
1994 if (u->source_memblockq)
1995 pa_memblockq_free(u->source_memblockq);
1996 if (u->sink_memblockq)
1997 pa_memblockq_free(u->sink_memblockq);
1998
1999 if (u->ec) {
2000 if (u->ec->done)
2001 u->ec->done(u->ec);
2002
2003 pa_xfree(u->ec);
2004 }
2005
2006 if (u->asyncmsgq)
2007 pa_asyncmsgq_unref(u->asyncmsgq);
2008
2009 if (u->save_aec) {
2010 if (u->played_file)
2011 fclose(u->played_file);
2012 if (u->captured_file)
2013 fclose(u->captured_file);
2014 if (u->canceled_file)
2015 fclose(u->canceled_file);
2016 if (u->drift_file)
2017 fclose(u->drift_file);
2018 }
2019
2020 pa_xfree(u);
2021 }
2022
2023 #ifdef ECHO_CANCEL_TEST
2024 /*
2025 * Stand-alone test program for running in the canceller on pre-recorded files.
2026 */
2027 int main(int argc, char* argv[]) {
2028 struct userdata u;
2029 pa_sample_spec source_ss, sink_ss;
2030 pa_channel_map source_map, sink_map;
2031 pa_modargs *ma = NULL;
2032 uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2033 int unused PA_GCC_UNUSED;
2034 int ret = 0, i;
2035 char c;
2036 float drift;
2037
2038 pa_memzero(&u, sizeof(u));
2039
2040 if (argc < 4 || argc > 7) {
2041 goto usage;
2042 }
2043
2044 u.captured_file = fopen(argv[2], "r");
2045 if (u.captured_file == NULL) {
2046 perror ("fopen failed");
2047 goto fail;
2048 }
2049 u.played_file = fopen(argv[1], "r");
2050 if (u.played_file == NULL) {
2051 perror ("fopen failed");
2052 goto fail;
2053 }
2054 u.canceled_file = fopen(argv[3], "wb");
2055 if (u.canceled_file == NULL) {
2056 perror ("fopen failed");
2057 goto fail;
2058 }
2059
2060 u.core = pa_xnew0(pa_core, 1);
2061 u.core->cpu_info.cpu_type = PA_CPU_X86;
2062 u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2063
2064 if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2065 pa_log("Failed to parse module arguments.");
2066 goto fail;
2067 }
2068
2069 source_ss.format = PA_SAMPLE_S16LE;
2070 source_ss.rate = DEFAULT_RATE;
2071 source_ss.channels = DEFAULT_CHANNELS;
2072 pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2073
2074 init_common(ma, &u, &source_ss, &source_map);
2075
2076 if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2077 (argc > 4) ? argv[5] : NULL )) {
2078 pa_log("Failed to init AEC engine");
2079 goto fail;
2080 }
2081
2082 if (u.ec->params.drift_compensation) {
2083 if (argc < 7) {
2084 pa_log("Drift compensation enabled but drift file not specified");
2085 goto fail;
2086 }
2087
2088 u.drift_file = fopen(argv[6], "r");
2089
2090 if (u.drift_file == NULL) {
2091 perror ("fopen failed");
2092 goto fail;
2093 }
2094 }
2095
2096 rdata = pa_xmalloc(u.blocksize);
2097 pdata = pa_xmalloc(u.blocksize);
2098 cdata = pa_xmalloc(u.blocksize);
2099
2100 if (!u.ec->params.drift_compensation) {
2101 while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2102 if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2103 perror("Played file ended before captured file");
2104 goto fail;
2105 }
2106
2107 u.ec->run(u.ec, rdata, pdata, cdata);
2108
2109 unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2110 }
2111 } else {
2112 while (fscanf(u.drift_file, "%c", &c) > 0) {
2113 switch (c) {
2114 case 'd':
2115 if (!fscanf(u.drift_file, "%a", &drift)) {
2116 perror("Drift file incomplete");
2117 goto fail;
2118 }
2119
2120 u.ec->set_drift(u.ec, drift);
2121
2122 break;
2123
2124 case 'c':
2125 if (!fscanf(u.drift_file, "%d", &i)) {
2126 perror("Drift file incomplete");
2127 goto fail;
2128 }
2129
2130 if (fread(rdata, i, 1, u.captured_file) <= 0) {
2131 perror("Captured file ended prematurely");
2132 goto fail;
2133 }
2134
2135 u.ec->record(u.ec, rdata, cdata);
2136
2137 unused = fwrite(cdata, i, 1, u.canceled_file);
2138
2139 break;
2140
2141 case 'p':
2142 if (!fscanf(u.drift_file, "%d", &i)) {
2143 perror("Drift file incomplete");
2144 goto fail;
2145 }
2146
2147 if (fread(pdata, i, 1, u.played_file) <= 0) {
2148 perror("Played file ended prematurely");
2149 goto fail;
2150 }
2151
2152 u.ec->play(u.ec, pdata);
2153
2154 break;
2155 }
2156 }
2157
2158 if (fread(rdata, i, 1, u.captured_file) > 0)
2159 pa_log("All capture data was not consumed");
2160 if (fread(pdata, i, 1, u.played_file) > 0)
2161 pa_log("All playback data was not consumed");
2162 }
2163
2164 u.ec->done(u.ec);
2165
2166 fclose(u.captured_file);
2167 fclose(u.played_file);
2168 fclose(u.canceled_file);
2169 if (u.drift_file)
2170 fclose(u.drift_file);
2171
2172 out:
2173 pa_xfree(rdata);
2174 pa_xfree(pdata);
2175 pa_xfree(cdata);
2176
2177 pa_xfree(u.ec);
2178 pa_xfree(u.core);
2179
2180 if (ma)
2181 pa_modargs_free(ma);
2182
2183 return ret;
2184
2185 usage:
2186 pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2187
2188 fail:
2189 ret = -1;
2190 goto out;
2191 }
2192 #endif /* ECHO_CANCEL_TEST */