2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
47 /* #define STREAM_DEBUG */
49 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
50 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
56 pa_stream
*pa_stream_new(pa_context
*c
, const char *name
, const pa_sample_spec
*ss
, const pa_channel_map
*map
) {
57 return pa_stream_new_with_proplist(c
, name
, ss
, map
, NULL
);
60 static void reset_callbacks(pa_stream
*s
) {
61 s
->read_callback
= NULL
;
62 s
->read_userdata
= NULL
;
63 s
->write_callback
= NULL
;
64 s
->write_userdata
= NULL
;
65 s
->state_callback
= NULL
;
66 s
->state_userdata
= NULL
;
67 s
->overflow_callback
= NULL
;
68 s
->overflow_userdata
= NULL
;
69 s
->underflow_callback
= NULL
;
70 s
->underflow_userdata
= NULL
;
71 s
->latency_update_callback
= NULL
;
72 s
->latency_update_userdata
= NULL
;
73 s
->moved_callback
= NULL
;
74 s
->moved_userdata
= NULL
;
75 s
->suspended_callback
= NULL
;
76 s
->suspended_userdata
= NULL
;
77 s
->started_callback
= NULL
;
78 s
->started_userdata
= NULL
;
79 s
->event_callback
= NULL
;
80 s
->event_userdata
= NULL
;
81 s
->buffer_attr_callback
= NULL
;
82 s
->buffer_attr_userdata
= NULL
;
85 static pa_stream
*pa_stream_new_with_proplist_internal(
88 const pa_sample_spec
*ss
,
89 const pa_channel_map
*map
,
90 pa_format_info
* const *formats
,
91 unsigned int n_formats
,
98 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
99 pa_assert((ss
== NULL
&& map
== NULL
) || (formats
== NULL
&& n_formats
== 0));
100 pa_assert(n_formats
< PA_MAX_FORMATS
);
102 PA_CHECK_VALIDITY_RETURN_NULL(c
, !pa_detect_fork(), PA_ERR_FORKED
);
103 PA_CHECK_VALIDITY_RETURN_NULL(c
, name
|| (p
&& pa_proplist_contains(p
, PA_PROP_MEDIA_NAME
)), PA_ERR_INVALID
);
105 s
= pa_xnew(pa_stream
, 1);
108 s
->mainloop
= c
->mainloop
;
110 s
->direction
= PA_STREAM_NODIRECTION
;
111 s
->state
= PA_STREAM_UNCONNECTED
;
115 s
->sample_spec
= *ss
;
117 pa_sample_spec_init(&s
->sample_spec
);
120 s
->channel_map
= *map
;
122 pa_channel_map_init(&s
->channel_map
);
126 s
->n_formats
= n_formats
;
127 for (i
= 0; i
< n_formats
; i
++)
128 s
->req_formats
[i
] = pa_format_info_copy(formats
[i
]);
131 /* We'll get the final negotiated format after connecting */
134 s
->direct_on_input
= PA_INVALID_INDEX
;
136 s
->proplist
= p
? pa_proplist_copy(p
) : pa_proplist_new();
138 pa_proplist_sets(s
->proplist
, PA_PROP_MEDIA_NAME
, name
);
141 s
->channel_valid
= false;
142 s
->syncid
= c
->csyncid
++;
143 s
->stream_index
= PA_INVALID_INDEX
;
145 s
->requested_bytes
= 0;
146 memset(&s
->buffer_attr
, 0, sizeof(s
->buffer_attr
));
148 /* We initialize the target length here, so that if the user
149 * passes no explicit buffering metrics the default is similar to
150 * what older PA versions provided. */
152 s
->buffer_attr
.maxlength
= (uint32_t) -1;
154 s
->buffer_attr
.tlength
= (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC
, ss
); /* 250ms of buffering */
156 /* FIXME: We assume a worst-case compressed format corresponding to
157 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
158 pa_sample_spec tmp_ss
= {
159 .format
= PA_SAMPLE_S16NE
,
163 s
->buffer_attr
.tlength
= (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC
, &tmp_ss
); /* 250ms of buffering */
165 s
->buffer_attr
.minreq
= (uint32_t) -1;
166 s
->buffer_attr
.prebuf
= (uint32_t) -1;
167 s
->buffer_attr
.fragsize
= (uint32_t) -1;
169 s
->device_index
= PA_INVALID_INDEX
;
170 s
->device_name
= NULL
;
171 s
->suspended
= false;
174 s
->write_memblock
= NULL
;
175 s
->write_data
= NULL
;
177 pa_memchunk_reset(&s
->peek_memchunk
);
179 s
->record_memblockq
= NULL
;
181 memset(&s
->timing_info
, 0, sizeof(s
->timing_info
));
182 s
->timing_info_valid
= false;
184 s
->previous_time
= 0;
185 s
->latest_underrun_at_index
= -1;
187 s
->read_index_not_before
= 0;
188 s
->write_index_not_before
= 0;
189 for (i
= 0; i
< PA_MAX_WRITE_INDEX_CORRECTIONS
; i
++)
190 s
->write_index_corrections
[i
].valid
= 0;
191 s
->current_write_index_correction
= 0;
193 s
->auto_timing_update_event
= NULL
;
194 s
->auto_timing_update_requested
= false;
195 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
201 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
202 PA_LLIST_PREPEND(pa_stream
, c
->streams
, s
);
208 pa_stream
*pa_stream_new_with_proplist(
211 const pa_sample_spec
*ss
,
212 const pa_channel_map
*map
,
217 PA_CHECK_VALIDITY_RETURN_NULL(c
, ss
&& pa_sample_spec_valid(ss
), PA_ERR_INVALID
);
218 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 12 || (ss
->format
!= PA_SAMPLE_S32LE
&& ss
->format
!= PA_SAMPLE_S32BE
), PA_ERR_NOTSUPPORTED
);
219 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 15 || (ss
->format
!= PA_SAMPLE_S24LE
&& ss
->format
!= PA_SAMPLE_S24BE
), PA_ERR_NOTSUPPORTED
);
220 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 15 || (ss
->format
!= PA_SAMPLE_S24_32LE
&& ss
->format
!= PA_SAMPLE_S24_32BE
), PA_ERR_NOTSUPPORTED
);
221 PA_CHECK_VALIDITY_RETURN_NULL(c
, !map
|| (pa_channel_map_valid(map
) && map
->channels
== ss
->channels
), PA_ERR_INVALID
);
224 PA_CHECK_VALIDITY_RETURN_NULL(c
, map
= pa_channel_map_init_auto(&tmap
, ss
->channels
, PA_CHANNEL_MAP_DEFAULT
), PA_ERR_INVALID
);
226 return pa_stream_new_with_proplist_internal(c
, name
, ss
, map
, NULL
, 0, p
);
229 pa_stream
*pa_stream_new_extended(
232 pa_format_info
* const *formats
,
233 unsigned int n_formats
,
236 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 21, PA_ERR_NOTSUPPORTED
);
238 return pa_stream_new_with_proplist_internal(c
, name
, NULL
, NULL
, formats
, n_formats
, p
);
241 static void stream_unlink(pa_stream
*s
) {
248 /* Detach from context */
250 /* Unref all operation objects that point to us */
251 for (o
= s
->context
->operations
; o
; o
= n
) {
255 pa_operation_cancel(o
);
258 /* Drop all outstanding replies for this stream */
259 if (s
->context
->pdispatch
)
260 pa_pdispatch_unregister_reply(s
->context
->pdispatch
, s
);
262 if (s
->channel_valid
) {
263 pa_hashmap_remove((s
->direction
== PA_STREAM_RECORD
) ? s
->context
->record_streams
: s
->context
->playback_streams
, PA_UINT32_TO_PTR(s
->channel
));
265 s
->channel_valid
= false;
268 PA_LLIST_REMOVE(pa_stream
, s
->context
->streams
, s
);
273 if (s
->auto_timing_update_event
) {
274 pa_assert(s
->mainloop
);
275 s
->mainloop
->time_free(s
->auto_timing_update_event
);
281 static void stream_free(pa_stream
*s
) {
288 if (s
->write_memblock
) {
290 pa_memblock_release(s
->write_memblock
);
291 pa_memblock_unref(s
->write_memblock
);
294 if (s
->peek_memchunk
.memblock
) {
296 pa_memblock_release(s
->peek_memchunk
.memblock
);
297 pa_memblock_unref(s
->peek_memchunk
.memblock
);
300 if (s
->record_memblockq
)
301 pa_memblockq_free(s
->record_memblockq
);
304 pa_proplist_free(s
->proplist
);
307 pa_smoother_free(s
->smoother
);
309 for (i
= 0; i
< s
->n_formats
; i
++)
310 pa_format_info_free(s
->req_formats
[i
]);
313 pa_format_info_free(s
->format
);
315 pa_xfree(s
->device_name
);
319 void pa_stream_unref(pa_stream
*s
) {
321 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
323 if (PA_REFCNT_DEC(s
) <= 0)
327 pa_stream
* pa_stream_ref(pa_stream
*s
) {
329 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
335 pa_stream_state_t
pa_stream_get_state(pa_stream
*s
) {
337 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
342 pa_context
* pa_stream_get_context(pa_stream
*s
) {
344 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
349 uint32_t pa_stream_get_index(pa_stream
*s
) {
351 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
353 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, PA_INVALID_INDEX
);
354 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
356 return s
->stream_index
;
359 void pa_stream_set_state(pa_stream
*s
, pa_stream_state_t st
) {
361 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
370 if (s
->state_callback
)
371 s
->state_callback(s
, s
->state_userdata
);
373 if ((st
== PA_STREAM_FAILED
|| st
== PA_STREAM_TERMINATED
))
379 static void request_auto_timing_update(pa_stream
*s
, bool force
) {
381 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
383 if (!(s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
))
386 if (s
->state
== PA_STREAM_READY
&&
387 (force
|| !s
->auto_timing_update_requested
)) {
391 pa_log_debug("Automatically requesting new timing data");
394 if ((o
= pa_stream_update_timing_info(s
, NULL
, NULL
))) {
395 pa_operation_unref(o
);
396 s
->auto_timing_update_requested
= true;
400 if (s
->auto_timing_update_event
) {
401 if (s
->suspended
&& !force
) {
402 pa_assert(s
->mainloop
);
403 s
->mainloop
->time_free(s
->auto_timing_update_event
);
404 s
->auto_timing_update_event
= NULL
;
407 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
409 pa_context_rttime_restart(s
->context
, s
->auto_timing_update_event
, pa_rtclock_now() + s
->auto_timing_interval_usec
);
411 s
->auto_timing_interval_usec
= PA_MIN(AUTO_TIMING_INTERVAL_END_USEC
, s
->auto_timing_interval_usec
*2);
416 void pa_command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
417 pa_context
*c
= userdata
;
422 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_KILLED
|| command
== PA_COMMAND_RECORD_STREAM_KILLED
);
425 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
429 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
430 !pa_tagstruct_eof(t
)) {
431 pa_context_fail(c
, PA_ERR_PROTOCOL
);
435 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_KILLED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
438 if (s
->state
!= PA_STREAM_READY
)
441 pa_context_set_error(c
, PA_ERR_KILLED
);
442 pa_stream_set_state(s
, PA_STREAM_FAILED
);
448 static void check_smoother_status(pa_stream
*s
, bool aposteriori
, bool force_start
, bool force_stop
) {
452 pa_assert(!force_start
|| !force_stop
);
457 x
= pa_rtclock_now();
459 if (s
->timing_info_valid
) {
461 x
-= s
->timing_info
.transport_usec
;
463 x
+= s
->timing_info
.transport_usec
;
466 if (s
->suspended
|| s
->corked
|| force_stop
)
467 pa_smoother_pause(s
->smoother
, x
);
468 else if (force_start
|| s
->buffer_attr
.prebuf
== 0) {
470 if (!s
->timing_info_valid
&&
474 s
->context
->version
>= 13) {
476 /* If the server supports STARTED events we take them as
477 * indications when audio really starts/stops playing, if
478 * we don't have any timing info yet -- instead of trying
479 * to be smart and guessing the server time. Otherwise the
480 * unknown transport delay adds too much noise to our time
486 pa_smoother_resume(s
->smoother
, x
, true);
489 /* Please note that we have no idea if playback actually started
490 * if prebuf is non-zero! */
493 static void auto_timing_update_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
);
495 void pa_command_stream_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
496 pa_context
*c
= userdata
;
503 uint32_t maxlength
= 0, fragsize
= 0, minreq
= 0, tlength
= 0, prebuf
= 0;
506 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_MOVED
|| command
== PA_COMMAND_RECORD_STREAM_MOVED
);
509 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
513 if (c
->version
< 12) {
514 pa_context_fail(c
, PA_ERR_PROTOCOL
);
518 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
519 pa_tagstruct_getu32(t
, &di
) < 0 ||
520 pa_tagstruct_gets(t
, &dn
) < 0 ||
521 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
522 pa_context_fail(c
, PA_ERR_PROTOCOL
);
526 if (c
->version
>= 13) {
528 if (command
== PA_COMMAND_RECORD_STREAM_MOVED
) {
529 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
530 pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
531 pa_tagstruct_get_usec(t
, &usec
) < 0) {
532 pa_context_fail(c
, PA_ERR_PROTOCOL
);
536 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
537 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
538 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
539 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
540 pa_tagstruct_get_usec(t
, &usec
) < 0) {
541 pa_context_fail(c
, PA_ERR_PROTOCOL
);
547 if (!pa_tagstruct_eof(t
)) {
548 pa_context_fail(c
, PA_ERR_PROTOCOL
);
552 if (!dn
|| di
== PA_INVALID_INDEX
) {
553 pa_context_fail(c
, PA_ERR_PROTOCOL
);
557 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_MOVED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
560 if (s
->state
!= PA_STREAM_READY
)
563 if (c
->version
>= 13) {
564 if (s
->direction
== PA_STREAM_RECORD
)
565 s
->timing_info
.configured_source_usec
= usec
;
567 s
->timing_info
.configured_sink_usec
= usec
;
569 s
->buffer_attr
.maxlength
= maxlength
;
570 s
->buffer_attr
.fragsize
= fragsize
;
571 s
->buffer_attr
.tlength
= tlength
;
572 s
->buffer_attr
.prebuf
= prebuf
;
573 s
->buffer_attr
.minreq
= minreq
;
576 pa_xfree(s
->device_name
);
577 s
->device_name
= pa_xstrdup(dn
);
578 s
->device_index
= di
;
580 s
->suspended
= suspended
;
582 if ((s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
) && !suspended
&& !s
->auto_timing_update_event
) {
583 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
584 s
->auto_timing_update_event
= pa_context_rttime_new(s
->context
, pa_rtclock_now() + s
->auto_timing_interval_usec
, &auto_timing_update_callback
, s
);
585 request_auto_timing_update(s
, true);
588 check_smoother_status(s
, true, false, false);
589 request_auto_timing_update(s
, true);
591 if (s
->moved_callback
)
592 s
->moved_callback(s
, s
->moved_userdata
);
598 void pa_command_stream_buffer_attr(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
599 pa_context
*c
= userdata
;
603 uint32_t maxlength
= 0, fragsize
= 0, minreq
= 0, tlength
= 0, prebuf
= 0;
606 pa_assert(command
== PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
|| command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
);
609 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
613 if (c
->version
< 15) {
614 pa_context_fail(c
, PA_ERR_PROTOCOL
);
618 if (pa_tagstruct_getu32(t
, &channel
) < 0) {
619 pa_context_fail(c
, PA_ERR_PROTOCOL
);
623 if (command
== PA_COMMAND_RECORD_STREAM_MOVED
) {
624 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
625 pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
626 pa_tagstruct_get_usec(t
, &usec
) < 0) {
627 pa_context_fail(c
, PA_ERR_PROTOCOL
);
631 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
632 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
633 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
634 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
635 pa_tagstruct_get_usec(t
, &usec
) < 0) {
636 pa_context_fail(c
, PA_ERR_PROTOCOL
);
641 if (!pa_tagstruct_eof(t
)) {
642 pa_context_fail(c
, PA_ERR_PROTOCOL
);
646 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
649 if (s
->state
!= PA_STREAM_READY
)
652 if (s
->direction
== PA_STREAM_RECORD
)
653 s
->timing_info
.configured_source_usec
= usec
;
655 s
->timing_info
.configured_sink_usec
= usec
;
657 s
->buffer_attr
.maxlength
= maxlength
;
658 s
->buffer_attr
.fragsize
= fragsize
;
659 s
->buffer_attr
.tlength
= tlength
;
660 s
->buffer_attr
.prebuf
= prebuf
;
661 s
->buffer_attr
.minreq
= minreq
;
663 request_auto_timing_update(s
, true);
665 if (s
->buffer_attr_callback
)
666 s
->buffer_attr_callback(s
, s
->buffer_attr_userdata
);
672 void pa_command_stream_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
673 pa_context
*c
= userdata
;
679 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
|| command
== PA_COMMAND_RECORD_STREAM_SUSPENDED
);
682 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
686 if (c
->version
< 12) {
687 pa_context_fail(c
, PA_ERR_PROTOCOL
);
691 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
692 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
693 !pa_tagstruct_eof(t
)) {
694 pa_context_fail(c
, PA_ERR_PROTOCOL
);
698 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
701 if (s
->state
!= PA_STREAM_READY
)
704 s
->suspended
= suspended
;
706 if ((s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
) && !suspended
&& !s
->auto_timing_update_event
) {
707 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
708 s
->auto_timing_update_event
= pa_context_rttime_new(s
->context
, pa_rtclock_now() + s
->auto_timing_interval_usec
, &auto_timing_update_callback
, s
);
709 request_auto_timing_update(s
, true);
712 check_smoother_status(s
, true, false, false);
713 request_auto_timing_update(s
, true);
715 if (s
->suspended_callback
)
716 s
->suspended_callback(s
, s
->suspended_userdata
);
722 void pa_command_stream_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
723 pa_context
*c
= userdata
;
728 pa_assert(command
== PA_COMMAND_STARTED
);
731 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
735 if (c
->version
< 13) {
736 pa_context_fail(c
, PA_ERR_PROTOCOL
);
740 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
741 !pa_tagstruct_eof(t
)) {
742 pa_context_fail(c
, PA_ERR_PROTOCOL
);
746 if (!(s
= pa_hashmap_get(c
->playback_streams
, PA_UINT32_TO_PTR(channel
))))
749 if (s
->state
!= PA_STREAM_READY
)
752 check_smoother_status(s
, true, true, false);
753 request_auto_timing_update(s
, true);
755 if (s
->started_callback
)
756 s
->started_callback(s
, s
->started_userdata
);
762 void pa_command_stream_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
763 pa_context
*c
= userdata
;
766 pa_proplist
*pl
= NULL
;
770 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_EVENT
|| command
== PA_COMMAND_RECORD_STREAM_EVENT
);
773 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
777 if (c
->version
< 15) {
778 pa_context_fail(c
, PA_ERR_PROTOCOL
);
782 pl
= pa_proplist_new();
784 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
785 pa_tagstruct_gets(t
, &event
) < 0 ||
786 pa_tagstruct_get_proplist(t
, pl
) < 0 ||
787 !pa_tagstruct_eof(t
) || !event
) {
788 pa_context_fail(c
, PA_ERR_PROTOCOL
);
792 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_EVENT
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
795 if (s
->state
!= PA_STREAM_READY
)
798 if (pa_streq(event
, PA_STREAM_EVENT_FORMAT_LOST
)) {
799 /* Let client know what the running time was when the stream had to be killed */
800 pa_usec_t stream_time
;
801 if (pa_stream_get_time(s
, &stream_time
) == 0)
802 pa_proplist_setf(pl
, "stream-time", "%llu", (unsigned long long) stream_time
);
805 if (s
->event_callback
)
806 s
->event_callback(s
, event
, pl
, s
->event_userdata
);
812 pa_proplist_free(pl
);
815 void pa_command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
817 pa_context
*c
= userdata
;
818 uint32_t bytes
, channel
;
821 pa_assert(command
== PA_COMMAND_REQUEST
);
824 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
828 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
829 pa_tagstruct_getu32(t
, &bytes
) < 0 ||
830 !pa_tagstruct_eof(t
)) {
831 pa_context_fail(c
, PA_ERR_PROTOCOL
);
835 if (!(s
= pa_hashmap_get(c
->playback_streams
, PA_UINT32_TO_PTR(channel
))))
838 if (s
->state
!= PA_STREAM_READY
)
841 s
->requested_bytes
+= bytes
;
844 pa_log_debug("got request for %lli, now at %lli", (long long) bytes
, (long long) s
->requested_bytes
);
847 if (s
->requested_bytes
> 0 && s
->write_callback
)
848 s
->write_callback(s
, (size_t) s
->requested_bytes
, s
->write_userdata
);
854 int64_t pa_stream_get_underflow_index(pa_stream
*p
) {
856 return p
->latest_underrun_at_index
;
859 void pa_command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
861 pa_context
*c
= userdata
;
866 pa_assert(command
== PA_COMMAND_OVERFLOW
|| command
== PA_COMMAND_UNDERFLOW
);
869 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
873 if (pa_tagstruct_getu32(t
, &channel
) < 0) {
874 pa_context_fail(c
, PA_ERR_PROTOCOL
);
878 if (c
->version
>= 23 && command
== PA_COMMAND_UNDERFLOW
) {
879 if (pa_tagstruct_gets64(t
, &offset
) < 0) {
880 pa_context_fail(c
, PA_ERR_PROTOCOL
);
885 if (!pa_tagstruct_eof(t
)) {
886 pa_context_fail(c
, PA_ERR_PROTOCOL
);
890 if (!(s
= pa_hashmap_get(c
->playback_streams
, PA_UINT32_TO_PTR(channel
))))
893 if (s
->state
!= PA_STREAM_READY
)
897 s
->latest_underrun_at_index
= offset
;
899 if (s
->buffer_attr
.prebuf
> 0)
900 check_smoother_status(s
, true, false, true);
902 request_auto_timing_update(s
, true);
904 if (command
== PA_COMMAND_OVERFLOW
) {
905 if (s
->overflow_callback
)
906 s
->overflow_callback(s
, s
->overflow_userdata
);
907 } else if (command
== PA_COMMAND_UNDERFLOW
) {
908 if (s
->underflow_callback
)
909 s
->underflow_callback(s
, s
->underflow_userdata
);
916 static void invalidate_indexes(pa_stream
*s
, bool r
, bool w
) {
918 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
921 pa_log_debug("invalidate r:%u w:%u tag:%u", r
, w
, s
->context
->ctag
);
924 if (s
->state
!= PA_STREAM_READY
)
928 s
->write_index_not_before
= s
->context
->ctag
;
930 if (s
->timing_info_valid
)
931 s
->timing_info
.write_index_corrupt
= true;
934 pa_log_debug("write_index invalidated");
939 s
->read_index_not_before
= s
->context
->ctag
;
941 if (s
->timing_info_valid
)
942 s
->timing_info
.read_index_corrupt
= true;
945 pa_log_debug("read_index invalidated");
949 request_auto_timing_update(s
, true);
952 static void auto_timing_update_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
953 pa_stream
*s
= userdata
;
956 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
959 request_auto_timing_update(s
, false);
963 static void create_stream_complete(pa_stream
*s
) {
965 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
966 pa_assert(s
->state
== PA_STREAM_CREATING
);
968 pa_stream_set_state(s
, PA_STREAM_READY
);
970 if (s
->requested_bytes
> 0 && s
->write_callback
)
971 s
->write_callback(s
, (size_t) s
->requested_bytes
, s
->write_userdata
);
973 if (s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
) {
974 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
975 pa_assert(!s
->auto_timing_update_event
);
976 s
->auto_timing_update_event
= pa_context_rttime_new(s
->context
, pa_rtclock_now() + s
->auto_timing_interval_usec
, &auto_timing_update_callback
, s
);
978 request_auto_timing_update(s
, true);
981 check_smoother_status(s
, true, false, false);
984 static void patch_buffer_attr(pa_stream
*s
, pa_buffer_attr
*attr
, pa_stream_flags_t
*flags
) {
990 if ((e
= getenv("PULSE_LATENCY_MSEC"))) {
993 if (pa_atou(e
, &ms
) < 0 || ms
<= 0)
994 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e
);
996 attr
->maxlength
= (uint32_t) -1;
997 attr
->tlength
= pa_usec_to_bytes(ms
* PA_USEC_PER_MSEC
, &s
->sample_spec
);
998 attr
->minreq
= (uint32_t) -1;
999 attr
->prebuf
= (uint32_t) -1;
1000 attr
->fragsize
= attr
->tlength
;
1004 *flags
|= PA_STREAM_ADJUST_LATENCY
;
1007 if (s
->context
->version
>= 13)
1010 /* Version older than 0.9.10 didn't do server side buffer_attr
1011 * selection, hence we have to fake it on the client side. */
1013 /* We choose fairly conservative values here, to not confuse
1014 * old clients with extremely large playback buffers */
1016 if (attr
->maxlength
== (uint32_t) -1)
1017 attr
->maxlength
= 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1019 if (attr
->tlength
== (uint32_t) -1)
1020 attr
->tlength
= (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC
, &s
->sample_spec
); /* 250ms of buffering */
1022 if (attr
->minreq
== (uint32_t) -1)
1023 attr
->minreq
= (attr
->tlength
)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1025 if (attr
->prebuf
== (uint32_t) -1)
1026 attr
->prebuf
= attr
->tlength
; /* Start to play only when the playback is fully filled up once */
1028 if (attr
->fragsize
== (uint32_t) -1)
1029 attr
->fragsize
= attr
->tlength
; /* Pass data to the app only when the buffer is filled up once */
1032 void pa_create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1033 pa_stream
*s
= userdata
;
1034 uint32_t requested_bytes
= 0;
1038 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1039 pa_assert(s
->state
== PA_STREAM_CREATING
);
1043 if (command
!= PA_COMMAND_REPLY
) {
1044 if (pa_context_handle_error(s
->context
, command
, t
, false) < 0)
1047 pa_stream_set_state(s
, PA_STREAM_FAILED
);
1051 if (pa_tagstruct_getu32(t
, &s
->channel
) < 0 ||
1052 s
->channel
== PA_INVALID_INDEX
||
1053 ((s
->direction
!= PA_STREAM_UPLOAD
) && (pa_tagstruct_getu32(t
, &s
->stream_index
) < 0 || s
->stream_index
== PA_INVALID_INDEX
)) ||
1054 ((s
->direction
!= PA_STREAM_RECORD
) && pa_tagstruct_getu32(t
, &requested_bytes
) < 0)) {
1055 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1059 s
->requested_bytes
= (int64_t) requested_bytes
;
1061 if (s
->context
->version
>= 9) {
1062 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1063 if (pa_tagstruct_getu32(t
, &s
->buffer_attr
.maxlength
) < 0 ||
1064 pa_tagstruct_getu32(t
, &s
->buffer_attr
.tlength
) < 0 ||
1065 pa_tagstruct_getu32(t
, &s
->buffer_attr
.prebuf
) < 0 ||
1066 pa_tagstruct_getu32(t
, &s
->buffer_attr
.minreq
) < 0) {
1067 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1070 } else if (s
->direction
== PA_STREAM_RECORD
) {
1071 if (pa_tagstruct_getu32(t
, &s
->buffer_attr
.maxlength
) < 0 ||
1072 pa_tagstruct_getu32(t
, &s
->buffer_attr
.fragsize
) < 0) {
1073 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1079 if (s
->context
->version
>= 12 && s
->direction
!= PA_STREAM_UPLOAD
) {
1082 const char *dn
= NULL
;
1085 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1086 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1087 pa_tagstruct_getu32(t
, &s
->device_index
) < 0 ||
1088 pa_tagstruct_gets(t
, &dn
) < 0 ||
1089 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
1090 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1094 if (!dn
|| s
->device_index
== PA_INVALID_INDEX
||
1095 ss
.channels
!= cm
.channels
||
1096 !pa_channel_map_valid(&cm
) ||
1097 !pa_sample_spec_valid(&ss
) ||
1098 (s
->n_formats
== 0 && (
1099 (!(s
->flags
& PA_STREAM_FIX_FORMAT
) && ss
.format
!= s
->sample_spec
.format
) ||
1100 (!(s
->flags
& PA_STREAM_FIX_RATE
) && ss
.rate
!= s
->sample_spec
.rate
) ||
1101 (!(s
->flags
& PA_STREAM_FIX_CHANNELS
) && !pa_channel_map_equal(&cm
, &s
->channel_map
))))) {
1102 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1106 pa_xfree(s
->device_name
);
1107 s
->device_name
= pa_xstrdup(dn
);
1108 s
->suspended
= suspended
;
1110 s
->channel_map
= cm
;
1111 s
->sample_spec
= ss
;
1114 if (s
->context
->version
>= 13 && s
->direction
!= PA_STREAM_UPLOAD
) {
1117 if (pa_tagstruct_get_usec(t
, &usec
) < 0) {
1118 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1122 if (s
->direction
== PA_STREAM_RECORD
)
1123 s
->timing_info
.configured_source_usec
= usec
;
1125 s
->timing_info
.configured_sink_usec
= usec
;
1128 if ((s
->context
->version
>= 21 && s
->direction
== PA_STREAM_PLAYBACK
)
1129 || s
->context
->version
>= 22) {
1131 pa_format_info
*f
= pa_format_info_new();
1132 pa_tagstruct_get_format_info(t
, f
);
1134 if (pa_format_info_valid(f
))
1137 pa_format_info_free(f
);
1138 if (s
->n_formats
> 0) {
1139 /* We used the extended API, so we should have got back a proper format */
1140 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1146 if (!pa_tagstruct_eof(t
)) {
1147 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1151 if (s
->direction
== PA_STREAM_RECORD
) {
1152 pa_assert(!s
->record_memblockq
);
1154 s
->record_memblockq
= pa_memblockq_new(
1155 "client side record memblockq",
1157 s
->buffer_attr
.maxlength
,
1166 s
->channel_valid
= true;
1167 pa_hashmap_put((s
->direction
== PA_STREAM_RECORD
) ? s
->context
->record_streams
: s
->context
->playback_streams
, PA_UINT32_TO_PTR(s
->channel
), s
);
1169 create_stream_complete(s
);
1175 static int create_stream(
1176 pa_stream_direction_t direction
,
1179 const pa_buffer_attr
*attr
,
1180 pa_stream_flags_t flags
,
1181 const pa_cvolume
*volume
,
1182 pa_stream
*sync_stream
) {
1186 bool volume_set
= !!volume
;
1191 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1192 pa_assert(direction
== PA_STREAM_PLAYBACK
|| direction
== PA_STREAM_RECORD
);
1194 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1195 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_UNCONNECTED
, PA_ERR_BADSTATE
);
1196 PA_CHECK_VALIDITY(s
->context
, s
->direct_on_input
== PA_INVALID_INDEX
|| direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
);
1197 PA_CHECK_VALIDITY(s
->context
, !(flags
& ~(PA_STREAM_START_CORKED
|
1198 PA_STREAM_INTERPOLATE_TIMING
|
1199 PA_STREAM_NOT_MONOTONIC
|
1200 PA_STREAM_AUTO_TIMING_UPDATE
|
1201 PA_STREAM_NO_REMAP_CHANNELS
|
1202 PA_STREAM_NO_REMIX_CHANNELS
|
1203 PA_STREAM_FIX_FORMAT
|
1205 PA_STREAM_FIX_CHANNELS
|
1206 PA_STREAM_DONT_MOVE
|
1207 PA_STREAM_VARIABLE_RATE
|
1208 PA_STREAM_PEAK_DETECT
|
1209 PA_STREAM_START_MUTED
|
1210 PA_STREAM_ADJUST_LATENCY
|
1211 PA_STREAM_EARLY_REQUESTS
|
1212 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND
|
1213 PA_STREAM_START_UNMUTED
|
1214 PA_STREAM_FAIL_ON_SUSPEND
|
1215 PA_STREAM_RELATIVE_VOLUME
|
1216 PA_STREAM_PASSTHROUGH
)), PA_ERR_INVALID
);
1218 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 12 || !(flags
& PA_STREAM_VARIABLE_RATE
), PA_ERR_NOTSUPPORTED
);
1219 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 13 || !(flags
& PA_STREAM_PEAK_DETECT
), PA_ERR_NOTSUPPORTED
);
1220 PA_CHECK_VALIDITY(s
->context
, s
->context
->state
== PA_CONTEXT_READY
, PA_ERR_BADSTATE
);
1221 /* Although some of the other flags are not supported on older
1222 * version, we don't check for them here, because it doesn't hurt
1223 * when they are passed but actually not supported. This makes
1224 * client development easier */
1226 PA_CHECK_VALIDITY(s
->context
, direction
== PA_STREAM_RECORD
|| !(flags
& (PA_STREAM_PEAK_DETECT
)), PA_ERR_INVALID
);
1227 PA_CHECK_VALIDITY(s
->context
, !volume
|| s
->n_formats
|| (pa_sample_spec_valid(&s
->sample_spec
) && volume
->channels
== s
->sample_spec
.channels
), PA_ERR_INVALID
);
1228 PA_CHECK_VALIDITY(s
->context
, !sync_stream
|| (direction
== PA_STREAM_PLAYBACK
&& sync_stream
->direction
== PA_STREAM_PLAYBACK
), PA_ERR_INVALID
);
1229 PA_CHECK_VALIDITY(s
->context
, (flags
& (PA_STREAM_ADJUST_LATENCY
|PA_STREAM_EARLY_REQUESTS
)) != (PA_STREAM_ADJUST_LATENCY
|PA_STREAM_EARLY_REQUESTS
), PA_ERR_INVALID
);
1233 s
->direction
= direction
;
1236 s
->syncid
= sync_stream
->syncid
;
1239 s
->buffer_attr
= *attr
;
1240 patch_buffer_attr(s
, &s
->buffer_attr
, &flags
);
1243 s
->corked
= !!(flags
& PA_STREAM_START_CORKED
);
1245 if (flags
& PA_STREAM_INTERPOLATE_TIMING
) {
1248 x
= pa_rtclock_now();
1250 pa_assert(!s
->smoother
);
1251 s
->smoother
= pa_smoother_new(
1252 SMOOTHER_ADJUST_TIME
,
1253 SMOOTHER_HISTORY_TIME
,
1254 !(flags
& PA_STREAM_NOT_MONOTONIC
),
1256 SMOOTHER_MIN_HISTORY
,
1262 dev
= s
->direction
== PA_STREAM_PLAYBACK
? s
->context
->conf
->default_sink
: s
->context
->conf
->default_source
;
1264 t
= pa_tagstruct_command(
1266 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_CREATE_PLAYBACK_STREAM
: PA_COMMAND_CREATE_RECORD_STREAM
),
1269 if (s
->context
->version
< 13)
1270 pa_tagstruct_puts(t
, pa_proplist_gets(s
->proplist
, PA_PROP_MEDIA_NAME
));
1274 PA_TAG_SAMPLE_SPEC
, &s
->sample_spec
,
1275 PA_TAG_CHANNEL_MAP
, &s
->channel_map
,
1276 PA_TAG_U32
, PA_INVALID_INDEX
,
1278 PA_TAG_U32
, s
->buffer_attr
.maxlength
,
1279 PA_TAG_BOOLEAN
, s
->corked
,
1283 if (pa_sample_spec_valid(&s
->sample_spec
))
1284 volume
= pa_cvolume_reset(&cv
, s
->sample_spec
.channels
);
1286 /* This is not really relevant, since no volume was set, and
1287 * the real number of channels is embedded in the format_info
1289 volume
= pa_cvolume_reset(&cv
, PA_CHANNELS_MAX
);
1293 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1296 PA_TAG_U32
, s
->buffer_attr
.tlength
,
1297 PA_TAG_U32
, s
->buffer_attr
.prebuf
,
1298 PA_TAG_U32
, s
->buffer_attr
.minreq
,
1299 PA_TAG_U32
, s
->syncid
,
1302 pa_tagstruct_put_cvolume(t
, volume
);
1304 pa_tagstruct_putu32(t
, s
->buffer_attr
.fragsize
);
1306 if (s
->context
->version
>= 12) {
1309 PA_TAG_BOOLEAN
, flags
& PA_STREAM_NO_REMAP_CHANNELS
,
1310 PA_TAG_BOOLEAN
, flags
& PA_STREAM_NO_REMIX_CHANNELS
,
1311 PA_TAG_BOOLEAN
, flags
& PA_STREAM_FIX_FORMAT
,
1312 PA_TAG_BOOLEAN
, flags
& PA_STREAM_FIX_RATE
,
1313 PA_TAG_BOOLEAN
, flags
& PA_STREAM_FIX_CHANNELS
,
1314 PA_TAG_BOOLEAN
, flags
& PA_STREAM_DONT_MOVE
,
1315 PA_TAG_BOOLEAN
, flags
& PA_STREAM_VARIABLE_RATE
,
1319 if (s
->context
->version
>= 13) {
1321 if (s
->direction
== PA_STREAM_PLAYBACK
)
1322 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_START_MUTED
);
1324 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_PEAK_DETECT
);
1328 PA_TAG_BOOLEAN
, flags
& PA_STREAM_ADJUST_LATENCY
,
1329 PA_TAG_PROPLIST
, s
->proplist
,
1332 if (s
->direction
== PA_STREAM_RECORD
)
1333 pa_tagstruct_putu32(t
, s
->direct_on_input
);
1336 if (s
->context
->version
>= 14) {
1338 if (s
->direction
== PA_STREAM_PLAYBACK
)
1339 pa_tagstruct_put_boolean(t
, volume_set
);
1341 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_EARLY_REQUESTS
);
1344 if (s
->context
->version
>= 15) {
1346 if (s
->direction
== PA_STREAM_PLAYBACK
)
1347 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_START_MUTED
|PA_STREAM_START_UNMUTED
));
1349 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND
);
1350 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_FAIL_ON_SUSPEND
);
1353 if (s
->context
->version
>= 17 && s
->direction
== PA_STREAM_PLAYBACK
)
1354 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_RELATIVE_VOLUME
);
1356 if (s
->context
->version
>= 18 && s
->direction
== PA_STREAM_PLAYBACK
)
1357 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_PASSTHROUGH
));
1359 if ((s
->context
->version
>= 21 && s
->direction
== PA_STREAM_PLAYBACK
)
1360 || s
->context
->version
>= 22) {
1362 pa_tagstruct_putu8(t
, s
->n_formats
);
1363 for (i
= 0; i
< s
->n_formats
; i
++)
1364 pa_tagstruct_put_format_info(t
, s
->req_formats
[i
]);
1367 if (s
->context
->version
>= 22 && s
->direction
== PA_STREAM_RECORD
) {
1368 pa_tagstruct_put_cvolume(t
, volume
);
1369 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_START_MUTED
);
1370 pa_tagstruct_put_boolean(t
, volume_set
);
1371 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_START_MUTED
|PA_STREAM_START_UNMUTED
));
1372 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_RELATIVE_VOLUME
);
1373 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_PASSTHROUGH
));
1376 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
1377 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_create_stream_callback
, s
, NULL
);
1379 pa_stream_set_state(s
, PA_STREAM_CREATING
);
1385 int pa_stream_connect_playback(
1388 const pa_buffer_attr
*attr
,
1389 pa_stream_flags_t flags
,
1390 const pa_cvolume
*volume
,
1391 pa_stream
*sync_stream
) {
1394 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1396 return create_stream(PA_STREAM_PLAYBACK
, s
, dev
, attr
, flags
, volume
, sync_stream
);
1399 int pa_stream_connect_record(
1402 const pa_buffer_attr
*attr
,
1403 pa_stream_flags_t flags
) {
1406 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1408 return create_stream(PA_STREAM_RECORD
, s
, dev
, attr
, flags
, NULL
, NULL
);
1411 int pa_stream_begin_write(
1417 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1419 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1420 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1421 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| s
->direction
== PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1422 PA_CHECK_VALIDITY(s
->context
, data
, PA_ERR_INVALID
);
1423 PA_CHECK_VALIDITY(s
->context
, nbytes
&& *nbytes
!= 0, PA_ERR_INVALID
);
1425 if (*nbytes
!= (size_t) -1) {
1428 m
= pa_mempool_block_size_max(s
->context
->mempool
);
1429 fs
= pa_frame_size(&s
->sample_spec
);
1436 if (!s
->write_memblock
) {
1437 s
->write_memblock
= pa_memblock_new(s
->context
->mempool
, *nbytes
);
1438 s
->write_data
= pa_memblock_acquire(s
->write_memblock
);
1441 *data
= s
->write_data
;
1442 *nbytes
= pa_memblock_get_length(s
->write_memblock
);
1447 int pa_stream_cancel_write(
1451 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1453 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1454 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1455 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| s
->direction
== PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1456 PA_CHECK_VALIDITY(s
->context
, s
->write_memblock
, PA_ERR_BADSTATE
);
1458 pa_assert(s
->write_data
);
1460 pa_memblock_release(s
->write_memblock
);
1461 pa_memblock_unref(s
->write_memblock
);
1462 s
->write_memblock
= NULL
;
1463 s
->write_data
= NULL
;
1468 int pa_stream_write(
1472 pa_free_cb_t free_cb
,
1474 pa_seek_mode_t seek
) {
1477 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1480 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1481 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1482 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| s
->direction
== PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1483 PA_CHECK_VALIDITY(s
->context
, seek
<= PA_SEEK_RELATIVE_END
, PA_ERR_INVALID
);
1484 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| (seek
== PA_SEEK_RELATIVE
&& offset
== 0), PA_ERR_INVALID
);
1485 PA_CHECK_VALIDITY(s
->context
,
1486 !s
->write_memblock
||
1487 ((data
>= s
->write_data
) &&
1488 ((const char*) data
+ length
<= (const char*) s
->write_data
+ pa_memblock_get_length(s
->write_memblock
))),
1490 PA_CHECK_VALIDITY(s
->context
, !free_cb
|| !s
->write_memblock
, PA_ERR_INVALID
);
1492 if (s
->write_memblock
) {
1495 /* pa_stream_write_begin() was called before */
1497 pa_memblock_release(s
->write_memblock
);
1499 chunk
.memblock
= s
->write_memblock
;
1500 chunk
.index
= (const char *) data
- (const char *) s
->write_data
;
1501 chunk
.length
= length
;
1503 s
->write_memblock
= NULL
;
1504 s
->write_data
= NULL
;
1506 pa_pstream_send_memblock(s
->context
->pstream
, s
->channel
, offset
, seek
, &chunk
);
1507 pa_memblock_unref(chunk
.memblock
);
1510 pa_seek_mode_t t_seek
= seek
;
1511 int64_t t_offset
= offset
;
1512 size_t t_length
= length
;
1513 const void *t_data
= data
;
1515 /* pa_stream_write_begin() was not called before */
1517 while (t_length
> 0) {
1522 if (free_cb
&& !pa_pstream_get_shm(s
->context
->pstream
)) {
1523 chunk
.memblock
= pa_memblock_new_user(s
->context
->mempool
, (void*) t_data
, t_length
, free_cb
, 1);
1524 chunk
.length
= t_length
;
1528 chunk
.length
= PA_MIN(t_length
, pa_mempool_block_size_max(s
->context
->mempool
));
1529 chunk
.memblock
= pa_memblock_new(s
->context
->mempool
, chunk
.length
);
1531 d
= pa_memblock_acquire(chunk
.memblock
);
1532 memcpy(d
, t_data
, chunk
.length
);
1533 pa_memblock_release(chunk
.memblock
);
1536 pa_pstream_send_memblock(s
->context
->pstream
, s
->channel
, t_offset
, t_seek
, &chunk
);
1539 t_seek
= PA_SEEK_RELATIVE
;
1541 t_data
= (const uint8_t*) t_data
+ chunk
.length
;
1542 t_length
-= chunk
.length
;
1544 pa_memblock_unref(chunk
.memblock
);
1547 if (free_cb
&& pa_pstream_get_shm(s
->context
->pstream
))
1548 free_cb((void*) data
);
1551 /* This is obviously wrong since we ignore the seeking index . But
1552 * that's OK, the server side applies the same error */
1553 s
->requested_bytes
-= (seek
== PA_SEEK_RELATIVE
? offset
: 0) + (int64_t) length
;
1556 pa_log_debug("wrote %lli, now at %lli", (long long) length
, (long long) s
->requested_bytes
);
1559 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1561 /* Update latency request correction */
1562 if (s
->write_index_corrections
[s
->current_write_index_correction
].valid
) {
1564 if (seek
== PA_SEEK_ABSOLUTE
) {
1565 s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
= false;
1566 s
->write_index_corrections
[s
->current_write_index_correction
].absolute
= true;
1567 s
->write_index_corrections
[s
->current_write_index_correction
].value
= offset
+ (int64_t) length
;
1568 } else if (seek
== PA_SEEK_RELATIVE
) {
1569 if (!s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
)
1570 s
->write_index_corrections
[s
->current_write_index_correction
].value
+= offset
+ (int64_t) length
;
1572 s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
= true;
1575 /* Update the write index in the already available latency data */
1576 if (s
->timing_info_valid
) {
1578 if (seek
== PA_SEEK_ABSOLUTE
) {
1579 s
->timing_info
.write_index_corrupt
= false;
1580 s
->timing_info
.write_index
= offset
+ (int64_t) length
;
1581 } else if (seek
== PA_SEEK_RELATIVE
) {
1582 if (!s
->timing_info
.write_index_corrupt
)
1583 s
->timing_info
.write_index
+= offset
+ (int64_t) length
;
1585 s
->timing_info
.write_index_corrupt
= true;
1588 if (!s
->timing_info_valid
|| s
->timing_info
.write_index_corrupt
)
1589 request_auto_timing_update(s
, true);
1595 int pa_stream_peek(pa_stream
*s
, const void **data
, size_t *length
) {
1597 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1601 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1602 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1603 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
);
1605 if (!s
->peek_memchunk
.memblock
) {
1607 if (pa_memblockq_peek(s
->record_memblockq
, &s
->peek_memchunk
) < 0) {
1608 /* record_memblockq is empty. */
1613 } else if (!s
->peek_memchunk
.memblock
) {
1614 /* record_memblockq isn't empty, but it doesn't have any data at
1615 * the current read index. */
1617 *length
= s
->peek_memchunk
.length
;
1621 s
->peek_data
= pa_memblock_acquire(s
->peek_memchunk
.memblock
);
1624 pa_assert(s
->peek_data
);
1625 *data
= (uint8_t*) s
->peek_data
+ s
->peek_memchunk
.index
;
1626 *length
= s
->peek_memchunk
.length
;
1630 int pa_stream_drop(pa_stream
*s
) {
1632 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1634 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1635 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1636 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
);
1637 PA_CHECK_VALIDITY(s
->context
, s
->peek_memchunk
.length
> 0, PA_ERR_BADSTATE
);
1639 pa_memblockq_drop(s
->record_memblockq
, s
->peek_memchunk
.length
);
1641 /* Fix the simulated local read index */
1642 if (s
->timing_info_valid
&& !s
->timing_info
.read_index_corrupt
)
1643 s
->timing_info
.read_index
+= (int64_t) s
->peek_memchunk
.length
;
1645 if (s
->peek_memchunk
.memblock
) {
1646 pa_assert(s
->peek_data
);
1647 s
->peek_data
= NULL
;
1648 pa_memblock_release(s
->peek_memchunk
.memblock
);
1649 pa_memblock_unref(s
->peek_memchunk
.memblock
);
1652 pa_memchunk_reset(&s
->peek_memchunk
);
1657 size_t pa_stream_writable_size(pa_stream
*s
) {
1659 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1661 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, (size_t) -1);
1662 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, (size_t) -1);
1663 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direction
!= PA_STREAM_RECORD
, PA_ERR_BADSTATE
, (size_t) -1);
1665 return s
->requested_bytes
> 0 ? (size_t) s
->requested_bytes
: 0;
1668 size_t pa_stream_readable_size(pa_stream
*s
) {
1670 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1672 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, (size_t) -1);
1673 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, (size_t) -1);
1674 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
, (size_t) -1);
1676 return pa_memblockq_get_length(s
->record_memblockq
);
1679 pa_operation
* pa_stream_drain(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
1685 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1687 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1688 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1689 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
, PA_ERR_BADSTATE
);
1691 /* Ask for a timing update before we cork/uncork to get the best
1692 * accuracy for the transport latency suitable for the
1693 * check_smoother_status() call in the started callback */
1694 request_auto_timing_update(s
, true);
1696 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
1698 t
= pa_tagstruct_command(s
->context
, PA_COMMAND_DRAIN_PLAYBACK_STREAM
, &tag
);
1699 pa_tagstruct_putu32(t
, s
->channel
);
1700 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
1701 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
1703 /* This might cause the read index to continue again, hence
1704 * let's request a timing update */
1705 request_auto_timing_update(s
, true);
1710 static pa_usec_t
calc_time(pa_stream
*s
, bool ignore_transport
) {
1714 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1715 pa_assert(s
->state
== PA_STREAM_READY
);
1716 pa_assert(s
->direction
!= PA_STREAM_UPLOAD
);
1717 pa_assert(s
->timing_info_valid
);
1718 pa_assert(s
->direction
!= PA_STREAM_PLAYBACK
|| !s
->timing_info
.read_index_corrupt
);
1719 pa_assert(s
->direction
!= PA_STREAM_RECORD
|| !s
->timing_info
.write_index_corrupt
);
1721 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1722 /* The last byte that was written into the output device
1723 * had this time value associated */
1724 usec
= pa_bytes_to_usec(s
->timing_info
.read_index
< 0 ? 0 : (uint64_t) s
->timing_info
.read_index
, &s
->sample_spec
);
1726 if (!s
->corked
&& !s
->suspended
) {
1728 if (!ignore_transport
)
1729 /* Because the latency info took a little time to come
1730 * to us, we assume that the real output time is actually
1732 usec
+= s
->timing_info
.transport_usec
;
1734 /* However, the output device usually maintains a buffer
1735 too, hence the real sample currently played is a little
1737 if (s
->timing_info
.sink_usec
>= usec
)
1740 usec
-= s
->timing_info
.sink_usec
;
1744 pa_assert(s
->direction
== PA_STREAM_RECORD
);
1746 /* The last byte written into the server side queue had
1747 * this time value associated */
1748 usec
= pa_bytes_to_usec(s
->timing_info
.write_index
< 0 ? 0 : (uint64_t) s
->timing_info
.write_index
, &s
->sample_spec
);
1750 if (!s
->corked
&& !s
->suspended
) {
1752 if (!ignore_transport
)
1753 /* Add transport latency */
1754 usec
+= s
->timing_info
.transport_usec
;
1756 /* Add latency of data in device buffer */
1757 usec
+= s
->timing_info
.source_usec
;
1759 /* If this is a monitor source, we need to correct the
1760 * time by the playback device buffer */
1761 if (s
->timing_info
.sink_usec
>= usec
)
1764 usec
-= s
->timing_info
.sink_usec
;
1771 static void stream_get_timing_info_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1772 pa_operation
*o
= userdata
;
1773 struct timeval local
, remote
, now
;
1775 bool playing
= false;
1776 uint64_t underrun_for
= 0, playing_for
= 0;
1780 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
1782 if (!o
->context
|| !o
->stream
)
1785 i
= &o
->stream
->timing_info
;
1787 o
->stream
->timing_info_valid
= false;
1788 i
->write_index_corrupt
= true;
1789 i
->read_index_corrupt
= true;
1791 if (command
!= PA_COMMAND_REPLY
) {
1792 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
1797 if (pa_tagstruct_get_usec(t
, &i
->sink_usec
) < 0 ||
1798 pa_tagstruct_get_usec(t
, &i
->source_usec
) < 0 ||
1799 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
1800 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
1801 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
1802 pa_tagstruct_gets64(t
, &i
->write_index
) < 0 ||
1803 pa_tagstruct_gets64(t
, &i
->read_index
) < 0) {
1805 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
1809 if (o
->context
->version
>= 13 &&
1810 o
->stream
->direction
== PA_STREAM_PLAYBACK
)
1811 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
1812 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
1814 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
1818 if (!pa_tagstruct_eof(t
)) {
1819 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
1822 o
->stream
->timing_info_valid
= true;
1823 i
->write_index_corrupt
= false;
1824 i
->read_index_corrupt
= false;
1826 i
->playing
= (int) playing
;
1827 i
->since_underrun
= (int64_t) (playing
? playing_for
: underrun_for
);
1829 pa_gettimeofday(&now
);
1831 /* Calculate timestamps */
1832 if (pa_timeval_cmp(&local
, &remote
) <= 0 && pa_timeval_cmp(&remote
, &now
) <= 0) {
1833 /* local and remote seem to have synchronized clocks */
1835 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
)
1836 i
->transport_usec
= pa_timeval_diff(&remote
, &local
);
1838 i
->transport_usec
= pa_timeval_diff(&now
, &remote
);
1840 i
->synchronized_clocks
= true;
1841 i
->timestamp
= remote
;
1843 /* clocks are not synchronized, let's estimate latency then */
1844 i
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
1845 i
->synchronized_clocks
= false;
1846 i
->timestamp
= local
;
1847 pa_timeval_add(&i
->timestamp
, i
->transport_usec
);
1850 /* Invalidate read and write indexes if necessary */
1851 if (tag
< o
->stream
->read_index_not_before
)
1852 i
->read_index_corrupt
= true;
1854 if (tag
< o
->stream
->write_index_not_before
)
1855 i
->write_index_corrupt
= true;
1857 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
) {
1858 /* Write index correction */
1861 uint32_t ctag
= tag
;
1863 /* Go through the saved correction values and add up the
1864 * total correction.*/
1865 for (n
= 0, j
= o
->stream
->current_write_index_correction
+1;
1866 n
< PA_MAX_WRITE_INDEX_CORRECTIONS
;
1867 n
++, j
= (j
+ 1) % PA_MAX_WRITE_INDEX_CORRECTIONS
) {
1869 /* Step over invalid data or out-of-date data */
1870 if (!o
->stream
->write_index_corrections
[j
].valid
||
1871 o
->stream
->write_index_corrections
[j
].tag
< ctag
)
1874 /* Make sure that everything is in order */
1875 ctag
= o
->stream
->write_index_corrections
[j
].tag
+1;
1877 /* Now fix the write index */
1878 if (o
->stream
->write_index_corrections
[j
].corrupt
) {
1879 /* A corrupting seek was made */
1880 i
->write_index_corrupt
= true;
1881 } else if (o
->stream
->write_index_corrections
[j
].absolute
) {
1882 /* An absolute seek was made */
1883 i
->write_index
= o
->stream
->write_index_corrections
[j
].value
;
1884 i
->write_index_corrupt
= false;
1885 } else if (!i
->write_index_corrupt
) {
1886 /* A relative seek was made */
1887 i
->write_index
+= o
->stream
->write_index_corrections
[j
].value
;
1891 /* Clear old correction entries */
1892 for (n
= 0; n
< PA_MAX_WRITE_INDEX_CORRECTIONS
; n
++) {
1893 if (!o
->stream
->write_index_corrections
[n
].valid
)
1896 if (o
->stream
->write_index_corrections
[n
].tag
<= tag
)
1897 o
->stream
->write_index_corrections
[n
].valid
= false;
1901 if (o
->stream
->direction
== PA_STREAM_RECORD
) {
1902 /* Read index correction */
1904 if (!i
->read_index_corrupt
)
1905 i
->read_index
-= (int64_t) pa_memblockq_get_length(o
->stream
->record_memblockq
);
1908 /* Update smoother if we're not corked */
1909 if (o
->stream
->smoother
&& !o
->stream
->corked
) {
1912 u
= x
= pa_rtclock_now() - i
->transport_usec
;
1914 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
&& o
->context
->version
>= 13) {
1917 /* If we weren't playing then it will take some time
1918 * until the audio will actually come out through the
1919 * speakers. Since we follow that timing here, we need
1920 * to try to fix this up */
1922 su
= pa_bytes_to_usec((uint64_t) i
->since_underrun
, &o
->stream
->sample_spec
);
1924 if (su
< i
->sink_usec
)
1925 x
+= i
->sink_usec
- su
;
1929 pa_smoother_pause(o
->stream
->smoother
, x
);
1931 /* Update the smoother */
1932 if ((o
->stream
->direction
== PA_STREAM_PLAYBACK
&& !i
->read_index_corrupt
) ||
1933 (o
->stream
->direction
== PA_STREAM_RECORD
&& !i
->write_index_corrupt
))
1934 pa_smoother_put(o
->stream
->smoother
, u
, calc_time(o
->stream
, true));
1937 pa_smoother_resume(o
->stream
->smoother
, x
, true);
1941 o
->stream
->auto_timing_update_requested
= false;
1943 if (o
->stream
->latency_update_callback
)
1944 o
->stream
->latency_update_callback(o
->stream
, o
->stream
->latency_update_userdata
);
1946 if (o
->callback
&& o
->stream
&& o
->stream
->state
== PA_STREAM_READY
) {
1947 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
1948 cb(o
->stream
, o
->stream
->timing_info_valid
, o
->userdata
);
1953 pa_operation_done(o
);
1954 pa_operation_unref(o
);
1957 pa_operation
* pa_stream_update_timing_info(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
1965 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1967 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1968 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1969 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1971 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1972 /* Find a place to store the write_index correction data for this entry */
1973 cidx
= (s
->current_write_index_correction
+ 1) % PA_MAX_WRITE_INDEX_CORRECTIONS
;
1975 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1976 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !s
->write_index_corrections
[cidx
].valid
, PA_ERR_INTERNAL
);
1978 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
1980 t
= pa_tagstruct_command(
1982 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_GET_PLAYBACK_LATENCY
: PA_COMMAND_GET_RECORD_LATENCY
),
1984 pa_tagstruct_putu32(t
, s
->channel
);
1985 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
1987 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
1988 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_timing_info_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
1990 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1991 /* Fill in initial correction data */
1993 s
->current_write_index_correction
= cidx
;
1995 s
->write_index_corrections
[cidx
].valid
= true;
1996 s
->write_index_corrections
[cidx
].absolute
= false;
1997 s
->write_index_corrections
[cidx
].corrupt
= false;
1998 s
->write_index_corrections
[cidx
].tag
= tag
;
1999 s
->write_index_corrections
[cidx
].value
= 0;
2005 void pa_stream_disconnect_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2006 pa_stream
*s
= userdata
;
2010 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2014 if (command
!= PA_COMMAND_REPLY
) {
2015 if (pa_context_handle_error(s
->context
, command
, t
, false) < 0)
2018 pa_stream_set_state(s
, PA_STREAM_FAILED
);
2020 } else if (!pa_tagstruct_eof(t
)) {
2021 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
2025 pa_stream_set_state(s
, PA_STREAM_TERMINATED
);
2031 int pa_stream_disconnect(pa_stream
*s
) {
2036 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2038 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2039 PA_CHECK_VALIDITY(s
->context
, s
->channel_valid
, PA_ERR_BADSTATE
);
2040 PA_CHECK_VALIDITY(s
->context
, s
->context
->state
== PA_CONTEXT_READY
, PA_ERR_BADSTATE
);
2044 t
= pa_tagstruct_command(
2046 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_DELETE_PLAYBACK_STREAM
:
2047 (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_DELETE_RECORD_STREAM
: PA_COMMAND_DELETE_UPLOAD_STREAM
)),
2049 pa_tagstruct_putu32(t
, s
->channel
);
2050 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2051 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_disconnect_callback
, s
, NULL
);
2057 void pa_stream_set_read_callback(pa_stream
*s
, pa_stream_request_cb_t cb
, void *userdata
) {
2059 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2061 if (pa_detect_fork())
2064 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2067 s
->read_callback
= cb
;
2068 s
->read_userdata
= userdata
;
2071 void pa_stream_set_write_callback(pa_stream
*s
, pa_stream_request_cb_t cb
, void *userdata
) {
2073 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2075 if (pa_detect_fork())
2078 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2081 s
->write_callback
= cb
;
2082 s
->write_userdata
= userdata
;
2085 void pa_stream_set_state_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2087 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2089 if (pa_detect_fork())
2092 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2095 s
->state_callback
= cb
;
2096 s
->state_userdata
= userdata
;
2099 void pa_stream_set_overflow_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2101 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2103 if (pa_detect_fork())
2106 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2109 s
->overflow_callback
= cb
;
2110 s
->overflow_userdata
= userdata
;
2113 void pa_stream_set_underflow_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2115 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2117 if (pa_detect_fork())
2120 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2123 s
->underflow_callback
= cb
;
2124 s
->underflow_userdata
= userdata
;
2127 void pa_stream_set_latency_update_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2129 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2131 if (pa_detect_fork())
2134 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2137 s
->latency_update_callback
= cb
;
2138 s
->latency_update_userdata
= userdata
;
2141 void pa_stream_set_moved_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2143 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2145 if (pa_detect_fork())
2148 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2151 s
->moved_callback
= cb
;
2152 s
->moved_userdata
= userdata
;
2155 void pa_stream_set_suspended_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2157 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2159 if (pa_detect_fork())
2162 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2165 s
->suspended_callback
= cb
;
2166 s
->suspended_userdata
= userdata
;
2169 void pa_stream_set_started_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2171 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2173 if (pa_detect_fork())
2176 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2179 s
->started_callback
= cb
;
2180 s
->started_userdata
= userdata
;
2183 void pa_stream_set_event_callback(pa_stream
*s
, pa_stream_event_cb_t cb
, void *userdata
) {
2185 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2187 if (pa_detect_fork())
2190 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2193 s
->event_callback
= cb
;
2194 s
->event_userdata
= userdata
;
2197 void pa_stream_set_buffer_attr_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2199 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2201 if (pa_detect_fork())
2204 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2207 s
->buffer_attr_callback
= cb
;
2208 s
->buffer_attr_userdata
= userdata
;
2211 void pa_stream_simple_ack_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2212 pa_operation
*o
= userdata
;
2217 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
2222 if (command
!= PA_COMMAND_REPLY
) {
2223 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
2227 } else if (!pa_tagstruct_eof(t
)) {
2228 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2233 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
2234 cb(o
->stream
, success
, o
->userdata
);
2238 pa_operation_done(o
);
2239 pa_operation_unref(o
);
2242 pa_operation
* pa_stream_cork(pa_stream
*s
, int b
, pa_stream_success_cb_t cb
, void *userdata
) {
2248 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2250 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2251 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2252 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2254 /* Ask for a timing update before we cork/uncork to get the best
2255 * accuracy for the transport latency suitable for the
2256 * check_smoother_status() call in the started callback */
2257 request_auto_timing_update(s
, true);
2261 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2263 t
= pa_tagstruct_command(
2265 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_CORK_PLAYBACK_STREAM
: PA_COMMAND_CORK_RECORD_STREAM
),
2267 pa_tagstruct_putu32(t
, s
->channel
);
2268 pa_tagstruct_put_boolean(t
, !!b
);
2269 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2270 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2272 check_smoother_status(s
, false, false, false);
2274 /* This might cause the indexes to hang/start again, hence let's
2275 * request a timing update, after the cork/uncork, too */
2276 request_auto_timing_update(s
, true);
2281 static pa_operation
* stream_send_simple_command(pa_stream
*s
, uint32_t command
, pa_stream_success_cb_t cb
, void *userdata
) {
2287 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2289 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2290 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2292 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2294 t
= pa_tagstruct_command(s
->context
, command
, &tag
);
2295 pa_tagstruct_putu32(t
, s
->channel
);
2296 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2297 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2302 pa_operation
* pa_stream_flush(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
2306 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2308 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2309 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2310 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2312 /* Ask for a timing update *before* the flush, so that the
2313 * transport usec is as up to date as possible when we get the
2314 * underflow message and update the smoother status*/
2315 request_auto_timing_update(s
, true);
2317 if (!(o
= stream_send_simple_command(s
, (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_FLUSH_PLAYBACK_STREAM
: PA_COMMAND_FLUSH_RECORD_STREAM
), cb
, userdata
)))
2320 if (s
->direction
== PA_STREAM_PLAYBACK
) {
2322 if (s
->write_index_corrections
[s
->current_write_index_correction
].valid
)
2323 s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
= true;
2325 if (s
->buffer_attr
.prebuf
> 0)
2326 check_smoother_status(s
, false, false, true);
2328 /* This will change the write index, but leave the
2329 * read index untouched. */
2330 invalidate_indexes(s
, false, true);
2333 /* For record streams this has no influence on the write
2334 * index, but the read index might jump. */
2335 invalidate_indexes(s
, true, false);
2337 /* Note that we do not update requested_bytes here. This is
2338 * because we cannot really know how data actually was dropped
2339 * from the write index due to this. This 'error' will be applied
2340 * by both client and server and hence we should be fine. */
2345 pa_operation
* pa_stream_prebuf(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
2349 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2351 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
, PA_ERR_BADSTATE
);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->buffer_attr
.prebuf
> 0, PA_ERR_BADSTATE
);
2356 /* Ask for a timing update before we cork/uncork to get the best
2357 * accuracy for the transport latency suitable for the
2358 * check_smoother_status() call in the started callback */
2359 request_auto_timing_update(s
, true);
2361 if (!(o
= stream_send_simple_command(s
, PA_COMMAND_PREBUF_PLAYBACK_STREAM
, cb
, userdata
)))
2364 /* This might cause the read index to hang again, hence
2365 * let's request a timing update */
2366 request_auto_timing_update(s
, true);
2371 pa_operation
* pa_stream_trigger(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
2375 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2377 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2378 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2379 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
, PA_ERR_BADSTATE
);
2380 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->buffer_attr
.prebuf
> 0, PA_ERR_BADSTATE
);
2382 /* Ask for a timing update before we cork/uncork to get the best
2383 * accuracy for the transport latency suitable for the
2384 * check_smoother_status() call in the started callback */
2385 request_auto_timing_update(s
, true);
2387 if (!(o
= stream_send_simple_command(s
, PA_COMMAND_TRIGGER_PLAYBACK_STREAM
, cb
, userdata
)))
2390 /* This might cause the read index to start moving again, hence
2391 * let's request a timing update */
2392 request_auto_timing_update(s
, true);
2397 pa_operation
* pa_stream_set_name(pa_stream
*s
, const char *name
, pa_stream_success_cb_t cb
, void *userdata
) {
2401 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2404 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2405 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2406 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2408 if (s
->context
->version
>= 13) {
2409 pa_proplist
*p
= pa_proplist_new();
2411 pa_proplist_sets(p
, PA_PROP_MEDIA_NAME
, name
);
2412 o
= pa_stream_proplist_update(s
, PA_UPDATE_REPLACE
, p
, cb
, userdata
);
2413 pa_proplist_free(p
);
2418 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2419 t
= pa_tagstruct_command(
2421 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_SET_RECORD_STREAM_NAME
: PA_COMMAND_SET_PLAYBACK_STREAM_NAME
),
2423 pa_tagstruct_putu32(t
, s
->channel
);
2424 pa_tagstruct_puts(t
, name
);
2425 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2426 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2432 int pa_stream_get_time(pa_stream
*s
, pa_usec_t
*r_usec
) {
2436 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2438 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2439 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2440 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2441 PA_CHECK_VALIDITY(s
->context
, s
->timing_info_valid
, PA_ERR_NODATA
);
2442 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_PLAYBACK
|| !s
->timing_info
.read_index_corrupt
, PA_ERR_NODATA
);
2443 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_RECORD
|| !s
->timing_info
.write_index_corrupt
, PA_ERR_NODATA
);
2446 usec
= pa_smoother_get(s
->smoother
, pa_rtclock_now());
2448 usec
= calc_time(s
, false);
2450 /* Make sure the time runs monotonically */
2451 if (!(s
->flags
& PA_STREAM_NOT_MONOTONIC
)) {
2452 if (usec
< s
->previous_time
)
2453 usec
= s
->previous_time
;
2455 s
->previous_time
= usec
;
2464 static pa_usec_t
time_counter_diff(pa_stream
*s
, pa_usec_t a
, pa_usec_t b
, int *negative
) {
2466 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2474 if (negative
&& s
->direction
== PA_STREAM_RECORD
) {
2482 int pa_stream_get_latency(pa_stream
*s
, pa_usec_t
*r_usec
, int *negative
) {
2488 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2491 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2492 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2493 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2494 PA_CHECK_VALIDITY(s
->context
, s
->timing_info_valid
, PA_ERR_NODATA
);
2495 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_PLAYBACK
|| !s
->timing_info
.write_index_corrupt
, PA_ERR_NODATA
);
2496 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_RECORD
|| !s
->timing_info
.read_index_corrupt
, PA_ERR_NODATA
);
2498 if ((r
= pa_stream_get_time(s
, &t
)) < 0)
2501 if (s
->direction
== PA_STREAM_PLAYBACK
)
2502 cindex
= s
->timing_info
.write_index
;
2504 cindex
= s
->timing_info
.read_index
;
2509 c
= pa_bytes_to_usec((uint64_t) cindex
, &s
->sample_spec
);
2511 if (s
->direction
== PA_STREAM_PLAYBACK
)
2512 *r_usec
= time_counter_diff(s
, c
, t
, negative
);
2514 *r_usec
= time_counter_diff(s
, t
, c
, negative
);
2519 const pa_timing_info
* pa_stream_get_timing_info(pa_stream
*s
) {
2521 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2523 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2524 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2525 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2526 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->timing_info_valid
, PA_ERR_NODATA
);
2528 return &s
->timing_info
;
2531 const pa_sample_spec
* pa_stream_get_sample_spec(pa_stream
*s
) {
2533 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2535 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2537 return &s
->sample_spec
;
2540 const pa_channel_map
* pa_stream_get_channel_map(pa_stream
*s
) {
2542 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2544 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2546 return &s
->channel_map
;
2549 const pa_format_info
* pa_stream_get_format_info(pa_stream
*s
) {
2551 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2553 /* We don't have the format till routing is done */
2554 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2555 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2559 const pa_buffer_attr
* pa_stream_get_buffer_attr(pa_stream
*s
) {
2561 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2563 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2564 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2565 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 9, PA_ERR_NOTSUPPORTED
);
2567 return &s
->buffer_attr
;
2570 static void stream_set_buffer_attr_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2571 pa_operation
*o
= userdata
;
2576 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
2581 if (command
!= PA_COMMAND_REPLY
) {
2582 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
2587 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
) {
2588 if (pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.maxlength
) < 0 ||
2589 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.tlength
) < 0 ||
2590 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.prebuf
) < 0 ||
2591 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.minreq
) < 0) {
2592 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2595 } else if (o
->stream
->direction
== PA_STREAM_RECORD
) {
2596 if (pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.maxlength
) < 0 ||
2597 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.fragsize
) < 0) {
2598 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2603 if (o
->stream
->context
->version
>= 13) {
2606 if (pa_tagstruct_get_usec(t
, &usec
) < 0) {
2607 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2611 if (o
->stream
->direction
== PA_STREAM_RECORD
)
2612 o
->stream
->timing_info
.configured_source_usec
= usec
;
2614 o
->stream
->timing_info
.configured_sink_usec
= usec
;
2617 if (!pa_tagstruct_eof(t
)) {
2618 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2624 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
2625 cb(o
->stream
, success
, o
->userdata
);
2629 pa_operation_done(o
);
2630 pa_operation_unref(o
);
2633 pa_operation
* pa_stream_set_buffer_attr(pa_stream
*s
, const pa_buffer_attr
*attr
, pa_stream_success_cb_t cb
, void *userdata
) {
2637 pa_buffer_attr copy
;
2640 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2643 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2644 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2645 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2646 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2648 /* Ask for a timing update before we cork/uncork to get the best
2649 * accuracy for the transport latency suitable for the
2650 * check_smoother_status() call in the started callback */
2651 request_auto_timing_update(s
, true);
2653 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2655 t
= pa_tagstruct_command(
2657 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR
: PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR
),
2659 pa_tagstruct_putu32(t
, s
->channel
);
2662 patch_buffer_attr(s
, ©
, NULL
);
2665 pa_tagstruct_putu32(t
, attr
->maxlength
);
2667 if (s
->direction
== PA_STREAM_PLAYBACK
)
2670 PA_TAG_U32
, attr
->tlength
,
2671 PA_TAG_U32
, attr
->prebuf
,
2672 PA_TAG_U32
, attr
->minreq
,
2675 pa_tagstruct_putu32(t
, attr
->fragsize
);
2677 if (s
->context
->version
>= 13)
2678 pa_tagstruct_put_boolean(t
, !!(s
->flags
& PA_STREAM_ADJUST_LATENCY
));
2680 if (s
->context
->version
>= 14)
2681 pa_tagstruct_put_boolean(t
, !!(s
->flags
& PA_STREAM_EARLY_REQUESTS
));
2683 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2684 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_set_buffer_attr_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2686 /* This might cause changes in the read/write index, hence let's
2687 * request a timing update */
2688 request_auto_timing_update(s
, true);
2693 uint32_t pa_stream_get_device_index(pa_stream
*s
) {
2695 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2697 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, PA_INVALID_INDEX
);
2698 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2699 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2700 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
, PA_INVALID_INDEX
);
2701 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->device_index
!= PA_INVALID_INDEX
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2703 return s
->device_index
;
2706 const char *pa_stream_get_device_name(pa_stream
*s
) {
2708 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2710 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2711 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2712 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2713 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2714 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->device_name
, PA_ERR_BADSTATE
);
2716 return s
->device_name
;
2719 int pa_stream_is_suspended(pa_stream
*s
) {
2721 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2723 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2724 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2725 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2726 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2728 return s
->suspended
;
2731 int pa_stream_is_corked(pa_stream
*s
) {
2733 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2735 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2736 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2737 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2742 static void stream_update_sample_rate_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2743 pa_operation
*o
= userdata
;
2748 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
2753 if (command
!= PA_COMMAND_REPLY
) {
2754 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
2760 if (!pa_tagstruct_eof(t
)) {
2761 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2766 o
->stream
->sample_spec
.rate
= PA_PTR_TO_UINT(o
->private);
2767 pa_assert(pa_sample_spec_valid(&o
->stream
->sample_spec
));
2770 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
2771 cb(o
->stream
, success
, o
->userdata
);
2775 pa_operation_done(o
);
2776 pa_operation_unref(o
);
2779 pa_operation
*pa_stream_update_sample_rate(pa_stream
*s
, uint32_t rate
, pa_stream_success_cb_t cb
, void *userdata
) {
2785 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2787 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2788 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, pa_sample_rate_valid(rate
), PA_ERR_INVALID
);
2789 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2790 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2791 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->flags
& PA_STREAM_VARIABLE_RATE
, PA_ERR_BADSTATE
);
2792 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2794 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2795 o
->private = PA_UINT_TO_PTR(rate
);
2797 t
= pa_tagstruct_command(
2799 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE
: PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE
),
2801 pa_tagstruct_putu32(t
, s
->channel
);
2802 pa_tagstruct_putu32(t
, rate
);
2804 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2805 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_update_sample_rate_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2810 pa_operation
*pa_stream_proplist_update(pa_stream
*s
, pa_update_mode_t mode
, pa_proplist
*p
, pa_stream_success_cb_t cb
, void *userdata
) {
2816 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2818 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2819 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, mode
== PA_UPDATE_SET
|| mode
== PA_UPDATE_MERGE
|| mode
== PA_UPDATE_REPLACE
, PA_ERR_INVALID
);
2820 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2821 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2822 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
);
2824 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2826 t
= pa_tagstruct_command(
2828 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST
: PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST
),
2830 pa_tagstruct_putu32(t
, s
->channel
);
2831 pa_tagstruct_putu32(t
, (uint32_t) mode
);
2832 pa_tagstruct_put_proplist(t
, p
);
2834 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2835 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2837 /* Please note that we don't update s->proplist here, because we
2838 * don't export that field */
2843 pa_operation
*pa_stream_proplist_remove(pa_stream
*s
, const char *const keys
[], pa_stream_success_cb_t cb
, void *userdata
) {
2847 const char * const*k
;
2850 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2852 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2853 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, keys
&& keys
[0], PA_ERR_INVALID
);
2854 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2855 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2856 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
);
2858 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2860 t
= pa_tagstruct_command(
2862 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST
: PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST
),
2864 pa_tagstruct_putu32(t
, s
->channel
);
2866 for (k
= keys
; *k
; k
++)
2867 pa_tagstruct_puts(t
, *k
);
2869 pa_tagstruct_puts(t
, NULL
);
2871 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2872 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2874 /* Please note that we don't update s->proplist here, because we
2875 * don't export that field */
2880 int pa_stream_set_monitor_stream(pa_stream
*s
, uint32_t sink_input_idx
) {
2882 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2884 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2885 PA_CHECK_VALIDITY(s
->context
, sink_input_idx
!= PA_INVALID_INDEX
, PA_ERR_INVALID
);
2886 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_UNCONNECTED
, PA_ERR_BADSTATE
);
2887 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
);
2889 s
->direct_on_input
= sink_input_idx
;
2894 uint32_t pa_stream_get_monitor_stream(pa_stream
*s
) {
2896 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2898 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, PA_INVALID_INDEX
);
2899 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direct_on_input
!= PA_INVALID_INDEX
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2900 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
, PA_INVALID_INDEX
);
2902 return s
->direct_on_input
;