2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
70 "sink=<remote sink name> "
72 "format=<sample format> "
73 "channels=<number of channels> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
82 "source=<remote source name> "
84 "format=<sample format> "
85 "channels=<number of channels> "
87 "channel_map=<channel map>");
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION
);
92 PA_MODULE_LOAD_ONCE(FALSE
);
94 static const char* const valid_modargs
[] = {
113 #define DEFAULT_TIMEOUT 5
115 #define LATENCY_INTERVAL 10
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
122 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
123 SINK_MESSAGE_REMOTE_SUSPEND
,
124 SINK_MESSAGE_UPDATE_LATENCY
,
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
134 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
135 SOURCE_MESSAGE_REMOTE_SUSPEND
,
136 SOURCE_MESSAGE_UPDATE_LATENCY
139 #define DEFAULT_FRAGSIZE_MSEC 25
144 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
155 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
157 [PA_COMMAND_REQUEST
] = command_request
,
158 [PA_COMMAND_STARTED
] = command_started
,
160 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
161 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
162 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
164 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
168 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
170 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
180 pa_thread_mq thread_mq
;
184 pa_socket_client
*client
;
186 pa_pdispatch
*pdispatch
;
192 size_t requested_bytes
;
198 pa_auth_cookie
*auth_cookie
;
202 uint32_t device_index
;
205 int64_t counter
, counter_delta
;
207 pa_bool_t remote_corked
:1;
208 pa_bool_t remote_suspended
:1;
210 pa_usec_t transport_usec
; /* maintained in the main thread */
211 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
213 uint32_t ignore_latency_before
;
215 pa_time_event
*time_event
;
217 pa_smoother
*smoother
;
219 char *device_description
;
233 static void request_latency(struct userdata
*u
);
235 /* Called from main context */
236 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
237 pa_log_debug("Got stream or client event.");
240 /* Called from main context */
241 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
242 struct userdata
*u
= userdata
;
247 pa_assert(u
->pdispatch
== pd
);
249 pa_log_warn("Stream killed");
250 pa_module_unload_request(u
->module
, TRUE
);
253 /* Called from main context */
254 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
255 struct userdata
*u
= userdata
;
260 pa_assert(u
->pdispatch
== pd
);
262 pa_log_info("Server signalled buffer overrun/underrun.");
266 /* Called from main context */
267 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
268 struct userdata
*u
= userdata
;
275 pa_assert(u
->pdispatch
== pd
);
277 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
278 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
279 !pa_tagstruct_eof(t
)) {
281 pa_log("Invalid packet.");
282 pa_module_unload_request(u
->module
, TRUE
);
286 pa_log_debug("Server reports device suspend.");
289 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
291 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
297 /* Called from main context */
298 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
299 struct userdata
*u
= userdata
;
300 uint32_t channel
, di
;
307 pa_assert(u
->pdispatch
== pd
);
309 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
310 pa_tagstruct_getu32(t
, &di
) < 0 ||
311 pa_tagstruct_gets(t
, &dn
) < 0 ||
312 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
314 pa_log_error("Invalid packet.");
315 pa_module_unload_request(u
->module
, TRUE
);
319 pa_log_debug("Server reports a stream move.");
322 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
324 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
330 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
331 struct userdata
*u
= userdata
;
332 uint32_t channel
, maxlength
, tlength
, fragsize
, prebuf
, minreq
;
338 pa_assert(u
->pdispatch
== pd
);
340 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
341 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
343 pa_log_error("Invalid packet.");
344 pa_module_unload_request(u
->module
, TRUE
);
348 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
349 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
350 pa_tagstruct_get_usec(t
, &usec
) < 0) {
352 pa_log_error("Invalid packet.");
353 pa_module_unload_request(u
->module
, TRUE
);
357 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
358 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
359 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
360 pa_tagstruct_get_usec(t
, &usec
) < 0) {
362 pa_log_error("Invalid packet.");
363 pa_module_unload_request(u
->module
, TRUE
);
369 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
377 /* Called from main context */
378 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
379 struct userdata
*u
= userdata
;
384 pa_assert(u
->pdispatch
== pd
);
386 pa_log_debug("Server reports playback started.");
392 /* Called from IO thread context */
393 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
398 x
= pa_rtclock_usec();
400 /* Correct by the time the requested issued needs to travel to the
401 * other side. This is a valid thread-safe access, because the
402 * main thread is waiting for us */
405 x
-= u
->thread_transport_usec
;
407 x
+= u
->thread_transport_usec
;
409 if (u
->remote_suspended
|| u
->remote_corked
)
410 pa_smoother_pause(u
->smoother
, x
);
412 pa_smoother_resume(u
->smoother
, x
, TRUE
);
415 /* Called from IO thread context */
416 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
419 if (u
->remote_corked
== cork
)
422 u
->remote_corked
= cork
;
423 check_smoother_status(u
, FALSE
);
426 /* Called from main context */
427 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
434 t
= pa_tagstruct_new(NULL
, 0);
436 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
438 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
440 pa_tagstruct_putu32(t
, u
->ctag
++);
441 pa_tagstruct_putu32(t
, u
->channel
);
442 pa_tagstruct_put_boolean(t
, !!cork
);
443 pa_pstream_send_tagstruct(u
->pstream
, t
);
448 /* Called from IO thread context */
449 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
452 if (u
->remote_suspended
== suspend
)
455 u
->remote_suspended
= suspend
;
456 check_smoother_status(u
, TRUE
);
461 /* Called from IO thread context */
462 static void send_data(struct userdata
*u
) {
465 while (u
->requested_bytes
> 0) {
466 pa_memchunk memchunk
;
468 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
469 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
470 pa_memblock_unref(memchunk
.memblock
);
472 u
->requested_bytes
-= memchunk
.length
;
474 u
->counter
+= (int64_t) memchunk
.length
;
478 /* This function is called from IO context -- except when it is not. */
479 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
480 struct userdata
*u
= PA_SINK(o
)->userdata
;
484 case PA_SINK_MESSAGE_SET_STATE
: {
487 /* First, change the state, because otherwide pa_sink_render() would fail */
488 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
490 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
492 if (PA_SINK_IS_OPENED(u
->sink
->state
))
499 case PA_SINK_MESSAGE_GET_LATENCY
: {
500 pa_usec_t yl
, yr
, *usec
= data
;
502 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
503 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
505 *usec
= yl
> yr
? yl
- yr
: 0;
509 case SINK_MESSAGE_REQUEST
:
511 pa_assert(offset
> 0);
512 u
->requested_bytes
+= (size_t) offset
;
514 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
520 case SINK_MESSAGE_REMOTE_SUSPEND
:
522 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
526 case SINK_MESSAGE_UPDATE_LATENCY
: {
529 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
531 if (y
> (pa_usec_t
) offset
)
532 y
-= (pa_usec_t
) offset
;
536 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
538 /* We can access this freely here, since the main thread is waiting for us */
539 u
->thread_transport_usec
= u
->transport_usec
;
544 case SINK_MESSAGE_POST
:
546 /* OK, This might be a bit confusing. This message is
547 * delivered to us from the main context -- NOT from the
548 * IO thread context where the rest of the messages are
549 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
553 u
->counter_delta
+= (int64_t) chunk
->length
;
558 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
561 /* Called from main context */
562 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
564 pa_sink_assert_ref(s
);
567 switch ((pa_sink_state_t
) state
) {
569 case PA_SINK_SUSPENDED
:
570 pa_assert(PA_SINK_IS_OPENED(s
->state
));
571 stream_cork(u
, TRUE
);
575 case PA_SINK_RUNNING
:
576 if (s
->state
== PA_SINK_SUSPENDED
)
577 stream_cork(u
, FALSE
);
580 case PA_SINK_UNLINKED
:
582 case PA_SINK_INVALID_STATE
:
591 /* This function is called from IO context -- except when it is not. */
592 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
593 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
597 case PA_SOURCE_MESSAGE_SET_STATE
: {
600 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
601 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
606 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
607 pa_usec_t yr
, yl
, *usec
= data
;
609 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
610 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
612 *usec
= yr
> yl
? yr
- yl
: 0;
616 case SOURCE_MESSAGE_POST
:
618 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
619 pa_source_post(u
->source
, chunk
);
621 u
->counter
+= (int64_t) chunk
->length
;
625 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
627 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
630 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
633 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
634 y
+= (pa_usec_t
) offset
;
636 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
638 /* We can access this freely here, since the main thread is waiting for us */
639 u
->thread_transport_usec
= u
->transport_usec
;
645 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
648 /* Called from main context */
649 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
651 pa_source_assert_ref(s
);
654 switch ((pa_source_state_t
) state
) {
656 case PA_SOURCE_SUSPENDED
:
657 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
658 stream_cork(u
, TRUE
);
662 case PA_SOURCE_RUNNING
:
663 if (s
->state
== PA_SOURCE_SUSPENDED
)
664 stream_cork(u
, FALSE
);
667 case PA_SOURCE_UNLINKED
:
669 case PA_SINK_INVALID_STATE
:
678 static void thread_func(void *userdata
) {
679 struct userdata
*u
= userdata
;
683 pa_log_debug("Thread starting up");
685 pa_thread_mq_install(&u
->thread_mq
);
686 pa_rtpoll_install(u
->rtpoll
);
692 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
693 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
694 pa_sink_process_rewind(u
->sink
, 0);
697 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
705 /* If this was no regular exit from the loop we have to continue
706 * processing messages until we received PA_MESSAGE_SHUTDOWN */
707 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
708 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
711 pa_log_debug("Thread shutting down");
715 /* Called from main context */
716 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
717 struct userdata
*u
= userdata
;
718 uint32_t bytes
, channel
;
721 pa_assert(command
== PA_COMMAND_REQUEST
);
724 pa_assert(u
->pdispatch
== pd
);
726 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
727 pa_tagstruct_getu32(t
, &bytes
) < 0) {
728 pa_log("Invalid protocol reply");
732 if (channel
!= u
->channel
) {
733 pa_log("Recieved data for invalid channel");
737 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
741 pa_module_unload_request(u
->module
, TRUE
);
746 /* Called from main context */
747 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
748 struct userdata
*u
= userdata
;
749 pa_usec_t sink_usec
, source_usec
;
751 int64_t write_index
, read_index
;
752 struct timeval local
, remote
, now
;
759 if (command
!= PA_COMMAND_REPLY
) {
760 if (command
== PA_COMMAND_ERROR
)
761 pa_log("Failed to get latency.");
763 pa_log("Protocol error.");
767 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
768 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
769 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
770 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
771 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
772 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
773 pa_tagstruct_gets64(t
, &read_index
) < 0) {
774 pa_log("Invalid reply.");
779 if (u
->version
>= 13) {
780 uint64_t underrun_for
= 0, playing_for
= 0;
782 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
783 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
784 pa_log("Invalid reply.");
790 if (!pa_tagstruct_eof(t
)) {
791 pa_log("Invalid reply.");
795 if (tag
< u
->ignore_latency_before
) {
799 pa_gettimeofday(&now
);
801 /* Calculate transport usec */
802 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
803 /* local and remote seem to have synchronized clocks */
805 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
807 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
810 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
812 /* First, take the device's delay */
814 delay
= (int64_t) sink_usec
;
815 ss
= &u
->sink
->sample_spec
;
817 delay
= (int64_t) source_usec
;
818 ss
= &u
->source
->sample_spec
;
821 /* Add the length of our server-side buffer */
822 if (write_index
>= read_index
)
823 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
825 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
827 /* Our measurements are already out of date, hence correct by the *
828 * transport latency */
830 delay
-= (int64_t) u
->transport_usec
;
832 delay
+= (int64_t) u
->transport_usec
;
835 /* Now correct by what we have have read/written since we requested the update */
837 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
839 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
843 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
845 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
852 pa_module_unload_request(u
->module
, TRUE
);
855 /* Called from main context */
856 static void request_latency(struct userdata
*u
) {
862 t
= pa_tagstruct_new(NULL
, 0);
864 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
866 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
868 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
869 pa_tagstruct_putu32(t
, u
->channel
);
871 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
873 pa_pstream_send_tagstruct(u
->pstream
, t
);
874 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
876 u
->ignore_latency_before
= tag
;
877 u
->counter_delta
= 0;
880 /* Called from main context */
881 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
882 struct userdata
*u
= userdata
;
891 pa_gettimeofday(&ntv
);
892 ntv
.tv_sec
+= LATENCY_INTERVAL
;
893 m
->time_restart(e
, &ntv
);
896 /* Called from main context */
897 static void update_description(struct userdata
*u
) {
899 char un
[128], hn
[128];
904 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
907 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
910 pa_sink_set_description(u
->sink
, d
);
911 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
912 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
913 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
915 pa_source_set_description(u
->source
, d
);
916 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
917 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
918 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
923 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
924 pa_get_user_name(un
, sizeof(un
)),
925 pa_get_host_name(hn
, sizeof(hn
)));
927 t
= pa_tagstruct_new(NULL
, 0);
929 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
931 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
933 pa_tagstruct_putu32(t
, u
->ctag
++);
934 pa_tagstruct_putu32(t
, u
->channel
);
935 pa_tagstruct_puts(t
, d
);
936 pa_pstream_send_tagstruct(u
->pstream
, t
);
941 /* Called from main context */
942 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
943 struct userdata
*u
= userdata
;
946 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
952 if (command
!= PA_COMMAND_REPLY
) {
953 if (command
== PA_COMMAND_ERROR
)
954 pa_log("Failed to get info.");
956 pa_log("Protocol error.");
960 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
961 pa_tagstruct_gets(t
, &server_version
) < 0 ||
962 pa_tagstruct_gets(t
, &user_name
) < 0 ||
963 pa_tagstruct_gets(t
, &host_name
) < 0 ||
964 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
965 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
966 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
967 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
969 pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
971 pa_log("Parse failure");
975 if (!pa_tagstruct_eof(t
)) {
976 pa_log("Packet too long");
980 pa_xfree(u
->server_fqdn
);
981 u
->server_fqdn
= pa_xstrdup(host_name
);
983 pa_xfree(u
->user_name
);
984 u
->user_name
= pa_xstrdup(user_name
);
986 update_description(u
);
991 pa_module_unload_request(u
->module
, TRUE
);
996 /* Called from main context */
997 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
998 struct userdata
*u
= userdata
;
999 uint32_t idx
, owner_module
, monitor_source
, flags
;
1000 const char *name
, *description
, *monitor_source_name
, *driver
;
1011 pl
= pa_proplist_new();
1013 if (command
!= PA_COMMAND_REPLY
) {
1014 if (command
== PA_COMMAND_ERROR
)
1015 pa_log("Failed to get info.");
1017 pa_log("Protocol error.");
1021 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1022 pa_tagstruct_gets(t
, &name
) < 0 ||
1023 pa_tagstruct_gets(t
, &description
) < 0 ||
1024 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1025 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1026 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1027 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1028 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1029 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1030 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1031 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1032 pa_tagstruct_gets(t
, &driver
) < 0 ||
1033 pa_tagstruct_getu32(t
, &flags
) < 0) {
1035 pa_log("Parse failure");
1039 if (u
->version
>= 13) {
1040 pa_usec_t configured_latency
;
1042 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1043 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1045 pa_log("Parse failure");
1050 if (u
->version
>= 15) {
1051 pa_volume_t base_volume
;
1052 uint32_t state
, n_volume_steps
, card
;
1054 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1055 pa_tagstruct_getu32(t
, &state
) < 0 ||
1056 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1057 pa_tagstruct_getu32(t
, &card
) < 0) {
1059 pa_log("Parse failure");
1064 if (!pa_tagstruct_eof(t
)) {
1065 pa_log("Packet too long");
1069 pa_proplist_free(pl
);
1071 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1074 pa_xfree(u
->device_description
);
1075 u
->device_description
= pa_xstrdup(description
);
1077 update_description(u
);
1082 pa_module_unload_request(u
->module
, TRUE
);
1083 pa_proplist_free(pl
);
1086 /* Called from main context */
1087 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1088 struct userdata
*u
= userdata
;
1089 uint32_t idx
, owner_module
, client
, sink
;
1090 pa_usec_t buffer_usec
, sink_usec
;
1091 const char *name
, *driver
, *resample_method
;
1093 pa_sample_spec sample_spec
;
1094 pa_channel_map channel_map
;
1101 pl
= pa_proplist_new();
1103 if (command
!= PA_COMMAND_REPLY
) {
1104 if (command
== PA_COMMAND_ERROR
)
1105 pa_log("Failed to get info.");
1107 pa_log("Protocol error.");
1111 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1112 pa_tagstruct_gets(t
, &name
) < 0 ||
1113 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1114 pa_tagstruct_getu32(t
, &client
) < 0 ||
1115 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1116 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1117 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1118 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1119 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1120 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1121 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1122 pa_tagstruct_gets(t
, &driver
) < 0) {
1124 pa_log("Parse failure");
1128 if (u
->version
>= 11) {
1129 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1131 pa_log("Parse failure");
1136 if (u
->version
>= 13) {
1137 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1139 pa_log("Parse failure");
1144 if (!pa_tagstruct_eof(t
)) {
1145 pa_log("Packet too long");
1149 pa_proplist_free(pl
);
1151 if (idx
!= u
->device_index
)
1156 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1157 pa_cvolume_equal(&volume
, &u
->sink
->virtual_volume
))
1160 pa_sink_volume_changed(u
->sink
, &volume
);
1162 if (u
->version
>= 11)
1163 pa_sink_mute_changed(u
->sink
, mute
);
1168 pa_module_unload_request(u
->module
, TRUE
);
1169 pa_proplist_free(pl
);
1174 /* Called from main context */
1175 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1176 struct userdata
*u
= userdata
;
1177 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1178 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1183 pa_usec_t latency
, configured_latency
;
1189 pl
= pa_proplist_new();
1191 if (command
!= PA_COMMAND_REPLY
) {
1192 if (command
== PA_COMMAND_ERROR
)
1193 pa_log("Failed to get info.");
1195 pa_log("Protocol error.");
1199 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1200 pa_tagstruct_gets(t
, &name
) < 0 ||
1201 pa_tagstruct_gets(t
, &description
) < 0 ||
1202 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1203 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1204 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1205 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1206 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1207 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1208 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1209 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1210 pa_tagstruct_gets(t
, &driver
) < 0 ||
1211 pa_tagstruct_getu32(t
, &flags
) < 0) {
1213 pa_log("Parse failure");
1217 if (u
->version
>= 13) {
1218 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1219 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1221 pa_log("Parse failure");
1226 if (u
->version
>= 15) {
1227 pa_volume_t base_volume
;
1228 uint32_t state
, n_volume_steps
, card
;
1230 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1231 pa_tagstruct_getu32(t
, &state
) < 0 ||
1232 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1233 pa_tagstruct_getu32(t
, &card
) < 0) {
1235 pa_log("Parse failure");
1240 if (!pa_tagstruct_eof(t
)) {
1241 pa_log("Packet too long");
1245 pa_proplist_free(pl
);
1247 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1250 pa_xfree(u
->device_description
);
1251 u
->device_description
= pa_xstrdup(description
);
1253 update_description(u
);
1258 pa_module_unload_request(u
->module
, TRUE
);
1259 pa_proplist_free(pl
);
1264 /* Called from main context */
1265 static void request_info(struct userdata
*u
) {
1270 t
= pa_tagstruct_new(NULL
, 0);
1271 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1272 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1273 pa_pstream_send_tagstruct(u
->pstream
, t
);
1274 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1277 t
= pa_tagstruct_new(NULL
, 0);
1278 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1279 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1280 pa_tagstruct_putu32(t
, u
->device_index
);
1281 pa_pstream_send_tagstruct(u
->pstream
, t
);
1282 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1285 t
= pa_tagstruct_new(NULL
, 0);
1286 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1287 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1288 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1289 pa_tagstruct_puts(t
, u
->sink_name
);
1290 pa_pstream_send_tagstruct(u
->pstream
, t
);
1291 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1294 if (u
->source_name
) {
1295 t
= pa_tagstruct_new(NULL
, 0);
1296 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1297 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1298 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1299 pa_tagstruct_puts(t
, u
->source_name
);
1300 pa_pstream_send_tagstruct(u
->pstream
, t
);
1301 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1306 /* Called from main context */
1307 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1308 struct userdata
*u
= userdata
;
1309 pa_subscription_event_type_t e
;
1315 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1317 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1318 pa_tagstruct_getu32(t
, &idx
) < 0) {
1319 pa_log("Invalid protocol reply");
1320 pa_module_unload_request(u
->module
, TRUE
);
1324 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1326 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1327 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1329 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1337 /* Called from main context */
1338 static void start_subscribe(struct userdata
*u
) {
1343 t
= pa_tagstruct_new(NULL
, 0);
1344 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1345 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1346 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1348 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1350 PA_SUBSCRIPTION_MASK_SOURCE
1354 pa_pstream_send_tagstruct(u
->pstream
, t
);
1357 /* Called from main context */
1358 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1359 struct userdata
*u
= userdata
;
1367 pa_assert(u
->pdispatch
== pd
);
1369 if (command
!= PA_COMMAND_REPLY
) {
1370 if (command
== PA_COMMAND_ERROR
)
1371 pa_log("Failed to create stream.");
1373 pa_log("Protocol error.");
1377 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1378 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1380 || pa_tagstruct_getu32(t
, &bytes
) < 0
1385 if (u
->version
>= 9) {
1387 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1388 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1389 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1390 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1393 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1394 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1399 if (u
->version
>= 12) {
1402 uint32_t device_index
;
1404 pa_bool_t suspended
;
1406 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1407 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1408 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1409 pa_tagstruct_gets(t
, &dn
) < 0 ||
1410 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1414 pa_xfree(u
->sink_name
);
1415 u
->sink_name
= pa_xstrdup(dn
);
1417 pa_xfree(u
->source_name
);
1418 u
->source_name
= pa_xstrdup(dn
);
1422 if (u
->version
>= 13) {
1425 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1428 /* #ifdef TUNNEL_SINK */
1429 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1431 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1435 if (!pa_tagstruct_eof(t
))
1441 pa_assert(!u
->time_event
);
1442 pa_gettimeofday(&ntv
);
1443 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1444 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1448 pa_log_debug("Stream created.");
1451 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1457 pa_log("Invalid reply. (Create stream)");
1460 pa_module_unload_request(u
->module
, TRUE
);
1464 /* Called from main context */
1465 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1466 struct userdata
*u
= userdata
;
1467 pa_tagstruct
*reply
;
1468 char name
[256], un
[128], hn
[128];
1475 pa_assert(u
->pdispatch
== pd
);
1477 if (command
!= PA_COMMAND_REPLY
||
1478 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1479 !pa_tagstruct_eof(t
)) {
1481 if (command
== PA_COMMAND_ERROR
)
1482 pa_log("Failed to authenticate");
1484 pa_log("Protocol error.");
1489 /* Minimum supported protocol version */
1490 if (u
->version
< 8) {
1491 pa_log("Incompatible protocol version");
1495 /* Starting with protocol version 13 the MSB of the version tag
1496 reflects if shm is enabled for this connection or not. We don't
1497 support SHM here at all, so we just ignore this. */
1499 if (u
->version
>= 13)
1500 u
->version
&= 0x7FFFFFFFU
;
1502 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1505 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1506 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1508 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1510 pa_get_user_name(un
, sizeof(un
)),
1511 pa_get_host_name(hn
, sizeof(hn
)));
1513 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1514 pa_source_update_proplist(u
->source
, 0, NULL
);
1516 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1518 pa_get_user_name(un
, sizeof(un
)),
1519 pa_get_host_name(hn
, sizeof(hn
)));
1522 reply
= pa_tagstruct_new(NULL
, 0);
1523 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1524 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1526 if (u
->version
>= 13) {
1528 pl
= pa_proplist_new();
1529 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1530 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1531 pa_init_proplist(pl
);
1532 pa_tagstruct_put_proplist(reply
, pl
);
1533 pa_proplist_free(pl
);
1535 pa_tagstruct_puts(reply
, "PulseAudio");
1537 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1538 /* We ignore the server's reply here */
1540 reply
= pa_tagstruct_new(NULL
, 0);
1542 if (u
->version
< 13)
1543 /* Only for older PA versions we need to fill in the maxlength */
1544 u
->maxlength
= 4*1024*1024;
1547 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1548 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1549 u
->prebuf
= u
->tlength
;
1551 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1555 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1556 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1558 if (u
->version
< 13)
1559 pa_tagstruct_puts(reply
, name
);
1561 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1562 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1563 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1564 pa_tagstruct_puts(reply
, u
->sink_name
);
1565 pa_tagstruct_putu32(reply
, u
->maxlength
);
1566 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1567 pa_tagstruct_putu32(reply
, u
->tlength
);
1568 pa_tagstruct_putu32(reply
, u
->prebuf
);
1569 pa_tagstruct_putu32(reply
, u
->minreq
);
1570 pa_tagstruct_putu32(reply
, 0);
1571 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1572 pa_tagstruct_put_cvolume(reply
, &volume
);
1574 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1575 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1577 if (u
->version
< 13)
1578 pa_tagstruct_puts(reply
, name
);
1580 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1581 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1582 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1583 pa_tagstruct_puts(reply
, u
->source_name
);
1584 pa_tagstruct_putu32(reply
, u
->maxlength
);
1585 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1586 pa_tagstruct_putu32(reply
, u
->fragsize
);
1589 if (u
->version
>= 12) {
1590 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1591 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1592 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1593 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1594 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1595 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1596 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1599 if (u
->version
>= 13) {
1602 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1603 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1605 pl
= pa_proplist_new();
1606 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1607 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1608 pa_tagstruct_put_proplist(reply
, pl
);
1609 pa_proplist_free(pl
);
1612 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1616 if (u
->version
>= 14) {
1618 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1620 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1623 if (u
->version
>= 15) {
1625 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1627 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1628 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1631 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1632 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1634 pa_log_debug("Connection authenticated, creating stream ...");
1639 pa_module_unload_request(u
->module
, TRUE
);
1642 /* Called from main context */
1643 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1644 struct userdata
*u
= userdata
;
1649 pa_log_warn("Stream died.");
1650 pa_module_unload_request(u
->module
, TRUE
);
1653 /* Called from main context */
1654 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1655 struct userdata
*u
= userdata
;
1661 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1662 pa_log("Invalid packet");
1663 pa_module_unload_request(u
->module
, TRUE
);
1669 /* Called from main context */
1670 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1671 struct userdata
*u
= userdata
;
1677 if (channel
!= u
->channel
) {
1678 pa_log("Recieved memory block on bad channel.");
1679 pa_module_unload_request(u
->module
, TRUE
);
1683 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1685 u
->counter_delta
+= (int64_t) chunk
->length
;
1689 /* Called from main context */
1690 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1691 struct userdata
*u
= userdata
;
1697 pa_assert(u
->client
== sc
);
1699 pa_socket_client_unref(u
->client
);
1703 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1704 pa_module_unload_request(u
->module
, TRUE
);
1708 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1709 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1711 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1712 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1714 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1717 t
= pa_tagstruct_new(NULL
, 0);
1718 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1719 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1720 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1722 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1728 if (pa_iochannel_creds_supported(io
))
1729 pa_iochannel_creds_enable(io
);
1731 ucred
.uid
= getuid();
1732 ucred
.gid
= getgid();
1734 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1737 pa_pstream_send_tagstruct(u
->pstream
, t
);
1740 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1742 pa_log_debug("Connection established, authenticating ...");
1747 /* Called from main context */
1748 static void sink_set_volume(pa_sink
*sink
) {
1757 t
= pa_tagstruct_new(NULL
, 0);
1758 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1759 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1760 pa_tagstruct_putu32(t
, u
->device_index
);
1761 pa_tagstruct_put_cvolume(t
, &sink
->virtual_volume
);
1762 pa_pstream_send_tagstruct(u
->pstream
, t
);
1765 /* Called from main context */
1766 static void sink_set_mute(pa_sink
*sink
) {
1775 if (u
->version
< 11)
1778 t
= pa_tagstruct_new(NULL
, 0);
1779 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1780 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1781 pa_tagstruct_putu32(t
, u
->device_index
);
1782 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1783 pa_pstream_send_tagstruct(u
->pstream
, t
);
1788 int pa__init(pa_module
*m
) {
1789 pa_modargs
*ma
= NULL
;
1790 struct userdata
*u
= NULL
;
1795 pa_sink_new_data data
;
1797 pa_source_new_data data
;
1802 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1803 pa_log("Failed to parse module arguments");
1807 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1811 u
->pdispatch
= NULL
;
1813 u
->server_name
= NULL
;
1815 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1817 u
->requested_bytes
= 0;
1819 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1822 u
->smoother
= pa_smoother_new(
1831 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1832 u
->time_event
= NULL
;
1833 u
->ignore_latency_before
= 0;
1834 u
->transport_usec
= u
->thread_transport_usec
= 0;
1835 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1836 u
->counter
= u
->counter_delta
= 0;
1838 u
->rtpoll
= pa_rtpoll_new();
1839 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1841 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1844 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1845 pa_log("No server specified.");
1849 ss
= m
->core
->default_sample_spec
;
1850 map
= m
->core
->default_channel_map
;
1851 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1852 pa_log("Invalid sample format specification");
1856 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1857 pa_log("Failed to connect to server '%s'", u
->server_name
);
1861 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1865 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1866 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1868 pa_sink_new_data_init(&data
);
1869 data
.driver
= __FILE__
;
1871 data
.namereg_fail
= TRUE
;
1872 pa_sink_new_data_set_name(&data
, dn
);
1873 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1874 pa_sink_new_data_set_channel_map(&data
, &map
);
1875 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1876 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1878 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1880 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1881 pa_log("Invalid properties");
1882 pa_sink_new_data_done(&data
);
1886 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
|PA_SINK_HW_MUTE_CTRL
);
1887 pa_sink_new_data_done(&data
);
1890 pa_log("Failed to create sink.");
1894 u
->sink
->parent
.process_msg
= sink_process_msg
;
1895 u
->sink
->userdata
= u
;
1896 u
->sink
->set_state
= sink_set_state
;
1897 u
->sink
->set_volume
= sink_set_volume
;
1898 u
->sink
->set_mute
= sink_set_mute
;
1900 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1902 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1904 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1905 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1909 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1910 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1912 pa_source_new_data_init(&data
);
1913 data
.driver
= __FILE__
;
1915 data
.namereg_fail
= TRUE
;
1916 pa_source_new_data_set_name(&data
, dn
);
1917 pa_source_new_data_set_sample_spec(&data
, &ss
);
1918 pa_source_new_data_set_channel_map(&data
, &map
);
1919 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1920 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1922 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1924 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1925 pa_log("Invalid properties");
1926 pa_source_new_data_done(&data
);
1930 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1931 pa_source_new_data_done(&data
);
1934 pa_log("Failed to create source.");
1938 u
->source
->parent
.process_msg
= source_process_msg
;
1939 u
->source
->set_state
= source_set_state
;
1940 u
->source
->userdata
= u
;
1942 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1944 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1945 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1950 u
->time_event
= NULL
;
1952 u
->maxlength
= (uint32_t) -1;
1954 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
1956 u
->fragsize
= (uint32_t) -1;
1959 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1960 pa_log("Failed to create thread.");
1965 pa_sink_put(u
->sink
);
1967 pa_source_put(u
->source
);
1970 pa_modargs_free(ma
);
1978 pa_modargs_free(ma
);
1985 void pa__done(pa_module
*m
) {
1990 if (!(u
= m
->userdata
))
1995 pa_sink_unlink(u
->sink
);
1998 pa_source_unlink(u
->source
);
2002 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2003 pa_thread_free(u
->thread
);
2006 pa_thread_mq_done(&u
->thread_mq
);
2010 pa_sink_unref(u
->sink
);
2013 pa_source_unref(u
->source
);
2017 pa_rtpoll_free(u
->rtpoll
);
2020 pa_pstream_unlink(u
->pstream
);
2021 pa_pstream_unref(u
->pstream
);
2025 pa_pdispatch_unref(u
->pdispatch
);
2028 pa_socket_client_unref(u
->client
);
2031 pa_auth_cookie_unref(u
->auth_cookie
);
2034 pa_smoother_free(u
->smoother
);
2037 u
->core
->mainloop
->time_free(u
->time_event
);
2040 pa_xfree(u
->sink_name
);
2042 pa_xfree(u
->source_name
);
2044 pa_xfree(u
->server_name
);
2046 pa_xfree(u
->device_description
);
2047 pa_xfree(u
->server_fqdn
);
2048 pa_xfree(u
->user_name
);