2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
70 "sink=<remote sink name> "
72 "format=<sample format> "
73 "channels=<number of channels> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
82 "source=<remote source name> "
84 "format=<sample format> "
85 "channels=<number of channels> "
87 "channel_map=<channel map>");
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION
);
92 PA_MODULE_LOAD_ONCE(FALSE
);
94 static const char* const valid_modargs
[] = {
113 #define DEFAULT_TIMEOUT 5
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
122 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
123 SINK_MESSAGE_REMOTE_SUSPEND
,
124 SINK_MESSAGE_UPDATE_LATENCY
,
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
134 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
135 SOURCE_MESSAGE_REMOTE_SUSPEND
,
136 SOURCE_MESSAGE_UPDATE_LATENCY
139 #define DEFAULT_FRAGSIZE_MSEC 25
144 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
155 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
157 [PA_COMMAND_REQUEST
] = command_request
,
158 [PA_COMMAND_STARTED
] = command_started
,
160 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
161 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
162 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
164 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
168 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
170 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
180 pa_thread_mq thread_mq
;
184 pa_socket_client
*client
;
186 pa_pdispatch
*pdispatch
;
192 size_t requested_bytes
;
199 pa_auth_cookie
*auth_cookie
;
203 uint32_t device_index
;
206 int64_t counter
, counter_delta
;
208 pa_bool_t remote_corked
:1;
209 pa_bool_t remote_suspended
:1;
211 pa_usec_t transport_usec
; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
214 uint32_t ignore_latency_before
;
216 pa_time_event
*time_event
;
218 pa_smoother
*smoother
;
220 char *device_description
;
234 static void request_latency(struct userdata
*u
);
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
238 pa_log_debug("Got stream or client event.");
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
243 struct userdata
*u
= userdata
;
248 pa_assert(u
->pdispatch
== pd
);
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u
->module
, TRUE
);
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
256 struct userdata
*u
= userdata
;
261 pa_assert(u
->pdispatch
== pd
);
263 pa_log_info("Server signalled buffer overrun/underrun.");
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
269 struct userdata
*u
= userdata
;
276 pa_assert(u
->pdispatch
== pd
);
278 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
279 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
280 !pa_tagstruct_eof(t
)) {
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u
->module
, TRUE
);
287 pa_log_debug("Server reports device suspend.");
290 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
292 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
298 /* Called from main context */
299 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
300 struct userdata
*u
= userdata
;
301 uint32_t channel
, di
;
308 pa_assert(u
->pdispatch
== pd
);
310 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
311 pa_tagstruct_getu32(t
, &di
) < 0 ||
312 pa_tagstruct_gets(t
, &dn
) < 0 ||
313 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u
->module
, TRUE
);
320 pa_log_debug("Server reports a stream move.");
323 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
325 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
331 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
332 struct userdata
*u
= userdata
;
333 uint32_t channel
, maxlength
, tlength
= 0, fragsize
, prebuf
, minreq
;
339 pa_assert(u
->pdispatch
== pd
);
341 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
342 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u
->module
, TRUE
);
349 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
350 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
351 pa_tagstruct_get_usec(t
, &usec
) < 0) {
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u
->module
, TRUE
);
358 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
359 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
360 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
361 pa_tagstruct_get_usec(t
, &usec
) < 0) {
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u
->module
, TRUE
);
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
378 /* Called from main context */
379 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
380 struct userdata
*u
= userdata
;
385 pa_assert(u
->pdispatch
== pd
);
387 pa_log_debug("Server reports playback started.");
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
399 x
= pa_rtclock_now();
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
406 x
-= u
->thread_transport_usec
;
408 x
+= u
->thread_transport_usec
;
410 if (u
->remote_suspended
|| u
->remote_corked
)
411 pa_smoother_pause(u
->smoother
, x
);
413 pa_smoother_resume(u
->smoother
, x
, TRUE
);
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
420 if (u
->remote_corked
== cork
)
423 u
->remote_corked
= cork
;
424 check_smoother_status(u
, FALSE
);
427 /* Called from main context */
428 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
435 t
= pa_tagstruct_new(NULL
, 0);
437 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
439 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
441 pa_tagstruct_putu32(t
, u
->ctag
++);
442 pa_tagstruct_putu32(t
, u
->channel
);
443 pa_tagstruct_put_boolean(t
, !!cork
);
444 pa_pstream_send_tagstruct(u
->pstream
, t
);
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
453 if (u
->remote_suspended
== suspend
)
456 u
->remote_suspended
= suspend
;
457 check_smoother_status(u
, TRUE
);
462 /* Called from IO thread context */
463 static void send_data(struct userdata
*u
) {
466 while (u
->requested_bytes
> 0) {
467 pa_memchunk memchunk
;
469 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
470 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
471 pa_memblock_unref(memchunk
.memblock
);
473 u
->requested_bytes
-= memchunk
.length
;
475 u
->counter
+= (int64_t) memchunk
.length
;
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
481 struct userdata
*u
= PA_SINK(o
)->userdata
;
485 case PA_SINK_MESSAGE_SET_STATE
: {
488 /* First, change the state, because otherwise pa_sink_render() would fail */
489 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
491 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
493 if (PA_SINK_IS_OPENED(u
->sink
->state
))
500 case PA_SINK_MESSAGE_GET_LATENCY
: {
501 pa_usec_t yl
, yr
, *usec
= data
;
503 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
504 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
506 *usec
= yl
> yr
? yl
- yr
: 0;
510 case SINK_MESSAGE_REQUEST
:
512 pa_assert(offset
> 0);
513 u
->requested_bytes
+= (size_t) offset
;
515 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
521 case SINK_MESSAGE_REMOTE_SUSPEND
:
523 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
527 case SINK_MESSAGE_UPDATE_LATENCY
: {
530 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
532 if (y
> (pa_usec_t
) offset
)
533 y
-= (pa_usec_t
) offset
;
537 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
539 /* We can access this freely here, since the main thread is waiting for us */
540 u
->thread_transport_usec
= u
->transport_usec
;
545 case SINK_MESSAGE_POST
:
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
552 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
554 u
->counter_delta
+= (int64_t) chunk
->length
;
559 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
562 /* Called from main context */
563 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
565 pa_sink_assert_ref(s
);
568 switch ((pa_sink_state_t
) state
) {
570 case PA_SINK_SUSPENDED
:
571 pa_assert(PA_SINK_IS_OPENED(s
->state
));
572 stream_cork(u
, TRUE
);
576 case PA_SINK_RUNNING
:
577 if (s
->state
== PA_SINK_SUSPENDED
)
578 stream_cork(u
, FALSE
);
581 case PA_SINK_UNLINKED
:
583 case PA_SINK_INVALID_STATE
:
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
594 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
598 case PA_SOURCE_MESSAGE_SET_STATE
: {
601 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
602 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
607 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
608 pa_usec_t yr
, yl
, *usec
= data
;
610 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
611 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
613 *usec
= yr
> yl
? yr
- yl
: 0;
617 case SOURCE_MESSAGE_POST
: {
620 pa_mcalign_push(u
->mcalign
, chunk
);
622 while (pa_mcalign_pop(u
->mcalign
, &c
) >= 0) {
624 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
625 pa_source_post(u
->source
, &c
);
627 pa_memblock_unref(c
.memblock
);
629 u
->counter
+= (int64_t) c
.length
;
635 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
637 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
640 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
643 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
644 y
+= (pa_usec_t
) offset
;
646 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
648 /* We can access this freely here, since the main thread is waiting for us */
649 u
->thread_transport_usec
= u
->transport_usec
;
655 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
658 /* Called from main context */
659 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
661 pa_source_assert_ref(s
);
664 switch ((pa_source_state_t
) state
) {
666 case PA_SOURCE_SUSPENDED
:
667 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
668 stream_cork(u
, TRUE
);
672 case PA_SOURCE_RUNNING
:
673 if (s
->state
== PA_SOURCE_SUSPENDED
)
674 stream_cork(u
, FALSE
);
677 case PA_SOURCE_UNLINKED
:
679 case PA_SINK_INVALID_STATE
:
688 static void thread_func(void *userdata
) {
689 struct userdata
*u
= userdata
;
693 pa_log_debug("Thread starting up");
695 pa_thread_mq_install(&u
->thread_mq
);
701 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
702 pa_sink_process_rewind(u
->sink
, 0);
705 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
713 /* If this was no regular exit from the loop we have to continue
714 * processing messages until we received PA_MESSAGE_SHUTDOWN */
715 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
716 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
719 pa_log_debug("Thread shutting down");
723 /* Called from main context */
724 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
725 struct userdata
*u
= userdata
;
726 uint32_t bytes
, channel
;
729 pa_assert(command
== PA_COMMAND_REQUEST
);
732 pa_assert(u
->pdispatch
== pd
);
734 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
735 pa_tagstruct_getu32(t
, &bytes
) < 0) {
736 pa_log("Invalid protocol reply");
740 if (channel
!= u
->channel
) {
741 pa_log("Received data for invalid channel");
745 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
749 pa_module_unload_request(u
->module
, TRUE
);
754 /* Called from main context */
755 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
756 struct userdata
*u
= userdata
;
757 pa_usec_t sink_usec
, source_usec
;
759 int64_t write_index
, read_index
;
760 struct timeval local
, remote
, now
;
767 if (command
!= PA_COMMAND_REPLY
) {
768 if (command
== PA_COMMAND_ERROR
)
769 pa_log("Failed to get latency.");
771 pa_log("Protocol error.");
775 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
776 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
777 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
778 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
779 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
780 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
781 pa_tagstruct_gets64(t
, &read_index
) < 0) {
782 pa_log("Invalid reply.");
787 if (u
->version
>= 13) {
788 uint64_t underrun_for
= 0, playing_for
= 0;
790 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
791 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
792 pa_log("Invalid reply.");
798 if (!pa_tagstruct_eof(t
)) {
799 pa_log("Invalid reply.");
803 if (tag
< u
->ignore_latency_before
) {
807 pa_gettimeofday(&now
);
809 /* Calculate transport usec */
810 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
811 /* local and remote seem to have synchronized clocks */
813 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
815 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
818 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
820 /* First, take the device's delay */
822 delay
= (int64_t) sink_usec
;
823 ss
= &u
->sink
->sample_spec
;
825 delay
= (int64_t) source_usec
;
826 ss
= &u
->source
->sample_spec
;
829 /* Add the length of our server-side buffer */
830 if (write_index
>= read_index
)
831 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
833 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
835 /* Our measurements are already out of date, hence correct by the *
836 * transport latency */
838 delay
-= (int64_t) u
->transport_usec
;
840 delay
+= (int64_t) u
->transport_usec
;
843 /* Now correct by what we have have read/written since we requested the update */
845 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
847 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
851 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
853 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
860 pa_module_unload_request(u
->module
, TRUE
);
863 /* Called from main context */
864 static void request_latency(struct userdata
*u
) {
870 t
= pa_tagstruct_new(NULL
, 0);
872 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
874 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
876 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
877 pa_tagstruct_putu32(t
, u
->channel
);
879 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
881 pa_pstream_send_tagstruct(u
->pstream
, t
);
882 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
884 u
->ignore_latency_before
= tag
;
885 u
->counter_delta
= 0;
888 /* Called from main context */
889 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
890 struct userdata
*u
= userdata
;
898 pa_core_rttime_restart(u
->core
, e
, pa_rtclock_now() + LATENCY_INTERVAL
);
901 /* Called from main context */
902 static void update_description(struct userdata
*u
) {
904 char un
[128], hn
[128];
909 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
912 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
915 pa_sink_set_description(u
->sink
, d
);
916 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
917 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
918 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
920 pa_source_set_description(u
->source
, d
);
921 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
922 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
923 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
928 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
929 pa_get_user_name(un
, sizeof(un
)),
930 pa_get_host_name(hn
, sizeof(hn
)));
932 t
= pa_tagstruct_new(NULL
, 0);
934 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
936 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
938 pa_tagstruct_putu32(t
, u
->ctag
++);
939 pa_tagstruct_putu32(t
, u
->channel
);
940 pa_tagstruct_puts(t
, d
);
941 pa_pstream_send_tagstruct(u
->pstream
, t
);
946 /* Called from main context */
947 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
948 struct userdata
*u
= userdata
;
951 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
957 if (command
!= PA_COMMAND_REPLY
) {
958 if (command
== PA_COMMAND_ERROR
)
959 pa_log("Failed to get info.");
961 pa_log("Protocol error.");
965 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
966 pa_tagstruct_gets(t
, &server_version
) < 0 ||
967 pa_tagstruct_gets(t
, &user_name
) < 0 ||
968 pa_tagstruct_gets(t
, &host_name
) < 0 ||
969 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
970 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
971 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
972 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
973 (u
->version
>= 15 && pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
975 pa_log("Parse failure");
979 if (!pa_tagstruct_eof(t
)) {
980 pa_log("Packet too long");
984 pa_xfree(u
->server_fqdn
);
985 u
->server_fqdn
= pa_xstrdup(host_name
);
987 pa_xfree(u
->user_name
);
988 u
->user_name
= pa_xstrdup(user_name
);
990 update_description(u
);
995 pa_module_unload_request(u
->module
, TRUE
);
998 static int read_ports(struct userdata
*u
, pa_tagstruct
*t
) {
999 if (u
->version
>= 16) {
1003 if (pa_tagstruct_getu32(t
, &n_ports
)) {
1004 pa_log("Parse failure");
1005 return -PA_ERR_PROTOCOL
;
1008 for (uint32_t j
= 0; j
< n_ports
; j
++) {
1011 if (pa_tagstruct_gets(t
, &s
) < 0 || /* name */
1012 pa_tagstruct_gets(t
, &s
) < 0 || /* description */
1013 pa_tagstruct_getu32(t
, &priority
) < 0) {
1015 pa_log("Parse failure");
1016 return -PA_ERR_PROTOCOL
;
1018 if (u
->version
>= 24 && pa_tagstruct_getu32(t
, &priority
) < 0) { /* available */
1019 pa_log("Parse failure");
1020 return -PA_ERR_PROTOCOL
;
1024 if (pa_tagstruct_gets(t
, &s
) < 0) { /* active port */
1025 pa_log("Parse failure");
1026 return -PA_ERR_PROTOCOL
;
1033 static int read_formats(struct userdata
*u
, pa_tagstruct
*t
) {
1035 pa_format_info
*format
;
1037 if (pa_tagstruct_getu8(t
, &n_formats
) < 0) { /* no. of formats */
1038 pa_log("Parse failure");
1039 return -PA_ERR_PROTOCOL
;
1042 for (uint8_t j
= 0; j
< n_formats
; j
++) {
1043 format
= pa_format_info_new();
1044 if (pa_tagstruct_get_format_info(t
, format
)) { /* format info */
1045 pa_format_info_free(format
);
1046 pa_log("Parse failure");
1047 return -PA_ERR_PROTOCOL
;
1049 pa_format_info_free(format
);
1056 /* Called from main context */
1057 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1058 struct userdata
*u
= userdata
;
1059 uint32_t idx
, owner_module
, monitor_source
, flags
;
1060 const char *name
, *description
, *monitor_source_name
, *driver
;
1070 if (command
!= PA_COMMAND_REPLY
) {
1071 if (command
== PA_COMMAND_ERROR
)
1072 pa_log("Failed to get info.");
1074 pa_log("Protocol error.");
1078 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1079 pa_tagstruct_gets(t
, &name
) < 0 ||
1080 pa_tagstruct_gets(t
, &description
) < 0 ||
1081 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1082 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1083 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1084 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1085 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1086 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1087 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1088 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1089 pa_tagstruct_gets(t
, &driver
) < 0 ||
1090 pa_tagstruct_getu32(t
, &flags
) < 0) {
1092 pa_log("Parse failure");
1096 if (u
->version
>= 13) {
1097 pa_usec_t configured_latency
;
1099 if (pa_tagstruct_get_proplist(t
, NULL
) < 0 ||
1100 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1102 pa_log("Parse failure");
1107 if (u
->version
>= 15) {
1108 pa_volume_t base_volume
;
1109 uint32_t state
, n_volume_steps
, card
;
1111 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1112 pa_tagstruct_getu32(t
, &state
) < 0 ||
1113 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1114 pa_tagstruct_getu32(t
, &card
) < 0) {
1116 pa_log("Parse failure");
1121 if (read_ports(u
, t
) < 0)
1124 if (u
->version
>= 21 && read_formats(u
, t
) < 0)
1127 if (!pa_tagstruct_eof(t
)) {
1128 pa_log("Packet too long");
1132 if (!u
->sink_name
|| !pa_streq(name
, u
->sink_name
))
1135 pa_xfree(u
->device_description
);
1136 u
->device_description
= pa_xstrdup(description
);
1138 update_description(u
);
1143 pa_module_unload_request(u
->module
, TRUE
);
1146 /* Called from main context */
1147 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1148 struct userdata
*u
= userdata
;
1149 uint32_t idx
, owner_module
, client
, sink
;
1150 pa_usec_t buffer_usec
, sink_usec
;
1151 const char *name
, *driver
, *resample_method
;
1152 pa_bool_t mute
= FALSE
;
1153 pa_sample_spec sample_spec
;
1154 pa_channel_map channel_map
;
1161 if (command
!= PA_COMMAND_REPLY
) {
1162 if (command
== PA_COMMAND_ERROR
)
1163 pa_log("Failed to get info.");
1165 pa_log("Protocol error.");
1169 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1170 pa_tagstruct_gets(t
, &name
) < 0 ||
1171 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1172 pa_tagstruct_getu32(t
, &client
) < 0 ||
1173 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1174 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1175 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1176 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1177 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1178 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1179 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1180 pa_tagstruct_gets(t
, &driver
) < 0) {
1182 pa_log("Parse failure");
1186 if (u
->version
>= 11) {
1187 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1189 pa_log("Parse failure");
1194 if (u
->version
>= 13) {
1195 if (pa_tagstruct_get_proplist(t
, NULL
) < 0) {
1197 pa_log("Parse failure");
1202 if (u
->version
>= 19) {
1203 if (pa_tagstruct_get_boolean(t
, &b
) < 0) {
1205 pa_log("Parse failure");
1210 if (u
->version
>= 20) {
1211 if (pa_tagstruct_get_boolean(t
, &b
) < 0 ||
1212 pa_tagstruct_get_boolean(t
, &b
) < 0) {
1214 pa_log("Parse failure");
1219 if (u
->version
>= 21) {
1220 pa_format_info
*format
= pa_format_info_new();
1222 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1223 pa_format_info_free(format
);
1224 pa_log("Parse failure");
1227 pa_format_info_free(format
);
1230 if (!pa_tagstruct_eof(t
)) {
1231 pa_log("Packet too long");
1235 if (idx
!= u
->device_index
)
1240 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1241 pa_cvolume_equal(&volume
, &u
->sink
->real_volume
))
1244 pa_sink_volume_changed(u
->sink
, &volume
);
1246 if (u
->version
>= 11)
1247 pa_sink_mute_changed(u
->sink
, mute
);
1252 pa_module_unload_request(u
->module
, TRUE
);
1257 /* Called from main context */
1258 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1259 struct userdata
*u
= userdata
;
1260 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1261 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1266 pa_usec_t latency
, configured_latency
;
1271 if (command
!= PA_COMMAND_REPLY
) {
1272 if (command
== PA_COMMAND_ERROR
)
1273 pa_log("Failed to get info.");
1275 pa_log("Protocol error.");
1279 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1280 pa_tagstruct_gets(t
, &name
) < 0 ||
1281 pa_tagstruct_gets(t
, &description
) < 0 ||
1282 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1283 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1284 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1285 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1286 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1287 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1288 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1289 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1290 pa_tagstruct_gets(t
, &driver
) < 0 ||
1291 pa_tagstruct_getu32(t
, &flags
) < 0) {
1293 pa_log("Parse failure");
1297 if (u
->version
>= 13) {
1298 if (pa_tagstruct_get_proplist(t
, NULL
) < 0 ||
1299 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1301 pa_log("Parse failure");
1306 if (u
->version
>= 15) {
1307 pa_volume_t base_volume
;
1308 uint32_t state
, n_volume_steps
, card
;
1310 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1311 pa_tagstruct_getu32(t
, &state
) < 0 ||
1312 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1313 pa_tagstruct_getu32(t
, &card
) < 0) {
1315 pa_log("Parse failure");
1320 if (read_ports(u
, t
) < 0)
1323 if (u
->version
>= 22 && read_formats(u
, t
) < 0)
1326 if (!pa_tagstruct_eof(t
)) {
1327 pa_log("Packet too long");
1331 if (!u
->source_name
|| !pa_streq(name
, u
->source_name
))
1334 pa_xfree(u
->device_description
);
1335 u
->device_description
= pa_xstrdup(description
);
1337 update_description(u
);
1342 pa_module_unload_request(u
->module
, TRUE
);
1347 /* Called from main context */
1348 static void request_info(struct userdata
*u
) {
1353 t
= pa_tagstruct_new(NULL
, 0);
1354 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1355 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1356 pa_pstream_send_tagstruct(u
->pstream
, t
);
1357 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1360 t
= pa_tagstruct_new(NULL
, 0);
1361 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1362 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1363 pa_tagstruct_putu32(t
, u
->device_index
);
1364 pa_pstream_send_tagstruct(u
->pstream
, t
);
1365 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1368 t
= pa_tagstruct_new(NULL
, 0);
1369 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1370 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1371 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1372 pa_tagstruct_puts(t
, u
->sink_name
);
1373 pa_pstream_send_tagstruct(u
->pstream
, t
);
1374 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1377 if (u
->source_name
) {
1378 t
= pa_tagstruct_new(NULL
, 0);
1379 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1380 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1381 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1382 pa_tagstruct_puts(t
, u
->source_name
);
1383 pa_pstream_send_tagstruct(u
->pstream
, t
);
1384 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1389 /* Called from main context */
1390 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1391 struct userdata
*u
= userdata
;
1392 pa_subscription_event_type_t e
;
1398 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1400 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1401 pa_tagstruct_getu32(t
, &idx
) < 0) {
1402 pa_log("Invalid protocol reply");
1403 pa_module_unload_request(u
->module
, TRUE
);
1407 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1409 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1410 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1412 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1420 /* Called from main context */
1421 static void start_subscribe(struct userdata
*u
) {
1425 t
= pa_tagstruct_new(NULL
, 0);
1426 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1427 pa_tagstruct_putu32(t
, u
->ctag
++);
1428 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1430 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1432 PA_SUBSCRIPTION_MASK_SOURCE
1436 pa_pstream_send_tagstruct(u
->pstream
, t
);
1439 /* Called from main context */
1440 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1441 struct userdata
*u
= userdata
;
1448 pa_assert(u
->pdispatch
== pd
);
1450 if (command
!= PA_COMMAND_REPLY
) {
1451 if (command
== PA_COMMAND_ERROR
)
1452 pa_log("Failed to create stream.");
1454 pa_log("Protocol error.");
1458 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1459 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1461 || pa_tagstruct_getu32(t
, &bytes
) < 0
1466 if (u
->version
>= 9) {
1468 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1469 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1470 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1471 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1474 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1475 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1480 if (u
->version
>= 12) {
1483 uint32_t device_index
;
1485 pa_bool_t suspended
;
1487 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1488 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1489 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1490 pa_tagstruct_gets(t
, &dn
) < 0 ||
1491 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1495 pa_xfree(u
->sink_name
);
1496 u
->sink_name
= pa_xstrdup(dn
);
1498 pa_xfree(u
->source_name
);
1499 u
->source_name
= pa_xstrdup(dn
);
1503 if (u
->version
>= 13) {
1506 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1509 /* #ifdef TUNNEL_SINK */
1510 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1512 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1516 if (u
->version
>= 21) {
1517 pa_format_info
*format
= pa_format_info_new();
1519 if (pa_tagstruct_get_format_info(t
, format
) < 0) {
1520 pa_format_info_free(format
);
1524 pa_format_info_free(format
);
1527 if (!pa_tagstruct_eof(t
))
1533 pa_assert(!u
->time_event
);
1534 u
->time_event
= pa_core_rttime_new(u
->core
, pa_rtclock_now() + LATENCY_INTERVAL
, timeout_callback
, u
);
1538 pa_log_debug("Stream created.");
1541 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1547 pa_log("Invalid reply. (Create stream)");
1550 pa_module_unload_request(u
->module
, TRUE
);
1554 /* Called from main context */
1555 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1556 struct userdata
*u
= userdata
;
1557 pa_tagstruct
*reply
;
1558 char name
[256], un
[128], hn
[128];
1563 pa_assert(u
->pdispatch
== pd
);
1565 if (command
!= PA_COMMAND_REPLY
||
1566 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1567 !pa_tagstruct_eof(t
)) {
1569 if (command
== PA_COMMAND_ERROR
)
1570 pa_log("Failed to authenticate");
1572 pa_log("Protocol error.");
1577 /* Minimum supported protocol version */
1578 if (u
->version
< 8) {
1579 pa_log("Incompatible protocol version");
1583 /* Starting with protocol version 13 the MSB of the version tag
1584 reflects if shm is enabled for this connection or not. We don't
1585 support SHM here at all, so we just ignore this. */
1587 if (u
->version
>= 13)
1588 u
->version
&= 0x7FFFFFFFU
;
1590 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1593 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1594 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1596 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1598 pa_get_user_name(un
, sizeof(un
)),
1599 pa_get_host_name(hn
, sizeof(hn
)));
1601 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1602 pa_source_update_proplist(u
->source
, 0, NULL
);
1604 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1606 pa_get_user_name(un
, sizeof(un
)),
1607 pa_get_host_name(hn
, sizeof(hn
)));
1610 reply
= pa_tagstruct_new(NULL
, 0);
1611 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1612 pa_tagstruct_putu32(reply
, u
->ctag
++);
1614 if (u
->version
>= 13) {
1616 pl
= pa_proplist_new();
1617 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1618 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1619 pa_init_proplist(pl
);
1620 pa_tagstruct_put_proplist(reply
, pl
);
1621 pa_proplist_free(pl
);
1623 pa_tagstruct_puts(reply
, "PulseAudio");
1625 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1626 /* We ignore the server's reply here */
1628 reply
= pa_tagstruct_new(NULL
, 0);
1630 if (u
->version
< 13)
1631 /* Only for older PA versions we need to fill in the maxlength */
1632 u
->maxlength
= 4*1024*1024;
1635 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1636 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1637 u
->prebuf
= u
->tlength
;
1639 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1643 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1644 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1646 if (u
->version
< 13)
1647 pa_tagstruct_puts(reply
, name
);
1649 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1650 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1651 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1652 pa_tagstruct_puts(reply
, u
->sink_name
);
1653 pa_tagstruct_putu32(reply
, u
->maxlength
);
1654 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1655 pa_tagstruct_putu32(reply
, u
->tlength
);
1656 pa_tagstruct_putu32(reply
, u
->prebuf
);
1657 pa_tagstruct_putu32(reply
, u
->minreq
);
1658 pa_tagstruct_putu32(reply
, 0);
1659 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1660 pa_tagstruct_put_cvolume(reply
, &volume
);
1662 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1663 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1665 if (u
->version
< 13)
1666 pa_tagstruct_puts(reply
, name
);
1668 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1669 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1670 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1671 pa_tagstruct_puts(reply
, u
->source_name
);
1672 pa_tagstruct_putu32(reply
, u
->maxlength
);
1673 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1674 pa_tagstruct_putu32(reply
, u
->fragsize
);
1677 if (u
->version
>= 12) {
1678 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1679 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1680 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1681 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1682 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1683 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1684 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1687 if (u
->version
>= 13) {
1690 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1691 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1693 pl
= pa_proplist_new();
1694 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1695 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1696 pa_tagstruct_put_proplist(reply
, pl
);
1697 pa_proplist_free(pl
);
1700 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1704 if (u
->version
>= 14) {
1706 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1708 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1711 if (u
->version
>= 15) {
1713 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1715 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1716 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1720 if (u
->version
>= 17)
1721 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1723 if (u
->version
>= 18)
1724 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1728 if (u
->version
>= 21) {
1729 /* We're not using the extended API, so n_formats = 0 and that's that */
1730 pa_tagstruct_putu8(reply
, 0);
1733 if (u
->version
>= 22) {
1734 /* We're not using the extended API, so n_formats = 0 and that's that */
1735 pa_tagstruct_putu8(reply
, 0);
1736 pa_cvolume_reset(&volume
, u
->source
->sample_spec
.channels
);
1737 pa_tagstruct_put_cvolume(reply
, &volume
);
1738 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted */
1739 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1740 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1741 pa_tagstruct_put_boolean(reply
, FALSE
); /* relative volume */
1742 pa_tagstruct_put_boolean(reply
, FALSE
); /* passthrough stream */
1746 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1747 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1749 pa_log_debug("Connection authenticated, creating stream ...");
1754 pa_module_unload_request(u
->module
, TRUE
);
1757 /* Called from main context */
1758 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1759 struct userdata
*u
= userdata
;
1764 pa_log_warn("Stream died.");
1765 pa_module_unload_request(u
->module
, TRUE
);
1768 /* Called from main context */
1769 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1770 struct userdata
*u
= userdata
;
1776 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1777 pa_log("Invalid packet");
1778 pa_module_unload_request(u
->module
, TRUE
);
1784 /* Called from main context */
1785 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1786 struct userdata
*u
= userdata
;
1792 if (channel
!= u
->channel
) {
1793 pa_log("Received memory block on bad channel.");
1794 pa_module_unload_request(u
->module
, TRUE
);
1798 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1800 u
->counter_delta
+= (int64_t) chunk
->length
;
1804 /* Called from main context */
1805 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1806 struct userdata
*u
= userdata
;
1812 pa_assert(u
->client
== sc
);
1814 pa_socket_client_unref(u
->client
);
1818 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1819 pa_module_unload_request(u
->module
, TRUE
);
1823 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1824 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, TRUE
, command_table
, PA_COMMAND_MAX
);
1826 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1827 pa_pstream_set_receive_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1829 pa_pstream_set_receive_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1832 t
= pa_tagstruct_new(NULL
, 0);
1833 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1834 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1835 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1837 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1843 if (pa_iochannel_creds_supported(io
))
1844 pa_iochannel_creds_enable(io
);
1846 ucred
.uid
= getuid();
1847 ucred
.gid
= getgid();
1849 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1852 pa_pstream_send_tagstruct(u
->pstream
, t
);
1855 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1857 pa_log_debug("Connection established, authenticating ...");
1862 /* Called from main context */
1863 static void sink_set_volume(pa_sink
*sink
) {
1871 t
= pa_tagstruct_new(NULL
, 0);
1872 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1873 pa_tagstruct_putu32(t
, u
->ctag
++);
1874 pa_tagstruct_putu32(t
, u
->device_index
);
1875 pa_tagstruct_put_cvolume(t
, &sink
->real_volume
);
1876 pa_pstream_send_tagstruct(u
->pstream
, t
);
1879 /* Called from main context */
1880 static void sink_set_mute(pa_sink
*sink
) {
1888 if (u
->version
< 11)
1891 t
= pa_tagstruct_new(NULL
, 0);
1892 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1893 pa_tagstruct_putu32(t
, u
->ctag
++);
1894 pa_tagstruct_putu32(t
, u
->device_index
);
1895 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1896 pa_pstream_send_tagstruct(u
->pstream
, t
);
1901 int pa__init(pa_module
*m
) {
1902 pa_modargs
*ma
= NULL
;
1903 struct userdata
*u
= NULL
;
1908 pa_sink_new_data data
;
1910 pa_source_new_data data
;
1915 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1916 pa_log("Failed to parse module arguments");
1920 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1924 u
->pdispatch
= NULL
;
1926 u
->server_name
= NULL
;
1928 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1930 u
->requested_bytes
= 0;
1932 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1935 u
->smoother
= pa_smoother_new(
1944 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1945 u
->time_event
= NULL
;
1946 u
->ignore_latency_before
= 0;
1947 u
->transport_usec
= u
->thread_transport_usec
= 0;
1948 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1949 u
->counter
= u
->counter_delta
= 0;
1951 u
->rtpoll
= pa_rtpoll_new();
1952 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1954 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), TRUE
, PA_NATIVE_COOKIE_LENGTH
)))
1957 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1958 pa_log("No server specified.");
1962 ss
= m
->core
->default_sample_spec
;
1963 map
= m
->core
->default_channel_map
;
1964 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1965 pa_log("Invalid sample format specification");
1969 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, TRUE
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1970 pa_log("Failed to connect to server '%s'", u
->server_name
);
1974 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1978 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1979 dn
= pa_sprintf_malloc("tunnel-sink.%s", u
->server_name
);
1981 pa_sink_new_data_init(&data
);
1982 data
.driver
= __FILE__
;
1984 data
.namereg_fail
= TRUE
;
1985 pa_sink_new_data_set_name(&data
, dn
);
1986 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1987 pa_sink_new_data_set_channel_map(&data
, &map
);
1988 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1989 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1991 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1993 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1994 pa_log("Invalid properties");
1995 pa_sink_new_data_done(&data
);
1999 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
);
2000 pa_sink_new_data_done(&data
);
2003 pa_log("Failed to create sink.");
2007 u
->sink
->parent
.process_msg
= sink_process_msg
;
2008 u
->sink
->userdata
= u
;
2009 u
->sink
->set_state
= sink_set_state
;
2010 pa_sink_set_set_volume_callback(u
->sink
, sink_set_volume
);
2011 pa_sink_set_set_mute_callback(u
->sink
, sink_set_mute
);
2013 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
2015 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2017 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
2018 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
2022 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
2023 dn
= pa_sprintf_malloc("tunnel-source.%s", u
->server_name
);
2025 pa_source_new_data_init(&data
);
2026 data
.driver
= __FILE__
;
2028 data
.namereg_fail
= TRUE
;
2029 pa_source_new_data_set_name(&data
, dn
);
2030 pa_source_new_data_set_sample_spec(&data
, &ss
);
2031 pa_source_new_data_set_channel_map(&data
, &map
);
2032 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
2033 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
2035 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
2037 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
2038 pa_log("Invalid properties");
2039 pa_source_new_data_done(&data
);
2043 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
2044 pa_source_new_data_done(&data
);
2047 pa_log("Failed to create source.");
2051 u
->source
->parent
.process_msg
= source_process_msg
;
2052 u
->source
->set_state
= source_set_state
;
2053 u
->source
->userdata
= u
;
2055 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2057 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
2058 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
2060 u
->mcalign
= pa_mcalign_new(pa_frame_size(&u
->source
->sample_spec
));
2065 u
->time_event
= NULL
;
2067 u
->maxlength
= (uint32_t) -1;
2069 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
2071 u
->fragsize
= (uint32_t) -1;
2074 if (!(u
->thread
= pa_thread_new("module-tunnel", thread_func
, u
))) {
2075 pa_log("Failed to create thread.");
2080 pa_sink_put(u
->sink
);
2082 pa_source_put(u
->source
);
2085 pa_modargs_free(ma
);
2093 pa_modargs_free(ma
);
2100 void pa__done(pa_module
*m
) {
2105 if (!(u
= m
->userdata
))
2110 pa_sink_unlink(u
->sink
);
2113 pa_source_unlink(u
->source
);
2117 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2118 pa_thread_free(u
->thread
);
2121 pa_thread_mq_done(&u
->thread_mq
);
2125 pa_sink_unref(u
->sink
);
2128 pa_source_unref(u
->source
);
2132 pa_rtpoll_free(u
->rtpoll
);
2135 pa_pstream_unlink(u
->pstream
);
2136 pa_pstream_unref(u
->pstream
);
2140 pa_pdispatch_unref(u
->pdispatch
);
2143 pa_socket_client_unref(u
->client
);
2146 pa_auth_cookie_unref(u
->auth_cookie
);
2149 pa_smoother_free(u
->smoother
);
2152 u
->core
->mainloop
->time_free(u
->time_event
);
2156 pa_mcalign_free(u
->mcalign
);
2160 pa_xfree(u
->sink_name
);
2162 pa_xfree(u
->source_name
);
2164 pa_xfree(u
->server_name
);
2166 pa_xfree(u
->device_description
);
2167 pa_xfree(u
->server_fqdn
);
2168 pa_xfree(u
->user_name
);