]> code.delx.au - pulseaudio/blob - src/modules/echo-cancel/module-echo-cancel.c
filters: Fix the master source/sink when autoloaded
[pulseaudio] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6 Based on module-virtual-sink.c
7 module-virtual-source.c
8 module-loopback.c
9
10 Copyright 2010 Intel Corporation
11 Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13 PulseAudio is free software; you can redistribute it and/or modify
14 it under the terms of the GNU Lesser General Public License as published
15 by the Free Software Foundation; either version 2.1 of the License,
16 or (at your option) any later version.
17
18 PulseAudio is distributed in the hope that it will be useful, but
19 WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 General Public License for more details.
22
23 You should have received a copy of the GNU Lesser General Public License
24 along with PulseAudio; if not, write to the Free Software
25 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63 _("source_name=<name for the source> "
64 "source_properties=<properties for the source> "
65 "source_master=<name of source to filter> "
66 "sink_name=<name for the sink> "
67 "sink_properties=<properties for the sink> "
68 "sink_master=<name of sink to filter> "
69 "adjust_time=<how often to readjust rates in s> "
70 "adjust_threshold=<how much drift to readjust after in ms> "
71 "format=<sample format> "
72 "rate=<sample rate> "
73 "channels=<number of channels> "
74 "channel_map=<channel map> "
75 "aec_method=<implementation to use> "
76 "aec_args=<parameters for the AEC engine> "
77 "save_aec=<save AEC data in /tmp> "
78 "autoloaded=<set if this module is being loaded automatically> "
79 "use_volume_sharing=<yes or no> "
80 ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84 PA_ECHO_CANCELLER_INVALID = -1,
85 PA_ECHO_CANCELLER_SPEEX = 0,
86 PA_ECHO_CANCELLER_ADRIAN,
87 #ifdef HAVE_WEBRTC
88 PA_ECHO_CANCELLER_WEBRTC,
89 #endif
90 } pa_echo_canceller_method_t;
91
92 #if HAVE_WEBRTC
93 #define DEFAULT_ECHO_CANCELLER "webrtc"
94 #else
95 #define DEFAULT_ECHO_CANCELLER "speex"
96 #endif
97
98 static const pa_echo_canceller ec_table[] = {
99 {
100 /* Speex */
101 .init = pa_speex_ec_init,
102 .run = pa_speex_ec_run,
103 .done = pa_speex_ec_done,
104 },
105 {
106 /* Adrian Andre's NLMS implementation */
107 .init = pa_adrian_ec_init,
108 .run = pa_adrian_ec_run,
109 .done = pa_adrian_ec_done,
110 },
111 #ifdef HAVE_WEBRTC
112 {
113 /* WebRTC's audio processing engine */
114 .init = pa_webrtc_ec_init,
115 .play = pa_webrtc_ec_play,
116 .record = pa_webrtc_ec_record,
117 .set_drift = pa_webrtc_ec_set_drift,
118 .run = pa_webrtc_ec_run,
119 .done = pa_webrtc_ec_done,
120 },
121 #endif
122 };
123
124 #define DEFAULT_RATE 32000
125 #define DEFAULT_CHANNELS 1
126 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
127 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
128 #define DEFAULT_SAVE_AEC FALSE
129 #define DEFAULT_AUTOLOADED FALSE
130
131 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
132
133 /* Can only be used in main context */
134 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
135 (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
136
137 /* This module creates a new (virtual) source and sink.
138 *
139 * The data sent to the new sink is kept in a memblockq before being
140 * forwarded to the real sink_master.
141 *
142 * Data read from source_master is matched against the saved sink data and
143 * echo canceled data is then pushed onto the new source.
144 *
145 * Both source and sink masters have their own threads to push/pull data
146 * respectively. We however perform all our actions in the source IO thread.
147 * To do this we send all played samples to the source IO thread where they
148 * are then pushed into the memblockq.
149 *
150 * Alignment is performed in two steps:
151 *
152 * 1) when something happens that requires quick adjustment of the alignment of
153 * capture and playback samples, we perform a resync. This adjusts the
154 * position in the playback memblock to the requested sample. Quick
155 * adjustments include moving the playback samples before the capture
156 * samples (because else the echo canceler does not work) or when the
157 * playback pointer drifts too far away.
158 *
159 * 2) periodically check the difference between capture and playback. we use a
160 * low and high watermark for adjusting the alignment. playback should always
161 * be before capture and the difference should not be bigger than one frame
162 * size. We would ideally like to resample the sink_input but most driver
163 * don't give enough accuracy to be able to do that right now.
164 */
165
166 struct userdata;
167
168 struct pa_echo_canceller_msg {
169 pa_msgobject parent;
170 struct userdata *userdata;
171 };
172
173 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
174 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
175
176 struct snapshot {
177 pa_usec_t sink_now;
178 pa_usec_t sink_latency;
179 size_t sink_delay;
180 int64_t send_counter;
181
182 pa_usec_t source_now;
183 pa_usec_t source_latency;
184 size_t source_delay;
185 int64_t recv_counter;
186 size_t rlen;
187 size_t plen;
188 };
189
190 struct userdata {
191 pa_core *core;
192 pa_module *module;
193
194 pa_bool_t autoloaded;
195 pa_bool_t dead;
196 pa_bool_t save_aec;
197
198 pa_echo_canceller *ec;
199 uint32_t blocksize;
200
201 pa_bool_t need_realign;
202
203 /* to wakeup the source I/O thread */
204 pa_asyncmsgq *asyncmsgq;
205 pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
206
207 pa_source *source;
208 pa_bool_t source_auto_desc;
209 pa_source_output *source_output;
210 pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
211 size_t source_skip;
212
213 pa_sink *sink;
214 pa_bool_t sink_auto_desc;
215 pa_sink_input *sink_input;
216 pa_memblockq *sink_memblockq;
217 int64_t send_counter; /* updated in sink IO thread */
218 int64_t recv_counter;
219 size_t sink_skip;
220
221 /* Bytes left over from previous iteration */
222 size_t sink_rem;
223 size_t source_rem;
224
225 pa_atomic_t request_resync;
226
227 pa_time_event *time_event;
228 pa_usec_t adjust_time;
229 int adjust_threshold;
230
231 FILE *captured_file;
232 FILE *played_file;
233 FILE *canceled_file;
234 FILE *drift_file;
235
236 pa_bool_t use_volume_sharing;
237
238 struct {
239 pa_cvolume current_volume;
240 } thread_info;
241 };
242
243 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
244
245 static const char* const valid_modargs[] = {
246 "source_name",
247 "source_properties",
248 "source_master",
249 "sink_name",
250 "sink_properties",
251 "sink_master",
252 "adjust_time",
253 "adjust_threshold",
254 "format",
255 "rate",
256 "channels",
257 "channel_map",
258 "aec_method",
259 "aec_args",
260 "save_aec",
261 "autoloaded",
262 "use_volume_sharing",
263 NULL
264 };
265
266 enum {
267 SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
268 SOURCE_OUTPUT_MESSAGE_REWIND,
269 SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
270 SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
271 };
272
273 enum {
274 SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
275 };
276
277 enum {
278 ECHO_CANCELLER_MESSAGE_SET_VOLUME,
279 };
280
281 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
282 int64_t buffer, diff_time, buffer_latency;
283
284 /* get the number of samples between capture and playback */
285 if (snapshot->plen > snapshot->rlen)
286 buffer = snapshot->plen - snapshot->rlen;
287 else
288 buffer = 0;
289
290 buffer += snapshot->source_delay + snapshot->sink_delay;
291
292 /* add the amount of samples not yet transferred to the source context */
293 if (snapshot->recv_counter <= snapshot->send_counter)
294 buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
295 else
296 buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
297
298 /* convert to time */
299 buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
300
301 /* capture and playback samples are perfectly aligned when diff_time is 0 */
302 diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
303 (snapshot->source_now - snapshot->source_latency);
304
305 pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
306 (long long) snapshot->sink_latency,
307 (long long) buffer_latency, (long long) snapshot->source_latency,
308 (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
309 (long long) (snapshot->send_counter - snapshot->recv_counter),
310 (long long) (snapshot->sink_now - snapshot->source_now));
311
312 return diff_time;
313 }
314
315 /* Called from main context */
316 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
317 struct userdata *u = userdata;
318 uint32_t old_rate, base_rate, new_rate;
319 int64_t diff_time;
320 /*size_t fs*/
321 struct snapshot latency_snapshot;
322
323 pa_assert(u);
324 pa_assert(a);
325 pa_assert(u->time_event == e);
326 pa_assert_ctl_context();
327
328 if (!IS_ACTIVE(u))
329 return;
330
331 /* update our snapshots */
332 pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
333 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
334
335 /* calculate drift between capture and playback */
336 diff_time = calc_diff(u, &latency_snapshot);
337
338 /*fs = pa_frame_size(&u->source_output->sample_spec);*/
339 old_rate = u->sink_input->sample_spec.rate;
340 base_rate = u->source_output->sample_spec.rate;
341
342 if (diff_time < 0) {
343 /* recording before playback, we need to adjust quickly. The echo
344 * canceler does not work in this case. */
345 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
346 NULL, diff_time, NULL, NULL);
347 /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
348 new_rate = base_rate;
349 }
350 else {
351 if (diff_time > u->adjust_threshold) {
352 /* diff too big, quickly adjust */
353 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
354 NULL, diff_time, NULL, NULL);
355 }
356
357 /* recording behind playback, we need to slowly adjust the rate to match */
358 /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
359
360 /* assume equal samplerates for now */
361 new_rate = base_rate;
362 }
363
364 /* make sure we don't make too big adjustments because that sounds horrible */
365 if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
366 new_rate = base_rate;
367
368 if (new_rate != old_rate) {
369 pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
370
371 pa_sink_input_set_rate(u->sink_input, new_rate);
372 }
373
374 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
375 }
376
377 /* Called from source I/O thread context */
378 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
379 struct userdata *u = PA_SOURCE(o)->userdata;
380
381 switch (code) {
382
383 case PA_SOURCE_MESSAGE_GET_LATENCY:
384
385 /* The source is _put() before the source output is, so let's
386 * make sure we don't access it in that time. Also, the
387 * source output is first shut down, the source second. */
388 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
389 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
390 *((pa_usec_t*) data) = 0;
391 return 0;
392 }
393
394 *((pa_usec_t*) data) =
395
396 /* Get the latency of the master source */
397 pa_source_get_latency_within_thread(u->source_output->source) +
398 /* Add the latency internal to our source output on top */
399 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
400 /* and the buffering we do on the source */
401 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
402
403 return 0;
404
405 case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
406 u->thread_info.current_volume = u->source->reference_volume;
407 break;
408 }
409
410 return pa_source_process_msg(o, code, data, offset, chunk);
411 }
412
413 /* Called from sink I/O thread context */
414 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
415 struct userdata *u = PA_SINK(o)->userdata;
416
417 switch (code) {
418
419 case PA_SINK_MESSAGE_GET_LATENCY:
420
421 /* The sink is _put() before the sink input is, so let's
422 * make sure we don't access it in that time. Also, the
423 * sink input is first shut down, the sink second. */
424 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
425 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
426 *((pa_usec_t*) data) = 0;
427 return 0;
428 }
429
430 *((pa_usec_t*) data) =
431
432 /* Get the latency of the master sink */
433 pa_sink_get_latency_within_thread(u->sink_input->sink) +
434
435 /* Add the latency internal to our sink input on top */
436 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
437
438 return 0;
439 }
440
441 return pa_sink_process_msg(o, code, data, offset, chunk);
442 }
443
444
445 /* Called from main context */
446 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
447 struct userdata *u;
448
449 pa_source_assert_ref(s);
450 pa_assert_se(u = s->userdata);
451
452 if (!PA_SOURCE_IS_LINKED(state) ||
453 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
454 return 0;
455
456 if (state == PA_SOURCE_RUNNING) {
457 /* restart timer when both sink and source are active */
458 if (IS_ACTIVE(u) && u->adjust_time)
459 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
460
461 pa_atomic_store(&u->request_resync, 1);
462 pa_source_output_cork(u->source_output, FALSE);
463 } else if (state == PA_SOURCE_SUSPENDED) {
464 pa_source_output_cork(u->source_output, TRUE);
465 }
466
467 return 0;
468 }
469
470 /* Called from main context */
471 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
472 struct userdata *u;
473
474 pa_sink_assert_ref(s);
475 pa_assert_se(u = s->userdata);
476
477 if (!PA_SINK_IS_LINKED(state) ||
478 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
479 return 0;
480
481 if (state == PA_SINK_RUNNING) {
482 /* restart timer when both sink and source are active */
483 if (IS_ACTIVE(u) && u->adjust_time)
484 pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
485
486 pa_atomic_store(&u->request_resync, 1);
487 pa_sink_input_cork(u->sink_input, FALSE);
488 } else if (state == PA_SINK_SUSPENDED) {
489 pa_sink_input_cork(u->sink_input, TRUE);
490 }
491
492 return 0;
493 }
494
495 /* Called from I/O thread context */
496 static void source_update_requested_latency_cb(pa_source *s) {
497 struct userdata *u;
498
499 pa_source_assert_ref(s);
500 pa_assert_se(u = s->userdata);
501
502 if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
503 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
504 return;
505
506 pa_log_debug("Source update requested latency");
507
508 /* Just hand this one over to the master source */
509 pa_source_output_set_requested_latency_within_thread(
510 u->source_output,
511 pa_source_get_requested_latency_within_thread(s));
512 }
513
514 /* Called from I/O thread context */
515 static void sink_update_requested_latency_cb(pa_sink *s) {
516 struct userdata *u;
517
518 pa_sink_assert_ref(s);
519 pa_assert_se(u = s->userdata);
520
521 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
522 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
523 return;
524
525 pa_log_debug("Sink update requested latency");
526
527 /* Just hand this one over to the master sink */
528 pa_sink_input_set_requested_latency_within_thread(
529 u->sink_input,
530 pa_sink_get_requested_latency_within_thread(s));
531 }
532
533 /* Called from I/O thread context */
534 static void sink_request_rewind_cb(pa_sink *s) {
535 struct userdata *u;
536
537 pa_sink_assert_ref(s);
538 pa_assert_se(u = s->userdata);
539
540 if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
541 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
542 return;
543
544 pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
545
546 /* Just hand this one over to the master sink */
547 pa_sink_input_request_rewind(u->sink_input,
548 s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
549 }
550
551 /* Called from main context */
552 static void source_set_volume_cb(pa_source *s) {
553 struct userdata *u;
554
555 pa_source_assert_ref(s);
556 pa_assert_se(u = s->userdata);
557
558 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
559 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
560 return;
561
562 pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
563 }
564
565 /* Called from main context */
566 static void sink_set_volume_cb(pa_sink *s) {
567 struct userdata *u;
568
569 pa_sink_assert_ref(s);
570 pa_assert_se(u = s->userdata);
571
572 if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
573 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
574 return;
575
576 pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
577 }
578
579 static void source_get_volume_cb(pa_source *s) {
580 struct userdata *u;
581 pa_cvolume v;
582
583 pa_source_assert_ref(s);
584 pa_assert_se(u = s->userdata);
585
586 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
587 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
588 return;
589
590 pa_source_output_get_volume(u->source_output, &v, TRUE);
591
592 if (pa_cvolume_equal(&s->real_volume, &v))
593 /* no change */
594 return;
595
596 s->real_volume = v;
597 pa_source_set_soft_volume(s, NULL);
598 }
599
600 /* Called from main context */
601 static void source_set_mute_cb(pa_source *s) {
602 struct userdata *u;
603
604 pa_source_assert_ref(s);
605 pa_assert_se(u = s->userdata);
606
607 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
608 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
609 return;
610
611 pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
612 }
613
614 /* Called from main context */
615 static void sink_set_mute_cb(pa_sink *s) {
616 struct userdata *u;
617
618 pa_sink_assert_ref(s);
619 pa_assert_se(u = s->userdata);
620
621 if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
622 !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
623 return;
624
625 pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
626 }
627
628 /* Called from main context */
629 static void source_get_mute_cb(pa_source *s) {
630 struct userdata *u;
631
632 pa_source_assert_ref(s);
633 pa_assert_se(u = s->userdata);
634
635 if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
636 !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
637 return;
638
639 pa_source_output_get_mute(u->source_output);
640 }
641
642 /* must be called from the input thread context */
643 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
644 int64_t diff;
645
646 if (diff_time < 0) {
647 diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
648
649 if (diff > 0) {
650 /* add some extra safety samples to compensate for jitter in the
651 * timings */
652 diff += 10 * pa_frame_size (&u->source_output->sample_spec);
653
654 pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
655
656 u->sink_skip = diff;
657 u->source_skip = 0;
658 }
659 } else if (diff_time > 0) {
660 diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
661
662 if (diff > 0) {
663 pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
664
665 u->source_skip = diff;
666 u->sink_skip = 0;
667 }
668 }
669 }
670
671 /* must be called from the input thread */
672 static void do_resync(struct userdata *u) {
673 int64_t diff_time;
674 struct snapshot latency_snapshot;
675
676 pa_log("Doing resync");
677
678 /* update our snapshot */
679 source_output_snapshot_within_thread(u, &latency_snapshot);
680 pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
681
682 /* calculate drift between capture and playback */
683 diff_time = calc_diff(u, &latency_snapshot);
684
685 /* and adjust for the drift */
686 apply_diff_time(u, diff_time);
687 }
688
689 /* 1. Calculate drift at this point, pass to canceller
690 * 2. Push out playback samples in blocksize chunks
691 * 3. Push out capture samples in blocksize chunks
692 * 4. ???
693 * 5. Profit
694 */
695 static void do_push_drift_comp(struct userdata *u) {
696 size_t rlen, plen;
697 pa_memchunk rchunk, pchunk, cchunk;
698 uint8_t *rdata, *pdata, *cdata;
699 float drift;
700 int unused PA_GCC_UNUSED;
701
702 rlen = pa_memblockq_get_length(u->source_memblockq);
703 plen = pa_memblockq_get_length(u->sink_memblockq);
704
705 /* Estimate snapshot drift as follows:
706 * pd: amount of data consumed since last time
707 * rd: amount of data consumed since last time
708 *
709 * drift = (pd - rd) / rd;
710 *
711 * We calculate pd and rd as the memblockq length less the number of
712 * samples left from the last iteration (to avoid double counting
713 * those remainder samples.
714 */
715 drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
716 u->sink_rem = plen % u->blocksize;
717 u->source_rem = rlen % u->blocksize;
718
719 /* Now let the canceller work its drift compensation magic */
720 u->ec->set_drift(u->ec, drift);
721
722 if (u->save_aec) {
723 if (u->drift_file)
724 fprintf(u->drift_file, "d %a\n", drift);
725 }
726
727 /* Send in the playback samples first */
728 while (plen >= u->blocksize) {
729 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
730 pdata = pa_memblock_acquire(pchunk.memblock);
731 pdata += pchunk.index;
732
733 u->ec->play(u->ec, pdata);
734
735 if (u->save_aec) {
736 if (u->drift_file)
737 fprintf(u->drift_file, "p %d\n", u->blocksize);
738 if (u->played_file)
739 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
740 }
741
742 pa_memblock_release(pchunk.memblock);
743 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
744 pa_memblock_unref(pchunk.memblock);
745
746 plen -= u->blocksize;
747 }
748
749 /* And now the capture samples */
750 while (rlen >= u->blocksize) {
751 pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
752
753 rdata = pa_memblock_acquire(rchunk.memblock);
754 rdata += rchunk.index;
755
756 cchunk.index = 0;
757 cchunk.length = u->blocksize;
758 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
759 cdata = pa_memblock_acquire(cchunk.memblock);
760
761 u->ec->record(u->ec, rdata, cdata);
762
763 if (u->save_aec) {
764 if (u->drift_file)
765 fprintf(u->drift_file, "c %d\n", u->blocksize);
766 if (u->captured_file)
767 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
768 if (u->canceled_file)
769 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
770 }
771
772 pa_memblock_release(cchunk.memblock);
773 pa_memblock_release(rchunk.memblock);
774
775 pa_memblock_unref(rchunk.memblock);
776
777 pa_source_post(u->source, &cchunk);
778 pa_memblock_unref(cchunk.memblock);
779
780 pa_memblockq_drop(u->source_memblockq, u->blocksize);
781 rlen -= u->blocksize;
782 }
783 }
784
785 /* This one's simpler than the drift compensation case -- we just iterate over
786 * the capture buffer, and pass the canceller blocksize bytes of playback and
787 * capture data. */
788 static void do_push(struct userdata *u) {
789 size_t rlen, plen;
790 pa_memchunk rchunk, pchunk, cchunk;
791 uint8_t *rdata, *pdata, *cdata;
792 int unused PA_GCC_UNUSED;
793
794 rlen = pa_memblockq_get_length(u->source_memblockq);
795 plen = pa_memblockq_get_length(u->sink_memblockq);
796
797 while (rlen >= u->blocksize) {
798 /* take fixed block from recorded samples */
799 pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
800
801 if (plen > u->blocksize) {
802 if (plen > u->blocksize) {
803 /* take fixed block from played samples */
804 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
805
806 rdata = pa_memblock_acquire(rchunk.memblock);
807 rdata += rchunk.index;
808 pdata = pa_memblock_acquire(pchunk.memblock);
809 pdata += pchunk.index;
810
811 cchunk.index = 0;
812 cchunk.length = u->blocksize;
813 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
814 cdata = pa_memblock_acquire(cchunk.memblock);
815
816 if (u->save_aec) {
817 if (u->captured_file)
818 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
819 if (u->played_file)
820 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
821 }
822
823 /* perform echo cancellation */
824 u->ec->run(u->ec, rdata, pdata, cdata);
825
826 if (u->save_aec) {
827 if (u->canceled_file)
828 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
829 }
830
831 pa_memblock_release(cchunk.memblock);
832 pa_memblock_release(pchunk.memblock);
833 pa_memblock_release(rchunk.memblock);
834
835 /* drop consumed sink samples */
836 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
837 pa_memblock_unref(pchunk.memblock);
838
839 pa_memblock_unref(rchunk.memblock);
840 /* the filtered samples now become the samples from our
841 * source */
842 rchunk = cchunk;
843
844 plen -= u->blocksize;
845 }
846 }
847
848 /* forward the (echo-canceled) data to the virtual source */
849 pa_source_post(u->source, &rchunk);
850 pa_memblock_unref(rchunk.memblock);
851
852 pa_memblockq_drop(u->source_memblockq, u->blocksize);
853 rlen -= u->blocksize;
854 }
855 }
856
857 /* Called from input thread context */
858 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
859 struct userdata *u;
860 size_t rlen, plen, to_skip;
861 pa_memchunk rchunk;
862
863 pa_source_output_assert_ref(o);
864 pa_source_output_assert_io_context(o);
865 pa_assert_se(u = o->userdata);
866
867 if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
868 pa_log("push when no link?");
869 return;
870 }
871
872 if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
873 u->sink->thread_info.state != PA_SINK_RUNNING)) {
874 pa_source_post(u->source, chunk);
875 return;
876 }
877
878 /* handle queued messages, do any message sending of our own */
879 while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
880 ;
881
882 pa_memblockq_push_align(u->source_memblockq, chunk);
883
884 rlen = pa_memblockq_get_length(u->source_memblockq);
885 plen = pa_memblockq_get_length(u->sink_memblockq);
886
887 /* Let's not do anything else till we have enough data to process */
888 if (rlen < u->blocksize)
889 return;
890
891 /* See if we need to drop samples in order to sync */
892 if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
893 do_resync(u);
894 }
895
896 /* Okay, skip cancellation for skipped source samples if needed. */
897 if (PA_UNLIKELY(u->source_skip)) {
898 /* The slightly tricky bit here is that we drop all but modulo
899 * blocksize bytes and then adjust for that last bit on the sink side.
900 * We do this because the source data is coming at a fixed rate, which
901 * means the only way to try to catch up is drop sink samples and let
902 * the canceller cope up with this. */
903 to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
904 to_skip -= to_skip % u->blocksize;
905
906 if (to_skip) {
907 pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
908 pa_source_post(u->source, &rchunk);
909
910 pa_memblock_unref(rchunk.memblock);
911 pa_memblockq_drop(u->source_memblockq, u->blocksize);
912
913 rlen -= to_skip;
914 u->source_skip -= to_skip;
915 }
916
917 if (rlen && u->source_skip % u->blocksize) {
918 u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
919 u->source_skip -= (u->source_skip % u->blocksize);
920 }
921 }
922
923 /* And for the sink, these samples have been played back already, so we can
924 * just drop them and get on with it. */
925 if (PA_UNLIKELY(u->sink_skip)) {
926 to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
927
928 pa_memblockq_drop(u->sink_memblockq, to_skip);
929
930 plen -= to_skip;
931 u->sink_skip -= to_skip;
932 }
933
934 /* process and push out samples */
935 if (u->ec->params.drift_compensation)
936 do_push_drift_comp(u);
937 else
938 do_push(u);
939 }
940
941 /* Called from I/O thread context */
942 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
943 struct userdata *u;
944
945 pa_sink_input_assert_ref(i);
946 pa_assert(chunk);
947 pa_assert_se(u = i->userdata);
948
949 if (u->sink->thread_info.rewind_requested)
950 pa_sink_process_rewind(u->sink, 0);
951
952 pa_sink_render_full(u->sink, nbytes, chunk);
953
954 if (i->thread_info.underrun_for > 0) {
955 pa_log_debug("Handling end of underrun.");
956 pa_atomic_store(&u->request_resync, 1);
957 }
958
959 /* let source thread handle the chunk. pass the sample count as well so that
960 * the source IO thread can update the right variables. */
961 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
962 NULL, 0, chunk, NULL);
963 u->send_counter += chunk->length;
964
965 return 0;
966 }
967
968 /* Called from input thread context */
969 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
970 struct userdata *u;
971
972 pa_source_output_assert_ref(o);
973 pa_source_output_assert_io_context(o);
974 pa_assert_se(u = o->userdata);
975
976 pa_source_process_rewind(u->source, nbytes);
977
978 /* go back on read side, we need to use older sink data for this */
979 pa_memblockq_rewind(u->sink_memblockq, nbytes);
980
981 /* manipulate write index */
982 pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
983
984 pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
985 (long long) pa_memblockq_get_length (u->source_memblockq));
986 }
987
988 /* Called from I/O thread context */
989 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
990 struct userdata *u;
991
992 pa_sink_input_assert_ref(i);
993 pa_assert_se(u = i->userdata);
994
995 pa_log_debug("Sink process rewind %lld", (long long) nbytes);
996
997 pa_sink_process_rewind(u->sink, nbytes);
998
999 pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1000 u->send_counter -= nbytes;
1001 }
1002
1003 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1004 size_t delay, rlen, plen;
1005 pa_usec_t now, latency;
1006
1007 now = pa_rtclock_now();
1008 latency = pa_source_get_latency_within_thread(u->source_output->source);
1009 delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1010
1011 delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1012 rlen = pa_memblockq_get_length(u->source_memblockq);
1013 plen = pa_memblockq_get_length(u->sink_memblockq);
1014
1015 snapshot->source_now = now;
1016 snapshot->source_latency = latency;
1017 snapshot->source_delay = delay;
1018 snapshot->recv_counter = u->recv_counter;
1019 snapshot->rlen = rlen + u->sink_skip;
1020 snapshot->plen = plen + u->source_skip;
1021 }
1022
1023
1024 /* Called from output thread context */
1025 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1026 struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1027
1028 switch (code) {
1029
1030 case SOURCE_OUTPUT_MESSAGE_POST:
1031
1032 pa_source_output_assert_io_context(u->source_output);
1033
1034 if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1035 pa_memblockq_push_align(u->sink_memblockq, chunk);
1036 else
1037 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1038
1039 u->recv_counter += (int64_t) chunk->length;
1040
1041 return 0;
1042
1043 case SOURCE_OUTPUT_MESSAGE_REWIND:
1044 pa_source_output_assert_io_context(u->source_output);
1045
1046 /* manipulate write index, never go past what we have */
1047 if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1048 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1049 else
1050 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1051
1052 pa_log_debug("Sink rewind (%lld)", (long long) offset);
1053
1054 u->recv_counter -= offset;
1055
1056 return 0;
1057
1058 case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1059 struct snapshot *snapshot = (struct snapshot *) data;
1060
1061 source_output_snapshot_within_thread(u, snapshot);
1062 return 0;
1063 }
1064
1065 case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1066 apply_diff_time(u, offset);
1067 return 0;
1068
1069 }
1070
1071 return pa_source_output_process_msg(obj, code, data, offset, chunk);
1072 }
1073
1074 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1075 struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1076
1077 switch (code) {
1078
1079 case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1080 size_t delay;
1081 pa_usec_t now, latency;
1082 struct snapshot *snapshot = (struct snapshot *) data;
1083
1084 pa_sink_input_assert_io_context(u->sink_input);
1085
1086 now = pa_rtclock_now();
1087 latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1088 delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1089
1090 delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1091
1092 snapshot->sink_now = now;
1093 snapshot->sink_latency = latency;
1094 snapshot->sink_delay = delay;
1095 snapshot->send_counter = u->send_counter;
1096 return 0;
1097 }
1098 }
1099
1100 return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1101 }
1102
1103 /* Called from I/O thread context */
1104 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1105 struct userdata *u;
1106
1107 pa_sink_input_assert_ref(i);
1108 pa_assert_se(u = i->userdata);
1109
1110 pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1111
1112 pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1113 pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1114 }
1115
1116 /* Called from I/O thread context */
1117 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1118 struct userdata *u;
1119
1120 pa_source_output_assert_ref(o);
1121 pa_assert_se(u = o->userdata);
1122
1123 pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1124
1125 pa_source_set_max_rewind_within_thread(u->source, nbytes);
1126 }
1127
1128 /* Called from I/O thread context */
1129 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1130 struct userdata *u;
1131
1132 pa_sink_input_assert_ref(i);
1133 pa_assert_se(u = i->userdata);
1134
1135 pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1136
1137 pa_sink_set_max_request_within_thread(u->sink, nbytes);
1138 }
1139
1140 /* Called from I/O thread context */
1141 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1142 struct userdata *u;
1143 pa_usec_t latency;
1144
1145 pa_sink_input_assert_ref(i);
1146 pa_assert_se(u = i->userdata);
1147
1148 latency = pa_sink_get_requested_latency_within_thread(i->sink);
1149
1150 pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1151 }
1152
1153 /* Called from I/O thread context */
1154 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1155 struct userdata *u;
1156 pa_usec_t latency;
1157
1158 pa_source_output_assert_ref(o);
1159 pa_assert_se(u = o->userdata);
1160
1161 latency = pa_source_get_requested_latency_within_thread(o->source);
1162
1163 pa_log_debug("source output update requested latency %lld", (long long) latency);
1164 }
1165
1166 /* Called from I/O thread context */
1167 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1168 struct userdata *u;
1169
1170 pa_sink_input_assert_ref(i);
1171 pa_assert_se(u = i->userdata);
1172
1173 pa_log_debug("Sink input update latency range %lld %lld",
1174 (long long) i->sink->thread_info.min_latency,
1175 (long long) i->sink->thread_info.max_latency);
1176
1177 pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1178 }
1179
1180 /* Called from I/O thread context */
1181 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1182 struct userdata *u;
1183
1184 pa_source_output_assert_ref(o);
1185 pa_assert_se(u = o->userdata);
1186
1187 pa_log_debug("Source output update latency range %lld %lld",
1188 (long long) o->source->thread_info.min_latency,
1189 (long long) o->source->thread_info.max_latency);
1190
1191 pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1192 }
1193
1194 /* Called from I/O thread context */
1195 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1196 struct userdata *u;
1197
1198 pa_sink_input_assert_ref(i);
1199 pa_assert_se(u = i->userdata);
1200
1201 pa_log_debug("Sink input update fixed latency %lld",
1202 (long long) i->sink->thread_info.fixed_latency);
1203
1204 pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1205 }
1206
1207 /* Called from I/O thread context */
1208 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1209 struct userdata *u;
1210
1211 pa_source_output_assert_ref(o);
1212 pa_assert_se(u = o->userdata);
1213
1214 pa_log_debug("Source output update fixed latency %lld",
1215 (long long) o->source->thread_info.fixed_latency);
1216
1217 pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1218 }
1219
1220 /* Called from output thread context */
1221 static void source_output_attach_cb(pa_source_output *o) {
1222 struct userdata *u;
1223
1224 pa_source_output_assert_ref(o);
1225 pa_source_output_assert_io_context(o);
1226 pa_assert_se(u = o->userdata);
1227
1228 pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1229 pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1230 pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1231 pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1232
1233 pa_log_debug("Source output %d attach", o->index);
1234
1235 pa_source_attach_within_thread(u->source);
1236
1237 u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1238 o->source->thread_info.rtpoll,
1239 PA_RTPOLL_LATE,
1240 u->asyncmsgq);
1241 }
1242
1243 /* Called from I/O thread context */
1244 static void sink_input_attach_cb(pa_sink_input *i) {
1245 struct userdata *u;
1246
1247 pa_sink_input_assert_ref(i);
1248 pa_assert_se(u = i->userdata);
1249
1250 pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1251 pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1252
1253 /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1254 * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1255 pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1256
1257 /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1258 * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1259 * HERE. SEE (6) */
1260 pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1261 pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1262
1263 pa_log_debug("Sink input %d attach", i->index);
1264
1265 u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1266 i->sink->thread_info.rtpoll,
1267 PA_RTPOLL_LATE,
1268 u->asyncmsgq);
1269
1270 pa_sink_attach_within_thread(u->sink);
1271 }
1272
1273
1274 /* Called from output thread context */
1275 static void source_output_detach_cb(pa_source_output *o) {
1276 struct userdata *u;
1277
1278 pa_source_output_assert_ref(o);
1279 pa_source_output_assert_io_context(o);
1280 pa_assert_se(u = o->userdata);
1281
1282 pa_source_detach_within_thread(u->source);
1283 pa_source_set_rtpoll(u->source, NULL);
1284
1285 pa_log_debug("Source output %d detach", o->index);
1286
1287 if (u->rtpoll_item_read) {
1288 pa_rtpoll_item_free(u->rtpoll_item_read);
1289 u->rtpoll_item_read = NULL;
1290 }
1291 }
1292
1293 /* Called from I/O thread context */
1294 static void sink_input_detach_cb(pa_sink_input *i) {
1295 struct userdata *u;
1296
1297 pa_sink_input_assert_ref(i);
1298 pa_assert_se(u = i->userdata);
1299
1300 pa_sink_detach_within_thread(u->sink);
1301
1302 pa_sink_set_rtpoll(u->sink, NULL);
1303
1304 pa_log_debug("Sink input %d detach", i->index);
1305
1306 if (u->rtpoll_item_write) {
1307 pa_rtpoll_item_free(u->rtpoll_item_write);
1308 u->rtpoll_item_write = NULL;
1309 }
1310 }
1311
1312 /* Called from output thread context */
1313 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1314 struct userdata *u;
1315
1316 pa_source_output_assert_ref(o);
1317 pa_source_output_assert_io_context(o);
1318 pa_assert_se(u = o->userdata);
1319
1320 pa_log_debug("Source output %d state %d", o->index, state);
1321 }
1322
1323 /* Called from IO thread context */
1324 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1325 struct userdata *u;
1326
1327 pa_sink_input_assert_ref(i);
1328 pa_assert_se(u = i->userdata);
1329
1330 pa_log_debug("Sink input %d state %d", i->index, state);
1331
1332 /* If we are added for the first time, ask for a rewinding so that
1333 * we are heard right-away. */
1334 if (PA_SINK_INPUT_IS_LINKED(state) &&
1335 i->thread_info.state == PA_SINK_INPUT_INIT) {
1336 pa_log_debug("Requesting rewind due to state change.");
1337 pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1338 }
1339 }
1340
1341 /* Called from main thread */
1342 static void source_output_kill_cb(pa_source_output *o) {
1343 struct userdata *u;
1344
1345 pa_source_output_assert_ref(o);
1346 pa_assert_ctl_context();
1347 pa_assert_se(u = o->userdata);
1348
1349 u->dead = TRUE;
1350
1351 /* The order here matters! We first kill the source output, followed
1352 * by the source. That means the source callbacks must be protected
1353 * against an unconnected source output! */
1354 pa_source_output_unlink(u->source_output);
1355 pa_source_unlink(u->source);
1356
1357 pa_source_output_unref(u->source_output);
1358 u->source_output = NULL;
1359
1360 pa_source_unref(u->source);
1361 u->source = NULL;
1362
1363 pa_log_debug("Source output kill %d", o->index);
1364
1365 pa_module_unload_request(u->module, TRUE);
1366 }
1367
1368 /* Called from main context */
1369 static void sink_input_kill_cb(pa_sink_input *i) {
1370 struct userdata *u;
1371
1372 pa_sink_input_assert_ref(i);
1373 pa_assert_se(u = i->userdata);
1374
1375 u->dead = TRUE;
1376
1377 /* The order here matters! We first kill the sink input, followed
1378 * by the sink. That means the sink callbacks must be protected
1379 * against an unconnected sink input! */
1380 pa_sink_input_unlink(u->sink_input);
1381 pa_sink_unlink(u->sink);
1382
1383 pa_sink_input_unref(u->sink_input);
1384 u->sink_input = NULL;
1385
1386 pa_sink_unref(u->sink);
1387 u->sink = NULL;
1388
1389 pa_log_debug("Sink input kill %d", i->index);
1390
1391 pa_module_unload_request(u->module, TRUE);
1392 }
1393
1394 /* Called from main thread */
1395 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1396 struct userdata *u;
1397
1398 pa_source_output_assert_ref(o);
1399 pa_assert_ctl_context();
1400 pa_assert_se(u = o->userdata);
1401
1402 if (u->dead || u->autoloaded)
1403 return FALSE;
1404
1405 return (u->source != dest) && (u->sink != dest->monitor_of);
1406 }
1407
1408 /* Called from main context */
1409 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1410 struct userdata *u;
1411
1412 pa_sink_input_assert_ref(i);
1413 pa_assert_se(u = i->userdata);
1414
1415 if (u->dead || u->autoloaded)
1416 return FALSE;
1417
1418 return u->sink != dest;
1419 }
1420
1421 /* Called from main thread */
1422 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1423 struct userdata *u;
1424
1425 pa_source_output_assert_ref(o);
1426 pa_assert_ctl_context();
1427 pa_assert_se(u = o->userdata);
1428
1429 if (dest) {
1430 pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1431 pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1432 } else
1433 pa_source_set_asyncmsgq(u->source, NULL);
1434
1435 if (u->source_auto_desc && dest) {
1436 const char *y, *z;
1437 pa_proplist *pl;
1438
1439 pl = pa_proplist_new();
1440 y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1441 z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1442 pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1443 y ? y : u->sink_input->sink->name);
1444
1445 pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1446 pa_proplist_free(pl);
1447 }
1448 }
1449
1450 /* Called from main context */
1451 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1452 struct userdata *u;
1453
1454 pa_sink_input_assert_ref(i);
1455 pa_assert_se(u = i->userdata);
1456
1457 if (dest) {
1458 pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1459 pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1460 } else
1461 pa_sink_set_asyncmsgq(u->sink, NULL);
1462
1463 if (u->sink_auto_desc && dest) {
1464 const char *y, *z;
1465 pa_proplist *pl;
1466
1467 pl = pa_proplist_new();
1468 y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1469 z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1470 pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1471 y ? y : u->source_output->source->name);
1472
1473 pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1474 pa_proplist_free(pl);
1475 }
1476 }
1477
1478 /* Called from main context */
1479 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1480 struct userdata *u;
1481
1482 pa_sink_input_assert_ref(i);
1483 pa_assert_se(u = i->userdata);
1484
1485 pa_sink_volume_changed(u->sink, &i->volume);
1486 }
1487
1488 /* Called from main context */
1489 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1490 struct userdata *u;
1491
1492 pa_sink_input_assert_ref(i);
1493 pa_assert_se(u = i->userdata);
1494
1495 pa_sink_mute_changed(u->sink, i->muted);
1496 }
1497
1498 /* Called from main context */
1499 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1500 struct pa_echo_canceller_msg *msg;
1501 struct userdata *u;
1502
1503 pa_assert(o);
1504
1505 msg = PA_ECHO_CANCELLER_MSG(o);
1506 u = msg->userdata;
1507
1508 switch (code) {
1509 case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1510 pa_cvolume *v = (pa_cvolume *) userdata;
1511
1512 if (u->use_volume_sharing)
1513 pa_source_set_volume(u->source, v, TRUE, FALSE);
1514 else
1515 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1516
1517 break;
1518 }
1519
1520 default:
1521 pa_assert_not_reached();
1522 break;
1523 }
1524
1525 return 0;
1526 }
1527
1528 /* Called by the canceller, so thread context */
1529 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1530 *v = ec->msg->userdata->thread_info.current_volume;
1531 }
1532
1533 /* Called by the canceller, so thread context */
1534 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1535 if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1536 pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1537
1538 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1539 pa_xfree);
1540 }
1541 }
1542
1543 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1544 if (pa_streq(method, "speex"))
1545 return PA_ECHO_CANCELLER_SPEEX;
1546 else if (pa_streq(method, "adrian"))
1547 return PA_ECHO_CANCELLER_ADRIAN;
1548 #ifdef HAVE_WEBRTC
1549 else if (pa_streq(method, "webrtc"))
1550 return PA_ECHO_CANCELLER_WEBRTC;
1551 #endif
1552 else
1553 return PA_ECHO_CANCELLER_INVALID;
1554 }
1555
1556 /* Common initialisation bits between module-echo-cancel and the standalone test program */
1557 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1558 pa_echo_canceller_method_t ec_method;
1559
1560 if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1561 pa_log("Invalid sample format specification or channel map");
1562 goto fail;
1563 }
1564
1565 u->ec = pa_xnew0(pa_echo_canceller, 1);
1566 if (!u->ec) {
1567 pa_log("Failed to alloc echo canceller");
1568 goto fail;
1569 }
1570
1571 if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1572 pa_log("Invalid echo canceller implementation");
1573 goto fail;
1574 }
1575
1576 u->ec->init = ec_table[ec_method].init;
1577 u->ec->play = ec_table[ec_method].play;
1578 u->ec->record = ec_table[ec_method].record;
1579 u->ec->set_drift = ec_table[ec_method].set_drift;
1580 u->ec->run = ec_table[ec_method].run;
1581 u->ec->done = ec_table[ec_method].done;
1582
1583 return 0;
1584
1585 fail:
1586 return -1;
1587 }
1588
1589
1590 int pa__init(pa_module*m) {
1591 struct userdata *u;
1592 pa_sample_spec source_ss, sink_ss;
1593 pa_channel_map source_map, sink_map;
1594 pa_modargs *ma;
1595 pa_source *source_master=NULL;
1596 pa_sink *sink_master=NULL;
1597 pa_source_output_new_data source_output_data;
1598 pa_sink_input_new_data sink_input_data;
1599 pa_source_new_data source_data;
1600 pa_sink_new_data sink_data;
1601 pa_memchunk silence;
1602 uint32_t temp;
1603
1604 pa_assert(m);
1605
1606 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1607 pa_log("Failed to parse module arguments.");
1608 goto fail;
1609 }
1610
1611 if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1612 pa_log("Master source not found");
1613 goto fail;
1614 }
1615 pa_assert(source_master);
1616
1617 if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1618 pa_log("Master sink not found");
1619 goto fail;
1620 }
1621 pa_assert(sink_master);
1622
1623 if (source_master->monitor_of == sink_master) {
1624 pa_log("Can't cancel echo between a sink and its monitor");
1625 goto fail;
1626 }
1627
1628 source_ss = source_master->sample_spec;
1629 source_ss.rate = DEFAULT_RATE;
1630 source_ss.channels = DEFAULT_CHANNELS;
1631 pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1632
1633 sink_ss = sink_master->sample_spec;
1634 sink_map = sink_master->channel_map;
1635
1636 u = pa_xnew0(struct userdata, 1);
1637 if (!u) {
1638 pa_log("Failed to alloc userdata");
1639 goto fail;
1640 }
1641 u->core = m->core;
1642 u->module = m;
1643 m->userdata = u;
1644 u->dead = FALSE;
1645
1646 u->use_volume_sharing = TRUE;
1647 if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1648 pa_log("use_volume_sharing= expects a boolean argument");
1649 goto fail;
1650 }
1651
1652 temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1653 if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1654 pa_log("Failed to parse adjust_time value");
1655 goto fail;
1656 }
1657
1658 if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1659 u->adjust_time = temp * PA_USEC_PER_SEC;
1660 else
1661 u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1662
1663 temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1664 if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1665 pa_log("Failed to parse adjust_threshold value");
1666 goto fail;
1667 }
1668
1669 if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1670 u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1671 else
1672 u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1673
1674 u->save_aec = DEFAULT_SAVE_AEC;
1675 if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1676 pa_log("Failed to parse save_aec value");
1677 goto fail;
1678 }
1679
1680 u->autoloaded = DEFAULT_AUTOLOADED;
1681 if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1682 pa_log("Failed to parse autoloaded value");
1683 goto fail;
1684 }
1685
1686 if (init_common(ma, u, &source_ss, &source_map))
1687 goto fail;
1688
1689 u->asyncmsgq = pa_asyncmsgq_new(0);
1690 u->need_realign = TRUE;
1691
1692 if (u->ec->init) {
1693 if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1694 pa_log("Failed to init AEC engine");
1695 goto fail;
1696 }
1697 }
1698
1699 if (u->ec->params.drift_compensation)
1700 pa_assert(u->ec->set_drift);
1701
1702 /* Create source */
1703 pa_source_new_data_init(&source_data);
1704 source_data.driver = __FILE__;
1705 source_data.module = m;
1706 if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1707 source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1708 pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1709 pa_source_new_data_set_channel_map(&source_data, &source_map);
1710 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1711 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1712 if (!u->autoloaded)
1713 pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1714
1715 if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1716 pa_log("Invalid properties");
1717 pa_source_new_data_done(&source_data);
1718 goto fail;
1719 }
1720
1721 if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1722 const char *y, *z;
1723
1724 y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1725 z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1726 pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1727 z ? z : source_master->name, y ? y : sink_master->name);
1728 }
1729
1730 u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1731 | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1732 pa_source_new_data_done(&source_data);
1733
1734 if (!u->source) {
1735 pa_log("Failed to create source.");
1736 goto fail;
1737 }
1738
1739 u->source->parent.process_msg = source_process_msg_cb;
1740 u->source->set_state = source_set_state_cb;
1741 u->source->update_requested_latency = source_update_requested_latency_cb;
1742 pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1743 pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1744 if (!u->use_volume_sharing) {
1745 pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1746 pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1747 pa_source_enable_decibel_volume(u->source, TRUE);
1748 }
1749 u->source->userdata = u;
1750
1751 pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1752
1753 /* Create sink */
1754 pa_sink_new_data_init(&sink_data);
1755 sink_data.driver = __FILE__;
1756 sink_data.module = m;
1757 if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1758 sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1759 pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1760 pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1761 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1762 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1763 if (!u->autoloaded)
1764 pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1765
1766 if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1767 pa_log("Invalid properties");
1768 pa_sink_new_data_done(&sink_data);
1769 goto fail;
1770 }
1771
1772 if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1773 const char *y, *z;
1774
1775 y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1776 z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1777 pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1778 z ? z : sink_master->name, y ? y : source_master->name);
1779 }
1780
1781 u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1782 | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1783 pa_sink_new_data_done(&sink_data);
1784
1785 if (!u->sink) {
1786 pa_log("Failed to create sink.");
1787 goto fail;
1788 }
1789
1790 u->sink->parent.process_msg = sink_process_msg_cb;
1791 u->sink->set_state = sink_set_state_cb;
1792 u->sink->update_requested_latency = sink_update_requested_latency_cb;
1793 u->sink->request_rewind = sink_request_rewind_cb;
1794 pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1795 if (!u->use_volume_sharing) {
1796 pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1797 pa_sink_enable_decibel_volume(u->sink, TRUE);
1798 }
1799 u->sink->userdata = u;
1800
1801 pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1802
1803 /* Create source output */
1804 pa_source_output_new_data_init(&source_output_data);
1805 source_output_data.driver = __FILE__;
1806 source_output_data.module = m;
1807 pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1808 source_output_data.destination_source = u->source;
1809 /* FIXME
1810 source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1811
1812 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1813 pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1814 pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1815 pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1816
1817 pa_source_output_new(&u->source_output, m->core, &source_output_data);
1818 pa_source_output_new_data_done(&source_output_data);
1819
1820 if (!u->source_output)
1821 goto fail;
1822
1823 u->source_output->parent.process_msg = source_output_process_msg_cb;
1824 u->source_output->push = source_output_push_cb;
1825 u->source_output->process_rewind = source_output_process_rewind_cb;
1826 u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1827 u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1828 u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1829 u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1830 u->source_output->kill = source_output_kill_cb;
1831 u->source_output->attach = source_output_attach_cb;
1832 u->source_output->detach = source_output_detach_cb;
1833 u->source_output->state_change = source_output_state_change_cb;
1834 u->source_output->may_move_to = source_output_may_move_to_cb;
1835 u->source_output->moving = source_output_moving_cb;
1836 u->source_output->userdata = u;
1837
1838 u->source->output_from_master = u->source_output;
1839
1840 /* Create sink input */
1841 pa_sink_input_new_data_init(&sink_input_data);
1842 sink_input_data.driver = __FILE__;
1843 sink_input_data.module = m;
1844 pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1845 sink_input_data.origin_sink = u->sink;
1846 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1847 pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1848 pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1849 pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1850 sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1851
1852 pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1853 pa_sink_input_new_data_done(&sink_input_data);
1854
1855 if (!u->sink_input)
1856 goto fail;
1857
1858 u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1859 u->sink_input->pop = sink_input_pop_cb;
1860 u->sink_input->process_rewind = sink_input_process_rewind_cb;
1861 u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1862 u->sink_input->update_max_request = sink_input_update_max_request_cb;
1863 u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1864 u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1865 u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1866 u->sink_input->kill = sink_input_kill_cb;
1867 u->sink_input->attach = sink_input_attach_cb;
1868 u->sink_input->detach = sink_input_detach_cb;
1869 u->sink_input->state_change = sink_input_state_change_cb;
1870 u->sink_input->may_move_to = sink_input_may_move_to_cb;
1871 u->sink_input->moving = sink_input_moving_cb;
1872 if (!u->use_volume_sharing)
1873 u->sink_input->volume_changed = sink_input_volume_changed_cb;
1874 u->sink_input->mute_changed = sink_input_mute_changed_cb;
1875 u->sink_input->userdata = u;
1876
1877 u->sink->input_to_master = u->sink_input;
1878
1879 pa_sink_input_get_silence(u->sink_input, &silence);
1880
1881 u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1882 &source_ss, 1, 1, 0, &silence);
1883 u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1884 &sink_ss, 1, 1, 0, &silence);
1885
1886 pa_memblock_unref(silence.memblock);
1887
1888 if (!u->source_memblockq || !u->sink_memblockq) {
1889 pa_log("Failed to create memblockq.");
1890 goto fail;
1891 }
1892
1893 if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1894 u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1895 else if (u->ec->params.drift_compensation) {
1896 pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1897 u->adjust_time = 0;
1898 /* Perform resync just once to give the canceller a leg up */
1899 pa_atomic_store(&u->request_resync, 1);
1900 }
1901
1902 if (u->save_aec) {
1903 pa_log("Creating AEC files in /tmp");
1904 u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1905 if (u->captured_file == NULL)
1906 perror ("fopen failed");
1907 u->played_file = fopen("/tmp/aec_play.sw", "wb");
1908 if (u->played_file == NULL)
1909 perror ("fopen failed");
1910 u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1911 if (u->canceled_file == NULL)
1912 perror ("fopen failed");
1913 if (u->ec->params.drift_compensation) {
1914 u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1915 if (u->drift_file == NULL)
1916 perror ("fopen failed");
1917 }
1918 }
1919
1920 u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1921 u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1922 u->ec->msg->userdata = u;
1923
1924 u->thread_info.current_volume = u->source->reference_volume;
1925
1926 pa_sink_put(u->sink);
1927 pa_source_put(u->source);
1928
1929 pa_sink_input_put(u->sink_input);
1930 pa_source_output_put(u->source_output);
1931 pa_modargs_free(ma);
1932
1933 return 0;
1934
1935 fail:
1936 if (ma)
1937 pa_modargs_free(ma);
1938
1939 pa__done(m);
1940
1941 return -1;
1942 }
1943
1944 int pa__get_n_used(pa_module *m) {
1945 struct userdata *u;
1946
1947 pa_assert(m);
1948 pa_assert_se(u = m->userdata);
1949
1950 return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1951 }
1952
1953 void pa__done(pa_module*m) {
1954 struct userdata *u;
1955
1956 pa_assert(m);
1957
1958 if (!(u = m->userdata))
1959 return;
1960
1961 u->dead = TRUE;
1962
1963 /* See comments in source_output_kill_cb() above regarding
1964 * destruction order! */
1965
1966 if (u->time_event)
1967 u->core->mainloop->time_free(u->time_event);
1968
1969 if (u->source_output)
1970 pa_source_output_unlink(u->source_output);
1971 if (u->sink_input)
1972 pa_sink_input_unlink(u->sink_input);
1973
1974 if (u->source)
1975 pa_source_unlink(u->source);
1976 if (u->sink)
1977 pa_sink_unlink(u->sink);
1978
1979 if (u->source_output)
1980 pa_source_output_unref(u->source_output);
1981 if (u->sink_input)
1982 pa_sink_input_unref(u->sink_input);
1983
1984 if (u->source)
1985 pa_source_unref(u->source);
1986 if (u->sink)
1987 pa_sink_unref(u->sink);
1988
1989 if (u->source_memblockq)
1990 pa_memblockq_free(u->source_memblockq);
1991 if (u->sink_memblockq)
1992 pa_memblockq_free(u->sink_memblockq);
1993
1994 if (u->ec) {
1995 if (u->ec->done)
1996 u->ec->done(u->ec);
1997
1998 pa_xfree(u->ec);
1999 }
2000
2001 if (u->asyncmsgq)
2002 pa_asyncmsgq_unref(u->asyncmsgq);
2003
2004 if (u->save_aec) {
2005 if (u->played_file)
2006 fclose(u->played_file);
2007 if (u->captured_file)
2008 fclose(u->captured_file);
2009 if (u->canceled_file)
2010 fclose(u->canceled_file);
2011 if (u->drift_file)
2012 fclose(u->drift_file);
2013 }
2014
2015 pa_xfree(u);
2016 }
2017
2018 #ifdef ECHO_CANCEL_TEST
2019 /*
2020 * Stand-alone test program for running in the canceller on pre-recorded files.
2021 */
2022 int main(int argc, char* argv[]) {
2023 struct userdata u;
2024 pa_sample_spec source_ss, sink_ss;
2025 pa_channel_map source_map, sink_map;
2026 pa_modargs *ma = NULL;
2027 uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2028 int unused PA_GCC_UNUSED;
2029 int ret = 0, i;
2030 char c;
2031 float drift;
2032
2033 pa_memzero(&u, sizeof(u));
2034
2035 if (argc < 4 || argc > 7) {
2036 goto usage;
2037 }
2038
2039 u.ec = pa_xnew0(pa_echo_canceller, 1);
2040 if (!u.ec) {
2041 pa_log("Failed to alloc echo canceller");
2042 goto fail;
2043 }
2044
2045 u.captured_file = fopen(argv[2], "r");
2046 if (u.captured_file == NULL) {
2047 perror ("fopen failed");
2048 goto fail;
2049 }
2050 u.played_file = fopen(argv[1], "r");
2051 if (u.played_file == NULL) {
2052 perror ("fopen failed");
2053 goto fail;
2054 }
2055 u.canceled_file = fopen(argv[3], "wb");
2056 if (u.canceled_file == NULL) {
2057 perror ("fopen failed");
2058 goto fail;
2059 }
2060
2061 u.core = pa_xnew0(pa_core, 1);
2062 u.core->cpu_info.cpu_type = PA_CPU_X86;
2063 u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2064
2065 if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2066 pa_log("Failed to parse module arguments.");
2067 goto fail;
2068 }
2069
2070 source_ss.format = PA_SAMPLE_S16LE;
2071 source_ss.rate = DEFAULT_RATE;
2072 source_ss.channels = DEFAULT_CHANNELS;
2073 pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2074
2075 init_common(ma, &u, &source_ss, &source_map);
2076
2077 if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2078 (argc > 4) ? argv[5] : NULL )) {
2079 pa_log("Failed to init AEC engine");
2080 goto fail;
2081 }
2082
2083 if (u.ec->params.drift_compensation) {
2084 if (argc < 7) {
2085 pa_log("Drift compensation enabled but drift file not specified");
2086 goto fail;
2087 }
2088
2089 u.drift_file = fopen(argv[6], "r");
2090
2091 if (u.drift_file == NULL) {
2092 perror ("fopen failed");
2093 goto fail;
2094 }
2095 }
2096
2097 rdata = pa_xmalloc(u.blocksize);
2098 pdata = pa_xmalloc(u.blocksize);
2099 cdata = pa_xmalloc(u.blocksize);
2100
2101 if (!u.ec->params.drift_compensation) {
2102 while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2103 if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2104 perror("Played file ended before captured file");
2105 goto fail;
2106 }
2107
2108 u.ec->run(u.ec, rdata, pdata, cdata);
2109
2110 unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2111 }
2112 } else {
2113 while (fscanf(u.drift_file, "%c", &c) > 0) {
2114 switch (c) {
2115 case 'd':
2116 if (!fscanf(u.drift_file, "%a", &drift)) {
2117 perror("Drift file incomplete");
2118 goto fail;
2119 }
2120
2121 u.ec->set_drift(u.ec, drift);
2122
2123 break;
2124
2125 case 'c':
2126 if (!fscanf(u.drift_file, "%d", &i)) {
2127 perror("Drift file incomplete");
2128 goto fail;
2129 }
2130
2131 if (fread(rdata, i, 1, u.captured_file) <= 0) {
2132 perror("Captured file ended prematurely");
2133 goto fail;
2134 }
2135
2136 u.ec->record(u.ec, rdata, cdata);
2137
2138 unused = fwrite(cdata, i, 1, u.canceled_file);
2139
2140 break;
2141
2142 case 'p':
2143 if (!fscanf(u.drift_file, "%d", &i)) {
2144 perror("Drift file incomplete");
2145 goto fail;
2146 }
2147
2148 if (fread(pdata, i, 1, u.played_file) <= 0) {
2149 perror("Played file ended prematurely");
2150 goto fail;
2151 }
2152
2153 u.ec->play(u.ec, pdata);
2154
2155 break;
2156 }
2157 }
2158
2159 if (fread(rdata, i, 1, u.captured_file) > 0)
2160 pa_log("All capture data was not consumed");
2161 if (fread(pdata, i, 1, u.played_file) > 0)
2162 pa_log("All playback data was not consumed");
2163 }
2164
2165 u.ec->done(u.ec);
2166
2167 fclose(u.captured_file);
2168 fclose(u.played_file);
2169 fclose(u.canceled_file);
2170 if (u.drift_file)
2171 fclose(u.drift_file);
2172
2173 out:
2174 pa_xfree(rdata);
2175 pa_xfree(pdata);
2176 pa_xfree(cdata);
2177
2178 pa_xfree(u.ec);
2179 pa_xfree(u.core);
2180
2181 if (ma)
2182 pa_modargs_free(ma);
2183
2184 return ret;
2185
2186 usage:
2187 pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2188
2189 fail:
2190 ret = -1;
2191 goto out;
2192 }
2193 #endif /* ECHO_CANCEL_TEST */