2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
60 #include "module-tunnel-sink-symdef.h"
62 #include "module-tunnel-source-symdef.h"
66 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 "sink_name=<name for the local sink> "
69 "sink_properties=<properties for the local sink> "
71 "sink=<remote sink name> "
73 "format=<sample format> "
74 "channels=<number of channels> "
76 "channel_map=<channel map>");
78 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 "source_name=<name for the local source> "
81 "source_properties=<properties for the local source> "
83 "source=<remote source name> "
85 "format=<sample format> "
86 "channels=<number of channels> "
88 "channel_map=<channel map>");
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION
);
93 PA_MODULE_LOAD_ONCE(FALSE
);
95 static const char* const valid_modargs
[] = {
114 #define DEFAULT_TIMEOUT 5
116 #define LATENCY_INTERVAL 10
118 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
123 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
124 SINK_MESSAGE_REMOTE_SUSPEND
,
125 SINK_MESSAGE_UPDATE_LATENCY
,
129 #define DEFAULT_TLENGTH_MSEC 150
130 #define DEFAULT_MINREQ_MSEC 25
135 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
136 SOURCE_MESSAGE_REMOTE_SUSPEND
,
137 SOURCE_MESSAGE_UPDATE_LATENCY
140 #define DEFAULT_FRAGSIZE_MSEC 25
145 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
154 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
156 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
158 [PA_COMMAND_REQUEST
] = command_request
,
159 [PA_COMMAND_STARTED
] = command_started
,
161 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
162 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
164 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
166 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
168 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
170 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
173 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
174 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
181 pa_thread_mq thread_mq
;
185 pa_socket_client
*client
;
187 pa_pdispatch
*pdispatch
;
193 size_t requested_bytes
;
199 pa_auth_cookie
*auth_cookie
;
203 uint32_t device_index
;
206 int64_t counter
, counter_delta
;
208 pa_bool_t remote_corked
:1;
209 pa_bool_t remote_suspended
:1;
211 pa_usec_t transport_usec
; /* maintained in the main thread */
212 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
214 uint32_t ignore_latency_before
;
216 pa_time_event
*time_event
;
218 pa_smoother
*smoother
;
220 char *device_description
;
234 static void request_latency(struct userdata
*u
);
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
238 pa_log_debug("Got stream or client event.");
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
243 struct userdata
*u
= userdata
;
248 pa_assert(u
->pdispatch
== pd
);
250 pa_log_warn("Stream killed");
251 pa_module_unload_request(u
->module
, TRUE
);
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
256 struct userdata
*u
= userdata
;
261 pa_assert(u
->pdispatch
== pd
);
263 pa_log_info("Server signalled buffer overrun/underrun.");
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
269 struct userdata
*u
= userdata
;
276 pa_assert(u
->pdispatch
== pd
);
278 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
279 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
280 !pa_tagstruct_eof(t
)) {
282 pa_log("Invalid packet.");
283 pa_module_unload_request(u
->module
, TRUE
);
287 pa_log_debug("Server reports device suspend.");
290 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
292 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
298 /* Called from main context */
299 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
300 struct userdata
*u
= userdata
;
301 uint32_t channel
, di
;
308 pa_assert(u
->pdispatch
== pd
);
310 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
311 pa_tagstruct_getu32(t
, &di
) < 0 ||
312 pa_tagstruct_gets(t
, &dn
) < 0 ||
313 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
315 pa_log_error("Invalid packet.");
316 pa_module_unload_request(u
->module
, TRUE
);
320 pa_log_debug("Server reports a stream move.");
323 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
325 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
331 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
332 struct userdata
*u
= userdata
;
333 uint32_t channel
, maxlength
, tlength
, fragsize
, prebuf
, minreq
;
339 pa_assert(u
->pdispatch
== pd
);
341 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
342 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
344 pa_log_error("Invalid packet.");
345 pa_module_unload_request(u
->module
, TRUE
);
349 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
350 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
351 pa_tagstruct_get_usec(t
, &usec
) < 0) {
353 pa_log_error("Invalid packet.");
354 pa_module_unload_request(u
->module
, TRUE
);
358 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
359 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
360 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
361 pa_tagstruct_get_usec(t
, &usec
) < 0) {
363 pa_log_error("Invalid packet.");
364 pa_module_unload_request(u
->module
, TRUE
);
370 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
378 /* Called from main context */
379 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
380 struct userdata
*u
= userdata
;
385 pa_assert(u
->pdispatch
== pd
);
387 pa_log_debug("Server reports playback started.");
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
399 x
= pa_rtclock_now();
401 /* Correct by the time the requested issued needs to travel to the
402 * other side. This is a valid thread-safe access, because the
403 * main thread is waiting for us */
406 x
-= u
->thread_transport_usec
;
408 x
+= u
->thread_transport_usec
;
410 if (u
->remote_suspended
|| u
->remote_corked
)
411 pa_smoother_pause(u
->smoother
, x
);
413 pa_smoother_resume(u
->smoother
, x
, TRUE
);
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
420 if (u
->remote_corked
== cork
)
423 u
->remote_corked
= cork
;
424 check_smoother_status(u
, FALSE
);
427 /* Called from main context */
428 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
435 t
= pa_tagstruct_new(NULL
, 0);
437 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
439 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
441 pa_tagstruct_putu32(t
, u
->ctag
++);
442 pa_tagstruct_putu32(t
, u
->channel
);
443 pa_tagstruct_put_boolean(t
, !!cork
);
444 pa_pstream_send_tagstruct(u
->pstream
, t
);
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
453 if (u
->remote_suspended
== suspend
)
456 u
->remote_suspended
= suspend
;
457 check_smoother_status(u
, TRUE
);
462 /* Called from IO thread context */
463 static void send_data(struct userdata
*u
) {
466 while (u
->requested_bytes
> 0) {
467 pa_memchunk memchunk
;
469 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
470 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
471 pa_memblock_unref(memchunk
.memblock
);
473 u
->requested_bytes
-= memchunk
.length
;
475 u
->counter
+= (int64_t) memchunk
.length
;
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
481 struct userdata
*u
= PA_SINK(o
)->userdata
;
485 case PA_SINK_MESSAGE_SET_STATE
: {
488 /* First, change the state, because otherwide pa_sink_render() would fail */
489 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
491 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
493 if (PA_SINK_IS_OPENED(u
->sink
->state
))
500 case PA_SINK_MESSAGE_GET_LATENCY
: {
501 pa_usec_t yl
, yr
, *usec
= data
;
503 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
504 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
506 *usec
= yl
> yr
? yl
- yr
: 0;
510 case SINK_MESSAGE_REQUEST
:
512 pa_assert(offset
> 0);
513 u
->requested_bytes
+= (size_t) offset
;
515 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
521 case SINK_MESSAGE_REMOTE_SUSPEND
:
523 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
527 case SINK_MESSAGE_UPDATE_LATENCY
: {
530 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
532 if (y
> (pa_usec_t
) offset
)
533 y
-= (pa_usec_t
) offset
;
537 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
539 /* We can access this freely here, since the main thread is waiting for us */
540 u
->thread_transport_usec
= u
->transport_usec
;
545 case SINK_MESSAGE_POST
:
547 /* OK, This might be a bit confusing. This message is
548 * delivered to us from the main context -- NOT from the
549 * IO thread context where the rest of the messages are
550 * dispatched. Yeah, ugly, but I am a lazy bastard. */
552 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
554 u
->counter_delta
+= (int64_t) chunk
->length
;
559 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
562 /* Called from main context */
563 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
565 pa_sink_assert_ref(s
);
568 switch ((pa_sink_state_t
) state
) {
570 case PA_SINK_SUSPENDED
:
571 pa_assert(PA_SINK_IS_OPENED(s
->state
));
572 stream_cork(u
, TRUE
);
576 case PA_SINK_RUNNING
:
577 if (s
->state
== PA_SINK_SUSPENDED
)
578 stream_cork(u
, FALSE
);
581 case PA_SINK_UNLINKED
:
583 case PA_SINK_INVALID_STATE
:
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
594 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
598 case PA_SOURCE_MESSAGE_SET_STATE
: {
601 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
602 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
607 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
608 pa_usec_t yr
, yl
, *usec
= data
;
610 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
611 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_now());
613 *usec
= yr
> yl
? yr
- yl
: 0;
617 case SOURCE_MESSAGE_POST
:
619 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
620 pa_source_post(u
->source
, chunk
);
622 u
->counter
+= (int64_t) chunk
->length
;
626 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
628 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
631 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
634 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
635 y
+= (pa_usec_t
) offset
;
637 pa_smoother_put(u
->smoother
, pa_rtclock_now(), y
);
639 /* We can access this freely here, since the main thread is waiting for us */
640 u
->thread_transport_usec
= u
->transport_usec
;
646 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
649 /* Called from main context */
650 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
652 pa_source_assert_ref(s
);
655 switch ((pa_source_state_t
) state
) {
657 case PA_SOURCE_SUSPENDED
:
658 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
659 stream_cork(u
, TRUE
);
663 case PA_SOURCE_RUNNING
:
664 if (s
->state
== PA_SOURCE_SUSPENDED
)
665 stream_cork(u
, FALSE
);
668 case PA_SOURCE_UNLINKED
:
670 case PA_SINK_INVALID_STATE
:
679 static void thread_func(void *userdata
) {
680 struct userdata
*u
= userdata
;
684 pa_log_debug("Thread starting up");
686 pa_thread_mq_install(&u
->thread_mq
);
687 pa_rtpoll_install(u
->rtpoll
);
693 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
694 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
695 pa_sink_process_rewind(u
->sink
, 0);
698 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
706 /* If this was no regular exit from the loop we have to continue
707 * processing messages until we received PA_MESSAGE_SHUTDOWN */
708 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
709 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
712 pa_log_debug("Thread shutting down");
716 /* Called from main context */
717 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
718 struct userdata
*u
= userdata
;
719 uint32_t bytes
, channel
;
722 pa_assert(command
== PA_COMMAND_REQUEST
);
725 pa_assert(u
->pdispatch
== pd
);
727 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
728 pa_tagstruct_getu32(t
, &bytes
) < 0) {
729 pa_log("Invalid protocol reply");
733 if (channel
!= u
->channel
) {
734 pa_log("Received data for invalid channel");
738 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
742 pa_module_unload_request(u
->module
, TRUE
);
747 /* Called from main context */
748 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
749 struct userdata
*u
= userdata
;
750 pa_usec_t sink_usec
, source_usec
;
752 int64_t write_index
, read_index
;
753 struct timeval local
, remote
, now
;
760 if (command
!= PA_COMMAND_REPLY
) {
761 if (command
== PA_COMMAND_ERROR
)
762 pa_log("Failed to get latency.");
764 pa_log("Protocol error.");
768 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
769 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
770 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
771 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
772 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
773 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
774 pa_tagstruct_gets64(t
, &read_index
) < 0) {
775 pa_log("Invalid reply.");
780 if (u
->version
>= 13) {
781 uint64_t underrun_for
= 0, playing_for
= 0;
783 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
784 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
785 pa_log("Invalid reply.");
791 if (!pa_tagstruct_eof(t
)) {
792 pa_log("Invalid reply.");
796 if (tag
< u
->ignore_latency_before
) {
800 pa_gettimeofday(&now
);
802 /* Calculate transport usec */
803 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
804 /* local and remote seem to have synchronized clocks */
806 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
808 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
811 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
813 /* First, take the device's delay */
815 delay
= (int64_t) sink_usec
;
816 ss
= &u
->sink
->sample_spec
;
818 delay
= (int64_t) source_usec
;
819 ss
= &u
->source
->sample_spec
;
822 /* Add the length of our server-side buffer */
823 if (write_index
>= read_index
)
824 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
826 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
828 /* Our measurements are already out of date, hence correct by the *
829 * transport latency */
831 delay
-= (int64_t) u
->transport_usec
;
833 delay
+= (int64_t) u
->transport_usec
;
836 /* Now correct by what we have have read/written since we requested the update */
838 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
840 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
844 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
846 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
853 pa_module_unload_request(u
->module
, TRUE
);
856 /* Called from main context */
857 static void request_latency(struct userdata
*u
) {
863 t
= pa_tagstruct_new(NULL
, 0);
865 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
867 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
869 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
870 pa_tagstruct_putu32(t
, u
->channel
);
872 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
874 pa_pstream_send_tagstruct(u
->pstream
, t
);
875 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
877 u
->ignore_latency_before
= tag
;
878 u
->counter_delta
= 0;
881 /* Called from main context */
882 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
883 struct userdata
*u
= userdata
;
892 pa_gettimeofday(&ntv
);
893 ntv
.tv_sec
+= LATENCY_INTERVAL
;
894 m
->time_restart(e
, &ntv
);
897 /* Called from main context */
898 static void update_description(struct userdata
*u
) {
900 char un
[128], hn
[128];
905 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
908 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
911 pa_sink_set_description(u
->sink
, d
);
912 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
913 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
914 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
916 pa_source_set_description(u
->source
, d
);
917 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
918 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
919 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
924 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
925 pa_get_user_name(un
, sizeof(un
)),
926 pa_get_host_name(hn
, sizeof(hn
)));
928 t
= pa_tagstruct_new(NULL
, 0);
930 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
932 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
934 pa_tagstruct_putu32(t
, u
->ctag
++);
935 pa_tagstruct_putu32(t
, u
->channel
);
936 pa_tagstruct_puts(t
, d
);
937 pa_pstream_send_tagstruct(u
->pstream
, t
);
942 /* Called from main context */
943 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
944 struct userdata
*u
= userdata
;
947 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
953 if (command
!= PA_COMMAND_REPLY
) {
954 if (command
== PA_COMMAND_ERROR
)
955 pa_log("Failed to get info.");
957 pa_log("Protocol error.");
961 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
962 pa_tagstruct_gets(t
, &server_version
) < 0 ||
963 pa_tagstruct_gets(t
, &user_name
) < 0 ||
964 pa_tagstruct_gets(t
, &host_name
) < 0 ||
965 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
966 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
967 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
968 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
970 pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
972 pa_log("Parse failure");
976 if (!pa_tagstruct_eof(t
)) {
977 pa_log("Packet too long");
981 pa_xfree(u
->server_fqdn
);
982 u
->server_fqdn
= pa_xstrdup(host_name
);
984 pa_xfree(u
->user_name
);
985 u
->user_name
= pa_xstrdup(user_name
);
987 update_description(u
);
992 pa_module_unload_request(u
->module
, TRUE
);
997 /* Called from main context */
998 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
999 struct userdata
*u
= userdata
;
1000 uint32_t idx
, owner_module
, monitor_source
, flags
;
1001 const char *name
, *description
, *monitor_source_name
, *driver
;
1012 pl
= pa_proplist_new();
1014 if (command
!= PA_COMMAND_REPLY
) {
1015 if (command
== PA_COMMAND_ERROR
)
1016 pa_log("Failed to get info.");
1018 pa_log("Protocol error.");
1022 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1023 pa_tagstruct_gets(t
, &name
) < 0 ||
1024 pa_tagstruct_gets(t
, &description
) < 0 ||
1025 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1026 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1027 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1028 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1029 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1030 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1031 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1032 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1033 pa_tagstruct_gets(t
, &driver
) < 0 ||
1034 pa_tagstruct_getu32(t
, &flags
) < 0) {
1036 pa_log("Parse failure");
1040 if (u
->version
>= 13) {
1041 pa_usec_t configured_latency
;
1043 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1044 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1046 pa_log("Parse failure");
1051 if (u
->version
>= 15) {
1052 pa_volume_t base_volume
;
1053 uint32_t state
, n_volume_steps
, card
;
1055 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1056 pa_tagstruct_getu32(t
, &state
) < 0 ||
1057 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1058 pa_tagstruct_getu32(t
, &card
) < 0) {
1060 pa_log("Parse failure");
1065 if (!pa_tagstruct_eof(t
)) {
1066 pa_log("Packet too long");
1070 pa_proplist_free(pl
);
1072 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1075 pa_xfree(u
->device_description
);
1076 u
->device_description
= pa_xstrdup(description
);
1078 update_description(u
);
1083 pa_module_unload_request(u
->module
, TRUE
);
1084 pa_proplist_free(pl
);
1087 /* Called from main context */
1088 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1089 struct userdata
*u
= userdata
;
1090 uint32_t idx
, owner_module
, client
, sink
;
1091 pa_usec_t buffer_usec
, sink_usec
;
1092 const char *name
, *driver
, *resample_method
;
1094 pa_sample_spec sample_spec
;
1095 pa_channel_map channel_map
;
1102 pl
= pa_proplist_new();
1104 if (command
!= PA_COMMAND_REPLY
) {
1105 if (command
== PA_COMMAND_ERROR
)
1106 pa_log("Failed to get info.");
1108 pa_log("Protocol error.");
1112 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1113 pa_tagstruct_gets(t
, &name
) < 0 ||
1114 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1115 pa_tagstruct_getu32(t
, &client
) < 0 ||
1116 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1117 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1118 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1119 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1120 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1121 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1122 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1123 pa_tagstruct_gets(t
, &driver
) < 0) {
1125 pa_log("Parse failure");
1129 if (u
->version
>= 11) {
1130 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1132 pa_log("Parse failure");
1137 if (u
->version
>= 13) {
1138 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1140 pa_log("Parse failure");
1145 if (!pa_tagstruct_eof(t
)) {
1146 pa_log("Packet too long");
1150 pa_proplist_free(pl
);
1152 if (idx
!= u
->device_index
)
1157 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1158 pa_cvolume_equal(&volume
, &u
->sink
->virtual_volume
))
1161 pa_sink_volume_changed(u
->sink
, &volume
, FALSE
);
1163 if (u
->version
>= 11)
1164 pa_sink_mute_changed(u
->sink
, mute
, FALSE
);
1169 pa_module_unload_request(u
->module
, TRUE
);
1170 pa_proplist_free(pl
);
1175 /* Called from main context */
1176 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1177 struct userdata
*u
= userdata
;
1178 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1179 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1184 pa_usec_t latency
, configured_latency
;
1190 pl
= pa_proplist_new();
1192 if (command
!= PA_COMMAND_REPLY
) {
1193 if (command
== PA_COMMAND_ERROR
)
1194 pa_log("Failed to get info.");
1196 pa_log("Protocol error.");
1200 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1201 pa_tagstruct_gets(t
, &name
) < 0 ||
1202 pa_tagstruct_gets(t
, &description
) < 0 ||
1203 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1204 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1205 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1206 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1207 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1208 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1209 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1210 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1211 pa_tagstruct_gets(t
, &driver
) < 0 ||
1212 pa_tagstruct_getu32(t
, &flags
) < 0) {
1214 pa_log("Parse failure");
1218 if (u
->version
>= 13) {
1219 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1220 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1222 pa_log("Parse failure");
1227 if (u
->version
>= 15) {
1228 pa_volume_t base_volume
;
1229 uint32_t state
, n_volume_steps
, card
;
1231 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1232 pa_tagstruct_getu32(t
, &state
) < 0 ||
1233 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1234 pa_tagstruct_getu32(t
, &card
) < 0) {
1236 pa_log("Parse failure");
1241 if (!pa_tagstruct_eof(t
)) {
1242 pa_log("Packet too long");
1246 pa_proplist_free(pl
);
1248 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1251 pa_xfree(u
->device_description
);
1252 u
->device_description
= pa_xstrdup(description
);
1254 update_description(u
);
1259 pa_module_unload_request(u
->module
, TRUE
);
1260 pa_proplist_free(pl
);
1265 /* Called from main context */
1266 static void request_info(struct userdata
*u
) {
1271 t
= pa_tagstruct_new(NULL
, 0);
1272 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1273 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1274 pa_pstream_send_tagstruct(u
->pstream
, t
);
1275 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1278 t
= pa_tagstruct_new(NULL
, 0);
1279 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1280 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1281 pa_tagstruct_putu32(t
, u
->device_index
);
1282 pa_pstream_send_tagstruct(u
->pstream
, t
);
1283 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1286 t
= pa_tagstruct_new(NULL
, 0);
1287 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1288 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1289 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1290 pa_tagstruct_puts(t
, u
->sink_name
);
1291 pa_pstream_send_tagstruct(u
->pstream
, t
);
1292 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1295 if (u
->source_name
) {
1296 t
= pa_tagstruct_new(NULL
, 0);
1297 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1298 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1299 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1300 pa_tagstruct_puts(t
, u
->source_name
);
1301 pa_pstream_send_tagstruct(u
->pstream
, t
);
1302 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1307 /* Called from main context */
1308 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1309 struct userdata
*u
= userdata
;
1310 pa_subscription_event_type_t e
;
1316 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1318 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1319 pa_tagstruct_getu32(t
, &idx
) < 0) {
1320 pa_log("Invalid protocol reply");
1321 pa_module_unload_request(u
->module
, TRUE
);
1325 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1327 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1328 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1330 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1338 /* Called from main context */
1339 static void start_subscribe(struct userdata
*u
) {
1344 t
= pa_tagstruct_new(NULL
, 0);
1345 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1346 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1347 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1349 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1351 PA_SUBSCRIPTION_MASK_SOURCE
1355 pa_pstream_send_tagstruct(u
->pstream
, t
);
1358 /* Called from main context */
1359 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1360 struct userdata
*u
= userdata
;
1368 pa_assert(u
->pdispatch
== pd
);
1370 if (command
!= PA_COMMAND_REPLY
) {
1371 if (command
== PA_COMMAND_ERROR
)
1372 pa_log("Failed to create stream.");
1374 pa_log("Protocol error.");
1378 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1379 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1381 || pa_tagstruct_getu32(t
, &bytes
) < 0
1386 if (u
->version
>= 9) {
1388 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1389 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1390 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1391 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1394 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1395 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1400 if (u
->version
>= 12) {
1403 uint32_t device_index
;
1405 pa_bool_t suspended
;
1407 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1408 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1409 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1410 pa_tagstruct_gets(t
, &dn
) < 0 ||
1411 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1415 pa_xfree(u
->sink_name
);
1416 u
->sink_name
= pa_xstrdup(dn
);
1418 pa_xfree(u
->source_name
);
1419 u
->source_name
= pa_xstrdup(dn
);
1423 if (u
->version
>= 13) {
1426 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1429 /* #ifdef TUNNEL_SINK */
1430 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1432 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1436 if (!pa_tagstruct_eof(t
))
1442 pa_assert(!u
->time_event
);
1443 pa_gettimeofday(&ntv
);
1444 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1445 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1449 pa_log_debug("Stream created.");
1452 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1458 pa_log("Invalid reply. (Create stream)");
1461 pa_module_unload_request(u
->module
, TRUE
);
1465 /* Called from main context */
1466 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1467 struct userdata
*u
= userdata
;
1468 pa_tagstruct
*reply
;
1469 char name
[256], un
[128], hn
[128];
1476 pa_assert(u
->pdispatch
== pd
);
1478 if (command
!= PA_COMMAND_REPLY
||
1479 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1480 !pa_tagstruct_eof(t
)) {
1482 if (command
== PA_COMMAND_ERROR
)
1483 pa_log("Failed to authenticate");
1485 pa_log("Protocol error.");
1490 /* Minimum supported protocol version */
1491 if (u
->version
< 8) {
1492 pa_log("Incompatible protocol version");
1496 /* Starting with protocol version 13 the MSB of the version tag
1497 reflects if shm is enabled for this connection or not. We don't
1498 support SHM here at all, so we just ignore this. */
1500 if (u
->version
>= 13)
1501 u
->version
&= 0x7FFFFFFFU
;
1503 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1506 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1507 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1509 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1511 pa_get_user_name(un
, sizeof(un
)),
1512 pa_get_host_name(hn
, sizeof(hn
)));
1514 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1515 pa_source_update_proplist(u
->source
, 0, NULL
);
1517 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1519 pa_get_user_name(un
, sizeof(un
)),
1520 pa_get_host_name(hn
, sizeof(hn
)));
1523 reply
= pa_tagstruct_new(NULL
, 0);
1524 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1525 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1527 if (u
->version
>= 13) {
1529 pl
= pa_proplist_new();
1530 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1531 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1532 pa_init_proplist(pl
);
1533 pa_tagstruct_put_proplist(reply
, pl
);
1534 pa_proplist_free(pl
);
1536 pa_tagstruct_puts(reply
, "PulseAudio");
1538 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1539 /* We ignore the server's reply here */
1541 reply
= pa_tagstruct_new(NULL
, 0);
1543 if (u
->version
< 13)
1544 /* Only for older PA versions we need to fill in the maxlength */
1545 u
->maxlength
= 4*1024*1024;
1548 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1549 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1550 u
->prebuf
= u
->tlength
;
1552 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1556 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1557 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1559 if (u
->version
< 13)
1560 pa_tagstruct_puts(reply
, name
);
1562 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1563 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1564 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1565 pa_tagstruct_puts(reply
, u
->sink_name
);
1566 pa_tagstruct_putu32(reply
, u
->maxlength
);
1567 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1568 pa_tagstruct_putu32(reply
, u
->tlength
);
1569 pa_tagstruct_putu32(reply
, u
->prebuf
);
1570 pa_tagstruct_putu32(reply
, u
->minreq
);
1571 pa_tagstruct_putu32(reply
, 0);
1572 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1573 pa_tagstruct_put_cvolume(reply
, &volume
);
1575 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1576 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1578 if (u
->version
< 13)
1579 pa_tagstruct_puts(reply
, name
);
1581 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1582 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1583 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1584 pa_tagstruct_puts(reply
, u
->source_name
);
1585 pa_tagstruct_putu32(reply
, u
->maxlength
);
1586 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1587 pa_tagstruct_putu32(reply
, u
->fragsize
);
1590 if (u
->version
>= 12) {
1591 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1592 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1593 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1594 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1595 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1596 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1597 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1600 if (u
->version
>= 13) {
1603 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1604 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1606 pl
= pa_proplist_new();
1607 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1608 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1609 pa_tagstruct_put_proplist(reply
, pl
);
1610 pa_proplist_free(pl
);
1613 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1617 if (u
->version
>= 14) {
1619 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1621 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1624 if (u
->version
>= 15) {
1626 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1628 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1629 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1632 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1633 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1635 pa_log_debug("Connection authenticated, creating stream ...");
1640 pa_module_unload_request(u
->module
, TRUE
);
1643 /* Called from main context */
1644 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1645 struct userdata
*u
= userdata
;
1650 pa_log_warn("Stream died.");
1651 pa_module_unload_request(u
->module
, TRUE
);
1654 /* Called from main context */
1655 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1656 struct userdata
*u
= userdata
;
1662 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1663 pa_log("Invalid packet");
1664 pa_module_unload_request(u
->module
, TRUE
);
1670 /* Called from main context */
1671 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1672 struct userdata
*u
= userdata
;
1678 if (channel
!= u
->channel
) {
1679 pa_log("Received memory block on bad channel.");
1680 pa_module_unload_request(u
->module
, TRUE
);
1684 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1686 u
->counter_delta
+= (int64_t) chunk
->length
;
1690 /* Called from main context */
1691 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1692 struct userdata
*u
= userdata
;
1698 pa_assert(u
->client
== sc
);
1700 pa_socket_client_unref(u
->client
);
1704 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1705 pa_module_unload_request(u
->module
, TRUE
);
1709 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1710 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1712 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1713 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1715 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1718 t
= pa_tagstruct_new(NULL
, 0);
1719 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1720 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1721 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1723 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1729 if (pa_iochannel_creds_supported(io
))
1730 pa_iochannel_creds_enable(io
);
1732 ucred
.uid
= getuid();
1733 ucred
.gid
= getgid();
1735 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1738 pa_pstream_send_tagstruct(u
->pstream
, t
);
1741 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1743 pa_log_debug("Connection established, authenticating ...");
1748 /* Called from main context */
1749 static void sink_set_volume(pa_sink
*sink
) {
1758 t
= pa_tagstruct_new(NULL
, 0);
1759 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1760 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1761 pa_tagstruct_putu32(t
, u
->device_index
);
1762 pa_tagstruct_put_cvolume(t
, &sink
->virtual_volume
);
1763 pa_pstream_send_tagstruct(u
->pstream
, t
);
1766 /* Called from main context */
1767 static void sink_set_mute(pa_sink
*sink
) {
1776 if (u
->version
< 11)
1779 t
= pa_tagstruct_new(NULL
, 0);
1780 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1781 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1782 pa_tagstruct_putu32(t
, u
->device_index
);
1783 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1784 pa_pstream_send_tagstruct(u
->pstream
, t
);
1789 int pa__init(pa_module
*m
) {
1790 pa_modargs
*ma
= NULL
;
1791 struct userdata
*u
= NULL
;
1796 pa_sink_new_data data
;
1798 pa_source_new_data data
;
1803 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1804 pa_log("Failed to parse module arguments");
1808 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1812 u
->pdispatch
= NULL
;
1814 u
->server_name
= NULL
;
1816 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1818 u
->requested_bytes
= 0;
1820 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1823 u
->smoother
= pa_smoother_new(
1832 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1833 u
->time_event
= NULL
;
1834 u
->ignore_latency_before
= 0;
1835 u
->transport_usec
= u
->thread_transport_usec
= 0;
1836 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1837 u
->counter
= u
->counter_delta
= 0;
1839 u
->rtpoll
= pa_rtpoll_new();
1840 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1842 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1845 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1846 pa_log("No server specified.");
1850 ss
= m
->core
->default_sample_spec
;
1851 map
= m
->core
->default_channel_map
;
1852 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1853 pa_log("Invalid sample format specification");
1857 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1858 pa_log("Failed to connect to server '%s'", u
->server_name
);
1862 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1866 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1867 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1869 pa_sink_new_data_init(&data
);
1870 data
.driver
= __FILE__
;
1872 data
.namereg_fail
= TRUE
;
1873 pa_sink_new_data_set_name(&data
, dn
);
1874 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1875 pa_sink_new_data_set_channel_map(&data
, &map
);
1876 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1877 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1879 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1881 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1882 pa_log("Invalid properties");
1883 pa_sink_new_data_done(&data
);
1887 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
|PA_SINK_HW_MUTE_CTRL
);
1888 pa_sink_new_data_done(&data
);
1891 pa_log("Failed to create sink.");
1895 u
->sink
->parent
.process_msg
= sink_process_msg
;
1896 u
->sink
->userdata
= u
;
1897 u
->sink
->set_state
= sink_set_state
;
1898 u
->sink
->set_volume
= sink_set_volume
;
1899 u
->sink
->set_mute
= sink_set_mute
;
1901 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1903 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1905 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1906 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1910 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1911 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1913 pa_source_new_data_init(&data
);
1914 data
.driver
= __FILE__
;
1916 data
.namereg_fail
= TRUE
;
1917 pa_source_new_data_set_name(&data
, dn
);
1918 pa_source_new_data_set_sample_spec(&data
, &ss
);
1919 pa_source_new_data_set_channel_map(&data
, &map
);
1920 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1921 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1923 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1925 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1926 pa_log("Invalid properties");
1927 pa_source_new_data_done(&data
);
1931 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1932 pa_source_new_data_done(&data
);
1935 pa_log("Failed to create source.");
1939 u
->source
->parent
.process_msg
= source_process_msg
;
1940 u
->source
->set_state
= source_set_state
;
1941 u
->source
->userdata
= u
;
1943 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1945 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1946 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1951 u
->time_event
= NULL
;
1953 u
->maxlength
= (uint32_t) -1;
1955 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
1957 u
->fragsize
= (uint32_t) -1;
1960 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1961 pa_log("Failed to create thread.");
1966 pa_sink_put(u
->sink
);
1968 pa_source_put(u
->source
);
1971 pa_modargs_free(ma
);
1979 pa_modargs_free(ma
);
1986 void pa__done(pa_module
*m
) {
1991 if (!(u
= m
->userdata
))
1996 pa_sink_unlink(u
->sink
);
1999 pa_source_unlink(u
->source
);
2003 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2004 pa_thread_free(u
->thread
);
2007 pa_thread_mq_done(&u
->thread_mq
);
2011 pa_sink_unref(u
->sink
);
2014 pa_source_unref(u
->source
);
2018 pa_rtpoll_free(u
->rtpoll
);
2021 pa_pstream_unlink(u
->pstream
);
2022 pa_pstream_unref(u
->pstream
);
2026 pa_pdispatch_unref(u
->pdispatch
);
2029 pa_socket_client_unref(u
->client
);
2032 pa_auth_cookie_unref(u
->auth_cookie
);
2035 pa_smoother_free(u
->smoother
);
2038 u
->core
->mainloop
->time_free(u
->time_event
);
2041 pa_xfree(u
->sink_name
);
2043 pa_xfree(u
->source_name
);
2045 pa_xfree(u
->server_name
);
2047 pa_xfree(u
->device_description
);
2048 pa_xfree(u
->server_fqdn
);
2049 pa_xfree(u
->user_name
);