2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
67 "sink_name=<name for the local sink> "
68 "sink_properties=<properties for the local sink> "
70 "sink=<remote sink name> "
72 "format=<sample format> "
73 "channels=<number of channels> "
75 "channel_map=<channel map>");
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source_name=<name for the local source> "
80 "source_properties=<properties for the local source> "
82 "source=<remote source name> "
84 "format=<sample format> "
85 "channels=<number of channels> "
87 "channel_map=<channel map>");
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION
);
92 PA_MODULE_LOAD_ONCE(FALSE
);
94 static const char* const valid_modargs
[] = {
113 #define DEFAULT_TIMEOUT 5
115 #define LATENCY_INTERVAL 10
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
122 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
123 SINK_MESSAGE_REMOTE_SUSPEND
,
124 SINK_MESSAGE_UPDATE_LATENCY
,
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
134 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
135 SOURCE_MESSAGE_REMOTE_SUSPEND
,
136 SOURCE_MESSAGE_UPDATE_LATENCY
139 #define DEFAULT_FRAGSIZE_MSEC 25
144 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
151 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
153 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
155 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
157 [PA_COMMAND_REQUEST
] = command_request
,
158 [PA_COMMAND_STARTED
] = command_started
,
160 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
161 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
162 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
163 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
164 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
165 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
166 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
167 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
168 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
169 [PA_COMMAND_PLAYBACK_STREAM_EVENT
] = command_stream_or_client_event
,
170 [PA_COMMAND_RECORD_STREAM_EVENT
] = command_stream_or_client_event
,
171 [PA_COMMAND_CLIENT_EVENT
] = command_stream_or_client_event
,
172 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
,
173 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
] = command_stream_buffer_attr_changed
180 pa_thread_mq thread_mq
;
184 pa_socket_client
*client
;
186 pa_pdispatch
*pdispatch
;
192 size_t requested_bytes
;
198 pa_auth_cookie
*auth_cookie
;
202 uint32_t device_index
;
205 int64_t counter
, counter_delta
;
207 pa_bool_t remote_corked
:1;
208 pa_bool_t remote_suspended
:1;
210 pa_usec_t transport_usec
; /* maintained in the main thread */
211 pa_usec_t thread_transport_usec
; /* maintained in the IO thread */
213 uint32_t ignore_latency_before
;
215 pa_time_event
*time_event
;
217 pa_smoother
*smoother
;
219 char *device_description
;
233 static void request_latency(struct userdata
*u
);
235 /* Called from main context */
236 static void command_stream_or_client_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
237 pa_log_debug("Got stream or client event.");
240 /* Called from main context */
241 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
242 struct userdata
*u
= userdata
;
247 pa_assert(u
->pdispatch
== pd
);
249 pa_log_warn("Stream killed");
250 pa_module_unload_request(u
->module
, TRUE
);
253 /* Called from main context */
254 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
255 struct userdata
*u
= userdata
;
260 pa_assert(u
->pdispatch
== pd
);
262 pa_log_info("Server signalled buffer overrun/underrun.");
266 /* Called from main context */
267 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
268 struct userdata
*u
= userdata
;
275 pa_assert(u
->pdispatch
== pd
);
277 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
278 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
279 !pa_tagstruct_eof(t
)) {
281 pa_log("Invalid packet.");
282 pa_module_unload_request(u
->module
, TRUE
);
286 pa_log_debug("Server reports device suspend.");
289 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
291 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
297 /* Called from main context */
298 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
299 struct userdata
*u
= userdata
;
300 uint32_t channel
, di
;
307 pa_assert(u
->pdispatch
== pd
);
309 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
310 pa_tagstruct_getu32(t
, &di
) < 0 ||
311 pa_tagstruct_gets(t
, &dn
) < 0 ||
312 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
314 pa_log_error("Invalid packet.");
315 pa_module_unload_request(u
->module
, TRUE
);
319 pa_log_debug("Server reports a stream move.");
322 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
324 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
330 static void command_stream_buffer_attr_changed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
331 struct userdata
*u
= userdata
;
332 uint32_t channel
, maxlength
, tlength
, fragsize
, prebuf
, minreq
;
338 pa_assert(u
->pdispatch
== pd
);
340 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
341 pa_tagstruct_getu32(t
, &maxlength
) < 0) {
343 pa_log_error("Invalid packet.");
344 pa_module_unload_request(u
->module
, TRUE
);
348 if (command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
) {
349 if (pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
350 pa_tagstruct_get_usec(t
, &usec
) < 0) {
352 pa_log_error("Invalid packet.");
353 pa_module_unload_request(u
->module
, TRUE
);
357 if (pa_tagstruct_getu32(t
, &tlength
) < 0 ||
358 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
359 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
360 pa_tagstruct_get_usec(t
, &usec
) < 0) {
362 pa_log_error("Invalid packet.");
363 pa_module_unload_request(u
->module
, TRUE
);
369 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength
, (unsigned long) u
->tlength
);
377 /* Called from main context */
378 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
379 struct userdata
*u
= userdata
;
384 pa_assert(u
->pdispatch
== pd
);
386 pa_log_debug("Server reports playback started.");
392 /* Called from IO thread context */
393 static void check_smoother_status(struct userdata
*u
, pa_bool_t past
) {
398 x
= pa_rtclock_usec();
400 /* Correct by the time the requested issued needs to travel to the
401 * other side. This is a valid thread-safe access, because the
402 * main thread is waiting for us */
405 x
-= u
->thread_transport_usec
;
407 x
+= u
->thread_transport_usec
;
409 if (u
->remote_suspended
|| u
->remote_corked
)
410 pa_smoother_pause(u
->smoother
, x
);
412 pa_smoother_resume(u
->smoother
, x
, TRUE
);
415 /* Called from IO thread context */
416 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
419 if (u
->remote_corked
== cork
)
422 u
->remote_corked
= cork
;
423 check_smoother_status(u
, FALSE
);
426 /* Called from main context */
427 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
434 t
= pa_tagstruct_new(NULL
, 0);
436 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
438 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
440 pa_tagstruct_putu32(t
, u
->ctag
++);
441 pa_tagstruct_putu32(t
, u
->channel
);
442 pa_tagstruct_put_boolean(t
, !!cork
);
443 pa_pstream_send_tagstruct(u
->pstream
, t
);
448 /* Called from IO thread context */
449 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
452 if (u
->remote_suspended
== suspend
)
455 u
->remote_suspended
= suspend
;
456 check_smoother_status(u
, TRUE
);
461 /* Called from IO thread context */
462 static void send_data(struct userdata
*u
) {
465 while (u
->requested_bytes
> 0) {
466 pa_memchunk memchunk
;
468 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
469 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
470 pa_memblock_unref(memchunk
.memblock
);
472 u
->requested_bytes
-= memchunk
.length
;
474 u
->counter
+= (int64_t) memchunk
.length
;
478 /* This function is called from IO context -- except when it is not. */
479 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
480 struct userdata
*u
= PA_SINK(o
)->userdata
;
484 case PA_SINK_MESSAGE_SET_STATE
: {
487 /* First, change the state, because otherwide pa_sink_render() would fail */
488 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
490 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
492 if (PA_SINK_IS_OPENED(u
->sink
->state
))
499 case PA_SINK_MESSAGE_GET_LATENCY
: {
500 pa_usec_t yl
, yr
, *usec
= data
;
502 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
503 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
505 *usec
= yl
> yr
? yl
- yr
: 0;
509 case SINK_MESSAGE_REQUEST
:
511 pa_assert(offset
> 0);
512 u
->requested_bytes
+= (size_t) offset
;
514 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
520 case SINK_MESSAGE_REMOTE_SUSPEND
:
522 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
526 case SINK_MESSAGE_UPDATE_LATENCY
: {
529 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->sink
->sample_spec
);
531 if (y
> (pa_usec_t
) offset
)
532 y
-= (pa_usec_t
) offset
;
536 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
538 /* We can access this freely here, since the main thread is waiting for us */
539 u
->thread_transport_usec
= u
->transport_usec
;
544 case SINK_MESSAGE_POST
:
546 /* OK, This might be a bit confusing. This message is
547 * delivered to us from the main context -- NOT from the
548 * IO thread context where the rest of the messages are
549 * dispatched. Yeah, ugly, but I am a lazy bastard. */
551 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
553 u
->counter_delta
+= (int64_t) chunk
->length
;
558 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
561 /* Called from main context */
562 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
564 pa_sink_assert_ref(s
);
567 switch ((pa_sink_state_t
) state
) {
569 case PA_SINK_SUSPENDED
:
570 pa_assert(PA_SINK_IS_OPENED(s
->state
));
571 stream_cork(u
, TRUE
);
575 case PA_SINK_RUNNING
:
576 if (s
->state
== PA_SINK_SUSPENDED
)
577 stream_cork(u
, FALSE
);
580 case PA_SINK_UNLINKED
:
582 case PA_SINK_INVALID_STATE
:
591 /* This function is called from IO context -- except when it is not. */
592 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
593 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
597 case PA_SOURCE_MESSAGE_SET_STATE
: {
600 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
601 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
606 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
607 pa_usec_t yr
, yl
, *usec
= data
;
609 yl
= pa_bytes_to_usec((uint64_t) u
->counter
, &PA_SOURCE(o
)->sample_spec
);
610 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
612 *usec
= yr
> yl
? yr
- yl
: 0;
616 case SOURCE_MESSAGE_POST
:
618 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
619 pa_source_post(u
->source
, chunk
);
621 u
->counter
+= (int64_t) chunk
->length
;
625 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
627 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
630 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
633 y
= pa_bytes_to_usec((uint64_t) u
->counter
, &u
->source
->sample_spec
);
634 y
+= (pa_usec_t
) offset
;
636 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
638 /* We can access this freely here, since the main thread is waiting for us */
639 u
->thread_transport_usec
= u
->transport_usec
;
645 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
648 /* Called from main context */
649 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
651 pa_source_assert_ref(s
);
654 switch ((pa_source_state_t
) state
) {
656 case PA_SOURCE_SUSPENDED
:
657 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
658 stream_cork(u
, TRUE
);
662 case PA_SOURCE_RUNNING
:
663 if (s
->state
== PA_SOURCE_SUSPENDED
)
664 stream_cork(u
, FALSE
);
667 case PA_SOURCE_UNLINKED
:
669 case PA_SINK_INVALID_STATE
:
678 static void thread_func(void *userdata
) {
679 struct userdata
*u
= userdata
;
683 pa_log_debug("Thread starting up");
685 pa_thread_mq_install(&u
->thread_mq
);
691 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
692 if (PA_UNLIKELY(u
->sink
->thread_info
.rewind_requested
))
693 pa_sink_process_rewind(u
->sink
, 0);
696 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
704 /* If this was no regular exit from the loop we have to continue
705 * processing messages until we received PA_MESSAGE_SHUTDOWN */
706 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
707 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
710 pa_log_debug("Thread shutting down");
714 /* Called from main context */
715 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
716 struct userdata
*u
= userdata
;
717 uint32_t bytes
, channel
;
720 pa_assert(command
== PA_COMMAND_REQUEST
);
723 pa_assert(u
->pdispatch
== pd
);
725 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
726 pa_tagstruct_getu32(t
, &bytes
) < 0) {
727 pa_log("Invalid protocol reply");
731 if (channel
!= u
->channel
) {
732 pa_log("Received data for invalid channel");
736 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
740 pa_module_unload_request(u
->module
, TRUE
);
745 /* Called from main context */
746 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
747 struct userdata
*u
= userdata
;
748 pa_usec_t sink_usec
, source_usec
;
750 int64_t write_index
, read_index
;
751 struct timeval local
, remote
, now
;
758 if (command
!= PA_COMMAND_REPLY
) {
759 if (command
== PA_COMMAND_ERROR
)
760 pa_log("Failed to get latency.");
762 pa_log("Protocol error.");
766 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
767 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
768 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
769 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
770 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
771 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
772 pa_tagstruct_gets64(t
, &read_index
) < 0) {
773 pa_log("Invalid reply.");
778 if (u
->version
>= 13) {
779 uint64_t underrun_for
= 0, playing_for
= 0;
781 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
782 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
783 pa_log("Invalid reply.");
789 if (!pa_tagstruct_eof(t
)) {
790 pa_log("Invalid reply.");
794 if (tag
< u
->ignore_latency_before
) {
798 pa_gettimeofday(&now
);
800 /* Calculate transport usec */
801 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
802 /* local and remote seem to have synchronized clocks */
804 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
806 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
809 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
811 /* First, take the device's delay */
813 delay
= (int64_t) sink_usec
;
814 ss
= &u
->sink
->sample_spec
;
816 delay
= (int64_t) source_usec
;
817 ss
= &u
->source
->sample_spec
;
820 /* Add the length of our server-side buffer */
821 if (write_index
>= read_index
)
822 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) (write_index
-read_index
), ss
);
824 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) (read_index
-write_index
), ss
);
826 /* Our measurements are already out of date, hence correct by the *
827 * transport latency */
829 delay
-= (int64_t) u
->transport_usec
;
831 delay
+= (int64_t) u
->transport_usec
;
834 /* Now correct by what we have have read/written since we requested the update */
836 delay
+= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
838 delay
-= (int64_t) pa_bytes_to_usec((uint64_t) u
->counter_delta
, ss
);
842 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
844 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
851 pa_module_unload_request(u
->module
, TRUE
);
854 /* Called from main context */
855 static void request_latency(struct userdata
*u
) {
861 t
= pa_tagstruct_new(NULL
, 0);
863 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
865 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
867 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
868 pa_tagstruct_putu32(t
, u
->channel
);
870 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
872 pa_pstream_send_tagstruct(u
->pstream
, t
);
873 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
875 u
->ignore_latency_before
= tag
;
876 u
->counter_delta
= 0;
879 /* Called from main context */
880 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
881 struct userdata
*u
= userdata
;
890 pa_gettimeofday(&ntv
);
891 ntv
.tv_sec
+= LATENCY_INTERVAL
;
892 m
->time_restart(e
, &ntv
);
895 /* Called from main context */
896 static void update_description(struct userdata
*u
) {
898 char un
[128], hn
[128];
903 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
906 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
909 pa_sink_set_description(u
->sink
, d
);
910 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
911 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
912 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
914 pa_source_set_description(u
->source
, d
);
915 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
916 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
917 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
922 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
923 pa_get_user_name(un
, sizeof(un
)),
924 pa_get_host_name(hn
, sizeof(hn
)));
926 t
= pa_tagstruct_new(NULL
, 0);
928 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
930 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
932 pa_tagstruct_putu32(t
, u
->ctag
++);
933 pa_tagstruct_putu32(t
, u
->channel
);
934 pa_tagstruct_puts(t
, d
);
935 pa_pstream_send_tagstruct(u
->pstream
, t
);
940 /* Called from main context */
941 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
942 struct userdata
*u
= userdata
;
945 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
951 if (command
!= PA_COMMAND_REPLY
) {
952 if (command
== PA_COMMAND_ERROR
)
953 pa_log("Failed to get info.");
955 pa_log("Protocol error.");
959 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
960 pa_tagstruct_gets(t
, &server_version
) < 0 ||
961 pa_tagstruct_gets(t
, &user_name
) < 0 ||
962 pa_tagstruct_gets(t
, &host_name
) < 0 ||
963 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
964 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
965 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
966 pa_tagstruct_getu32(t
, &cookie
) < 0 ||
968 pa_tagstruct_get_channel_map(t
, &cm
) < 0)) {
970 pa_log("Parse failure");
974 if (!pa_tagstruct_eof(t
)) {
975 pa_log("Packet too long");
979 pa_xfree(u
->server_fqdn
);
980 u
->server_fqdn
= pa_xstrdup(host_name
);
982 pa_xfree(u
->user_name
);
983 u
->user_name
= pa_xstrdup(user_name
);
985 update_description(u
);
990 pa_module_unload_request(u
->module
, TRUE
);
995 /* Called from main context */
996 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
997 struct userdata
*u
= userdata
;
998 uint32_t idx
, owner_module
, monitor_source
, flags
;
999 const char *name
, *description
, *monitor_source_name
, *driver
;
1010 pl
= pa_proplist_new();
1012 if (command
!= PA_COMMAND_REPLY
) {
1013 if (command
== PA_COMMAND_ERROR
)
1014 pa_log("Failed to get info.");
1016 pa_log("Protocol error.");
1020 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1021 pa_tagstruct_gets(t
, &name
) < 0 ||
1022 pa_tagstruct_gets(t
, &description
) < 0 ||
1023 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1024 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1025 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1026 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1027 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1028 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
1029 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
1030 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1031 pa_tagstruct_gets(t
, &driver
) < 0 ||
1032 pa_tagstruct_getu32(t
, &flags
) < 0) {
1034 pa_log("Parse failure");
1038 if (u
->version
>= 13) {
1039 pa_usec_t configured_latency
;
1041 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1042 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1044 pa_log("Parse failure");
1049 if (u
->version
>= 15) {
1050 pa_volume_t base_volume
;
1051 uint32_t state
, n_volume_steps
, card
;
1053 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1054 pa_tagstruct_getu32(t
, &state
) < 0 ||
1055 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1056 pa_tagstruct_getu32(t
, &card
) < 0) {
1058 pa_log("Parse failure");
1063 if (!pa_tagstruct_eof(t
)) {
1064 pa_log("Packet too long");
1068 pa_proplist_free(pl
);
1070 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
1073 pa_xfree(u
->device_description
);
1074 u
->device_description
= pa_xstrdup(description
);
1076 update_description(u
);
1081 pa_module_unload_request(u
->module
, TRUE
);
1082 pa_proplist_free(pl
);
1085 /* Called from main context */
1086 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1087 struct userdata
*u
= userdata
;
1088 uint32_t idx
, owner_module
, client
, sink
;
1089 pa_usec_t buffer_usec
, sink_usec
;
1090 const char *name
, *driver
, *resample_method
;
1092 pa_sample_spec sample_spec
;
1093 pa_channel_map channel_map
;
1100 pl
= pa_proplist_new();
1102 if (command
!= PA_COMMAND_REPLY
) {
1103 if (command
== PA_COMMAND_ERROR
)
1104 pa_log("Failed to get info.");
1106 pa_log("Protocol error.");
1110 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1111 pa_tagstruct_gets(t
, &name
) < 0 ||
1112 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1113 pa_tagstruct_getu32(t
, &client
) < 0 ||
1114 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1115 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1116 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1117 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1118 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1119 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1120 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1121 pa_tagstruct_gets(t
, &driver
) < 0) {
1123 pa_log("Parse failure");
1127 if (u
->version
>= 11) {
1128 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1130 pa_log("Parse failure");
1135 if (u
->version
>= 13) {
1136 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1138 pa_log("Parse failure");
1143 if (!pa_tagstruct_eof(t
)) {
1144 pa_log("Packet too long");
1148 pa_proplist_free(pl
);
1150 if (idx
!= u
->device_index
)
1155 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1156 pa_cvolume_equal(&volume
, &u
->sink
->virtual_volume
))
1159 pa_sink_volume_changed(u
->sink
, &volume
, FALSE
);
1161 if (u
->version
>= 11)
1162 pa_sink_mute_changed(u
->sink
, mute
, FALSE
);
1167 pa_module_unload_request(u
->module
, TRUE
);
1168 pa_proplist_free(pl
);
1173 /* Called from main context */
1174 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1175 struct userdata
*u
= userdata
;
1176 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1177 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1182 pa_usec_t latency
, configured_latency
;
1188 pl
= pa_proplist_new();
1190 if (command
!= PA_COMMAND_REPLY
) {
1191 if (command
== PA_COMMAND_ERROR
)
1192 pa_log("Failed to get info.");
1194 pa_log("Protocol error.");
1198 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1199 pa_tagstruct_gets(t
, &name
) < 0 ||
1200 pa_tagstruct_gets(t
, &description
) < 0 ||
1201 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1202 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1203 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1204 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1205 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1206 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1207 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1208 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1209 pa_tagstruct_gets(t
, &driver
) < 0 ||
1210 pa_tagstruct_getu32(t
, &flags
) < 0) {
1212 pa_log("Parse failure");
1216 if (u
->version
>= 13) {
1217 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1218 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1220 pa_log("Parse failure");
1225 if (u
->version
>= 15) {
1226 pa_volume_t base_volume
;
1227 uint32_t state
, n_volume_steps
, card
;
1229 if (pa_tagstruct_get_volume(t
, &base_volume
) < 0 ||
1230 pa_tagstruct_getu32(t
, &state
) < 0 ||
1231 pa_tagstruct_getu32(t
, &n_volume_steps
) < 0 ||
1232 pa_tagstruct_getu32(t
, &card
) < 0) {
1234 pa_log("Parse failure");
1239 if (!pa_tagstruct_eof(t
)) {
1240 pa_log("Packet too long");
1244 pa_proplist_free(pl
);
1246 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1249 pa_xfree(u
->device_description
);
1250 u
->device_description
= pa_xstrdup(description
);
1252 update_description(u
);
1257 pa_module_unload_request(u
->module
, TRUE
);
1258 pa_proplist_free(pl
);
1263 /* Called from main context */
1264 static void request_info(struct userdata
*u
) {
1269 t
= pa_tagstruct_new(NULL
, 0);
1270 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1271 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1272 pa_pstream_send_tagstruct(u
->pstream
, t
);
1273 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1276 t
= pa_tagstruct_new(NULL
, 0);
1277 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1278 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1279 pa_tagstruct_putu32(t
, u
->device_index
);
1280 pa_pstream_send_tagstruct(u
->pstream
, t
);
1281 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1284 t
= pa_tagstruct_new(NULL
, 0);
1285 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1286 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1287 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1288 pa_tagstruct_puts(t
, u
->sink_name
);
1289 pa_pstream_send_tagstruct(u
->pstream
, t
);
1290 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1293 if (u
->source_name
) {
1294 t
= pa_tagstruct_new(NULL
, 0);
1295 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1296 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1297 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1298 pa_tagstruct_puts(t
, u
->source_name
);
1299 pa_pstream_send_tagstruct(u
->pstream
, t
);
1300 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1305 /* Called from main context */
1306 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1307 struct userdata
*u
= userdata
;
1308 pa_subscription_event_type_t e
;
1314 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1316 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1317 pa_tagstruct_getu32(t
, &idx
) < 0) {
1318 pa_log("Invalid protocol reply");
1319 pa_module_unload_request(u
->module
, TRUE
);
1323 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1325 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1326 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1328 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1336 /* Called from main context */
1337 static void start_subscribe(struct userdata
*u
) {
1342 t
= pa_tagstruct_new(NULL
, 0);
1343 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1344 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1345 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1347 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1349 PA_SUBSCRIPTION_MASK_SOURCE
1353 pa_pstream_send_tagstruct(u
->pstream
, t
);
1356 /* Called from main context */
1357 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1358 struct userdata
*u
= userdata
;
1366 pa_assert(u
->pdispatch
== pd
);
1368 if (command
!= PA_COMMAND_REPLY
) {
1369 if (command
== PA_COMMAND_ERROR
)
1370 pa_log("Failed to create stream.");
1372 pa_log("Protocol error.");
1376 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1377 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1379 || pa_tagstruct_getu32(t
, &bytes
) < 0
1384 if (u
->version
>= 9) {
1386 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1387 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1388 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1389 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1392 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1393 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1398 if (u
->version
>= 12) {
1401 uint32_t device_index
;
1403 pa_bool_t suspended
;
1405 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1406 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1407 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1408 pa_tagstruct_gets(t
, &dn
) < 0 ||
1409 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1413 pa_xfree(u
->sink_name
);
1414 u
->sink_name
= pa_xstrdup(dn
);
1416 pa_xfree(u
->source_name
);
1417 u
->source_name
= pa_xstrdup(dn
);
1421 if (u
->version
>= 13) {
1424 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1427 /* #ifdef TUNNEL_SINK */
1428 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1430 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1434 if (!pa_tagstruct_eof(t
))
1440 pa_assert(!u
->time_event
);
1441 pa_gettimeofday(&ntv
);
1442 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1443 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1447 pa_log_debug("Stream created.");
1450 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1456 pa_log("Invalid reply. (Create stream)");
1459 pa_module_unload_request(u
->module
, TRUE
);
1463 /* Called from main context */
1464 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1465 struct userdata
*u
= userdata
;
1466 pa_tagstruct
*reply
;
1467 char name
[256], un
[128], hn
[128];
1474 pa_assert(u
->pdispatch
== pd
);
1476 if (command
!= PA_COMMAND_REPLY
||
1477 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1478 !pa_tagstruct_eof(t
)) {
1480 if (command
== PA_COMMAND_ERROR
)
1481 pa_log("Failed to authenticate");
1483 pa_log("Protocol error.");
1488 /* Minimum supported protocol version */
1489 if (u
->version
< 8) {
1490 pa_log("Incompatible protocol version");
1494 /* Starting with protocol version 13 the MSB of the version tag
1495 reflects if shm is enabled for this connection or not. We don't
1496 support SHM here at all, so we just ignore this. */
1498 if (u
->version
>= 13)
1499 u
->version
&= 0x7FFFFFFFU
;
1501 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1504 pa_proplist_setf(u
->sink
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1505 pa_sink_update_proplist(u
->sink
, 0, NULL
);
1507 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1509 pa_get_user_name(un
, sizeof(un
)),
1510 pa_get_host_name(hn
, sizeof(hn
)));
1512 pa_proplist_setf(u
->source
->proplist
, "tunnel.remote_version", "%u", u
->version
);
1513 pa_source_update_proplist(u
->source
, 0, NULL
);
1515 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1517 pa_get_user_name(un
, sizeof(un
)),
1518 pa_get_host_name(hn
, sizeof(hn
)));
1521 reply
= pa_tagstruct_new(NULL
, 0);
1522 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1523 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1525 if (u
->version
>= 13) {
1527 pl
= pa_proplist_new();
1528 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1529 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1530 pa_init_proplist(pl
);
1531 pa_tagstruct_put_proplist(reply
, pl
);
1532 pa_proplist_free(pl
);
1534 pa_tagstruct_puts(reply
, "PulseAudio");
1536 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1537 /* We ignore the server's reply here */
1539 reply
= pa_tagstruct_new(NULL
, 0);
1541 if (u
->version
< 13)
1542 /* Only for older PA versions we need to fill in the maxlength */
1543 u
->maxlength
= 4*1024*1024;
1546 u
->tlength
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1547 u
->minreq
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1548 u
->prebuf
= u
->tlength
;
1550 u
->fragsize
= (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1554 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1555 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1557 if (u
->version
< 13)
1558 pa_tagstruct_puts(reply
, name
);
1560 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1561 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1562 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1563 pa_tagstruct_puts(reply
, u
->sink_name
);
1564 pa_tagstruct_putu32(reply
, u
->maxlength
);
1565 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1566 pa_tagstruct_putu32(reply
, u
->tlength
);
1567 pa_tagstruct_putu32(reply
, u
->prebuf
);
1568 pa_tagstruct_putu32(reply
, u
->minreq
);
1569 pa_tagstruct_putu32(reply
, 0);
1570 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1571 pa_tagstruct_put_cvolume(reply
, &volume
);
1573 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1574 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1576 if (u
->version
< 13)
1577 pa_tagstruct_puts(reply
, name
);
1579 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1580 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1581 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1582 pa_tagstruct_puts(reply
, u
->source_name
);
1583 pa_tagstruct_putu32(reply
, u
->maxlength
);
1584 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1585 pa_tagstruct_putu32(reply
, u
->fragsize
);
1588 if (u
->version
>= 12) {
1589 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1590 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1591 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1592 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1593 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1594 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1595 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1598 if (u
->version
>= 13) {
1601 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1602 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1604 pl
= pa_proplist_new();
1605 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1606 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1607 pa_tagstruct_put_proplist(reply
, pl
);
1608 pa_proplist_free(pl
);
1611 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1615 if (u
->version
>= 14) {
1617 pa_tagstruct_put_boolean(reply
, FALSE
); /* volume_set */
1619 pa_tagstruct_put_boolean(reply
, TRUE
); /* early rquests */
1622 if (u
->version
>= 15) {
1624 pa_tagstruct_put_boolean(reply
, FALSE
); /* muted_set */
1626 pa_tagstruct_put_boolean(reply
, FALSE
); /* don't inhibit auto suspend */
1627 pa_tagstruct_put_boolean(reply
, FALSE
); /* fail on suspend */
1630 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1631 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1633 pa_log_debug("Connection authenticated, creating stream ...");
1638 pa_module_unload_request(u
->module
, TRUE
);
1641 /* Called from main context */
1642 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1643 struct userdata
*u
= userdata
;
1648 pa_log_warn("Stream died.");
1649 pa_module_unload_request(u
->module
, TRUE
);
1652 /* Called from main context */
1653 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1654 struct userdata
*u
= userdata
;
1660 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1661 pa_log("Invalid packet");
1662 pa_module_unload_request(u
->module
, TRUE
);
1668 /* Called from main context */
1669 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1670 struct userdata
*u
= userdata
;
1676 if (channel
!= u
->channel
) {
1677 pa_log("Received memory block on bad channel.");
1678 pa_module_unload_request(u
->module
, TRUE
);
1682 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1684 u
->counter_delta
+= (int64_t) chunk
->length
;
1688 /* Called from main context */
1689 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1690 struct userdata
*u
= userdata
;
1696 pa_assert(u
->client
== sc
);
1698 pa_socket_client_unref(u
->client
);
1702 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1703 pa_module_unload_request(u
->module
, TRUE
);
1707 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1708 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1710 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1711 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1713 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1716 t
= pa_tagstruct_new(NULL
, 0);
1717 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1718 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1719 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1721 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1727 if (pa_iochannel_creds_supported(io
))
1728 pa_iochannel_creds_enable(io
);
1730 ucred
.uid
= getuid();
1731 ucred
.gid
= getgid();
1733 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1736 pa_pstream_send_tagstruct(u
->pstream
, t
);
1739 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1741 pa_log_debug("Connection established, authenticating ...");
1746 /* Called from main context */
1747 static void sink_set_volume(pa_sink
*sink
) {
1756 t
= pa_tagstruct_new(NULL
, 0);
1757 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1758 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1759 pa_tagstruct_putu32(t
, u
->device_index
);
1760 pa_tagstruct_put_cvolume(t
, &sink
->virtual_volume
);
1761 pa_pstream_send_tagstruct(u
->pstream
, t
);
1764 /* Called from main context */
1765 static void sink_set_mute(pa_sink
*sink
) {
1774 if (u
->version
< 11)
1777 t
= pa_tagstruct_new(NULL
, 0);
1778 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1779 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1780 pa_tagstruct_putu32(t
, u
->device_index
);
1781 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1782 pa_pstream_send_tagstruct(u
->pstream
, t
);
1787 int pa__init(pa_module
*m
) {
1788 pa_modargs
*ma
= NULL
;
1789 struct userdata
*u
= NULL
;
1794 pa_sink_new_data data
;
1796 pa_source_new_data data
;
1801 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1802 pa_log("Failed to parse module arguments");
1806 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1810 u
->pdispatch
= NULL
;
1812 u
->server_name
= NULL
;
1814 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1816 u
->requested_bytes
= 0;
1818 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1821 u
->smoother
= pa_smoother_new(
1830 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1831 u
->time_event
= NULL
;
1832 u
->ignore_latency_before
= 0;
1833 u
->transport_usec
= u
->thread_transport_usec
= 0;
1834 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1835 u
->counter
= u
->counter_delta
= 0;
1837 u
->rtpoll
= pa_rtpoll_new();
1838 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1840 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1843 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1844 pa_log("No server specified.");
1848 ss
= m
->core
->default_sample_spec
;
1849 map
= m
->core
->default_channel_map
;
1850 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1851 pa_log("Invalid sample format specification");
1855 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1856 pa_log("Failed to connect to server '%s'", u
->server_name
);
1860 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1864 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1865 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1867 pa_sink_new_data_init(&data
);
1868 data
.driver
= __FILE__
;
1870 data
.namereg_fail
= TRUE
;
1871 pa_sink_new_data_set_name(&data
, dn
);
1872 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1873 pa_sink_new_data_set_channel_map(&data
, &map
);
1874 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1875 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1877 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1879 if (pa_modargs_get_proplist(ma
, "sink_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1880 pa_log("Invalid properties");
1881 pa_sink_new_data_done(&data
);
1885 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
|PA_SINK_HW_MUTE_CTRL
);
1886 pa_sink_new_data_done(&data
);
1889 pa_log("Failed to create sink.");
1893 u
->sink
->parent
.process_msg
= sink_process_msg
;
1894 u
->sink
->userdata
= u
;
1895 u
->sink
->set_state
= sink_set_state
;
1896 u
->sink
->set_volume
= sink_set_volume
;
1897 u
->sink
->set_mute
= sink_set_mute
;
1899 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1901 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1903 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1904 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1908 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1909 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1911 pa_source_new_data_init(&data
);
1912 data
.driver
= __FILE__
;
1914 data
.namereg_fail
= TRUE
;
1915 pa_source_new_data_set_name(&data
, dn
);
1916 pa_source_new_data_set_sample_spec(&data
, &ss
);
1917 pa_source_new_data_set_channel_map(&data
, &map
);
1918 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1919 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1921 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1923 if (pa_modargs_get_proplist(ma
, "source_properties", data
.proplist
, PA_UPDATE_REPLACE
) < 0) {
1924 pa_log("Invalid properties");
1925 pa_source_new_data_done(&data
);
1929 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1930 pa_source_new_data_done(&data
);
1933 pa_log("Failed to create source.");
1937 u
->source
->parent
.process_msg
= source_process_msg
;
1938 u
->source
->set_state
= source_set_state
;
1939 u
->source
->userdata
= u
;
1941 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1943 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1944 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1949 u
->time_event
= NULL
;
1951 u
->maxlength
= (uint32_t) -1;
1953 u
->tlength
= u
->minreq
= u
->prebuf
= (uint32_t) -1;
1955 u
->fragsize
= (uint32_t) -1;
1958 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1959 pa_log("Failed to create thread.");
1964 pa_sink_put(u
->sink
);
1966 pa_source_put(u
->source
);
1969 pa_modargs_free(ma
);
1977 pa_modargs_free(ma
);
1984 void pa__done(pa_module
*m
) {
1989 if (!(u
= m
->userdata
))
1994 pa_sink_unlink(u
->sink
);
1997 pa_source_unlink(u
->source
);
2001 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
2002 pa_thread_free(u
->thread
);
2005 pa_thread_mq_done(&u
->thread_mq
);
2009 pa_sink_unref(u
->sink
);
2012 pa_source_unref(u
->source
);
2016 pa_rtpoll_free(u
->rtpoll
);
2019 pa_pstream_unlink(u
->pstream
);
2020 pa_pstream_unref(u
->pstream
);
2024 pa_pdispatch_unref(u
->pdispatch
);
2027 pa_socket_client_unref(u
->client
);
2030 pa_auth_cookie_unref(u
->auth_cookie
);
2033 pa_smoother_free(u
->smoother
);
2036 u
->core
->mainloop
->time_free(u
->time_event
);
2039 pa_xfree(u
->sink_name
);
2041 pa_xfree(u
->source_name
);
2043 pa_xfree(u
->server_name
);
2045 pa_xfree(u
->device_description
);
2046 pa_xfree(u
->server_fqdn
);
2047 pa_xfree(u
->user_name
);