2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
70 "sink=<remote sink name> "
72 "format=<sample format> "
73 "channels=<number of channels> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
82 "source=<remote source name> "
84 "format=<sample format> "
85 "channels=<number of channels> "
87 "channel_map=<channel map>");
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION
);
92 PA_MODULE_LOAD_ONCE(FALSE
);
94 static const char* const valid_modargs
[] = {
113 #define DEFAULT_TIMEOUT 5
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
122 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
123 SINK_MESSAGE_REMOTE_SUSPEND
,
124 SINK_MESSAGE_UPDATE_LATENCY
,
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
134 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
135 SOURCE_MESSAGE_REMOTE_SUSPEND
,
136 SOURCE_MESSAGE_UPDATE_LATENCY
139 #define DEFAULT_FRAGSIZE_MSEC 25
144 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
155 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
157 [PA_COMMAND_REQUEST
] = command_request
,
158 [PA_COMMAND_STARTED
] = command_started
,
160 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
161 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
162 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
164 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
168 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
170 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
180 pa_thread_mq thread_mq
;
184 pa_socket_client
*client
;
186 pa_pdispatch
*pdispatch
;
192 size_t requested_bytes
;
199 pa_auth_cookie
*auth_cookie
;
203 uint32_t device_index
;
206 int64_t counter
, counter_delta
;
208 pa_bool_t remote_corked
:1;
209 pa_bool_t remote_suspended
:1;
211 pa_usec_t transport_usec
; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
214 uint32_t ignore_latency_before
;
216 pa_time_event
*time_event
;
218 pa_smoother
*smoother
;
220 char *device_description
;
234 static void request_latency(struct userdata
*u
);
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
238 pa_log_debug("Got stream or client event.");
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
243 struct userdata
*u
= userdata
;
248 pa_assert(u
->pdispatch
== pd
);
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u
->module
, TRUE
);
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
256 struct userdata
*u
= userdata
;
261 pa_assert(u
->pdispatch
== pd
);
263 pa_log_info("Server signalled buffer overrun/underrun.");
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
269 struct userdata
*u
= userdata
;
276 pa_assert(u
->pdispatch
== pd
);
278 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
279 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
280 !pa_tagstruct_eof(t
)) {
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u
->module
, TRUE
);
287 pa_log_debug("Server reports device suspend.");
290 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
292 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
298 /* Called from main context */
299 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
300 struct userdata
*u
= userdata
;
301 uint32_t channel
, di
;
308 pa_assert(u
->pdispatch
== pd
);
310 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
311 pa_tagstruct_getu32(t
, &di
) < 0 ||
312 pa_tagstruct_gets(t
, &dn
) < 0 ||
313 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u
->module
, TRUE
);
320 pa_log_debug("Server reports a stream move.");
323 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
325 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
331 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
332 struct userdata
*u
= userdata
;
333 uint32_t channel
, maxlength
, tlength
= 0, fragsize
, prebuf
, minreq
;
339 pa_assert(u
->pdispatch
== pd
);
341 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
342 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u
->module
, TRUE
);
349 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
350 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
351 pa_tagstruct_get_usec(t
, &usec
) < 0) {
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u
->module
, TRUE
);
358 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
359 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
360 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
361 pa_tagstruct_get_usec(t
, &usec
) < 0) {
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u
->module
, TRUE
);
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
378 /* Called from main context */
379 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
380 struct userdata
*u
= userdata
;
385 pa_assert(u
->pdispatch
== pd
);
387 pa_log_debug("Server reports playback started.");
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
399 x
= pa_rtclock_now();
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
406 x
-= u
->thread_transport_usec
;
408 x
+= u
->thread_transport_usec
;
410 if (u
->remote_suspended
|| u
->remote_corked
)
411 pa_smoother_pause(u
->smoother
, x
);
413 pa_smoother_resume(u
->smoother
, x
, TRUE
);
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
420 if (u
->remote_corked
== cork
)
423 u
->remote_corked
= cork
;
424 check_smoother_status(u
, FALSE
);
427 /* Called from main context */
428 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
435 t
= pa_tagstruct_new(NULL
, 0);
437 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
439 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
441 pa_tagstruct_putu32(t
, u
->ctag
++);
442 pa_tagstruct_putu32(t
, u
->channel
);
443 pa_tagstruct_put_boolean(t
, !!cork
);
444 pa_pstream_send_tagstruct(u
->pstream
, t
);
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
453 if (u
->remote_suspended
== suspend
)
456 u
->remote_suspended
= suspend
;
457 check_smoother_status(u
, TRUE
);
462 /* Called from IO thread context */
463 static void send_data(struct userdata
*u
) {
466 while (u
->requested_bytes
> 0) {
467 pa_memchunk memchunk
;
469 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
470 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
471 pa_memblock_unref(memchunk
.memblock
);
473 u
->requested_bytes
-= memchunk
.length
;
475 u
->counter
+= (int64_t) memchunk
.length
;
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
481 struct userdata
*u
= PA_SINK(o
)->userdata
;
485 case PA_SINK_MESSAGE_SET_STATE
: {
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
491 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
493 if (PA_SINK_IS_OPENED(u
->sink
->state
))
500 case PA_SINK_MESSAGE_GET_LATENCY
: {
501 pa_usec_t yl
, yr
, *usec
= data
;
503 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
504 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
506 *usec
= yl
> yr
? yl
- yr
: 0;
510 case SINK_MESSAGE_REQUEST
:
512 pa_assert(offset
> 0);
513 u
->requested_bytes
+= (size_t) offset
;
515 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
521 case SINK_MESSAGE_REMOTE_SUSPEND
:
523 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
527 case SINK_MESSAGE_UPDATE_LATENCY
: {
530 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
532 if (y
> (pa_usec_t
) offset
)
533 y
-= (pa_usec_t
) offset
;
537 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
539 /* We can access this freely here, since the main thread is waiting for us */
540 u
->thread_transport_usec
= u
->transport_usec
;
545 case SINK_MESSAGE_POST
:
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
552 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
554 u
->counter_delta
+= (int64_t) chunk
->length
;
559 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
562 /* Called from main context */
563 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
565 pa_sink_assert_ref(s
);
568 switch ((pa_sink_state_t
) state
) {
570 case PA_SINK_SUSPENDED
:
571 pa_assert(PA_SINK_IS_OPENED(s
->state
));
572 stream_cork(u
, TRUE
);
576 case PA_SINK_RUNNING
:
577 if (s
->state
== PA_SINK_SUSPENDED
)
578 stream_cork(u
, FALSE
);
581 case PA_SINK_UNLINKED
:
583 case PA_SINK_INVALID_STATE
:
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
594 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
598 case PA_SOURCE_MESSAGE_SET_STATE
: {
601 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
602 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
607 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
608 pa_usec_t yr
, yl
, *usec
= data
;
610 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
611 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
613 *usec
= yr
> yl
? yr
- yl
: 0;
617 case SOURCE_MESSAGE_POST
: {
620 pa_mcalign_push(u
->mcalign
, chunk
);
622 while (pa_mcalign_pop(u
->mcalign
, &c
) >= 0) {
624 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
625 pa_source_post(u
->source
, &c
);
627 pa_memblock_unref(c
.memblock
);
629 u
->counter
+= (int64_t) c
.length
;
635 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
637 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
640 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
643 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
644 y
+= (pa_usec_t
) offset
;
646 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
648 /* We can access this freely here, since the main thread is waiting for us */
649 u
->thread_transport_usec
= u
->transport_usec
;
655 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
658 /* Called from main context */
659 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
661 pa_source_assert_ref(s
);
664 switch ((pa_source_state_t
) state
) {
666 case PA_SOURCE_SUSPENDED
:
667 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
668 stream_cork(u
, TRUE
);
672 case PA_SOURCE_RUNNING
:
673 if (s
->state
== PA_SOURCE_SUSPENDED
)
674 stream_cork(u
, FALSE
);
677 case PA_SOURCE_UNLINKED
:
679 case PA_SINK_INVALID_STATE
:
688 static void thread_func(void *userdata
) {
689 struct userdata
*u
= userdata
;
693 pa_log_debug("Thread starting up");
695 pa_thread_mq_install(&u
->thread_mq
);
701 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
702 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
703 pa_sink_process_rewind(u
->sink
, 0);
706 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
714 /* If this was no regular exit from the loop we have to continue
715 * processing messages until we received PA_MESSAGE_SHUTDOWN */
716 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
717 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
720 pa_log_debug("Thread shutting down");
724 /* Called from main context */
725 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
726 struct userdata
*u
= userdata
;
727 uint32_t bytes
, channel
;
730 pa_assert(command
== PA_COMMAND_REQUEST
);
733 pa_assert(u
->pdispatch
== pd
);
735 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
736 pa_tagstruct_getu32(t
, &bytes
) < 0) {
737 pa_log("Invalid protocol reply");
741 if (channel
!= u
->channel
) {
742 pa_log("Received data for invalid channel");
746 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
750 pa_module_unload_request(u
->module
, TRUE
);
755 /* Called from main context */
756 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
757 struct userdata
*u
= userdata
;
758 pa_usec_t sink_usec
, source_usec
;
760 int64_t write_index
, read_index
;
761 struct timeval local
, remote
, now
;
768 if (command
!= PA_COMMAND_REPLY
) {
769 if (command
== PA_COMMAND_ERROR
)
770 pa_log("Failed to get latency.");
772 pa_log("Protocol error.");
776 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
777 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
778 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
779 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
780 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
781 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
782 pa_tagstruct_gets64(t
, &read_index
) < 0) {
783 pa_log("Invalid reply.");
788 if (u
->version
>= 13) {
789 uint64_t underrun_for
= 0, playing_for
= 0;
791 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
792 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
793 pa_log("Invalid reply.");
799 if (!pa_tagstruct_eof(t
)) {
800 pa_log("Invalid reply.");
804 if (tag
< u
->ignore_latency_before
) {
808 pa_gettimeofday(&now
);
810 /* Calculate transport usec */
811 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
812 /* local and remote seem to have synchronized clocks */
814 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
816 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
819 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
821 /* First, take the device's delay */
823 delay
= (int64_t) sink_usec
;
824 ss
= &u
->sink
->sample_spec
;
826 delay
= (int64_t) source_usec
;
827 ss
= &u
->source
->sample_spec
;
830 /* Add the length of our server-side buffer */
831 if (write_index
>= read_index
)
832 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
834 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
836 /* Our measurements are already out of date, hence correct by the *
837 * transport latency */
839 delay
-= (int64_t) u
->transport_usec
;
841 delay
+= (int64_t) u
->transport_usec
;
844 /* Now correct by what we have have read/written since we requested the update */
846 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
848 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
852 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
854 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
861 pa_module_unload_request(u
->module
, TRUE
);
864 /* Called from main context */
865 static void request_latency(struct userdata
*u
) {
871 t
= pa_tagstruct_new(NULL
, 0);
873 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
875 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
877 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
878 pa_tagstruct_putu32(t
, u
->channel
);
880 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
882 pa_pstream_send_tagstruct(u
->pstream
, t
);
883 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
885 u
->ignore_latency_before
= tag
;
886 u
->counter_delta
= 0;
889 /* Called from main context */
890 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
891 struct userdata
*u
= userdata
;
899 pa_core_rttime_restart(u
->core
, e
, pa_rtclock_now() + LATENCY_INTERVAL
);
902 /* Called from main context */
903 static void update_description(struct userdata
*u
) {
905 char un
[128], hn
[128];
910 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
913 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
916 pa_sink_set_description(u
->sink
, d
);
917 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
918 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
919 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
921 pa_source_set_description(u
->source
, d
);
922 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
923 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
924 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
929 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
930 pa_get_user_name(un
, sizeof(un
)),
931 pa_get_host_name(hn
, sizeof(hn
)));
933 t
= pa_tagstruct_new(NULL
, 0);
935 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
937 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
939 pa_tagstruct_putu32(t
, u
->ctag
++);
940 pa_tagstruct_putu32(t
, u
->channel
);
941 pa_tagstruct_puts(t
, d
);
942 pa_pstream_send_tagstruct(u
->pstream
, t
);
947 /* Called from main context */
948 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
949 struct userdata
*u
= userdata
;
952 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
958 if (command
!= PA_COMMAND_REPLY
) {
959 if (command
== PA_COMMAND_ERROR
)
960 pa_log("Failed to get info.");
962 pa_log("Protocol error.");
966 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
967 pa_tagstruct_gets(t
, &server_version
) < 0 ||
968 pa_tagstruct_gets(t
, &user_name
) < 0 ||
969 pa_tagstruct_gets(t
, &host_name
) < 0 ||
970 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
971 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
972 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
973 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
974 (u
->version
>= 15 && pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
976 pa_log("Parse failure");
980 if (!pa_tagstruct_eof(t
)) {
981 pa_log("Packet too long");
985 pa_xfree(u
->server_fqdn
);
986 u
->server_fqdn
= pa_xstrdup(host_name
);
988 pa_xfree(u
->user_name
);
989 u
->user_name
= pa_xstrdup(user_name
);
991 update_description(u
);
996 pa_module_unload_request(u
->module
, TRUE
);
999 static int read_ports(struct userdata
*u
, pa_tagstruct
*t
)
1001 if (u
->version
>= 16) {
1005 if (pa_tagstruct_getu32(t
, &n_ports
)) {
1006 pa_log("Parse failure");
1007 return -PA_ERR_PROTOCOL
;
1010 for (uint32_t j
= 0; j
< n_ports
; j
++) {
1013 if (pa_tagstruct_gets(t
, &s
) < 0 || /* name */
1014 pa_tagstruct_gets(t
, &s
) < 0 || /* description */
1015 pa_tagstruct_getu32(t
, &priority
) < 0) {
1017 pa_log("Parse failure");
1018 return -PA_ERR_PROTOCOL
;
1020 if (u
->version
>= 24 && pa_tagstruct_getu32(t
, &priority
) < 0) { /* available */
1021 pa_log("Parse failure");
1022 return -PA_ERR_PROTOCOL
;
1026 if (pa_tagstruct_gets(t
, &s
) < 0) { /* active port */
1027 pa_log("Parse failure");
1028 return -PA_ERR_PROTOCOL
;
1035 static int read_formats(struct userdata
*u
, pa_tagstruct
*t
) {
1037 pa_format_info
*format
;
1039 if (pa_tagstruct_getu8(t
, &n_formats
) < 0) { /* no. of formats */
1040 pa_log("Parse failure");
1041 return -PA_ERR_PROTOCOL
;
1044 for (uint8_t j
= 0; j
< n_formats
; j
++) {
1045 format
= pa_format_info_new();
1046 if (pa_tagstruct_get_format_info(t
, format
)) { /* format info */
1047 pa_format_info_free(format
);
1048 pa_log("Parse failure");
1049 return -PA_ERR_PROTOCOL
;
1051 pa_format_info_free(format
);
1058 /* Called from main context */
1059 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1060 struct userdata
*u
= userdata
;
1061 uint32_t idx
, owner_module
, monitor_source
, flags
;
1062 const char *name
, *description
, *monitor_source_name
, *driver
;
1073 pl
= pa_proplist_new();
1075 if (command
!= PA_COMMAND_REPLY
) {
1076 if (command
== PA_COMMAND_ERROR
)
1077 pa_log("Failed to get info.");
1079 pa_log("Protocol error.");
1083 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1084 pa_tagstruct_gets(t
, &name
) < 0 ||
1085 pa_tagstruct_gets(t
, &description
) < 0 ||
1086 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1087 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1088 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1089 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1090 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1091 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1092 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1093 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1094 pa_tagstruct_gets(t
, &driver
) < 0 ||
1095 pa_tagstruct_getu32(t
, &flags
) < 0) {
1097 pa_log("Parse failure");
1101 if (u
->version
>= 13) {
1102 pa_usec_t configured_latency
;
1104 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1105 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1107 pa_log("Parse failure");
1112 if (u
->version
>= 15) {
1113 pa_volume_t base_volume
;
1114 uint32_t state
, n_volume_steps
, card
;
1116 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1117 pa_tagstruct_getu32(t
, &state
) < 0 ||
1118 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1119 pa_tagstruct_getu32(t
, &card
) < 0) {
1121 pa_log("Parse failure");
1126 if (read_ports(u
, t
) < 0)
1129 if (u
->version
>= 21 && read_formats(u
, t
) < 0)
1132 if (!pa_tagstruct_eof(t
)) {
1133 pa_log("Packet too long");
1137 pa_proplist_free(pl
);
1139 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1142 pa_xfree(u
->device_description
);
1143 u
->device_description
= pa_xstrdup(description
);
1145 update_description(u
);
1150 pa_module_unload_request(u
->module
, TRUE
);
1151 pa_proplist_free(pl
);
1154 /* Called from main context */
1155 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1156 struct userdata
*u
= userdata
;
1157 uint32_t idx
, owner_module
, client
, sink
;
1158 pa_usec_t buffer_usec
, sink_usec
;
1159 const char *name
, *driver
, *resample_method
;
1160 pa_bool_t mute
= FALSE
;
1161 pa_sample_spec sample_spec
;
1162 pa_channel_map channel_map
;
1170 pl
= pa_proplist_new();
1172 if (command
!= PA_COMMAND_REPLY
) {
1173 if (command
== PA_COMMAND_ERROR
)
1174 pa_log("Failed to get info.");
1176 pa_log("Protocol error.");
1180 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1181 pa_tagstruct_gets(t
, &name
) < 0 ||
1182 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1183 pa_tagstruct_getu32(t
, &client
) < 0 ||
1184 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1185 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1186 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1187 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1188 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1189 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1190 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1191 pa_tagstruct_gets(t
, &driver
) < 0) {
1193 pa_log("Parse failure");
1197 if (u
->version
>= 11) {
1198 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1200 pa_log("Parse failure");
1205 if (u
->version
>= 13) {
1206 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1208 pa_log("Parse failure");
1213 if (u
->version
>= 19) {
1214 if (pa_tagstruct_get_boolean(t
, &b
) < 0) {
1216 pa_log("Parse failure");
1221 if (u
->version
>= 20) {
1222 if (pa_tagstruct_get_boolean(t
, &b
) < 0 ||
1223 pa_tagstruct_get_boolean(t
, &b
) < 0) {
1225 pa_log("Parse failure");
1230 if (u
->version
>= 21) {
1231 pa_format_info
*format
= pa_format_info_new();
1233 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1234 pa_format_info_free(format
);
1235 pa_log("Parse failure");
1238 pa_format_info_free(format
);
1241 if (!pa_tagstruct_eof(t
)) {
1242 pa_log("Packet too long");
1246 pa_proplist_free(pl
);
1248 if (idx
!= u
->device_index
)
1253 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1254 pa_cvolume_equal(&volume
, &u
->sink
->real_volume
))
1257 pa_sink_volume_changed(u
->sink
, &volume
);
1259 if (u
->version
>= 11)
1260 pa_sink_mute_changed(u
->sink
, mute
);
1265 pa_module_unload_request(u
->module
, TRUE
);
1266 pa_proplist_free(pl
);
1271 /* Called from main context */
1272 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1273 struct userdata
*u
= userdata
;
1274 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1275 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1280 pa_usec_t latency
, configured_latency
;
1286 pl
= pa_proplist_new();
1288 if (command
!= PA_COMMAND_REPLY
) {
1289 if (command
== PA_COMMAND_ERROR
)
1290 pa_log("Failed to get info.");
1292 pa_log("Protocol error.");
1296 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1297 pa_tagstruct_gets(t
, &name
) < 0 ||
1298 pa_tagstruct_gets(t
, &description
) < 0 ||
1299 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1300 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1301 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1302 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1303 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1304 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1305 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1306 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1307 pa_tagstruct_gets(t
, &driver
) < 0 ||
1308 pa_tagstruct_getu32(t
, &flags
) < 0) {
1310 pa_log("Parse failure");
1314 if (u
->version
>= 13) {
1315 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1316 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1318 pa_log("Parse failure");
1323 if (u
->version
>= 15) {
1324 pa_volume_t base_volume
;
1325 uint32_t state
, n_volume_steps
, card
;
1327 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1328 pa_tagstruct_getu32(t
, &state
) < 0 ||
1329 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1330 pa_tagstruct_getu32(t
, &card
) < 0) {
1332 pa_log("Parse failure");
1337 if (read_ports(u
, t
) < 0)
1340 if (u
->version
>= 22 && read_formats(u
, t
) < 0)
1343 if (!pa_tagstruct_eof(t
)) {
1344 pa_log("Packet too long");
1348 pa_proplist_free(pl
);
1350 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1353 pa_xfree(u
->device_description
);
1354 u
->device_description
= pa_xstrdup(description
);
1356 update_description(u
);
1361 pa_module_unload_request(u
->module
, TRUE
);
1362 pa_proplist_free(pl
);
1367 /* Called from main context */
1368 static void request_info(struct userdata
*u
) {
1373 t
= pa_tagstruct_new(NULL
, 0);
1374 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1375 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1376 pa_pstream_send_tagstruct(u
->pstream
, t
);
1377 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1380 t
= pa_tagstruct_new(NULL
, 0);
1381 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1382 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1383 pa_tagstruct_putu32(t
, u
->device_index
);
1384 pa_pstream_send_tagstruct(u
->pstream
, t
);
1385 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1388 t
= pa_tagstruct_new(NULL
, 0);
1389 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1390 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1391 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1392 pa_tagstruct_puts(t
, u
->sink_name
);
1393 pa_pstream_send_tagstruct(u
->pstream
, t
);
1394 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1397 if (u
->source_name
) {
1398 t
= pa_tagstruct_new(NULL
, 0);
1399 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1400 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1401 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1402 pa_tagstruct_puts(t
, u
->source_name
);
1403 pa_pstream_send_tagstruct(u
->pstream
, t
);
1404 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1409 /* Called from main context */
1410 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1411 struct userdata
*u
= userdata
;
1412 pa_subscription_event_type_t e
;
1418 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1420 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1421 pa_tagstruct_getu32(t
, &idx
) < 0) {
1422 pa_log("Invalid protocol reply");
1423 pa_module_unload_request(u
->module
, TRUE
);
1427 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1429 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1430 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1432 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1440 /* Called from main context */
1441 static void start_subscribe(struct userdata
*u
) {
1445 t
= pa_tagstruct_new(NULL
, 0);
1446 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1447 pa_tagstruct_putu32(t
, u
->ctag
++);
1448 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1450 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1452 PA_SUBSCRIPTION_MASK_SOURCE
1456 pa_pstream_send_tagstruct(u
->pstream
, t
);
1459 /* Called from main context */
1460 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1461 struct userdata
*u
= userdata
;
1468 pa_assert(u
->pdispatch
== pd
);
1470 if (command
!= PA_COMMAND_REPLY
) {
1471 if (command
== PA_COMMAND_ERROR
)
1472 pa_log("Failed to create stream.");
1474 pa_log("Protocol error.");
1478 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1479 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1481 || pa_tagstruct_getu32(t
, &bytes
) < 0
1486 if (u
->version
>= 9) {
1488 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1489 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1490 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1491 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1494 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1495 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1500 if (u
->version
>= 12) {
1503 uint32_t device_index
;
1505 pa_bool_t suspended
;
1507 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1508 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1509 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1510 pa_tagstruct_gets(t
, &dn
) < 0 ||
1511 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1515 pa_xfree(u
->sink_name
);
1516 u
->sink_name
= pa_xstrdup(dn
);
1518 pa_xfree(u
->source_name
);
1519 u
->source_name
= pa_xstrdup(dn
);
1523 if (u
->version
>= 13) {
1526 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1529 /* #ifdef TUNNEL_SINK */
1530 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1532 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1536 if (u
->version
>= 21) {
1537 pa_format_info
*format
= pa_format_info_new();
1539 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1540 pa_format_info_free(format
);
1544 pa_format_info_free(format
);
1547 if (!pa_tagstruct_eof(t
))
1553 pa_assert(!u
->time_event
);
1554 u
->time_event
= pa_core_rttime_new(u
->core
, pa_rtclock_now() + LATENCY_INTERVAL
, timeout_callback
, u
);
1558 pa_log_debug("Stream created.");
1561 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1567 pa_log("Invalid reply. (Create stream)");
1570 pa_module_unload_request(u
->module
, TRUE
);
1574 /* Called from main context */
1575 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1576 struct userdata
*u
= userdata
;
1577 pa_tagstruct
*reply
;
1578 char name
[256], un
[128], hn
[128];
1583 pa_assert(u
->pdispatch
== pd
);
1585 if (command
!= PA_COMMAND_REPLY
||
1586 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1587 !pa_tagstruct_eof(t
)) {
1589 if (command
== PA_COMMAND_ERROR
)
1590 pa_log("Failed to authenticate");
1592 pa_log("Protocol error.");
1597 /* Minimum supported protocol version */
1598 if (u
->version
< 8) {
1599 pa_log("Incompatible protocol version");
1603 /* Starting with protocol version 13 the MSB of the version tag
1604 reflects if shm is enabled for this connection or not. We don't
1605 support SHM here at all, so we just ignore this. */
1607 if (u
->version
>= 13)
1608 u
->version
&= 0x7FFFFFFFU
;
1610 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1613 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1614 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1616 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1618 pa_get_user_name(un
, sizeof(un
)),
1619 pa_get_host_name(hn
, sizeof(hn
)));
1621 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1622 pa_source_update_proplist(u
->source
, 0, NULL
);
1624 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1626 pa_get_user_name(un
, sizeof(un
)),
1627 pa_get_host_name(hn
, sizeof(hn
)));
1630 reply
= pa_tagstruct_new(NULL
, 0);
1631 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1632 pa_tagstruct_putu32(reply
, u
->ctag
++);
1634 if (u
->version
>= 13) {
1636 pl
= pa_proplist_new();
1637 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1638 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1639 pa_init_proplist(pl
);
1640 pa_tagstruct_put_proplist(reply
, pl
);
1641 pa_proplist_free(pl
);
1643 pa_tagstruct_puts(reply
, "PulseAudio");
1645 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1646 /* We ignore the server's reply here */
1648 reply
= pa_tagstruct_new(NULL
, 0);
1650 if (u
->version
< 13)
1651 /* Only for older PA versions we need to fill in the maxlength */
1652 u
->maxlength
= 4*1024*1024;
1655 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1656 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1657 u
->prebuf
= u
->tlength
;
1659 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1663 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1664 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1666 if (u
->version
< 13)
1667 pa_tagstruct_puts(reply
, name
);
1669 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1670 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1671 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1672 pa_tagstruct_puts(reply
, u
->sink_name
);
1673 pa_tagstruct_putu32(reply
, u
->maxlength
);
1674 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1675 pa_tagstruct_putu32(reply
, u
->tlength
);
1676 pa_tagstruct_putu32(reply
, u
->prebuf
);
1677 pa_tagstruct_putu32(reply
, u
->minreq
);
1678 pa_tagstruct_putu32(reply
, 0);
1679 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1680 pa_tagstruct_put_cvolume(reply
, &volume
);
1682 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1683 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1685 if (u
->version
< 13)
1686 pa_tagstruct_puts(reply
, name
);
1688 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1689 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1690 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1691 pa_tagstruct_puts(reply
, u
->source_name
);
1692 pa_tagstruct_putu32(reply
, u
->maxlength
);
1693 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1694 pa_tagstruct_putu32(reply
, u
->fragsize
);
1697 if (u
->version
>= 12) {
1698 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1699 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1700 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1701 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1702 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1703 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1704 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1707 if (u
->version
>= 13) {
1710 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1711 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1713 pl
= pa_proplist_new();
1714 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1715 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1716 pa_tagstruct_put_proplist(reply
, pl
);
1717 pa_proplist_free(pl
);
1720 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1724 if (u
->version
>= 14) {
1726 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1728 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1731 if (u
->version
>= 15) {
1733 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1735 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1736 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1740 if (u
->version
>= 17)
1741 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1743 if (u
->version
>= 18)
1744 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1748 if (u
->version
>= 21) {
1749 /* We're not using the extended API, so n_formats = 0 and that's that */
1750 pa_tagstruct_putu8(reply
, 0);
1753 if (u
->version
>= 22) {
1754 /* We're not using the extended API, so n_formats = 0 and that's that */
1755 pa_tagstruct_putu8(reply
, 0);
1756 pa_cvolume_reset(&volume
, u
->source
->sample_spec
.channels
);
1757 pa_tagstruct_put_cvolume(reply
, &volume
);
1758 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted */
1759 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1760 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1761 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1762 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1766 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1767 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1769 pa_log_debug("Connection authenticated, creating stream ...");
1774 pa_module_unload_request(u
->module
, TRUE
);
1777 /* Called from main context */
1778 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1779 struct userdata
*u
= userdata
;
1784 pa_log_warn("Stream died.");
1785 pa_module_unload_request(u
->module
, TRUE
);
1788 /* Called from main context */
1789 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1790 struct userdata
*u
= userdata
;
1796 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1797 pa_log("Invalid packet");
1798 pa_module_unload_request(u
->module
, TRUE
);
1804 /* Called from main context */
1805 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1806 struct userdata
*u
= userdata
;
1812 if (channel
!= u
->channel
) {
1813 pa_log("Received memory block on bad channel.");
1814 pa_module_unload_request(u
->module
, TRUE
);
1818 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1820 u
->counter_delta
+= (int64_t) chunk
->length
;
1824 /* Called from main context */
1825 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1826 struct userdata
*u
= userdata
;
1832 pa_assert(u
->client
== sc
);
1834 pa_socket_client_unref(u
->client
);
1838 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1839 pa_module_unload_request(u
->module
, TRUE
);
1843 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1844 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, TRUE
, command_table
, PA_COMMAND_MAX
);
1846 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1847 pa_pstream_set_receive_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1849 pa_pstream_set_receive_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1852 t
= pa_tagstruct_new(NULL
, 0);
1853 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1854 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1855 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1857 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1863 if (pa_iochannel_creds_supported(io
))
1864 pa_iochannel_creds_enable(io
);
1866 ucred
.uid
= getuid();
1867 ucred
.gid
= getgid();
1869 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1872 pa_pstream_send_tagstruct(u
->pstream
, t
);
1875 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1877 pa_log_debug("Connection established, authenticating ...");
1882 /* Called from main context */
1883 static void sink_set_volume(pa_sink
*sink
) {
1891 t
= pa_tagstruct_new(NULL
, 0);
1892 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1893 pa_tagstruct_putu32(t
, u
->ctag
++);
1894 pa_tagstruct_putu32(t
, u
->device_index
);
1895 pa_tagstruct_put_cvolume(t
, &sink
->real_volume
);
1896 pa_pstream_send_tagstruct(u
->pstream
, t
);
1899 /* Called from main context */
1900 static void sink_set_mute(pa_sink
*sink
) {
1908 if (u
->version
< 11)
1911 t
= pa_tagstruct_new(NULL
, 0);
1912 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1913 pa_tagstruct_putu32(t
, u
->ctag
++);
1914 pa_tagstruct_putu32(t
, u
->device_index
);
1915 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1916 pa_pstream_send_tagstruct(u
->pstream
, t
);
1921 int pa__init(pa_module
*m
) {
1922 pa_modargs
*ma
= NULL
;
1923 struct userdata
*u
= NULL
;
1928 pa_sink_new_data data
;
1930 pa_source_new_data data
;
1935 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1936 pa_log("Failed to parse module arguments");
1940 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1944 u
->pdispatch
= NULL
;
1946 u
->server_name
= NULL
;
1948 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1950 u
->requested_bytes
= 0;
1952 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1955 u
->smoother
= pa_smoother_new(
1964 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1965 u
->time_event
= NULL
;
1966 u
->ignore_latency_before
= 0;
1967 u
->transport_usec
= u
->thread_transport_usec
= 0;
1968 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1969 u
->counter
= u
->counter_delta
= 0;
1971 u
->rtpoll
= pa_rtpoll_new();
1972 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1974 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1977 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1978 pa_log("No server specified.");
1982 ss
= m
->core
->default_sample_spec
;
1983 map
= m
->core
->default_channel_map
;
1984 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1985 pa_log("Invalid sample format specification");
1989 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, TRUE
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1990 pa_log("Failed to connect to server '%s'", u
->server_name
);
1994 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1998 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1999 dn
= pa_sprintf_malloc("tunnel-sink.%s", u
->server_name
);
2001 pa_sink_new_data_init(&data
);
2002 data
.driver
= __FILE__
;
2004 data
.namereg_fail
= TRUE
;
2005 pa_sink_new_data_set_name(&data
, dn
);
2006 pa_sink_new_data_set_sample_spec(&data
, &ss
);
2007 pa_sink_new_data_set_channel_map(&data
, &map
);
2008 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
2009 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2011 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
2013 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2014 pa_log("Invalid properties");
2015 pa_sink_new_data_done(&data
);
2019 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
);
2020 pa_sink_new_data_done(&data
);
2023 pa_log("Failed to create sink.");
2027 u
->sink
->parent
.process_msg
= sink_process_msg
;
2028 u
->sink
->userdata
= u
;
2029 u
->sink
->set_state
= sink_set_state
;
2030 pa_sink_set_set_volume_callback(u
->sink
, sink_set_volume
);
2031 pa_sink_set_set_mute_callback(u
->sink
, sink_set_mute
);
2033 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
2035 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2037 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
2038 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
2042 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
2043 dn
= pa_sprintf_malloc("tunnel-source.%s", u
->server_name
);
2045 pa_source_new_data_init(&data
);
2046 data
.driver
= __FILE__
;
2048 data
.namereg_fail
= TRUE
;
2049 pa_source_new_data_set_name(&data
, dn
);
2050 pa_source_new_data_set_sample_spec(&data
, &ss
);
2051 pa_source_new_data_set_channel_map(&data
, &map
);
2052 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
2053 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2055 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
2057 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2058 pa_log("Invalid properties");
2059 pa_source_new_data_done(&data
);
2063 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
2064 pa_source_new_data_done(&data
);
2067 pa_log("Failed to create source.");
2071 u
->source
->parent
.process_msg
= source_process_msg
;
2072 u
->source
->set_state
= source_set_state
;
2073 u
->source
->userdata
= u
;
2075 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2077 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
2078 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
2080 u
->mcalign
= pa_mcalign_new(pa_frame_size(&u
->source
->sample_spec
));
2085 u
->time_event
= NULL
;
2087 u
->maxlength
= (uint32_t) -1;
2089 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
2091 u
->fragsize
= (uint32_t) -1;
2094 if (!(u
->thread
= pa_thread_new("module-tunnel", thread_func
, u
))) {
2095 pa_log("Failed to create thread.");
2100 pa_sink_put(u
->sink
);
2102 pa_source_put(u
->source
);
2105 pa_modargs_free(ma
);
2113 pa_modargs_free(ma
);
2120 void pa__done(pa_module
*m
) {
2125 if (!(u
= m
->userdata
))
2130 pa_sink_unlink(u
->sink
);
2133 pa_source_unlink(u
->source
);
2137 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2138 pa_thread_free(u
->thread
);
2141 pa_thread_mq_done(&u
->thread_mq
);
2145 pa_sink_unref(u
->sink
);
2148 pa_source_unref(u
->source
);
2152 pa_rtpoll_free(u
->rtpoll
);
2155 pa_pstream_unlink(u
->pstream
);
2156 pa_pstream_unref(u
->pstream
);
2160 pa_pdispatch_unref(u
->pdispatch
);
2163 pa_socket_client_unref(u
->client
);
2166 pa_auth_cookie_unref(u
->auth_cookie
);
2169 pa_smoother_free(u
->smoother
);
2172 u
->core
->mainloop
->time_free(u
->time_event
);
2176 pa_mcalign_free(u
->mcalign
);
2180 pa_xfree(u
->sink_name
);
2182 pa_xfree(u
->source_name
);
2184 pa_xfree(u
->server_name
);
2186 pa_xfree(u
->device_description
);
2187 pa_xfree(u
->server_fqdn
);
2188 pa_xfree(u
->user_name
);