4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
7 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
9 PulseAudio is free software; you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License as published
11 by the Free Software Foundation; either version 2 of the License,
12 or (at your option) any later version.
14 PulseAudio is distributed in the hope that it will be useful, but
15 WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 General Public License for more details.
19 You should have received a copy of the GNU Lesser General Public License
20 along with PulseAudio; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
32 #include <sys/types.h>
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
59 #include <pulsecore/proplist-util.h>
62 #include "module-tunnel-sink-symdef.h"
64 #include "module-tunnel-source-symdef.h"
68 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
71 "sink=<remote sink name> "
73 "format=<sample format> "
74 "channels=<number of channels> "
76 "sink_name=<name for the local sink> "
77 "channel_map=<channel map>");
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
82 "source=<remote source name> "
84 "format=<sample format> "
85 "channels=<number of channels> "
87 "source_name=<name for the local source> "
88 "channel_map=<channel map>");
91 PA_MODULE_AUTHOR("Lennart Poettering");
92 PA_MODULE_VERSION(PACKAGE_VERSION
);
93 PA_MODULE_LOAD_ONCE(FALSE
);
95 static const char* const valid_modargs
[] = {
112 #define DEFAULT_TIMEOUT 5
114 #define LATENCY_INTERVAL 10
116 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
121 SINK_MESSAGE_REQUEST
= PA_SINK_MESSAGE_MAX
,
122 SINK_MESSAGE_REMOTE_SUSPEND
,
123 SINK_MESSAGE_UPDATE_LATENCY
,
127 #define DEFAULT_TLENGTH_MSEC 150
128 #define DEFAULT_MINREQ_MSEC 25
133 SOURCE_MESSAGE_POST
= PA_SOURCE_MESSAGE_MAX
,
134 SOURCE_MESSAGE_REMOTE_SUSPEND
,
135 SOURCE_MESSAGE_UPDATE_LATENCY
138 #define DEFAULT_FRAGSIZE_MSEC 25
143 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
144 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
146 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
147 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
148 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
149 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
150 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
);
152 static const pa_pdispatch_cb_t command_table
[PA_COMMAND_MAX
] = {
154 [PA_COMMAND_REQUEST
] = command_request
,
155 [PA_COMMAND_STARTED
] = command_started
,
157 [PA_COMMAND_SUBSCRIBE_EVENT
] = command_subscribe_event
,
158 [PA_COMMAND_OVERFLOW
] = command_overflow_or_underflow
,
159 [PA_COMMAND_UNDERFLOW
] = command_overflow_or_underflow
,
160 [PA_COMMAND_PLAYBACK_STREAM_KILLED
] = command_stream_killed
,
161 [PA_COMMAND_RECORD_STREAM_KILLED
] = command_stream_killed
,
162 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
] = command_suspended
,
163 [PA_COMMAND_RECORD_STREAM_SUSPENDED
] = command_suspended
,
164 [PA_COMMAND_PLAYBACK_STREAM_MOVED
] = command_moved
,
165 [PA_COMMAND_RECORD_STREAM_MOVED
] = command_moved
,
172 pa_thread_mq thread_mq
;
176 pa_socket_client
*client
;
178 pa_pdispatch
*pdispatch
;
184 int32_t requested_bytes
;
190 uint8_t auth_cookie
[PA_NATIVE_COOKIE_LENGTH
];
194 uint32_t device_index
;
197 int64_t counter
, counter_delta
;
199 pa_bool_t remote_corked
:1;
200 pa_bool_t remote_suspended
:1;
202 pa_usec_t transport_usec
;
203 pa_bool_t transport_usec_valid
;
205 uint32_t ignore_latency_before
;
207 pa_time_event
*time_event
;
209 pa_bool_t auth_cookie_in_property
;
211 pa_smoother
*smoother
;
213 char *device_description
;
227 static void request_latency(struct userdata
*u
);
229 /* Called from main context */
230 static void command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
231 struct userdata
*u
= userdata
;
236 pa_assert(u
->pdispatch
== pd
);
238 pa_log_warn("Stream killed");
239 pa_module_unload_request(u
->module
);
242 /* Called from main context */
243 static void command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
244 struct userdata
*u
= userdata
;
249 pa_assert(u
->pdispatch
== pd
);
251 pa_log_info("Server signalled buffer overrun/underrun.");
255 /* Called from main context */
256 static void command_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
257 struct userdata
*u
= userdata
;
264 pa_assert(u
->pdispatch
== pd
);
266 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
267 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
268 !pa_tagstruct_eof(t
)) {
269 pa_log("Invalid packet");
270 pa_module_unload_request(u
->module
);
275 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
277 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_REMOTE_SUSPEND
, PA_UINT32_TO_PTR(!!suspended
), 0, NULL
);
283 /* Called from main context */
284 static void command_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
285 struct userdata
*u
= userdata
;
290 pa_assert(u
->pdispatch
== pd
);
292 pa_log_debug("Server reports a stream move.");
298 /* Called from main context */
299 static void command_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
300 struct userdata
*u
= userdata
;
305 pa_assert(u
->pdispatch
== pd
);
307 pa_log_debug("Server reports playback started.");
313 /* Called from IO thread context */
314 static void stream_cork_within_thread(struct userdata
*u
, pa_bool_t cork
) {
318 if (u
->remote_corked
== cork
)
321 u
->remote_corked
= cork
;
322 x
= pa_rtclock_usec();
324 /* Correct by the time this needs to travel to the other side.
325 * This is a valid thread-safe access, because the main thread is
327 if (u
->transport_usec_valid
)
328 x
+= u
->transport_usec
;
330 if (u
->remote_suspended
|| u
->remote_corked
)
331 pa_smoother_pause(u
->smoother
, x
);
333 pa_smoother_resume(u
->smoother
, x
);
336 /* Called from main context */
337 static void stream_cork(struct userdata
*u
, pa_bool_t cork
) {
344 t
= pa_tagstruct_new(NULL
, 0);
346 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_PLAYBACK_STREAM
);
348 pa_tagstruct_putu32(t
, PA_COMMAND_CORK_RECORD_STREAM
);
350 pa_tagstruct_putu32(t
, u
->ctag
++);
351 pa_tagstruct_putu32(t
, u
->channel
);
352 pa_tagstruct_put_boolean(t
, !!cork
);
353 pa_pstream_send_tagstruct(u
->pstream
, t
);
358 /* Called from IO thread context */
359 static void stream_suspend_within_thread(struct userdata
*u
, pa_bool_t suspend
) {
363 if (u
->remote_suspended
== suspend
)
366 u
->remote_suspended
= suspend
;
368 x
= pa_rtclock_usec();
370 /* Correct by the time this needed to travel from the other side.
371 * This is a valid thread-safe access, because the main thread is
373 if (u
->transport_usec_valid
)
374 x
-= u
->transport_usec
;
376 if (u
->remote_suspended
|| u
->remote_corked
)
377 pa_smoother_pause(u
->smoother
, x
);
379 pa_smoother_resume(u
->smoother
, x
);
384 /* Called from IO thread context */
385 static void send_data(struct userdata
*u
) {
388 while (u
->requested_bytes
> 0) {
389 pa_memchunk memchunk
;
391 pa_sink_render(u
->sink
, u
->requested_bytes
, &memchunk
);
392 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_POST
, NULL
, 0, &memchunk
, NULL
);
393 pa_memblock_unref(memchunk
.memblock
);
395 u
->requested_bytes
-= memchunk
.length
;
397 u
->counter
+= memchunk
.length
;
401 /* This function is called from IO context -- except when it is not. */
402 static int sink_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
403 struct userdata
*u
= PA_SINK(o
)->userdata
;
407 case PA_SINK_MESSAGE_SET_STATE
: {
410 /* First, change the state, because otherwide pa_sink_render() would fail */
411 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0) {
413 stream_cork_within_thread(u
, u
->sink
->state
== PA_SINK_SUSPENDED
);
415 if (PA_SINK_IS_OPENED(u
->sink
->state
))
422 case PA_SINK_MESSAGE_GET_LATENCY
: {
423 pa_usec_t yl
, yr
, *usec
= data
;
425 yl
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
426 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
428 *usec
= yl
> yr
? yl
- yr
: 0;
432 case SINK_MESSAGE_REQUEST
:
434 pa_assert(offset
> 0);
435 u
->requested_bytes
+= (size_t) offset
;
437 if (PA_SINK_IS_OPENED(u
->sink
->thread_info
.state
))
443 case SINK_MESSAGE_REMOTE_SUSPEND
:
445 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
449 case SINK_MESSAGE_UPDATE_LATENCY
: {
452 y
= pa_bytes_to_usec(u
->counter
, &u
->sink
->sample_spec
);
454 if (y
> (pa_usec_t
) offset
|| offset
< 0)
459 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
464 case SINK_MESSAGE_POST
:
466 /* OK, This might be a bit confusing. This message is
467 * delivered to us from the main context -- NOT from the
468 * IO thread context where the rest of the messages are
469 * dispatched. Yeah, ugly, but I am a lazy bastard. */
471 pa_pstream_send_memblock(u
->pstream
, u
->channel
, 0, PA_SEEK_RELATIVE
, chunk
);
473 u
->counter_delta
+= chunk
->length
;
478 return pa_sink_process_msg(o
, code
, data
, offset
, chunk
);
481 /* Called from main context */
482 static int sink_set_state(pa_sink
*s
, pa_sink_state_t state
) {
484 pa_sink_assert_ref(s
);
487 switch ((pa_sink_state_t
) state
) {
489 case PA_SINK_SUSPENDED
:
490 pa_assert(PA_SINK_IS_OPENED(s
->state
));
491 stream_cork(u
, TRUE
);
495 case PA_SINK_RUNNING
:
496 if (s
->state
== PA_SINK_SUSPENDED
)
497 stream_cork(u
, FALSE
);
500 case PA_SINK_UNLINKED
:
510 /* This function is called from IO context -- except when it is not. */
511 static int source_process_msg(pa_msgobject
*o
, int code
, void *data
, int64_t offset
, pa_memchunk
*chunk
) {
512 struct userdata
*u
= PA_SOURCE(o
)->userdata
;
516 case PA_SINK_MESSAGE_SET_STATE
: {
519 if ((r
= pa_sink_process_msg(o
, code
, data
, offset
, chunk
)) >= 0)
520 stream_cork_within_thread(u
, u
->source
->state
== PA_SOURCE_SUSPENDED
);
525 case PA_SOURCE_MESSAGE_GET_LATENCY
: {
526 pa_usec_t yr
, yl
, *usec
= data
;
528 yl
= pa_bytes_to_usec(u
->counter
, &PA_SINK(o
)->sample_spec
);
529 yr
= pa_smoother_get(u
->smoother
, pa_rtclock_usec());
531 *usec
= yr
> yl
? yr
- yl
: 0;
535 case SOURCE_MESSAGE_POST
:
537 if (PA_SOURCE_IS_OPENED(u
->source
->thread_info
.state
))
538 pa_source_post(u
->source
, chunk
);
540 u
->counter
+= chunk
->length
;
544 case SOURCE_MESSAGE_REMOTE_SUSPEND
:
546 stream_suspend_within_thread(u
, !!PA_PTR_TO_UINT(data
));
549 case SOURCE_MESSAGE_UPDATE_LATENCY
: {
552 y
= pa_bytes_to_usec(u
->counter
, &u
->source
->sample_spec
);
554 if (offset
>= 0 || y
> (pa_usec_t
) -offset
)
559 pa_smoother_put(u
->smoother
, pa_rtclock_usec(), y
);
565 return pa_source_process_msg(o
, code
, data
, offset
, chunk
);
568 /* Called from main context */
569 static int source_set_state(pa_source
*s
, pa_source_state_t state
) {
571 pa_source_assert_ref(s
);
574 switch ((pa_source_state_t
) state
) {
576 case PA_SOURCE_SUSPENDED
:
577 pa_assert(PA_SOURCE_IS_OPENED(s
->state
));
578 stream_cork(u
, TRUE
);
582 case PA_SOURCE_RUNNING
:
583 if (s
->state
== PA_SOURCE_SUSPENDED
)
584 stream_cork(u
, FALSE
);
587 case PA_SOURCE_UNLINKED
:
597 static void thread_func(void *userdata
) {
598 struct userdata
*u
= userdata
;
602 pa_log_debug("Thread starting up");
604 pa_thread_mq_install(&u
->thread_mq
);
605 pa_rtpoll_install(u
->rtpoll
);
610 if ((ret
= pa_rtpoll_run(u
->rtpoll
, TRUE
)) < 0)
618 /* If this was no regular exit from the loop we have to continue
619 * processing messages until we received PA_MESSAGE_SHUTDOWN */
620 pa_asyncmsgq_post(u
->thread_mq
.outq
, PA_MSGOBJECT(u
->core
), PA_CORE_MESSAGE_UNLOAD_MODULE
, u
->module
, 0, NULL
, NULL
);
621 pa_asyncmsgq_wait_for(u
->thread_mq
.inq
, PA_MESSAGE_SHUTDOWN
);
624 pa_log_debug("Thread shutting down");
628 /* Called from main context */
629 static void command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
630 struct userdata
*u
= userdata
;
631 uint32_t bytes
, channel
;
634 pa_assert(command
== PA_COMMAND_REQUEST
);
637 pa_assert(u
->pdispatch
== pd
);
639 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
640 pa_tagstruct_getu32(t
, &bytes
) < 0) {
641 pa_log("Invalid protocol reply");
645 if (channel
!= u
->channel
) {
646 pa_log("Recieved data for invalid channel");
650 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
654 pa_module_unload_request(u
->module
);
659 /* Called from main context */
660 static void stream_get_latency_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
661 struct userdata
*u
= userdata
;
662 pa_usec_t sink_usec
, source_usec
, transport_usec
;
664 int64_t write_index
, read_index
;
665 struct timeval local
, remote
, now
;
672 if (command
!= PA_COMMAND_REPLY
) {
673 if (command
== PA_COMMAND_ERROR
)
674 pa_log("Failed to get latency.");
676 pa_log("Protocol error.");
680 if (pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
681 pa_tagstruct_get_usec(t
, &source_usec
) < 0 ||
682 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
683 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
684 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
685 pa_tagstruct_gets64(t
, &write_index
) < 0 ||
686 pa_tagstruct_gets64(t
, &read_index
) < 0) {
687 pa_log("Invalid reply.");
692 if (u
->version
>= 13) {
693 uint64_t underrun_for
= 0, playing_for
= 0;
695 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
696 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
697 pa_log("Invalid reply.");
703 if (!pa_tagstruct_eof(t
)) {
704 pa_log("Invalid reply.");
708 if (tag
< u
->ignore_latency_before
) {
713 pa_gettimeofday(&now
);
715 /* Calculate transport usec */
716 if (pa_timeval_cmp(&local
, &remote
) < 0 && pa_timeval_cmp(&remote
, &now
)) {
717 /* local and remote seem to have synchronized clocks */
719 u
->transport_usec
= pa_timeval_diff(&remote
, &local
);
721 u
->transport_usec
= pa_timeval_diff(&now
, &remote
);
724 u
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
725 u
->transport_usec_valid
= TRUE
;
727 /* First, take the device's delay */
729 delay
= (int64_t) sink_usec
;
730 ss
= &u
->sink
->sample_spec
;
732 delay
= (int64_t) source_usec
;
733 ss
= &u
->source
->sample_spec
;
736 /* Add the length of our server-side buffer */
737 if (write_index
>= read_index
)
738 delay
+= (int64_t) pa_bytes_to_usec(write_index
-read_index
, ss
);
740 delay
-= (int64_t) pa_bytes_to_usec(read_index
-write_index
, ss
);
742 /* Our measurements are already out of date, hence correct by the *
743 * transport latency */
745 delay
-= (int64_t) transport_usec
;
747 delay
+= (int64_t) transport_usec
;
750 /* Now correct by what we have have read/written since we requested the update */
752 delay
+= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
754 delay
-= (int64_t) pa_bytes_to_usec(u
->counter_delta
, ss
);
758 pa_asyncmsgq_send(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
760 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_UPDATE_LATENCY
, 0, delay
, NULL
);
767 pa_module_unload_request(u
->module
);
770 /* Called from main context */
771 static void request_latency(struct userdata
*u
) {
777 t
= pa_tagstruct_new(NULL
, 0);
779 pa_tagstruct_putu32(t
, PA_COMMAND_GET_PLAYBACK_LATENCY
);
781 pa_tagstruct_putu32(t
, PA_COMMAND_GET_RECORD_LATENCY
);
783 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
784 pa_tagstruct_putu32(t
, u
->channel
);
786 pa_gettimeofday(&now
);
787 pa_tagstruct_put_timeval(t
, &now
);
789 pa_pstream_send_tagstruct(u
->pstream
, t
);
790 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_latency_callback
, u
, NULL
);
792 u
->ignore_latency_before
= tag
;
793 u
->counter_delta
= 0;
796 /* Called from main context */
797 static void timeout_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*tv
, void *userdata
) {
798 struct userdata
*u
= userdata
;
807 pa_gettimeofday(&ntv
);
808 ntv
.tv_sec
+= LATENCY_INTERVAL
;
809 m
->time_restart(e
, &ntv
);
812 /* Called from main context */
813 static void update_description(struct userdata
*u
) {
815 char un
[128], hn
[128];
820 if (!u
->server_fqdn
|| !u
->user_name
|| !u
->device_description
)
823 d
= pa_sprintf_malloc("%s on %s@%s", u
->device_description
, u
->user_name
, u
->server_fqdn
);
826 pa_sink_set_description(u
->sink
, d
);
827 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.user", u
->user_name
);
828 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
829 pa_proplist_sets(u
->sink
->proplist
, "tunnel.remote.description", u
->device_description
);
831 pa_source_set_description(u
->source
, d
);
832 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.user", u
->user_name
);
833 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.fqdn", u
->server_fqdn
);
834 pa_proplist_sets(u
->source
->proplist
, "tunnel.remote.description", u
->device_description
);
839 d
= pa_sprintf_malloc("%s for %s@%s", u
->device_description
,
840 pa_get_user_name(un
, sizeof(un
)),
841 pa_get_host_name(hn
, sizeof(hn
)));
843 t
= pa_tagstruct_new(NULL
, 0);
845 pa_tagstruct_putu32(t
, PA_COMMAND_SET_PLAYBACK_STREAM_NAME
);
847 pa_tagstruct_putu32(t
, PA_COMMAND_SET_RECORD_STREAM_NAME
);
849 pa_tagstruct_putu32(t
, u
->ctag
++);
850 pa_tagstruct_putu32(t
, u
->channel
);
851 pa_tagstruct_puts(t
, d
);
852 pa_pstream_send_tagstruct(u
->pstream
, t
);
857 /* Called from main context */
858 static void server_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
859 struct userdata
*u
= userdata
;
861 const char *server_name
, *server_version
, *user_name
, *host_name
, *default_sink_name
, *default_source_name
;
867 if (command
!= PA_COMMAND_REPLY
) {
868 if (command
== PA_COMMAND_ERROR
)
869 pa_log("Failed to get info.");
871 pa_log("Protocol error.");
875 if (pa_tagstruct_gets(t
, &server_name
) < 0 ||
876 pa_tagstruct_gets(t
, &server_version
) < 0 ||
877 pa_tagstruct_gets(t
, &user_name
) < 0 ||
878 pa_tagstruct_gets(t
, &host_name
) < 0 ||
879 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
880 pa_tagstruct_gets(t
, &default_sink_name
) < 0 ||
881 pa_tagstruct_gets(t
, &default_source_name
) < 0 ||
882 pa_tagstruct_getu32(t
, &cookie
) < 0) {
884 pa_log("Parse failure");
888 if (!pa_tagstruct_eof(t
)) {
889 pa_log("Packet too long");
893 pa_xfree(u
->server_fqdn
);
894 u
->server_fqdn
= pa_xstrdup(host_name
);
896 pa_xfree(u
->user_name
);
897 u
->user_name
= pa_xstrdup(user_name
);
899 update_description(u
);
904 pa_module_unload_request(u
->module
);
909 /* Called from main context */
910 static void sink_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
911 struct userdata
*u
= userdata
;
912 uint32_t idx
, owner_module
, monitor_source
, flags
;
913 const char *name
, *description
, *monitor_source_name
, *driver
;
924 pl
= pa_proplist_new();
926 if (command
!= PA_COMMAND_REPLY
) {
927 if (command
== PA_COMMAND_ERROR
)
928 pa_log("Failed to get info.");
930 pa_log("Protocol error.");
934 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
935 pa_tagstruct_gets(t
, &name
) < 0 ||
936 pa_tagstruct_gets(t
, &description
) < 0 ||
937 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
938 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
939 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
940 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
941 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
942 pa_tagstruct_getu32(t
, &monitor_source
) < 0 ||
943 pa_tagstruct_gets(t
, &monitor_source_name
) < 0 ||
944 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
945 pa_tagstruct_gets(t
, &driver
) < 0 ||
946 pa_tagstruct_getu32(t
, &flags
) < 0) {
948 pa_log("Parse failure");
952 if (u
->version
>= 13) {
953 pa_usec_t configured_latency
;
955 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
956 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
958 pa_log("Parse failure");
963 if (!pa_tagstruct_eof(t
)) {
964 pa_log("Packet too long");
968 pa_proplist_free(pl
);
970 if (!u
->sink_name
|| strcmp(name
, u
->sink_name
))
973 pa_xfree(u
->device_description
);
974 u
->device_description
= pa_xstrdup(description
);
976 update_description(u
);
981 pa_module_unload_request(u
->module
);
982 pa_proplist_free(pl
);
985 /* Called from main context */
986 static void sink_input_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
987 struct userdata
*u
= userdata
;
988 uint32_t idx
, owner_module
, client
, sink
;
989 pa_usec_t buffer_usec
, sink_usec
;
990 const char *name
, *driver
, *resample_method
;
992 pa_sample_spec sample_spec
;
993 pa_channel_map channel_map
;
1000 pl
= pa_proplist_new();
1002 if (command
!= PA_COMMAND_REPLY
) {
1003 if (command
== PA_COMMAND_ERROR
)
1004 pa_log("Failed to get info.");
1006 pa_log("Protocol error.");
1010 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1011 pa_tagstruct_gets(t
, &name
) < 0 ||
1012 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1013 pa_tagstruct_getu32(t
, &client
) < 0 ||
1014 pa_tagstruct_getu32(t
, &sink
) < 0 ||
1015 pa_tagstruct_get_sample_spec(t
, &sample_spec
) < 0 ||
1016 pa_tagstruct_get_channel_map(t
, &channel_map
) < 0 ||
1017 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1018 pa_tagstruct_get_usec(t
, &buffer_usec
) < 0 ||
1019 pa_tagstruct_get_usec(t
, &sink_usec
) < 0 ||
1020 pa_tagstruct_gets(t
, &resample_method
) < 0 ||
1021 pa_tagstruct_gets(t
, &driver
) < 0) {
1023 pa_log("Parse failure");
1027 if (u
->version
>= 11) {
1028 if (pa_tagstruct_get_boolean(t
, &mute
) < 0) {
1030 pa_log("Parse failure");
1035 if (u
->version
>= 13) {
1036 if (pa_tagstruct_get_proplist(t
, pl
) < 0) {
1038 pa_log("Parse failure");
1043 if (!pa_tagstruct_eof(t
)) {
1044 pa_log("Packet too long");
1048 pa_proplist_free(pl
);
1050 if (idx
!= u
->device_index
)
1055 if ((u
->version
< 11 || !!mute
== !!u
->sink
->muted
) &&
1056 pa_cvolume_equal(&volume
, &u
->sink
->volume
))
1059 memcpy(&u
->sink
->volume
, &volume
, sizeof(pa_cvolume
));
1061 if (u
->version
>= 11)
1062 u
->sink
->muted
= !!mute
;
1064 pa_subscription_post(u
->sink
->core
, PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
, u
->sink
->index
);
1068 pa_module_unload_request(u
->module
);
1069 pa_proplist_free(pl
);
1074 /* Called from main context */
1075 static void source_info_cb(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1076 struct userdata
*u
= userdata
;
1077 uint32_t idx
, owner_module
, monitor_of_sink
, flags
;
1078 const char *name
, *description
, *monitor_of_sink_name
, *driver
;
1083 pa_usec_t latency
, configured_latency
;
1089 pl
= pa_proplist_new();
1091 if (command
!= PA_COMMAND_REPLY
) {
1092 if (command
== PA_COMMAND_ERROR
)
1093 pa_log("Failed to get info.");
1095 pa_log("Protocol error.");
1099 if (pa_tagstruct_getu32(t
, &idx
) < 0 ||
1100 pa_tagstruct_gets(t
, &name
) < 0 ||
1101 pa_tagstruct_gets(t
, &description
) < 0 ||
1102 pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1103 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1104 pa_tagstruct_getu32(t
, &owner_module
) < 0 ||
1105 pa_tagstruct_get_cvolume(t
, &volume
) < 0 ||
1106 pa_tagstruct_get_boolean(t
, &mute
) < 0 ||
1107 pa_tagstruct_getu32(t
, &monitor_of_sink
) < 0 ||
1108 pa_tagstruct_gets(t
, &monitor_of_sink_name
) < 0 ||
1109 pa_tagstruct_get_usec(t
, &latency
) < 0 ||
1110 pa_tagstruct_gets(t
, &driver
) < 0 ||
1111 pa_tagstruct_getu32(t
, &flags
) < 0) {
1113 pa_log("Parse failure");
1117 if (u
->version
>= 13) {
1118 if (pa_tagstruct_get_proplist(t
, pl
) < 0 ||
1119 pa_tagstruct_get_usec(t
, &configured_latency
) < 0) {
1121 pa_log("Parse failure");
1126 if (!pa_tagstruct_eof(t
)) {
1127 pa_log("Packet too long");
1131 pa_proplist_free(pl
);
1133 if (!u
->source_name
|| strcmp(name
, u
->source_name
))
1136 pa_xfree(u
->device_description
);
1137 u
->device_description
= pa_xstrdup(description
);
1139 update_description(u
);
1144 pa_module_unload_request(u
->module
);
1145 pa_proplist_free(pl
);
1150 /* Called from main context */
1151 static void request_info(struct userdata
*u
) {
1156 t
= pa_tagstruct_new(NULL
, 0);
1157 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SERVER_INFO
);
1158 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1159 pa_pstream_send_tagstruct(u
->pstream
, t
);
1160 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, server_info_cb
, u
, NULL
);
1163 t
= pa_tagstruct_new(NULL
, 0);
1164 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INPUT_INFO
);
1165 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1166 pa_tagstruct_putu32(t
, u
->device_index
);
1167 pa_pstream_send_tagstruct(u
->pstream
, t
);
1168 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_input_info_cb
, u
, NULL
);
1171 t
= pa_tagstruct_new(NULL
, 0);
1172 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SINK_INFO
);
1173 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1174 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1175 pa_tagstruct_puts(t
, u
->sink_name
);
1176 pa_pstream_send_tagstruct(u
->pstream
, t
);
1177 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, sink_info_cb
, u
, NULL
);
1180 if (u
->source_name
) {
1181 t
= pa_tagstruct_new(NULL
, 0);
1182 pa_tagstruct_putu32(t
, PA_COMMAND_GET_SOURCE_INFO
);
1183 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1184 pa_tagstruct_putu32(t
, PA_INVALID_INDEX
);
1185 pa_tagstruct_puts(t
, u
->source_name
);
1186 pa_pstream_send_tagstruct(u
->pstream
, t
);
1187 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, source_info_cb
, u
, NULL
);
1192 /* Called from main context */
1193 static void command_subscribe_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1194 struct userdata
*u
= userdata
;
1195 pa_subscription_event_type_t e
;
1201 pa_assert(command
== PA_COMMAND_SUBSCRIBE_EVENT
);
1203 if (pa_tagstruct_getu32(t
, &e
) < 0 ||
1204 pa_tagstruct_getu32(t
, &idx
) < 0) {
1205 pa_log("Invalid protocol reply");
1206 pa_module_unload_request(u
->module
);
1210 if (e
!= (PA_SUBSCRIPTION_EVENT_SERVER
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1212 e
!= (PA_SUBSCRIPTION_EVENT_SINK_INPUT
|PA_SUBSCRIPTION_EVENT_CHANGE
) &&
1213 e
!= (PA_SUBSCRIPTION_EVENT_SINK
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1215 e
!= (PA_SUBSCRIPTION_EVENT_SOURCE
|PA_SUBSCRIPTION_EVENT_CHANGE
)
1223 /* Called from main context */
1224 static void start_subscribe(struct userdata
*u
) {
1229 t
= pa_tagstruct_new(NULL
, 0);
1230 pa_tagstruct_putu32(t
, PA_COMMAND_SUBSCRIBE
);
1231 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1232 pa_tagstruct_putu32(t
, PA_SUBSCRIPTION_MASK_SERVER
|
1234 PA_SUBSCRIPTION_MASK_SINK_INPUT
|PA_SUBSCRIPTION_MASK_SINK
1236 PA_SUBSCRIPTION_MASK_SOURCE
1240 pa_pstream_send_tagstruct(u
->pstream
, t
);
1243 /* Called from main context */
1244 static void create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1245 struct userdata
*u
= userdata
;
1253 pa_assert(u
->pdispatch
== pd
);
1255 if (command
!= PA_COMMAND_REPLY
) {
1256 if (command
== PA_COMMAND_ERROR
)
1257 pa_log("Failed to create stream.");
1259 pa_log("Protocol error.");
1263 if (pa_tagstruct_getu32(t
, &u
->channel
) < 0 ||
1264 pa_tagstruct_getu32(t
, &u
->device_index
) < 0
1266 || pa_tagstruct_getu32(t
, &bytes
) < 0
1271 if (u
->version
>= 9) {
1273 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1274 pa_tagstruct_getu32(t
, &u
->tlength
) < 0 ||
1275 pa_tagstruct_getu32(t
, &u
->prebuf
) < 0 ||
1276 pa_tagstruct_getu32(t
, &u
->minreq
) < 0)
1279 if (pa_tagstruct_getu32(t
, &u
->maxlength
) < 0 ||
1280 pa_tagstruct_getu32(t
, &u
->fragsize
) < 0)
1285 if (u
->version
>= 12) {
1288 uint32_t device_index
;
1290 pa_bool_t suspended
;
1292 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1293 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1294 pa_tagstruct_getu32(t
, &device_index
) < 0 ||
1295 pa_tagstruct_gets(t
, &dn
) < 0 ||
1296 pa_tagstruct_get_boolean(t
, &suspended
) < 0)
1300 pa_xfree(u
->sink_name
);
1301 u
->sink_name
= pa_xstrdup(dn
);
1303 pa_xfree(u
->source_name
);
1304 u
->source_name
= pa_xstrdup(dn
);
1308 if (u
->version
>= 13) {
1311 if (pa_tagstruct_get_usec(t
, &usec
) < 0)
1315 pa_sink_set_latency_range(u
->sink
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1317 pa_source_set_latency_range(u
->source
, usec
+ MIN_NETWORK_LATENCY_USEC
, 0);
1321 if (!pa_tagstruct_eof(t
))
1327 pa_assert(!u
->time_event
);
1328 pa_gettimeofday(&ntv
);
1329 ntv
.tv_sec
+= LATENCY_INTERVAL
;
1330 u
->time_event
= u
->core
->mainloop
->time_new(u
->core
->mainloop
, &ntv
, timeout_callback
, u
);
1334 pa_log_debug("Stream created.");
1337 pa_asyncmsgq_post(u
->sink
->asyncmsgq
, PA_MSGOBJECT(u
->sink
), SINK_MESSAGE_REQUEST
, NULL
, bytes
, NULL
, NULL
);
1343 pa_log("Invalid reply. (Create stream)");
1346 pa_module_unload_request(u
->module
);
1350 /* Called from main context */
1351 static void setup_complete_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1352 struct userdata
*u
= userdata
;
1353 pa_tagstruct
*reply
;
1354 char name
[256], un
[128], hn
[128];
1361 pa_assert(u
->pdispatch
== pd
);
1363 if (command
!= PA_COMMAND_REPLY
||
1364 pa_tagstruct_getu32(t
, &u
->version
) < 0 ||
1365 !pa_tagstruct_eof(t
)) {
1367 if (command
== PA_COMMAND_ERROR
)
1368 pa_log("Failed to authenticate");
1370 pa_log("Protocol error.");
1375 /* Minimum supported protocol version */
1376 if (u
->version
< 8) {
1377 pa_log("Incompatible protocol version");
1381 /* Starting with protocol version 13 the MSB of the version tag
1382 reflects if shm is enabled for this connection or not. We don't
1383 support SHM here at all, so we just ignore this. */
1385 if (u
->version
>= 13)
1386 u
->version
&= 0x7FFFFFFFU
;
1388 pa_log_debug("Protocol version: remote %u, local %u", u
->version
, PA_PROTOCOL_VERSION
);
1391 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1393 pa_get_user_name(un
, sizeof(un
)),
1394 pa_get_host_name(hn
, sizeof(hn
)));
1396 pa_snprintf(name
, sizeof(name
), "%s for %s@%s",
1398 pa_get_user_name(un
, sizeof(un
)),
1399 pa_get_host_name(hn
, sizeof(hn
)));
1402 reply
= pa_tagstruct_new(NULL
, 0);
1403 pa_tagstruct_putu32(reply
, PA_COMMAND_SET_CLIENT_NAME
);
1404 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1406 if (u
->version
>= 13) {
1408 pl
= pa_proplist_new();
1409 pa_init_proplist(pl
);
1410 pa_proplist_sets(pl
, PA_PROP_APPLICATION_ID
, "org.PulseAudio.PulseAudio");
1411 pa_proplist_sets(pl
, PA_PROP_APPLICATION_VERSION
, PACKAGE_VERSION
);
1412 pa_tagstruct_put_proplist(reply
, pl
);
1413 pa_proplist_free(pl
);
1415 pa_tagstruct_puts(reply
, "PulseAudio");
1417 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1418 /* We ignore the server's reply here */
1420 reply
= pa_tagstruct_new(NULL
, 0);
1422 if (u
->version
< 13)
1423 /* Only for older PA versions we need to fill in the maxlength */
1424 u
->maxlength
= 4*1024*1024;
1427 u
->tlength
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_TLENGTH_MSEC
, &u
->sink
->sample_spec
);
1428 u
->minreq
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_MINREQ_MSEC
, &u
->sink
->sample_spec
);
1429 u
->prebuf
= u
->tlength
;
1431 u
->fragsize
= pa_usec_to_bytes(PA_USEC_PER_MSEC
* DEFAULT_FRAGSIZE_MSEC
, &u
->source
->sample_spec
);
1435 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_PLAYBACK_STREAM
);
1436 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1438 if (u
->version
< 13)
1439 pa_tagstruct_puts(reply
, name
);
1441 pa_tagstruct_put_sample_spec(reply
, &u
->sink
->sample_spec
);
1442 pa_tagstruct_put_channel_map(reply
, &u
->sink
->channel_map
);
1443 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1444 pa_tagstruct_puts(reply
, u
->sink_name
);
1445 pa_tagstruct_putu32(reply
, u
->maxlength
);
1446 pa_tagstruct_put_boolean(reply
, !PA_SINK_IS_OPENED(pa_sink_get_state(u
->sink
)));
1447 pa_tagstruct_putu32(reply
, u
->tlength
);
1448 pa_tagstruct_putu32(reply
, u
->prebuf
);
1449 pa_tagstruct_putu32(reply
, u
->minreq
);
1450 pa_tagstruct_putu32(reply
, 0);
1451 pa_cvolume_reset(&volume
, u
->sink
->sample_spec
.channels
);
1452 pa_tagstruct_put_cvolume(reply
, &volume
);
1454 pa_tagstruct_putu32(reply
, PA_COMMAND_CREATE_RECORD_STREAM
);
1455 pa_tagstruct_putu32(reply
, tag
= u
->ctag
++);
1457 if (u
->version
< 13)
1458 pa_tagstruct_puts(reply
, name
);
1460 pa_tagstruct_put_sample_spec(reply
, &u
->source
->sample_spec
);
1461 pa_tagstruct_put_channel_map(reply
, &u
->source
->channel_map
);
1462 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
);
1463 pa_tagstruct_puts(reply
, u
->source_name
);
1464 pa_tagstruct_putu32(reply
, u
->maxlength
);
1465 pa_tagstruct_put_boolean(reply
, !PA_SOURCE_IS_OPENED(pa_source_get_state(u
->source
)));
1466 pa_tagstruct_putu32(reply
, u
->fragsize
);
1469 if (u
->version
>= 12) {
1470 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remap */
1471 pa_tagstruct_put_boolean(reply
, FALSE
); /* no_remix */
1472 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_format */
1473 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_rate */
1474 pa_tagstruct_put_boolean(reply
, FALSE
); /* fix_channels */
1475 pa_tagstruct_put_boolean(reply
, TRUE
); /* no_move */
1476 pa_tagstruct_put_boolean(reply
, FALSE
); /* variable_rate */
1479 if (u
->version
>= 13) {
1482 pa_tagstruct_put_boolean(reply
, FALSE
); /* start muted/peak detect*/
1483 pa_tagstruct_put_boolean(reply
, TRUE
); /* adjust_latency */
1485 pl
= pa_proplist_new();
1486 pa_proplist_sets(pl
, PA_PROP_MEDIA_NAME
, name
);
1487 pa_proplist_sets(pl
, PA_PROP_MEDIA_ROLE
, "abstract");
1488 pa_tagstruct_put_proplist(reply
, pl
);
1489 pa_proplist_free(pl
);
1492 pa_tagstruct_putu32(reply
, PA_INVALID_INDEX
); /* direct on input */
1496 pa_pstream_send_tagstruct(u
->pstream
, reply
);
1497 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, create_stream_callback
, u
, NULL
);
1499 pa_log_debug("Connection authenticated, creating stream ...");
1504 pa_module_unload_request(u
->module
);
1507 /* Called from main context */
1508 static void pstream_die_callback(pa_pstream
*p
, void *userdata
) {
1509 struct userdata
*u
= userdata
;
1514 pa_log_warn("Stream died.");
1515 pa_module_unload_request(u
->module
);
1518 /* Called from main context */
1519 static void pstream_packet_callback(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
, void *userdata
) {
1520 struct userdata
*u
= userdata
;
1526 if (pa_pdispatch_run(u
->pdispatch
, packet
, creds
, u
) < 0) {
1527 pa_log("Invalid packet");
1528 pa_module_unload_request(u
->module
);
1534 /* Called from main context */
1535 static void pstream_memblock_callback(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek
, const pa_memchunk
*chunk
, void *userdata
) {
1536 struct userdata
*u
= userdata
;
1542 if (channel
!= u
->channel
) {
1543 pa_log("Recieved memory block on bad channel.");
1544 pa_module_unload_request(u
->module
);
1548 pa_asyncmsgq_send(u
->source
->asyncmsgq
, PA_MSGOBJECT(u
->source
), SOURCE_MESSAGE_POST
, PA_UINT_TO_PTR(seek
), offset
, chunk
);
1550 u
->counter_delta
+= chunk
->length
;
1555 /* Called from main context */
1556 static void on_connection(pa_socket_client
*sc
, pa_iochannel
*io
, void *userdata
) {
1557 struct userdata
*u
= userdata
;
1563 pa_assert(u
->client
== sc
);
1565 pa_socket_client_unref(u
->client
);
1569 pa_log("Connection failed: %s", pa_cstrerror(errno
));
1570 pa_module_unload_request(u
->module
);
1574 u
->pstream
= pa_pstream_new(u
->core
->mainloop
, io
, u
->core
->mempool
);
1575 u
->pdispatch
= pa_pdispatch_new(u
->core
->mainloop
, command_table
, PA_COMMAND_MAX
);
1577 pa_pstream_set_die_callback(u
->pstream
, pstream_die_callback
, u
);
1578 pa_pstream_set_recieve_packet_callback(u
->pstream
, pstream_packet_callback
, u
);
1580 pa_pstream_set_recieve_memblock_callback(u
->pstream
, pstream_memblock_callback
, u
);
1583 t
= pa_tagstruct_new(NULL
, 0);
1584 pa_tagstruct_putu32(t
, PA_COMMAND_AUTH
);
1585 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1586 pa_tagstruct_putu32(t
, PA_PROTOCOL_VERSION
);
1587 pa_tagstruct_put_arbitrary(t
, u
->auth_cookie
, sizeof(u
->auth_cookie
));
1593 if (pa_iochannel_creds_supported(io
))
1594 pa_iochannel_creds_enable(io
);
1596 ucred
.uid
= getuid();
1597 ucred
.gid
= getgid();
1599 pa_pstream_send_tagstruct_with_creds(u
->pstream
, t
, &ucred
);
1602 pa_pstream_send_tagstruct(u
->pstream
, t
);
1605 pa_pdispatch_register_reply(u
->pdispatch
, tag
, DEFAULT_TIMEOUT
, setup_complete_callback
, u
, NULL
);
1607 pa_log_debug("Connection established, authenticating ...");
1612 /* Called from main context */
1613 static int sink_set_volume(pa_sink
*sink
) {
1622 t
= pa_tagstruct_new(NULL
, 0);
1623 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_VOLUME
);
1624 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1625 pa_tagstruct_putu32(t
, u
->device_index
);
1626 pa_tagstruct_put_cvolume(t
, &sink
->volume
);
1627 pa_pstream_send_tagstruct(u
->pstream
, t
);
1632 /* Called from main context */
1633 static int sink_set_mute(pa_sink
*sink
) {
1642 if (u
->version
< 11)
1645 t
= pa_tagstruct_new(NULL
, 0);
1646 pa_tagstruct_putu32(t
, PA_COMMAND_SET_SINK_INPUT_MUTE
);
1647 pa_tagstruct_putu32(t
, tag
= u
->ctag
++);
1648 pa_tagstruct_putu32(t
, u
->device_index
);
1649 pa_tagstruct_put_boolean(t
, !!sink
->muted
);
1650 pa_pstream_send_tagstruct(u
->pstream
, t
);
1657 /* Called from main context */
1658 static int load_key(struct userdata
*u
, const char*fn
) {
1661 u
->auth_cookie_in_property
= FALSE
;
1663 if (!fn
&& pa_authkey_prop_get(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0) {
1664 pa_log_debug("Using already loaded auth cookie.");
1665 pa_authkey_prop_ref(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1666 u
->auth_cookie_in_property
= 1;
1671 fn
= PA_NATIVE_COOKIE_FILE
;
1673 if (pa_authkey_load_auto(fn
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) < 0)
1676 pa_log_debug("Loading cookie from disk.");
1678 if (pa_authkey_prop_put(u
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
, u
->auth_cookie
, sizeof(u
->auth_cookie
)) >= 0)
1679 u
->auth_cookie_in_property
= TRUE
;
1684 int pa__init(pa_module
*m
) {
1685 pa_modargs
*ma
= NULL
;
1686 struct userdata
*u
= NULL
;
1691 pa_sink_new_data data
;
1693 pa_source_new_data data
;
1698 if (!(ma
= pa_modargs_new(m
->argument
, valid_modargs
))) {
1699 pa_log("Failed to parse module arguments");
1703 m
->userdata
= u
= pa_xnew0(struct userdata
, 1);
1707 u
->pdispatch
= NULL
;
1709 u
->server_name
= NULL
;
1711 u
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));;
1713 u
->requested_bytes
= 0;
1715 u
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));;
1718 u
->smoother
= pa_smoother_new(PA_USEC_PER_SEC
, PA_USEC_PER_SEC
*2, TRUE
, 10);
1720 u
->device_index
= u
->channel
= PA_INVALID_INDEX
;
1721 u
->auth_cookie_in_property
= FALSE
;
1722 u
->time_event
= NULL
;
1723 u
->ignore_latency_before
= 0;
1724 u
->transport_usec
= 0;
1725 u
->transport_usec_valid
= FALSE
;
1726 u
->remote_suspended
= u
->remote_corked
= FALSE
;
1727 u
->counter
= u
->counter_delta
= 0;
1729 u
->rtpoll
= pa_rtpoll_new();
1730 pa_thread_mq_init(&u
->thread_mq
, m
->core
->mainloop
, u
->rtpoll
);
1732 if (load_key(u
, pa_modargs_get_value(ma
, "cookie", NULL
)) < 0)
1735 if (!(u
->server_name
= pa_xstrdup(pa_modargs_get_value(ma
, "server", NULL
)))) {
1736 pa_log("No server specified.");
1740 ss
= m
->core
->default_sample_spec
;
1741 if (pa_modargs_get_sample_spec_and_channel_map(ma
, &ss
, &map
, PA_CHANNEL_MAP_DEFAULT
) < 0) {
1742 pa_log("Invalid sample format specification");
1746 if (!(u
->client
= pa_socket_client_new_string(m
->core
->mainloop
, u
->server_name
, PA_NATIVE_DEFAULT_PORT
))) {
1747 pa_log("Failed to connect to server '%s'", u
->server_name
);
1751 pa_socket_client_set_callback(u
->client
, on_connection
, u
);
1755 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "sink_name", NULL
))))
1756 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1758 pa_sink_new_data_init(&data
);
1759 data
.driver
= __FILE__
;
1761 data
.namereg_fail
= TRUE
;
1762 pa_sink_new_data_set_name(&data
, dn
);
1763 pa_sink_new_data_set_sample_spec(&data
, &ss
);
1764 pa_sink_new_data_set_channel_map(&data
, &map
);
1765 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->sink_name
), u
->sink_name
? " on " : "", u
->server_name
);
1766 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1768 pa_proplist_sets(data
.proplist
, "tunnel.remote.sink", u
->sink_name
);
1770 u
->sink
= pa_sink_new(m
->core
, &data
, PA_SINK_NETWORK
|PA_SINK_LATENCY
|PA_SINK_HW_VOLUME_CTRL
);
1771 pa_sink_new_data_done(&data
);
1774 pa_log("Failed to create sink.");
1778 u
->sink
->parent
.process_msg
= sink_process_msg
;
1779 u
->sink
->userdata
= u
;
1780 u
->sink
->set_state
= sink_set_state
;
1781 u
->sink
->set_volume
= sink_set_volume
;
1782 u
->sink
->set_mute
= sink_set_mute
;
1784 u
->sink
->refresh_volume
= u
->sink
->refresh_mute
= FALSE
;
1786 pa_sink_set_latency_range(u
->sink
, MIN_NETWORK_LATENCY_USEC
, 0);
1788 pa_sink_set_asyncmsgq(u
->sink
, u
->thread_mq
.inq
);
1789 pa_sink_set_rtpoll(u
->sink
, u
->rtpoll
);
1793 if (!(dn
= pa_xstrdup(pa_modargs_get_value(ma
, "source_name", NULL
))))
1794 dn
= pa_sprintf_malloc("tunnel.%s", u
->server_name
);
1796 pa_source_new_data_init(&data
);
1797 data
.driver
= __FILE__
;
1799 data
.namereg_fail
= TRUE
;
1800 pa_source_new_data_set_name(&data
, dn
);
1801 pa_source_new_data_set_sample_spec(&data
, &ss
);
1802 pa_source_new_data_set_channel_map(&data
, &map
);
1803 pa_proplist_setf(data
.proplist
, PA_PROP_DEVICE_DESCRIPTION
, "%s%s%s", pa_strempty(u
->source_name
), u
->source_name
? " on " : "", u
->server_name
);
1804 pa_proplist_sets(data
.proplist
, "tunnel.remote.server", u
->server_name
);
1806 pa_proplist_sets(data
.proplist
, "tunnel.remote.source", u
->source_name
);
1808 u
->source
= pa_source_new(m
->core
, &data
, PA_SOURCE_NETWORK
|PA_SOURCE_LATENCY
);
1809 pa_source_new_data_done(&data
);
1812 pa_log("Failed to create source.");
1816 u
->source
->parent
.process_msg
= source_process_msg
;
1817 u
->source
->userdata
= u
;
1818 u
->source
->set_state
= source_set_state
;
1820 pa_source_set_latency_range(u
->source
, MIN_NETWORK_LATENCY_USEC
, 0);
1822 pa_source_set_asyncmsgq(u
->source
, u
->thread_mq
.inq
);
1823 pa_source_set_rtpoll(u
->source
, u
->rtpoll
);
1828 u
->time_event
= NULL
;
1832 u
->tlength
= u
->minreq
= u
->prebuf
= 0;
1837 pa_smoother_set_time_offset(u
->smoother
, pa_rtclock_usec());
1839 if (!(u
->thread
= pa_thread_new(thread_func
, u
))) {
1840 pa_log("Failed to create thread.");
1845 pa_sink_put(u
->sink
);
1847 pa_source_put(u
->source
);
1850 pa_modargs_free(ma
);
1858 pa_modargs_free(ma
);
1865 void pa__done(pa_module
*m
) {
1870 if (!(u
= m
->userdata
))
1875 pa_sink_unlink(u
->sink
);
1878 pa_source_unlink(u
->source
);
1882 pa_asyncmsgq_send(u
->thread_mq
.inq
, NULL
, PA_MESSAGE_SHUTDOWN
, NULL
, 0, NULL
);
1883 pa_thread_free(u
->thread
);
1886 pa_thread_mq_done(&u
->thread_mq
);
1890 pa_sink_unref(u
->sink
);
1893 pa_source_unref(u
->source
);
1897 pa_rtpoll_free(u
->rtpoll
);
1900 pa_pstream_unlink(u
->pstream
);
1901 pa_pstream_unref(u
->pstream
);
1905 pa_pdispatch_unref(u
->pdispatch
);
1908 pa_socket_client_unref(u
->client
);
1910 if (u
->auth_cookie_in_property
)
1911 pa_authkey_prop_unref(m
->core
, PA_NATIVE_COOKIE_PROPERTY_NAME
);
1914 pa_smoother_free(u
->smoother
);
1917 u
->core
->mainloop
->time_free(u
->time_event
);
1920 pa_xfree(u
->sink_name
);
1922 pa_xfree(u
->source_name
);
1924 pa_xfree(u
->server_name
);
1926 pa_xfree(u
->device_description
);
1927 pa_xfree(u
->server_fqdn
);
1928 pa_xfree(u
->user_name
);