2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 "sink=<remote sink name> "
70 "format=<sample format> "
71 "channels=<number of channels> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source=<remote source name> "
81 "format=<sample format> "
82 "channels=<number of channels> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION
);
90 PA_MODULE_LOAD_ONCE(FALSE
);
92 static const char* const valid_modargs
[] = {
109 #define DEFAULT_TIMEOUT 5
111 #define LATENCY_INTERVAL 10
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
119 SINK_MESSAGE_REMOTE_SUSPEND
,
120 SINK_MESSAGE_UPDATE_LATENCY
,
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
130 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
131 SOURCE_MESSAGE_REMOTE_SUSPEND
,
132 SOURCE_MESSAGE_UPDATE_LATENCY
135 #define DEFAULT_FRAGSIZE_MSEC 25
140 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
141 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
143 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
144 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
145 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
151 [PA_COMMAND_REQUEST
] = command_request
,
152 [PA_COMMAND_STARTED
] = command_started
,
154 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
155 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
156 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
157 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
158 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
159 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
160 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
161 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
162 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
169 pa_thread_mq thread_mq
;
173 pa_socket_client
*client
;
175 pa_pdispatch
*pdispatch
;
181 int32_t requested_bytes
;
187 pa_auth_cookie
*auth_cookie
;
191 uint32_t device_index
;
194 int64_t counter
, counter_delta
;
196 pa_bool_t remote_corked
:1;
197 pa_bool_t remote_suspended
:1;
199 pa_usec_t transport_usec
;
200 pa_bool_t transport_usec_valid
;
202 uint32_t ignore_latency_before
;
204 pa_time_event
*time_event
;
206 pa_smoother
*smoother
;
208 char *device_description
;
222 static void request_latency(struct userdata
*u
);
224 /* Called from main context */
225 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
226 struct userdata
*u
= userdata
;
231 pa_assert(u
->pdispatch
== pd
);
233 pa_log_warn("Stream killed");
234 pa_module_unload_request(u
->module
);
237 /* Called from main context */
238 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
239 struct userdata
*u
= userdata
;
244 pa_assert(u
->pdispatch
== pd
);
246 pa_log_info("Server signalled buffer overrun/underrun.");
250 /* Called from main context */
251 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
252 struct userdata
*u
= userdata
;
259 pa_assert(u
->pdispatch
== pd
);
261 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
262 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
263 !pa_tagstruct_eof(t
)) {
264 pa_log("Invalid packet");
265 pa_module_unload_request(u
->module
);
270 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
272 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
278 /* Called from main context */
279 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
280 struct userdata
*u
= userdata
;
285 pa_assert(u
->pdispatch
== pd
);
287 pa_log_debug("Server reports a stream move.");
293 /* Called from main context */
294 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
295 struct userdata
*u
= userdata
;
300 pa_assert(u
->pdispatch
== pd
);
302 pa_log_debug("Server reports playback started.");
308 /* Called from IO thread context */
309 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
313 if (u
->remote_corked
== cork
)
316 u
->remote_corked
= cork
;
317 x
= pa_rtclock_usec();
319 /* Correct by the time this needs to travel to the other side.
320 * This is a valid thread-safe access, because the main thread is
322 if (u
->transport_usec_valid
)
323 x
+= u
->transport_usec
;
325 if (u
->remote_suspended
|| u
->remote_corked
)
326 pa_smoother_pause(u
->smoother
, x
);
328 pa_smoother_resume(u
->smoother
, x
);
331 /* Called from main context */
332 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
339 t
= pa_tagstruct_new(NULL
, 0);
341 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
343 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
345 pa_tagstruct_putu32(t
, u
->ctag
++);
346 pa_tagstruct_putu32(t
, u
->channel
);
347 pa_tagstruct_put_boolean(t
, !!cork
);
348 pa_pstream_send_tagstruct(u
->pstream
, t
);
353 /* Called from IO thread context */
354 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
358 if (u
->remote_suspended
== suspend
)
361 u
->remote_suspended
= suspend
;
363 x
= pa_rtclock_usec();
365 /* Correct by the time this needed to travel from the other side.
366 * This is a valid thread-safe access, because the main thread is
368 if (u
->transport_usec_valid
)
369 x
-= u
->transport_usec
;
371 if (u
->remote_suspended
|| u
->remote_corked
)
372 pa_smoother_pause(u
->smoother
, x
);
374 pa_smoother_resume(u
->smoother
, x
);
379 /* Called from IO thread context */
380 static void send_data(struct userdata
*u
) {
383 while (u
->requested_bytes
> 0) {
384 pa_memchunk memchunk
;
386 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
387 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
388 pa_memblock_unref(memchunk
.memblock
);
390 u
->requested_bytes
-= memchunk
.length
;
392 u
->counter
+= memchunk
.length
;
396 /* This function is called from IO context -- except when it is not. */
397 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
398 struct userdata
*u
= PA_SINK(o
)->userdata
;
402 case PA_SINK_MESSAGE_SET_STATE
: {
405 /* First, change the state, because otherwide pa_sink_render() would fail */
406 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
408 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
410 if (PA_SINK_IS_OPENED(u
->sink
->state
))
417 case PA_SINK_MESSAGE_GET_LATENCY
: {
418 pa_usec_t yl
, yr
, *usec
= data
;
420 yl
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
421 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
423 *usec
= yl
> yr
? yl
- yr
: 0;
427 case SINK_MESSAGE_REQUEST
:
429 pa_assert(offset
> 0);
430 u
->requested_bytes
+= (size_t) offset
;
432 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
438 case SINK_MESSAGE_REMOTE_SUSPEND
:
440 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
444 case SINK_MESSAGE_UPDATE_LATENCY
: {
447 y
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
449 if (y
> (pa_usec_t
) offset
|| offset
< 0)
454 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
459 case SINK_MESSAGE_POST
:
461 /* OK, This might be a bit confusing. This message is
462 * delivered to us from the main context -- NOT from the
463 * IO thread context where the rest of the messages are
464 * dispatched. Yeah, ugly, but I am a lazy bastard. */
466 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
468 u
->counter_delta
+= chunk
->length
;
473 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
476 /* Called from main context */
477 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
479 pa_sink_assert_ref(s
);
482 switch ((pa_sink_state_t
) state
) {
484 case PA_SINK_SUSPENDED
:
485 pa_assert(PA_SINK_IS_OPENED(s
->state
));
486 stream_cork(u
, TRUE
);
490 case PA_SINK_RUNNING
:
491 if (s
->state
== PA_SINK_SUSPENDED
)
492 stream_cork(u
, FALSE
);
495 case PA_SINK_UNLINKED
:
505 /* This function is called from IO context -- except when it is not. */
506 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
507 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
511 case PA_SINK_MESSAGE_SET_STATE
: {
514 if ((r
= pa_source_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
515 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
520 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
521 pa_usec_t yr
, yl
, *usec
= data
;
523 yl
= pa_bytes_to_usec(u
->counter
, &PA_SINK(o
)->sample_spec
);
524 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
526 *usec
= yr
> yl
? yr
- yl
: 0;
530 case SOURCE_MESSAGE_POST
:
532 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
533 pa_source_post(u
->source
, chunk
);
535 u
->counter
+= chunk
->length
;
539 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
541 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
544 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
547 y
= pa_bytes_to_usec(u
->counter
, &u
->source
->sample_spec
);
549 if (offset
>= 0 || y
> (pa_usec_t
) -offset
)
554 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
560 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
563 /* Called from main context */
564 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
566 pa_source_assert_ref(s
);
569 switch ((pa_source_state_t
) state
) {
571 case PA_SOURCE_SUSPENDED
:
572 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
573 stream_cork(u
, TRUE
);
577 case PA_SOURCE_RUNNING
:
578 if (s
->state
== PA_SOURCE_SUSPENDED
)
579 stream_cork(u
, FALSE
);
582 case PA_SOURCE_UNLINKED
:
592 static void thread_func(void *userdata
) {
593 struct userdata
*u
= userdata
;
597 pa_log_debug("Thread starting up");
599 pa_thread_mq_install(&u
->thread_mq
);
600 pa_rtpoll_install(u
->rtpoll
);
606 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
607 if (u
->sink
->thread_info
.rewind_requested
)
608 pa_sink_process_rewind(u
->sink
, 0);
611 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
619 /* If this was no regular exit from the loop we have to continue
620 * processing messages until we received PA_MESSAGE_SHUTDOWN */
621 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
622 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
625 pa_log_debug("Thread shutting down");
629 /* Called from main context */
630 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
631 struct userdata
*u
= userdata
;
632 uint32_t bytes
, channel
;
635 pa_assert(command
== PA_COMMAND_REQUEST
);
638 pa_assert(u
->pdispatch
== pd
);
640 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
641 pa_tagstruct_getu32(t
, &bytes
) < 0) {
642 pa_log("Invalid protocol reply");
646 if (channel
!= u
->channel
) {
647 pa_log("Recieved data for invalid channel");
651 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
655 pa_module_unload_request(u
->module
);
660 /* Called from main context */
661 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
662 struct userdata
*u
= userdata
;
663 pa_usec_t sink_usec
, source_usec
, transport_usec
;
665 int64_t write_index
, read_index
;
666 struct timeval local
, remote
, now
;
673 if (command
!= PA_COMMAND_REPLY
) {
674 if (command
== PA_COMMAND_ERROR
)
675 pa_log("Failed to get latency.");
677 pa_log("Protocol error.");
681 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
682 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
683 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
684 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
685 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
686 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
687 pa_tagstruct_gets64(t
, &read_index
) < 0) {
688 pa_log("Invalid reply.");
693 if (u
->version
>= 13) {
694 uint64_t underrun_for
= 0, playing_for
= 0;
696 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
697 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
698 pa_log("Invalid reply.");
704 if (!pa_tagstruct_eof(t
)) {
705 pa_log("Invalid reply.");
709 if (tag
< u
->ignore_latency_before
) {
714 pa_gettimeofday(&now
);
716 /* Calculate transport usec */
717 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
718 /* local and remote seem to have synchronized clocks */
720 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
722 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
725 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
726 u
->transport_usec_valid
= TRUE
;
728 /* First, take the device's delay */
730 delay
= (int64_t) sink_usec
;
731 ss
= &u
->sink
->sample_spec
;
733 delay
= (int64_t) source_usec
;
734 ss
= &u
->source
->sample_spec
;
737 /* Add the length of our server-side buffer */
738 if (write_index
>= read_index
)
739 delay
+= (int64_t) pa_bytes_to_usec(write_index
-read_index
, ss
);
741 delay
-= (int64_t) pa_bytes_to_usec(read_index
-write_index
, ss
);
743 /* Our measurements are already out of date, hence correct by the *
744 * transport latency */
746 delay
-= (int64_t) transport_usec
;
748 delay
+= (int64_t) transport_usec
;
751 /* Now correct by what we have have read/written since we requested the update */
753 delay
+= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
755 delay
-= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
759 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
761 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
768 pa_module_unload_request(u
->module
);
771 /* Called from main context */
772 static void request_latency(struct userdata
*u
) {
778 t
= pa_tagstruct_new(NULL
, 0);
780 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
782 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
784 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
785 pa_tagstruct_putu32(t
, u
->channel
);
787 pa_gettimeofday(&now
);
788 pa_tagstruct_put_timeval(t
, &now
);
790 pa_pstream_send_tagstruct(u
->pstream
, t
);
791 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
793 u
->ignore_latency_before
= tag
;
794 u
->counter_delta
= 0;
797 /* Called from main context */
798 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
799 struct userdata
*u
= userdata
;
808 pa_gettimeofday(&ntv
);
809 ntv
.tv_sec
+= LATENCY_INTERVAL
;
810 m
->time_restart(e
, &ntv
);
813 /* Called from main context */
814 static void update_description(struct userdata
*u
) {
816 char un
[128], hn
[128];
821 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
824 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
827 pa_sink_set_description(u
->sink
, d
);
828 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
829 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
830 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
832 pa_source_set_description(u
->source
, d
);
833 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
834 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
835 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
840 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
841 pa_get_user_name(un
, sizeof(un
)),
842 pa_get_host_name(hn
, sizeof(hn
)));
844 t
= pa_tagstruct_new(NULL
, 0);
846 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
848 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
850 pa_tagstruct_putu32(t
, u
->ctag
++);
851 pa_tagstruct_putu32(t
, u
->channel
);
852 pa_tagstruct_puts(t
, d
);
853 pa_pstream_send_tagstruct(u
->pstream
, t
);
858 /* Called from main context */
859 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
860 struct userdata
*u
= userdata
;
862 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
868 if (command
!= PA_COMMAND_REPLY
) {
869 if (command
== PA_COMMAND_ERROR
)
870 pa_log("Failed to get info.");
872 pa_log("Protocol error.");
876 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
877 pa_tagstruct_gets(t
, &server_version
) < 0 ||
878 pa_tagstruct_gets(t
, &user_name
) < 0 ||
879 pa_tagstruct_gets(t
, &host_name
) < 0 ||
880 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
881 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
882 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
883 pa_tagstruct_getu32(t
, &cookie
) < 0) {
885 pa_log("Parse failure");
889 if (!pa_tagstruct_eof(t
)) {
890 pa_log("Packet too long");
894 pa_xfree(u
->server_fqdn
);
895 u
->server_fqdn
= pa_xstrdup(host_name
);
897 pa_xfree(u
->user_name
);
898 u
->user_name
= pa_xstrdup(user_name
);
900 update_description(u
);
905 pa_module_unload_request(u
->module
);
910 /* Called from main context */
911 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
912 struct userdata
*u
= userdata
;
913 uint32_t idx
, owner_module
, monitor_source
, flags
;
914 const char *name
, *description
, *monitor_source_name
, *driver
;
925 pl
= pa_proplist_new();
927 if (command
!= PA_COMMAND_REPLY
) {
928 if (command
== PA_COMMAND_ERROR
)
929 pa_log("Failed to get info.");
931 pa_log("Protocol error.");
935 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
936 pa_tagstruct_gets(t
, &name
) < 0 ||
937 pa_tagstruct_gets(t
, &description
) < 0 ||
938 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
939 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
940 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
941 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
942 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
943 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
944 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
945 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
946 pa_tagstruct_gets(t
, &driver
) < 0 ||
947 pa_tagstruct_getu32(t
, &flags
) < 0) {
949 pa_log("Parse failure");
953 if (u
->version
>= 13) {
954 pa_usec_t configured_latency
;
956 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
957 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
959 pa_log("Parse failure");
964 if (!pa_tagstruct_eof(t
)) {
965 pa_log("Packet too long");
969 pa_proplist_free(pl
);
971 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
974 pa_xfree(u
->device_description
);
975 u
->device_description
= pa_xstrdup(description
);
977 update_description(u
);
982 pa_module_unload_request(u
->module
);
983 pa_proplist_free(pl
);
986 /* Called from main context */
987 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
988 struct userdata
*u
= userdata
;
989 uint32_t idx
, owner_module
, client
, sink
;
990 pa_usec_t buffer_usec
, sink_usec
;
991 const char *name
, *driver
, *resample_method
;
993 pa_sample_spec sample_spec
;
994 pa_channel_map channel_map
;
1001 pl
= pa_proplist_new();
1003 if (command
!= PA_COMMAND_REPLY
) {
1004 if (command
== PA_COMMAND_ERROR
)
1005 pa_log("Failed to get info.");
1007 pa_log("Protocol error.");
1011 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1012 pa_tagstruct_gets(t
, &name
) < 0 ||
1013 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1014 pa_tagstruct_getu32(t
, &client
) < 0 ||
1015 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1016 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1017 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1018 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1019 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1020 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1021 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1022 pa_tagstruct_gets(t
, &driver
) < 0) {
1024 pa_log("Parse failure");
1028 if (u
->version
>= 11) {
1029 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1031 pa_log("Parse failure");
1036 if (u
->version
>= 13) {
1037 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1039 pa_log("Parse failure");
1044 if (!pa_tagstruct_eof(t
)) {
1045 pa_log("Packet too long");
1049 pa_proplist_free(pl
);
1051 if (idx
!= u
->device_index
)
1056 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1057 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
1060 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
1062 if (u
->version
>= 11)
1063 u
->sink
->muted
= !!mute
;
1065 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
1069 pa_module_unload_request(u
->module
);
1070 pa_proplist_free(pl
);
1075 /* Called from main context */
1076 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1077 struct userdata
*u
= userdata
;
1078 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1079 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1084 pa_usec_t latency
, configured_latency
;
1090 pl
= pa_proplist_new();
1092 if (command
!= PA_COMMAND_REPLY
) {
1093 if (command
== PA_COMMAND_ERROR
)
1094 pa_log("Failed to get info.");
1096 pa_log("Protocol error.");
1100 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1101 pa_tagstruct_gets(t
, &name
) < 0 ||
1102 pa_tagstruct_gets(t
, &description
) < 0 ||
1103 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1104 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1105 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1106 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1107 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1108 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1109 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1110 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1111 pa_tagstruct_gets(t
, &driver
) < 0 ||
1112 pa_tagstruct_getu32(t
, &flags
) < 0) {
1114 pa_log("Parse failure");
1118 if (u
->version
>= 13) {
1119 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1120 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1122 pa_log("Parse failure");
1127 if (!pa_tagstruct_eof(t
)) {
1128 pa_log("Packet too long");
1132 pa_proplist_free(pl
);
1134 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1137 pa_xfree(u
->device_description
);
1138 u
->device_description
= pa_xstrdup(description
);
1140 update_description(u
);
1145 pa_module_unload_request(u
->module
);
1146 pa_proplist_free(pl
);
1151 /* Called from main context */
1152 static void request_info(struct userdata
*u
) {
1157 t
= pa_tagstruct_new(NULL
, 0);
1158 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1159 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1160 pa_pstream_send_tagstruct(u
->pstream
, t
);
1161 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1164 t
= pa_tagstruct_new(NULL
, 0);
1165 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1166 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1167 pa_tagstruct_putu32(t
, u
->device_index
);
1168 pa_pstream_send_tagstruct(u
->pstream
, t
);
1169 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1172 t
= pa_tagstruct_new(NULL
, 0);
1173 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1174 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1175 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1176 pa_tagstruct_puts(t
, u
->sink_name
);
1177 pa_pstream_send_tagstruct(u
->pstream
, t
);
1178 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1181 if (u
->source_name
) {
1182 t
= pa_tagstruct_new(NULL
, 0);
1183 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1184 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1185 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1186 pa_tagstruct_puts(t
, u
->source_name
);
1187 pa_pstream_send_tagstruct(u
->pstream
, t
);
1188 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1193 /* Called from main context */
1194 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1195 struct userdata
*u
= userdata
;
1196 pa_subscription_event_type_t e
;
1202 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1204 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1205 pa_tagstruct_getu32(t
, &idx
) < 0) {
1206 pa_log("Invalid protocol reply");
1207 pa_module_unload_request(u
->module
);
1211 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1213 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1214 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1216 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1224 /* Called from main context */
1225 static void start_subscribe(struct userdata
*u
) {
1230 t
= pa_tagstruct_new(NULL
, 0);
1231 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1232 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1233 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1235 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1237 PA_SUBSCRIPTION_MASK_SOURCE
1241 pa_pstream_send_tagstruct(u
->pstream
, t
);
1244 /* Called from main context */
1245 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1246 struct userdata
*u
= userdata
;
1254 pa_assert(u
->pdispatch
== pd
);
1256 if (command
!= PA_COMMAND_REPLY
) {
1257 if (command
== PA_COMMAND_ERROR
)
1258 pa_log("Failed to create stream.");
1260 pa_log("Protocol error.");
1264 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1265 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1267 || pa_tagstruct_getu32(t
, &bytes
) < 0
1272 if (u
->version
>= 9) {
1274 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1275 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1276 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1277 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1280 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1281 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1286 if (u
->version
>= 12) {
1289 uint32_t device_index
;
1291 pa_bool_t suspended
;
1293 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1294 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1295 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1296 pa_tagstruct_gets(t
, &dn
) < 0 ||
1297 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1301 pa_xfree(u
->sink_name
);
1302 u
->sink_name
= pa_xstrdup(dn
);
1304 pa_xfree(u
->source_name
);
1305 u
->source_name
= pa_xstrdup(dn
);
1309 if (u
->version
>= 13) {
1312 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1316 pa_sink_set_latency_range(u
->sink
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1318 pa_source_set_latency_range(u
->source
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1322 if (!pa_tagstruct_eof(t
))
1328 pa_assert(!u
->time_event
);
1329 pa_gettimeofday(&ntv
);
1330 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1331 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1335 pa_log_debug("Stream created.");
1338 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1344 pa_log("Invalid reply. (Create stream)");
1347 pa_module_unload_request(u
->module
);
1351 /* Called from main context */
1352 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1353 struct userdata
*u
= userdata
;
1354 pa_tagstruct
*reply
;
1355 char name
[256], un
[128], hn
[128];
1362 pa_assert(u
->pdispatch
== pd
);
1364 if (command
!= PA_COMMAND_REPLY
||
1365 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1366 !pa_tagstruct_eof(t
)) {
1368 if (command
== PA_COMMAND_ERROR
)
1369 pa_log("Failed to authenticate");
1371 pa_log("Protocol error.");
1376 /* Minimum supported protocol version */
1377 if (u
->version
< 8) {
1378 pa_log("Incompatible protocol version");
1382 /* Starting with protocol version 13 the MSB of the version tag
1383 reflects if shm is enabled for this connection or not. We don't
1384 support SHM here at all, so we just ignore this. */
1386 if (u
->version
>= 13)
1387 u
->version
&= 0x7FFFFFFFU
;
1389 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1392 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1394 pa_get_user_name(un
, sizeof(un
)),
1395 pa_get_host_name(hn
, sizeof(hn
)));
1397 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1399 pa_get_user_name(un
, sizeof(un
)),
1400 pa_get_host_name(hn
, sizeof(hn
)));
1403 reply
= pa_tagstruct_new(NULL
, 0);
1404 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1405 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1407 if (u
->version
>= 13) {
1409 pl
= pa_proplist_new();
1410 pa_init_proplist(pl
);
1411 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1412 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1413 pa_tagstruct_put_proplist(reply
, pl
);
1414 pa_proplist_free(pl
);
1416 pa_tagstruct_puts(reply
, "PulseAudio");
1418 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1419 /* We ignore the server's reply here */
1421 reply
= pa_tagstruct_new(NULL
, 0);
1423 if (u
->version
< 13)
1424 /* Only for older PA versions we need to fill in the maxlength */
1425 u
->maxlength
= 4*1024*1024;
1428 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1429 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1430 u
->prebuf
= u
->tlength
;
1432 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1436 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1437 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1439 if (u
->version
< 13)
1440 pa_tagstruct_puts(reply
, name
);
1442 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1443 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1444 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1445 pa_tagstruct_puts(reply
, u
->sink_name
);
1446 pa_tagstruct_putu32(reply
, u
->maxlength
);
1447 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1448 pa_tagstruct_putu32(reply
, u
->tlength
);
1449 pa_tagstruct_putu32(reply
, u
->prebuf
);
1450 pa_tagstruct_putu32(reply
, u
->minreq
);
1451 pa_tagstruct_putu32(reply
, 0);
1452 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1453 pa_tagstruct_put_cvolume(reply
, &volume
);
1455 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1456 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1458 if (u
->version
< 13)
1459 pa_tagstruct_puts(reply
, name
);
1461 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1462 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1463 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1464 pa_tagstruct_puts(reply
, u
->source_name
);
1465 pa_tagstruct_putu32(reply
, u
->maxlength
);
1466 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1467 pa_tagstruct_putu32(reply
, u
->fragsize
);
1470 if (u
->version
>= 12) {
1471 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1472 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1473 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1474 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1475 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1476 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1477 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1480 if (u
->version
>= 13) {
1483 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1484 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1486 pl
= pa_proplist_new();
1487 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1488 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1489 pa_tagstruct_put_proplist(reply
, pl
);
1490 pa_proplist_free(pl
);
1493 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1497 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1498 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1500 pa_log_debug("Connection authenticated, creating stream ...");
1505 pa_module_unload_request(u
->module
);
1508 /* Called from main context */
1509 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1510 struct userdata
*u
= userdata
;
1515 pa_log_warn("Stream died.");
1516 pa_module_unload_request(u
->module
);
1519 /* Called from main context */
1520 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1521 struct userdata
*u
= userdata
;
1527 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1528 pa_log("Invalid packet");
1529 pa_module_unload_request(u
->module
);
1535 /* Called from main context */
1536 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1537 struct userdata
*u
= userdata
;
1543 if (channel
!= u
->channel
) {
1544 pa_log("Recieved memory block on bad channel.");
1545 pa_module_unload_request(u
->module
);
1549 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1551 u
->counter_delta
+= chunk
->length
;
1556 /* Called from main context */
1557 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1558 struct userdata
*u
= userdata
;
1564 pa_assert(u
->client
== sc
);
1566 pa_socket_client_unref(u
->client
);
1570 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1571 pa_module_unload_request(u
->module
);
1575 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1576 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1578 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1579 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1581 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1584 t
= pa_tagstruct_new(NULL
, 0);
1585 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1586 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1587 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1589 pa_tagstruct_put_arbitrary(t
, pa_auth_cookie_read(u
->auth_cookie
, PA_NATIVE_COOKIE_LENGTH
), PA_NATIVE_COOKIE_LENGTH
);
1595 if (pa_iochannel_creds_supported(io
))
1596 pa_iochannel_creds_enable(io
);
1598 ucred
.uid
= getuid();
1599 ucred
.gid
= getgid();
1601 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1604 pa_pstream_send_tagstruct(u
->pstream
, t
);
1607 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1609 pa_log_debug("Connection established, authenticating ...");
1614 /* Called from main context */
1615 static int sink_set_volume(pa_sink
*sink
) {
1624 t
= pa_tagstruct_new(NULL
, 0);
1625 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1626 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1627 pa_tagstruct_putu32(t
, u
->device_index
);
1628 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
1629 pa_pstream_send_tagstruct(u
->pstream
, t
);
1634 /* Called from main context */
1635 static int sink_set_mute(pa_sink
*sink
) {
1644 if (u
->version
< 11)
1647 t
= pa_tagstruct_new(NULL
, 0);
1648 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1649 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1650 pa_tagstruct_putu32(t
, u
->device_index
);
1651 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1652 pa_pstream_send_tagstruct(u
->pstream
, t
);
1659 int pa__init(pa_module
*m
) {
1660 pa_modargs
*ma
= NULL
;
1661 struct userdata
*u
= NULL
;
1666 pa_sink_new_data data
;
1668 pa_source_new_data data
;
1673 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1674 pa_log("Failed to parse module arguments");
1678 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1682 u
->pdispatch
= NULL
;
1684 u
->server_name
= NULL
;
1686 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1688 u
->requested_bytes
= 0;
1690 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1693 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
1695 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1696 u
->time_event
= NULL
;
1697 u
->ignore_latency_before
= 0;
1698 u
->transport_usec
= 0;
1699 u
->transport_usec_valid
= FALSE
;
1700 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1701 u
->counter
= u
->counter_delta
= 0;
1703 u
->rtpoll
= pa_rtpoll_new();
1704 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1706 if (!(u
->auth_cookie
= pa_auth_cookie_get(u
->core
, pa_modargs_get_value(ma
, "cookie", PA_NATIVE_COOKIE_FILE
), PA_NATIVE_COOKIE_LENGTH
)))
1709 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1710 pa_log("No server specified.");
1714 ss
= m
->core
->default_sample_spec
;
1715 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1716 pa_log("Invalid sample format specification");
1720 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1721 pa_log("Failed to connect to server '%s'", u
->server_name
);
1725 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1729 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1730 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1732 pa_sink_new_data_init(&data
);
1733 data
.driver
= __FILE__
;
1735 data
.namereg_fail
= TRUE
;
1736 pa_sink_new_data_set_name(&data
, dn
);
1737 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1738 pa_sink_new_data_set_channel_map(&data
, &map
);
1739 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1740 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1742 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1744 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
);
1745 pa_sink_new_data_done(&data
);
1748 pa_log("Failed to create sink.");
1752 u
->sink
->parent
.process_msg
= sink_process_msg
;
1753 u
->sink
->userdata
= u
;
1754 u
->sink
->set_state
= sink_set_state
;
1755 u
->sink
->set_volume
= sink_set_volume
;
1756 u
->sink
->set_mute
= sink_set_mute
;
1758 u
->sink
->refresh_volume
= u
->sink
->refresh_muted
= FALSE
;
1760 pa_sink_set_latency_range(u
->sink
, MIN_NETWORK_LATENCY_USEC
, 0);
1762 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1763 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1767 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1768 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1770 pa_source_new_data_init(&data
);
1771 data
.driver
= __FILE__
;
1773 data
.namereg_fail
= TRUE
;
1774 pa_source_new_data_set_name(&data
, dn
);
1775 pa_source_new_data_set_sample_spec(&data
, &ss
);
1776 pa_source_new_data_set_channel_map(&data
, &map
);
1777 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1778 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1780 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1782 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1783 pa_source_new_data_done(&data
);
1786 pa_log("Failed to create source.");
1790 u
->source
->parent
.process_msg
= source_process_msg
;
1791 u
->source
->set_state
= source_set_state
;
1792 u
->source
->userdata
= u
;
1794 pa_source_set_latency_range(u
->source
, MIN_NETWORK_LATENCY_USEC
, 0);
1796 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1797 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1802 u
->time_event
= NULL
;
1806 u
->tlength
= u
->minreq
= u
->prebuf
= 0;
1811 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1813 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1814 pa_log("Failed to create thread.");
1819 pa_sink_put(u
->sink
);
1821 pa_source_put(u
->source
);
1824 pa_modargs_free(ma
);
1832 pa_modargs_free(ma
);
1839 void pa__done(pa_module
*m
) {
1844 if (!(u
= m
->userdata
))
1849 pa_sink_unlink(u
->sink
);
1852 pa_source_unlink(u
->source
);
1856 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1857 pa_thread_free(u
->thread
);
1860 pa_thread_mq_done(&u
->thread_mq
);
1864 pa_sink_unref(u
->sink
);
1867 pa_source_unref(u
->source
);
1871 pa_rtpoll_free(u
->rtpoll
);
1874 pa_pstream_unlink(u
->pstream
);
1875 pa_pstream_unref(u
->pstream
);
1879 pa_pdispatch_unref(u
->pdispatch
);
1882 pa_socket_client_unref(u
->client
);
1885 pa_auth_cookie_unref(u
->auth_cookie
);
1888 pa_smoother_free(u
->smoother
);
1891 u
->core
->mainloop
->time_free(u
->time_event
);
1894 pa_xfree(u
->sink_name
);
1896 pa_xfree(u
->source_name
);
1898 pa_xfree(u
->server_name
);
1900 pa_xfree(u
->device_description
);
1901 pa_xfree(u
->server_fqdn
);
1902 pa_xfree(u
->user_name
);