2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
47 /* #define STREAM_DEBUG */
49 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
50 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
56 pa_stream
*pa_stream_new(pa_context
*c
, const char *name
, const pa_sample_spec
*ss
, const pa_channel_map
*map
) {
57 return pa_stream_new_with_proplist(c
, name
, ss
, map
, NULL
);
60 static void reset_callbacks(pa_stream
*s
) {
61 s
->read_callback
= NULL
;
62 s
->read_userdata
= NULL
;
63 s
->write_callback
= NULL
;
64 s
->write_userdata
= NULL
;
65 s
->state_callback
= NULL
;
66 s
->state_userdata
= NULL
;
67 s
->overflow_callback
= NULL
;
68 s
->overflow_userdata
= NULL
;
69 s
->underflow_callback
= NULL
;
70 s
->underflow_userdata
= NULL
;
71 s
->latency_update_callback
= NULL
;
72 s
->latency_update_userdata
= NULL
;
73 s
->moved_callback
= NULL
;
74 s
->moved_userdata
= NULL
;
75 s
->suspended_callback
= NULL
;
76 s
->suspended_userdata
= NULL
;
77 s
->started_callback
= NULL
;
78 s
->started_userdata
= NULL
;
79 s
->event_callback
= NULL
;
80 s
->event_userdata
= NULL
;
81 s
->buffer_attr_callback
= NULL
;
82 s
->buffer_attr_userdata
= NULL
;
85 static pa_stream
*pa_stream_new_with_proplist_internal(
88 const pa_sample_spec
*ss
,
89 const pa_channel_map
*map
,
90 pa_format_info
* const *formats
,
91 unsigned int n_formats
,
98 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
99 pa_assert((ss
== NULL
&& map
== NULL
) || (formats
== NULL
&& n_formats
== 0));
100 pa_assert(n_formats
< PA_MAX_FORMATS
);
102 PA_CHECK_VALIDITY_RETURN_NULL(c
, !pa_detect_fork(), PA_ERR_FORKED
);
103 PA_CHECK_VALIDITY_RETURN_NULL(c
, name
|| (p
&& pa_proplist_contains(p
, PA_PROP_MEDIA_NAME
)), PA_ERR_INVALID
);
105 s
= pa_xnew(pa_stream
, 1);
108 s
->mainloop
= c
->mainloop
;
110 s
->direction
= PA_STREAM_NODIRECTION
;
111 s
->state
= PA_STREAM_UNCONNECTED
;
115 s
->sample_spec
= *ss
;
117 pa_sample_spec_init(&s
->sample_spec
);
120 s
->channel_map
= *map
;
122 pa_channel_map_init(&s
->channel_map
);
126 s
->n_formats
= n_formats
;
127 for (i
= 0; i
< n_formats
; i
++)
128 s
->req_formats
[i
] = pa_format_info_copy(formats
[i
]);
131 /* We'll get the final negotiated format after connecting */
134 s
->direct_on_input
= PA_INVALID_INDEX
;
136 s
->proplist
= p
? pa_proplist_copy(p
) : pa_proplist_new();
138 pa_proplist_sets(s
->proplist
, PA_PROP_MEDIA_NAME
, name
);
141 s
->channel_valid
= false;
142 s
->syncid
= c
->csyncid
++;
143 s
->stream_index
= PA_INVALID_INDEX
;
145 s
->requested_bytes
= 0;
146 memset(&s
->buffer_attr
, 0, sizeof(s
->buffer_attr
));
148 /* We initialize the target length here, so that if the user
149 * passes no explicit buffering metrics the default is similar to
150 * what older PA versions provided. */
152 s
->buffer_attr
.maxlength
= (uint32_t) -1;
154 s
->buffer_attr
.tlength
= (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC
, ss
); /* 250ms of buffering */
156 /* FIXME: We assume a worst-case compressed format corresponding to
157 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
158 pa_sample_spec tmp_ss
= {
159 .format
= PA_SAMPLE_S16NE
,
163 s
->buffer_attr
.tlength
= (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC
, &tmp_ss
); /* 250ms of buffering */
165 s
->buffer_attr
.minreq
= (uint32_t) -1;
166 s
->buffer_attr
.prebuf
= (uint32_t) -1;
167 s
->buffer_attr
.fragsize
= (uint32_t) -1;
169 s
->device_index
= PA_INVALID_INDEX
;
170 s
->device_name
= NULL
;
171 s
->suspended
= false;
174 s
->write_memblock
= NULL
;
175 s
->write_data
= NULL
;
177 pa_memchunk_reset(&s
->peek_memchunk
);
179 s
->record_memblockq
= NULL
;
181 memset(&s
->timing_info
, 0, sizeof(s
->timing_info
));
182 s
->timing_info_valid
= false;
184 s
->previous_time
= 0;
185 s
->latest_underrun_at_index
= -1;
187 s
->read_index_not_before
= 0;
188 s
->write_index_not_before
= 0;
189 for (i
= 0; i
< PA_MAX_WRITE_INDEX_CORRECTIONS
; i
++)
190 s
->write_index_corrections
[i
].valid
= 0;
191 s
->current_write_index_correction
= 0;
193 s
->auto_timing_update_event
= NULL
;
194 s
->auto_timing_update_requested
= false;
195 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
201 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
202 PA_LLIST_PREPEND(pa_stream
, c
->streams
, s
);
208 pa_stream
*pa_stream_new_with_proplist(
211 const pa_sample_spec
*ss
,
212 const pa_channel_map
*map
,
217 PA_CHECK_VALIDITY_RETURN_NULL(c
, ss
&& pa_sample_spec_valid(ss
), PA_ERR_INVALID
);
218 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 12 || (ss
->format
!= PA_SAMPLE_S32LE
&& ss
->format
!= PA_SAMPLE_S32BE
), PA_ERR_NOTSUPPORTED
);
219 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 15 || (ss
->format
!= PA_SAMPLE_S24LE
&& ss
->format
!= PA_SAMPLE_S24BE
), PA_ERR_NOTSUPPORTED
);
220 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 15 || (ss
->format
!= PA_SAMPLE_S24_32LE
&& ss
->format
!= PA_SAMPLE_S24_32BE
), PA_ERR_NOTSUPPORTED
);
221 PA_CHECK_VALIDITY_RETURN_NULL(c
, !map
|| (pa_channel_map_valid(map
) && map
->channels
== ss
->channels
), PA_ERR_INVALID
);
224 PA_CHECK_VALIDITY_RETURN_NULL(c
, map
= pa_channel_map_init_auto(&tmap
, ss
->channels
, PA_CHANNEL_MAP_DEFAULT
), PA_ERR_INVALID
);
226 return pa_stream_new_with_proplist_internal(c
, name
, ss
, map
, NULL
, 0, p
);
229 pa_stream
*pa_stream_new_extended(
232 pa_format_info
* const *formats
,
233 unsigned int n_formats
,
236 PA_CHECK_VALIDITY_RETURN_NULL(c
, c
->version
>= 21, PA_ERR_NOTSUPPORTED
);
238 return pa_stream_new_with_proplist_internal(c
, name
, NULL
, NULL
, formats
, n_formats
, p
);
241 static void stream_unlink(pa_stream
*s
) {
248 /* Detach from context */
250 /* Unref all operation objects that point to us */
251 for (o
= s
->context
->operations
; o
; o
= n
) {
255 pa_operation_cancel(o
);
258 /* Drop all outstanding replies for this stream */
259 if (s
->context
->pdispatch
)
260 pa_pdispatch_unregister_reply(s
->context
->pdispatch
, s
);
262 if (s
->channel_valid
) {
263 pa_hashmap_remove((s
->direction
== PA_STREAM_RECORD
) ? s
->context
->record_streams
: s
->context
->playback_streams
, PA_UINT32_TO_PTR(s
->channel
));
265 s
->channel_valid
= false;
268 PA_LLIST_REMOVE(pa_stream
, s
->context
->streams
, s
);
273 if (s
->auto_timing_update_event
) {
274 pa_assert(s
->mainloop
);
275 s
->mainloop
->time_free(s
->auto_timing_update_event
);
281 static void stream_free(pa_stream
*s
) {
288 if (s
->write_memblock
) {
290 pa_memblock_release(s
->write_memblock
);
291 pa_memblock_unref(s
->write_memblock
);
294 if (s
->peek_memchunk
.memblock
) {
296 pa_memblock_release(s
->peek_memchunk
.memblock
);
297 pa_memblock_unref(s
->peek_memchunk
.memblock
);
300 if (s
->record_memblockq
)
301 pa_memblockq_free(s
->record_memblockq
);
304 pa_proplist_free(s
->proplist
);
307 pa_smoother_free(s
->smoother
);
309 for (i
= 0; i
< s
->n_formats
; i
++)
310 pa_format_info_free(s
->req_formats
[i
]);
313 pa_format_info_free(s
->format
);
315 pa_xfree(s
->device_name
);
319 void pa_stream_unref(pa_stream
*s
) {
321 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
323 if (PA_REFCNT_DEC(s
) <= 0)
327 pa_stream
* pa_stream_ref(pa_stream
*s
) {
329 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
335 pa_stream_state_t
pa_stream_get_state(pa_stream
*s
) {
337 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
342 pa_context
* pa_stream_get_context(pa_stream
*s
) {
344 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
349 uint32_t pa_stream_get_index(pa_stream
*s
) {
351 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
353 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, PA_INVALID_INDEX
);
354 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
356 return s
->stream_index
;
359 void pa_stream_set_state(pa_stream
*s
, pa_stream_state_t st
) {
361 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
370 if (s
->state_callback
)
371 s
->state_callback(s
, s
->state_userdata
);
373 if ((st
== PA_STREAM_FAILED
|| st
== PA_STREAM_TERMINATED
))
379 static void request_auto_timing_update(pa_stream
*s
, bool force
) {
381 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
383 if (!(s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
))
386 if (s
->state
== PA_STREAM_READY
&&
387 (force
|| !s
->auto_timing_update_requested
)) {
391 pa_log_debug("Automatically requesting new timing data");
394 if ((o
= pa_stream_update_timing_info(s
, NULL
, NULL
))) {
395 pa_operation_unref(o
);
396 s
->auto_timing_update_requested
= true;
400 if (s
->auto_timing_update_event
) {
401 if (s
->suspended
&& !force
) {
402 pa_assert(s
->mainloop
);
403 s
->mainloop
->time_free(s
->auto_timing_update_event
);
404 s
->auto_timing_update_event
= NULL
;
407 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
409 pa_context_rttime_restart(s
->context
, s
->auto_timing_update_event
, pa_rtclock_now() + s
->auto_timing_interval_usec
);
411 s
->auto_timing_interval_usec
= PA_MIN(AUTO_TIMING_INTERVAL_END_USEC
, s
->auto_timing_interval_usec
*2);
416 void pa_command_stream_killed(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
417 pa_context
*c
= userdata
;
422 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_KILLED
|| command
== PA_COMMAND_RECORD_STREAM_KILLED
);
425 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
429 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
430 !pa_tagstruct_eof(t
)) {
431 pa_context_fail(c
, PA_ERR_PROTOCOL
);
435 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_KILLED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
438 if (s
->state
!= PA_STREAM_READY
)
441 pa_context_set_error(c
, PA_ERR_KILLED
);
442 pa_stream_set_state(s
, PA_STREAM_FAILED
);
448 static void check_smoother_status(pa_stream
*s
, bool aposteriori
, bool force_start
, bool force_stop
) {
452 pa_assert(!force_start
|| !force_stop
);
457 x
= pa_rtclock_now();
459 if (s
->timing_info_valid
) {
461 x
-= s
->timing_info
.transport_usec
;
463 x
+= s
->timing_info
.transport_usec
;
466 if (s
->suspended
|| s
->corked
|| force_stop
)
467 pa_smoother_pause(s
->smoother
, x
);
468 else if (force_start
|| s
->buffer_attr
.prebuf
== 0) {
470 if (!s
->timing_info_valid
&&
474 s
->context
->version
>= 13) {
476 /* If the server supports STARTED events we take them as
477 * indications when audio really starts/stops playing, if
478 * we don't have any timing info yet -- instead of trying
479 * to be smart and guessing the server time. Otherwise the
480 * unknown transport delay adds too much noise to our time
486 pa_smoother_resume(s
->smoother
, x
, true);
489 /* Please note that we have no idea if playback actually started
490 * if prebuf is non-zero! */
493 static void auto_timing_update_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
);
495 void pa_command_stream_moved(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
496 pa_context
*c
= userdata
;
503 uint32_t maxlength
= 0, fragsize
= 0, minreq
= 0, tlength
= 0, prebuf
= 0;
506 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_MOVED
|| command
== PA_COMMAND_RECORD_STREAM_MOVED
);
509 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
513 if (c
->version
< 12) {
514 pa_context_fail(c
, PA_ERR_PROTOCOL
);
518 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
519 pa_tagstruct_getu32(t
, &di
) < 0 ||
520 pa_tagstruct_gets(t
, &dn
) < 0 ||
521 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
522 pa_context_fail(c
, PA_ERR_PROTOCOL
);
526 if (c
->version
>= 13) {
528 if (command
== PA_COMMAND_RECORD_STREAM_MOVED
) {
529 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
530 pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
531 pa_tagstruct_get_usec(t
, &usec
) < 0) {
532 pa_context_fail(c
, PA_ERR_PROTOCOL
);
536 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
537 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
538 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
539 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
540 pa_tagstruct_get_usec(t
, &usec
) < 0) {
541 pa_context_fail(c
, PA_ERR_PROTOCOL
);
547 if (!pa_tagstruct_eof(t
)) {
548 pa_context_fail(c
, PA_ERR_PROTOCOL
);
552 if (!dn
|| di
== PA_INVALID_INDEX
) {
553 pa_context_fail(c
, PA_ERR_PROTOCOL
);
557 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_MOVED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
560 if (s
->state
!= PA_STREAM_READY
)
563 if (c
->version
>= 13) {
564 if (s
->direction
== PA_STREAM_RECORD
)
565 s
->timing_info
.configured_source_usec
= usec
;
567 s
->timing_info
.configured_sink_usec
= usec
;
569 s
->buffer_attr
.maxlength
= maxlength
;
570 s
->buffer_attr
.fragsize
= fragsize
;
571 s
->buffer_attr
.tlength
= tlength
;
572 s
->buffer_attr
.prebuf
= prebuf
;
573 s
->buffer_attr
.minreq
= minreq
;
576 pa_xfree(s
->device_name
);
577 s
->device_name
= pa_xstrdup(dn
);
578 s
->device_index
= di
;
580 s
->suspended
= suspended
;
582 if ((s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
) && !suspended
&& !s
->auto_timing_update_event
) {
583 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
584 s
->auto_timing_update_event
= pa_context_rttime_new(s
->context
, pa_rtclock_now() + s
->auto_timing_interval_usec
, &auto_timing_update_callback
, s
);
585 request_auto_timing_update(s
, true);
588 check_smoother_status(s
, true, false, false);
589 request_auto_timing_update(s
, true);
591 if (s
->moved_callback
)
592 s
->moved_callback(s
, s
->moved_userdata
);
598 void pa_command_stream_buffer_attr(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
599 pa_context
*c
= userdata
;
603 uint32_t maxlength
= 0, fragsize
= 0, minreq
= 0, tlength
= 0, prebuf
= 0;
606 pa_assert(command
== PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
|| command
== PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED
);
609 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
613 if (c
->version
< 15) {
614 pa_context_fail(c
, PA_ERR_PROTOCOL
);
618 if (pa_tagstruct_getu32(t
, &channel
) < 0) {
619 pa_context_fail(c
, PA_ERR_PROTOCOL
);
623 if (command
== PA_COMMAND_RECORD_STREAM_MOVED
) {
624 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
625 pa_tagstruct_getu32(t
, &fragsize
) < 0 ||
626 pa_tagstruct_get_usec(t
, &usec
) < 0) {
627 pa_context_fail(c
, PA_ERR_PROTOCOL
);
631 if (pa_tagstruct_getu32(t
, &maxlength
) < 0 ||
632 pa_tagstruct_getu32(t
, &tlength
) < 0 ||
633 pa_tagstruct_getu32(t
, &prebuf
) < 0 ||
634 pa_tagstruct_getu32(t
, &minreq
) < 0 ||
635 pa_tagstruct_get_usec(t
, &usec
) < 0) {
636 pa_context_fail(c
, PA_ERR_PROTOCOL
);
641 if (!pa_tagstruct_eof(t
)) {
642 pa_context_fail(c
, PA_ERR_PROTOCOL
);
646 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
649 if (s
->state
!= PA_STREAM_READY
)
652 if (s
->direction
== PA_STREAM_RECORD
)
653 s
->timing_info
.configured_source_usec
= usec
;
655 s
->timing_info
.configured_sink_usec
= usec
;
657 s
->buffer_attr
.maxlength
= maxlength
;
658 s
->buffer_attr
.fragsize
= fragsize
;
659 s
->buffer_attr
.tlength
= tlength
;
660 s
->buffer_attr
.prebuf
= prebuf
;
661 s
->buffer_attr
.minreq
= minreq
;
663 request_auto_timing_update(s
, true);
665 if (s
->buffer_attr_callback
)
666 s
->buffer_attr_callback(s
, s
->buffer_attr_userdata
);
672 void pa_command_stream_suspended(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
673 pa_context
*c
= userdata
;
679 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
|| command
== PA_COMMAND_RECORD_STREAM_SUSPENDED
);
682 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
686 if (c
->version
< 12) {
687 pa_context_fail(c
, PA_ERR_PROTOCOL
);
691 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
692 pa_tagstruct_get_boolean(t
, &suspended
) < 0 ||
693 !pa_tagstruct_eof(t
)) {
694 pa_context_fail(c
, PA_ERR_PROTOCOL
);
698 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_SUSPENDED
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
701 if (s
->state
!= PA_STREAM_READY
)
704 s
->suspended
= suspended
;
706 if ((s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
) && !suspended
&& !s
->auto_timing_update_event
) {
707 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
708 s
->auto_timing_update_event
= pa_context_rttime_new(s
->context
, pa_rtclock_now() + s
->auto_timing_interval_usec
, &auto_timing_update_callback
, s
);
709 request_auto_timing_update(s
, true);
712 check_smoother_status(s
, true, false, false);
713 request_auto_timing_update(s
, true);
715 if (s
->suspended_callback
)
716 s
->suspended_callback(s
, s
->suspended_userdata
);
722 void pa_command_stream_started(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
723 pa_context
*c
= userdata
;
728 pa_assert(command
== PA_COMMAND_STARTED
);
731 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
735 if (c
->version
< 13) {
736 pa_context_fail(c
, PA_ERR_PROTOCOL
);
740 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
741 !pa_tagstruct_eof(t
)) {
742 pa_context_fail(c
, PA_ERR_PROTOCOL
);
746 if (!(s
= pa_hashmap_get(c
->playback_streams
, PA_UINT32_TO_PTR(channel
))))
749 if (s
->state
!= PA_STREAM_READY
)
752 check_smoother_status(s
, true, true, false);
753 request_auto_timing_update(s
, true);
755 if (s
->started_callback
)
756 s
->started_callback(s
, s
->started_userdata
);
762 void pa_command_stream_event(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
763 pa_context
*c
= userdata
;
766 pa_proplist
*pl
= NULL
;
770 pa_assert(command
== PA_COMMAND_PLAYBACK_STREAM_EVENT
|| command
== PA_COMMAND_RECORD_STREAM_EVENT
);
773 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
777 if (c
->version
< 15) {
778 pa_context_fail(c
, PA_ERR_PROTOCOL
);
782 pl
= pa_proplist_new();
784 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
785 pa_tagstruct_gets(t
, &event
) < 0 ||
786 pa_tagstruct_get_proplist(t
, pl
) < 0 ||
787 !pa_tagstruct_eof(t
) || !event
) {
788 pa_context_fail(c
, PA_ERR_PROTOCOL
);
792 if (!(s
= pa_hashmap_get(command
== PA_COMMAND_PLAYBACK_STREAM_EVENT
? c
->playback_streams
: c
->record_streams
, PA_UINT32_TO_PTR(channel
))))
795 if (s
->state
!= PA_STREAM_READY
)
798 if (pa_streq(event
, PA_STREAM_EVENT_FORMAT_LOST
)) {
799 /* Let client know what the running time was when the stream had to be killed */
800 pa_usec_t stream_time
;
801 if (pa_stream_get_time(s
, &stream_time
) == 0)
802 pa_proplist_setf(pl
, "stream-time", "%llu", (unsigned long long) stream_time
);
805 if (s
->event_callback
)
806 s
->event_callback(s
, event
, pl
, s
->event_userdata
);
812 pa_proplist_free(pl
);
815 void pa_command_request(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
817 pa_context
*c
= userdata
;
818 uint32_t bytes
, channel
;
821 pa_assert(command
== PA_COMMAND_REQUEST
);
824 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
828 if (pa_tagstruct_getu32(t
, &channel
) < 0 ||
829 pa_tagstruct_getu32(t
, &bytes
) < 0 ||
830 !pa_tagstruct_eof(t
)) {
831 pa_context_fail(c
, PA_ERR_PROTOCOL
);
835 if (!(s
= pa_hashmap_get(c
->playback_streams
, PA_UINT32_TO_PTR(channel
))))
838 if (s
->state
!= PA_STREAM_READY
)
841 s
->requested_bytes
+= bytes
;
844 pa_log_debug("got request for %lli, now at %lli", (long long) bytes
, (long long) s
->requested_bytes
);
847 if (s
->requested_bytes
> 0 && s
->write_callback
)
848 s
->write_callback(s
, (size_t) s
->requested_bytes
, s
->write_userdata
);
854 int64_t pa_stream_get_underflow_index(pa_stream
*p
) {
856 return p
->latest_underrun_at_index
;
859 void pa_command_overflow_or_underflow(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
861 pa_context
*c
= userdata
;
866 pa_assert(command
== PA_COMMAND_OVERFLOW
|| command
== PA_COMMAND_UNDERFLOW
);
869 pa_assert(PA_REFCNT_VALUE(c
) >= 1);
873 if (pa_tagstruct_getu32(t
, &channel
) < 0) {
874 pa_context_fail(c
, PA_ERR_PROTOCOL
);
878 if (c
->version
>= 23 && command
== PA_COMMAND_UNDERFLOW
) {
879 if (pa_tagstruct_gets64(t
, &offset
) < 0) {
880 pa_context_fail(c
, PA_ERR_PROTOCOL
);
885 if (!pa_tagstruct_eof(t
)) {
886 pa_context_fail(c
, PA_ERR_PROTOCOL
);
890 if (!(s
= pa_hashmap_get(c
->playback_streams
, PA_UINT32_TO_PTR(channel
))))
893 if (s
->state
!= PA_STREAM_READY
)
897 s
->latest_underrun_at_index
= offset
;
899 if (s
->buffer_attr
.prebuf
> 0)
900 check_smoother_status(s
, true, false, true);
902 request_auto_timing_update(s
, true);
904 if (command
== PA_COMMAND_OVERFLOW
) {
905 if (s
->overflow_callback
)
906 s
->overflow_callback(s
, s
->overflow_userdata
);
907 } else if (command
== PA_COMMAND_UNDERFLOW
) {
908 if (s
->underflow_callback
)
909 s
->underflow_callback(s
, s
->underflow_userdata
);
916 static void invalidate_indexes(pa_stream
*s
, bool r
, bool w
) {
918 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
921 pa_log_debug("invalidate r:%u w:%u tag:%u", r
, w
, s
->context
->ctag
);
924 if (s
->state
!= PA_STREAM_READY
)
928 s
->write_index_not_before
= s
->context
->ctag
;
930 if (s
->timing_info_valid
)
931 s
->timing_info
.write_index_corrupt
= true;
934 pa_log_debug("write_index invalidated");
939 s
->read_index_not_before
= s
->context
->ctag
;
941 if (s
->timing_info_valid
)
942 s
->timing_info
.read_index_corrupt
= true;
945 pa_log_debug("read_index invalidated");
949 request_auto_timing_update(s
, true);
952 static void auto_timing_update_callback(pa_mainloop_api
*m
, pa_time_event
*e
, const struct timeval
*t
, void *userdata
) {
953 pa_stream
*s
= userdata
;
956 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
959 request_auto_timing_update(s
, false);
963 static void create_stream_complete(pa_stream
*s
) {
965 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
966 pa_assert(s
->state
== PA_STREAM_CREATING
);
968 pa_stream_set_state(s
, PA_STREAM_READY
);
970 if (s
->requested_bytes
> 0 && s
->write_callback
)
971 s
->write_callback(s
, (size_t) s
->requested_bytes
, s
->write_userdata
);
973 if (s
->flags
& PA_STREAM_AUTO_TIMING_UPDATE
) {
974 s
->auto_timing_interval_usec
= AUTO_TIMING_INTERVAL_START_USEC
;
975 pa_assert(!s
->auto_timing_update_event
);
976 s
->auto_timing_update_event
= pa_context_rttime_new(s
->context
, pa_rtclock_now() + s
->auto_timing_interval_usec
, &auto_timing_update_callback
, s
);
978 request_auto_timing_update(s
, true);
981 check_smoother_status(s
, true, false, false);
984 static void patch_buffer_attr(pa_stream
*s
, pa_buffer_attr
*attr
, pa_stream_flags_t
*flags
) {
990 if ((e
= getenv("PULSE_LATENCY_MSEC"))) {
993 if (pa_atou(e
, &ms
) < 0 || ms
<= 0)
994 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e
);
996 attr
->maxlength
= (uint32_t) -1;
997 attr
->tlength
= pa_usec_to_bytes(ms
* PA_USEC_PER_MSEC
, &s
->sample_spec
);
998 attr
->minreq
= (uint32_t) -1;
999 attr
->prebuf
= (uint32_t) -1;
1000 attr
->fragsize
= attr
->tlength
;
1004 *flags
|= PA_STREAM_ADJUST_LATENCY
;
1007 if (s
->context
->version
>= 13)
1010 /* Version older than 0.9.10 didn't do server side buffer_attr
1011 * selection, hence we have to fake it on the client side. */
1013 /* We choose fairly conservative values here, to not confuse
1014 * old clients with extremely large playback buffers */
1016 if (attr
->maxlength
== (uint32_t) -1)
1017 attr
->maxlength
= 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1019 if (attr
->tlength
== (uint32_t) -1)
1020 attr
->tlength
= (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC
, &s
->sample_spec
); /* 250ms of buffering */
1022 if (attr
->minreq
== (uint32_t) -1)
1023 attr
->minreq
= (attr
->tlength
)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1025 if (attr
->prebuf
== (uint32_t) -1)
1026 attr
->prebuf
= attr
->tlength
; /* Start to play only when the playback is fully filled up once */
1028 if (attr
->fragsize
== (uint32_t) -1)
1029 attr
->fragsize
= attr
->tlength
; /* Pass data to the app only when the buffer is filled up once */
1032 void pa_create_stream_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1033 pa_stream
*s
= userdata
;
1034 uint32_t requested_bytes
= 0;
1038 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1039 pa_assert(s
->state
== PA_STREAM_CREATING
);
1043 if (command
!= PA_COMMAND_REPLY
) {
1044 if (pa_context_handle_error(s
->context
, command
, t
, false) < 0)
1047 pa_stream_set_state(s
, PA_STREAM_FAILED
);
1051 if (pa_tagstruct_getu32(t
, &s
->channel
) < 0 ||
1052 s
->channel
== PA_INVALID_INDEX
||
1053 ((s
->direction
!= PA_STREAM_UPLOAD
) && (pa_tagstruct_getu32(t
, &s
->stream_index
) < 0 || s
->stream_index
== PA_INVALID_INDEX
)) ||
1054 ((s
->direction
!= PA_STREAM_RECORD
) && pa_tagstruct_getu32(t
, &requested_bytes
) < 0)) {
1055 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1059 s
->requested_bytes
= (int64_t) requested_bytes
;
1061 if (s
->context
->version
>= 9) {
1062 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1063 if (pa_tagstruct_getu32(t
, &s
->buffer_attr
.maxlength
) < 0 ||
1064 pa_tagstruct_getu32(t
, &s
->buffer_attr
.tlength
) < 0 ||
1065 pa_tagstruct_getu32(t
, &s
->buffer_attr
.prebuf
) < 0 ||
1066 pa_tagstruct_getu32(t
, &s
->buffer_attr
.minreq
) < 0) {
1067 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1070 } else if (s
->direction
== PA_STREAM_RECORD
) {
1071 if (pa_tagstruct_getu32(t
, &s
->buffer_attr
.maxlength
) < 0 ||
1072 pa_tagstruct_getu32(t
, &s
->buffer_attr
.fragsize
) < 0) {
1073 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1079 if (s
->context
->version
>= 12 && s
->direction
!= PA_STREAM_UPLOAD
) {
1082 const char *dn
= NULL
;
1085 if (pa_tagstruct_get_sample_spec(t
, &ss
) < 0 ||
1086 pa_tagstruct_get_channel_map(t
, &cm
) < 0 ||
1087 pa_tagstruct_getu32(t
, &s
->device_index
) < 0 ||
1088 pa_tagstruct_gets(t
, &dn
) < 0 ||
1089 pa_tagstruct_get_boolean(t
, &suspended
) < 0) {
1090 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1094 if (!dn
|| s
->device_index
== PA_INVALID_INDEX
||
1095 ss
.channels
!= cm
.channels
||
1096 !pa_channel_map_valid(&cm
) ||
1097 !pa_sample_spec_valid(&ss
) ||
1098 (s
->n_formats
== 0 && (
1099 (!(s
->flags
& PA_STREAM_FIX_FORMAT
) && ss
.format
!= s
->sample_spec
.format
) ||
1100 (!(s
->flags
& PA_STREAM_FIX_RATE
) && ss
.rate
!= s
->sample_spec
.rate
) ||
1101 (!(s
->flags
& PA_STREAM_FIX_CHANNELS
) && !pa_channel_map_equal(&cm
, &s
->channel_map
))))) {
1102 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1106 pa_xfree(s
->device_name
);
1107 s
->device_name
= pa_xstrdup(dn
);
1108 s
->suspended
= suspended
;
1110 s
->channel_map
= cm
;
1111 s
->sample_spec
= ss
;
1114 if (s
->context
->version
>= 13 && s
->direction
!= PA_STREAM_UPLOAD
) {
1117 if (pa_tagstruct_get_usec(t
, &usec
) < 0) {
1118 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1122 if (s
->direction
== PA_STREAM_RECORD
)
1123 s
->timing_info
.configured_source_usec
= usec
;
1125 s
->timing_info
.configured_sink_usec
= usec
;
1128 if ((s
->context
->version
>= 21 && s
->direction
== PA_STREAM_PLAYBACK
)
1129 || s
->context
->version
>= 22) {
1131 pa_format_info
*f
= pa_format_info_new();
1132 pa_tagstruct_get_format_info(t
, f
);
1134 if (pa_format_info_valid(f
))
1137 pa_format_info_free(f
);
1138 if (s
->n_formats
> 0) {
1139 /* We used the extended API, so we should have got back a proper format */
1140 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1146 if (!pa_tagstruct_eof(t
)) {
1147 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
1151 if (s
->direction
== PA_STREAM_RECORD
) {
1152 pa_assert(!s
->record_memblockq
);
1154 s
->record_memblockq
= pa_memblockq_new(
1155 "client side record memblockq",
1157 s
->buffer_attr
.maxlength
,
1166 s
->channel_valid
= true;
1167 pa_hashmap_put((s
->direction
== PA_STREAM_RECORD
) ? s
->context
->record_streams
: s
->context
->playback_streams
, PA_UINT32_TO_PTR(s
->channel
), s
);
1169 create_stream_complete(s
);
1175 static int create_stream(
1176 pa_stream_direction_t direction
,
1179 const pa_buffer_attr
*attr
,
1180 pa_stream_flags_t flags
,
1181 const pa_cvolume
*volume
,
1182 pa_stream
*sync_stream
) {
1186 bool volume_set
= !!volume
;
1191 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1192 pa_assert(direction
== PA_STREAM_PLAYBACK
|| direction
== PA_STREAM_RECORD
);
1194 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1195 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_UNCONNECTED
, PA_ERR_BADSTATE
);
1196 PA_CHECK_VALIDITY(s
->context
, s
->direct_on_input
== PA_INVALID_INDEX
|| direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
);
1197 PA_CHECK_VALIDITY(s
->context
, !(flags
& ~(PA_STREAM_START_CORKED
|
1198 PA_STREAM_INTERPOLATE_TIMING
|
1199 PA_STREAM_NOT_MONOTONIC
|
1200 PA_STREAM_AUTO_TIMING_UPDATE
|
1201 PA_STREAM_NO_REMAP_CHANNELS
|
1202 PA_STREAM_NO_REMIX_CHANNELS
|
1203 PA_STREAM_FIX_FORMAT
|
1205 PA_STREAM_FIX_CHANNELS
|
1206 PA_STREAM_DONT_MOVE
|
1207 PA_STREAM_VARIABLE_RATE
|
1208 PA_STREAM_PEAK_DETECT
|
1209 PA_STREAM_START_MUTED
|
1210 PA_STREAM_ADJUST_LATENCY
|
1211 PA_STREAM_EARLY_REQUESTS
|
1212 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND
|
1213 PA_STREAM_START_UNMUTED
|
1214 PA_STREAM_FAIL_ON_SUSPEND
|
1215 PA_STREAM_RELATIVE_VOLUME
|
1216 PA_STREAM_PASSTHROUGH
)), PA_ERR_INVALID
);
1218 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 12 || !(flags
& PA_STREAM_VARIABLE_RATE
), PA_ERR_NOTSUPPORTED
);
1219 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 13 || !(flags
& PA_STREAM_PEAK_DETECT
), PA_ERR_NOTSUPPORTED
);
1220 PA_CHECK_VALIDITY(s
->context
, s
->context
->state
== PA_CONTEXT_READY
, PA_ERR_BADSTATE
);
1221 /* Although some of the other flags are not supported on older
1222 * version, we don't check for them here, because it doesn't hurt
1223 * when they are passed but actually not supported. This makes
1224 * client development easier */
1226 PA_CHECK_VALIDITY(s
->context
, direction
== PA_STREAM_RECORD
|| !(flags
& (PA_STREAM_PEAK_DETECT
)), PA_ERR_INVALID
);
1227 PA_CHECK_VALIDITY(s
->context
, !sync_stream
|| (direction
== PA_STREAM_PLAYBACK
&& sync_stream
->direction
== PA_STREAM_PLAYBACK
), PA_ERR_INVALID
);
1228 PA_CHECK_VALIDITY(s
->context
, (flags
& (PA_STREAM_ADJUST_LATENCY
|PA_STREAM_EARLY_REQUESTS
)) != (PA_STREAM_ADJUST_LATENCY
|PA_STREAM_EARLY_REQUESTS
), PA_ERR_INVALID
);
1232 s
->direction
= direction
;
1235 s
->syncid
= sync_stream
->syncid
;
1238 s
->buffer_attr
= *attr
;
1239 patch_buffer_attr(s
, &s
->buffer_attr
, &flags
);
1242 s
->corked
= !!(flags
& PA_STREAM_START_CORKED
);
1244 if (flags
& PA_STREAM_INTERPOLATE_TIMING
) {
1247 x
= pa_rtclock_now();
1249 pa_assert(!s
->smoother
);
1250 s
->smoother
= pa_smoother_new(
1251 SMOOTHER_ADJUST_TIME
,
1252 SMOOTHER_HISTORY_TIME
,
1253 !(flags
& PA_STREAM_NOT_MONOTONIC
),
1255 SMOOTHER_MIN_HISTORY
,
1261 dev
= s
->direction
== PA_STREAM_PLAYBACK
? s
->context
->conf
->default_sink
: s
->context
->conf
->default_source
;
1263 t
= pa_tagstruct_command(
1265 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_CREATE_PLAYBACK_STREAM
: PA_COMMAND_CREATE_RECORD_STREAM
),
1268 if (s
->context
->version
< 13)
1269 pa_tagstruct_puts(t
, pa_proplist_gets(s
->proplist
, PA_PROP_MEDIA_NAME
));
1273 PA_TAG_SAMPLE_SPEC
, &s
->sample_spec
,
1274 PA_TAG_CHANNEL_MAP
, &s
->channel_map
,
1275 PA_TAG_U32
, PA_INVALID_INDEX
,
1277 PA_TAG_U32
, s
->buffer_attr
.maxlength
,
1278 PA_TAG_BOOLEAN
, s
->corked
,
1282 if (pa_sample_spec_valid(&s
->sample_spec
))
1283 volume
= pa_cvolume_reset(&cv
, s
->sample_spec
.channels
);
1285 /* This is not really relevant, since no volume was set, and
1286 * the real number of channels is embedded in the format_info
1288 volume
= pa_cvolume_reset(&cv
, PA_CHANNELS_MAX
);
1292 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1295 PA_TAG_U32
, s
->buffer_attr
.tlength
,
1296 PA_TAG_U32
, s
->buffer_attr
.prebuf
,
1297 PA_TAG_U32
, s
->buffer_attr
.minreq
,
1298 PA_TAG_U32
, s
->syncid
,
1301 pa_tagstruct_put_cvolume(t
, volume
);
1303 pa_tagstruct_putu32(t
, s
->buffer_attr
.fragsize
);
1305 if (s
->context
->version
>= 12) {
1308 PA_TAG_BOOLEAN
, flags
& PA_STREAM_NO_REMAP_CHANNELS
,
1309 PA_TAG_BOOLEAN
, flags
& PA_STREAM_NO_REMIX_CHANNELS
,
1310 PA_TAG_BOOLEAN
, flags
& PA_STREAM_FIX_FORMAT
,
1311 PA_TAG_BOOLEAN
, flags
& PA_STREAM_FIX_RATE
,
1312 PA_TAG_BOOLEAN
, flags
& PA_STREAM_FIX_CHANNELS
,
1313 PA_TAG_BOOLEAN
, flags
& PA_STREAM_DONT_MOVE
,
1314 PA_TAG_BOOLEAN
, flags
& PA_STREAM_VARIABLE_RATE
,
1318 if (s
->context
->version
>= 13) {
1320 if (s
->direction
== PA_STREAM_PLAYBACK
)
1321 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_START_MUTED
);
1323 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_PEAK_DETECT
);
1327 PA_TAG_BOOLEAN
, flags
& PA_STREAM_ADJUST_LATENCY
,
1328 PA_TAG_PROPLIST
, s
->proplist
,
1331 if (s
->direction
== PA_STREAM_RECORD
)
1332 pa_tagstruct_putu32(t
, s
->direct_on_input
);
1335 if (s
->context
->version
>= 14) {
1337 if (s
->direction
== PA_STREAM_PLAYBACK
)
1338 pa_tagstruct_put_boolean(t
, volume_set
);
1340 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_EARLY_REQUESTS
);
1343 if (s
->context
->version
>= 15) {
1345 if (s
->direction
== PA_STREAM_PLAYBACK
)
1346 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_START_MUTED
|PA_STREAM_START_UNMUTED
));
1348 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND
);
1349 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_FAIL_ON_SUSPEND
);
1352 if (s
->context
->version
>= 17 && s
->direction
== PA_STREAM_PLAYBACK
)
1353 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_RELATIVE_VOLUME
);
1355 if (s
->context
->version
>= 18 && s
->direction
== PA_STREAM_PLAYBACK
)
1356 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_PASSTHROUGH
));
1358 if ((s
->context
->version
>= 21 && s
->direction
== PA_STREAM_PLAYBACK
)
1359 || s
->context
->version
>= 22) {
1361 pa_tagstruct_putu8(t
, s
->n_formats
);
1362 for (i
= 0; i
< s
->n_formats
; i
++)
1363 pa_tagstruct_put_format_info(t
, s
->req_formats
[i
]);
1366 if (s
->context
->version
>= 22 && s
->direction
== PA_STREAM_RECORD
) {
1367 pa_tagstruct_put_cvolume(t
, volume
);
1368 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_START_MUTED
);
1369 pa_tagstruct_put_boolean(t
, volume_set
);
1370 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_START_MUTED
|PA_STREAM_START_UNMUTED
));
1371 pa_tagstruct_put_boolean(t
, flags
& PA_STREAM_RELATIVE_VOLUME
);
1372 pa_tagstruct_put_boolean(t
, flags
& (PA_STREAM_PASSTHROUGH
));
1375 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
1376 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_create_stream_callback
, s
, NULL
);
1378 pa_stream_set_state(s
, PA_STREAM_CREATING
);
1384 int pa_stream_connect_playback(
1387 const pa_buffer_attr
*attr
,
1388 pa_stream_flags_t flags
,
1389 const pa_cvolume
*volume
,
1390 pa_stream
*sync_stream
) {
1393 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1395 return create_stream(PA_STREAM_PLAYBACK
, s
, dev
, attr
, flags
, volume
, sync_stream
);
1398 int pa_stream_connect_record(
1401 const pa_buffer_attr
*attr
,
1402 pa_stream_flags_t flags
) {
1405 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1407 return create_stream(PA_STREAM_RECORD
, s
, dev
, attr
, flags
, NULL
, NULL
);
1410 int pa_stream_begin_write(
1416 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1418 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1419 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1420 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| s
->direction
== PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1421 PA_CHECK_VALIDITY(s
->context
, data
, PA_ERR_INVALID
);
1422 PA_CHECK_VALIDITY(s
->context
, nbytes
&& *nbytes
!= 0, PA_ERR_INVALID
);
1424 if (*nbytes
!= (size_t) -1) {
1427 m
= pa_mempool_block_size_max(s
->context
->mempool
);
1428 fs
= pa_frame_size(&s
->sample_spec
);
1435 if (!s
->write_memblock
) {
1436 s
->write_memblock
= pa_memblock_new(s
->context
->mempool
, *nbytes
);
1437 s
->write_data
= pa_memblock_acquire(s
->write_memblock
);
1440 *data
= s
->write_data
;
1441 *nbytes
= pa_memblock_get_length(s
->write_memblock
);
1446 int pa_stream_cancel_write(
1450 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1452 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1453 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1454 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| s
->direction
== PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1455 PA_CHECK_VALIDITY(s
->context
, s
->write_memblock
, PA_ERR_BADSTATE
);
1457 pa_assert(s
->write_data
);
1459 pa_memblock_release(s
->write_memblock
);
1460 pa_memblock_unref(s
->write_memblock
);
1461 s
->write_memblock
= NULL
;
1462 s
->write_data
= NULL
;
1467 int pa_stream_write(
1471 pa_free_cb_t free_cb
,
1473 pa_seek_mode_t seek
) {
1476 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1479 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1480 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1481 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| s
->direction
== PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1482 PA_CHECK_VALIDITY(s
->context
, seek
<= PA_SEEK_RELATIVE_END
, PA_ERR_INVALID
);
1483 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
|| (seek
== PA_SEEK_RELATIVE
&& offset
== 0), PA_ERR_INVALID
);
1484 PA_CHECK_VALIDITY(s
->context
,
1485 !s
->write_memblock
||
1486 ((data
>= s
->write_data
) &&
1487 ((const char*) data
+ length
<= (const char*) s
->write_data
+ pa_memblock_get_length(s
->write_memblock
))),
1489 PA_CHECK_VALIDITY(s
->context
, !free_cb
|| !s
->write_memblock
, PA_ERR_INVALID
);
1491 if (s
->write_memblock
) {
1494 /* pa_stream_write_begin() was called before */
1496 pa_memblock_release(s
->write_memblock
);
1498 chunk
.memblock
= s
->write_memblock
;
1499 chunk
.index
= (const char *) data
- (const char *) s
->write_data
;
1500 chunk
.length
= length
;
1502 s
->write_memblock
= NULL
;
1503 s
->write_data
= NULL
;
1505 pa_pstream_send_memblock(s
->context
->pstream
, s
->channel
, offset
, seek
, &chunk
);
1506 pa_memblock_unref(chunk
.memblock
);
1509 pa_seek_mode_t t_seek
= seek
;
1510 int64_t t_offset
= offset
;
1511 size_t t_length
= length
;
1512 const void *t_data
= data
;
1514 /* pa_stream_write_begin() was not called before */
1516 while (t_length
> 0) {
1521 if (free_cb
&& !pa_pstream_get_shm(s
->context
->pstream
)) {
1522 chunk
.memblock
= pa_memblock_new_user(s
->context
->mempool
, (void*) t_data
, t_length
, free_cb
, 1);
1523 chunk
.length
= t_length
;
1527 chunk
.length
= PA_MIN(t_length
, pa_mempool_block_size_max(s
->context
->mempool
));
1528 chunk
.memblock
= pa_memblock_new(s
->context
->mempool
, chunk
.length
);
1530 d
= pa_memblock_acquire(chunk
.memblock
);
1531 memcpy(d
, t_data
, chunk
.length
);
1532 pa_memblock_release(chunk
.memblock
);
1535 pa_pstream_send_memblock(s
->context
->pstream
, s
->channel
, t_offset
, t_seek
, &chunk
);
1538 t_seek
= PA_SEEK_RELATIVE
;
1540 t_data
= (const uint8_t*) t_data
+ chunk
.length
;
1541 t_length
-= chunk
.length
;
1543 pa_memblock_unref(chunk
.memblock
);
1546 if (free_cb
&& pa_pstream_get_shm(s
->context
->pstream
))
1547 free_cb((void*) data
);
1550 /* This is obviously wrong since we ignore the seeking index . But
1551 * that's OK, the server side applies the same error */
1552 s
->requested_bytes
-= (seek
== PA_SEEK_RELATIVE
? offset
: 0) + (int64_t) length
;
1555 pa_log_debug("wrote %lli, now at %lli", (long long) length
, (long long) s
->requested_bytes
);
1558 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1560 /* Update latency request correction */
1561 if (s
->write_index_corrections
[s
->current_write_index_correction
].valid
) {
1563 if (seek
== PA_SEEK_ABSOLUTE
) {
1564 s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
= false;
1565 s
->write_index_corrections
[s
->current_write_index_correction
].absolute
= true;
1566 s
->write_index_corrections
[s
->current_write_index_correction
].value
= offset
+ (int64_t) length
;
1567 } else if (seek
== PA_SEEK_RELATIVE
) {
1568 if (!s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
)
1569 s
->write_index_corrections
[s
->current_write_index_correction
].value
+= offset
+ (int64_t) length
;
1571 s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
= true;
1574 /* Update the write index in the already available latency data */
1575 if (s
->timing_info_valid
) {
1577 if (seek
== PA_SEEK_ABSOLUTE
) {
1578 s
->timing_info
.write_index_corrupt
= false;
1579 s
->timing_info
.write_index
= offset
+ (int64_t) length
;
1580 } else if (seek
== PA_SEEK_RELATIVE
) {
1581 if (!s
->timing_info
.write_index_corrupt
)
1582 s
->timing_info
.write_index
+= offset
+ (int64_t) length
;
1584 s
->timing_info
.write_index_corrupt
= true;
1587 if (!s
->timing_info_valid
|| s
->timing_info
.write_index_corrupt
)
1588 request_auto_timing_update(s
, true);
1594 int pa_stream_peek(pa_stream
*s
, const void **data
, size_t *length
) {
1596 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1600 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1601 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1602 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
);
1604 if (!s
->peek_memchunk
.memblock
) {
1606 if (pa_memblockq_peek(s
->record_memblockq
, &s
->peek_memchunk
) < 0) {
1607 /* record_memblockq is empty. */
1612 } else if (!s
->peek_memchunk
.memblock
) {
1613 /* record_memblockq isn't empty, but it doesn't have any data at
1614 * the current read index. */
1616 *length
= s
->peek_memchunk
.length
;
1620 s
->peek_data
= pa_memblock_acquire(s
->peek_memchunk
.memblock
);
1623 pa_assert(s
->peek_data
);
1624 *data
= (uint8_t*) s
->peek_data
+ s
->peek_memchunk
.index
;
1625 *length
= s
->peek_memchunk
.length
;
1629 int pa_stream_drop(pa_stream
*s
) {
1631 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1633 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1634 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1635 PA_CHECK_VALIDITY(s
->context
, s
->direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
);
1636 PA_CHECK_VALIDITY(s
->context
, s
->peek_memchunk
.length
> 0, PA_ERR_BADSTATE
);
1638 pa_memblockq_drop(s
->record_memblockq
, s
->peek_memchunk
.length
);
1640 /* Fix the simulated local read index */
1641 if (s
->timing_info_valid
&& !s
->timing_info
.read_index_corrupt
)
1642 s
->timing_info
.read_index
+= (int64_t) s
->peek_memchunk
.length
;
1644 if (s
->peek_memchunk
.memblock
) {
1645 pa_assert(s
->peek_data
);
1646 s
->peek_data
= NULL
;
1647 pa_memblock_release(s
->peek_memchunk
.memblock
);
1648 pa_memblock_unref(s
->peek_memchunk
.memblock
);
1651 pa_memchunk_reset(&s
->peek_memchunk
);
1656 size_t pa_stream_writable_size(pa_stream
*s
) {
1658 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1660 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, (size_t) -1);
1661 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, (size_t) -1);
1662 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direction
!= PA_STREAM_RECORD
, PA_ERR_BADSTATE
, (size_t) -1);
1664 return s
->requested_bytes
> 0 ? (size_t) s
->requested_bytes
: 0;
1667 size_t pa_stream_readable_size(pa_stream
*s
) {
1669 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1671 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, (size_t) -1);
1672 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, (size_t) -1);
1673 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direction
== PA_STREAM_RECORD
, PA_ERR_BADSTATE
, (size_t) -1);
1675 return pa_memblockq_get_length(s
->record_memblockq
);
1678 pa_operation
* pa_stream_drain(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
1684 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1686 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1687 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1688 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
, PA_ERR_BADSTATE
);
1690 /* Ask for a timing update before we cork/uncork to get the best
1691 * accuracy for the transport latency suitable for the
1692 * check_smoother_status() call in the started callback */
1693 request_auto_timing_update(s
, true);
1695 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
1697 t
= pa_tagstruct_command(s
->context
, PA_COMMAND_DRAIN_PLAYBACK_STREAM
, &tag
);
1698 pa_tagstruct_putu32(t
, s
->channel
);
1699 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
1700 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
1702 /* This might cause the read index to continue again, hence
1703 * let's request a timing update */
1704 request_auto_timing_update(s
, true);
1709 static pa_usec_t
calc_time(pa_stream
*s
, bool ignore_transport
) {
1713 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1714 pa_assert(s
->state
== PA_STREAM_READY
);
1715 pa_assert(s
->direction
!= PA_STREAM_UPLOAD
);
1716 pa_assert(s
->timing_info_valid
);
1717 pa_assert(s
->direction
!= PA_STREAM_PLAYBACK
|| !s
->timing_info
.read_index_corrupt
);
1718 pa_assert(s
->direction
!= PA_STREAM_RECORD
|| !s
->timing_info
.write_index_corrupt
);
1720 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1721 /* The last byte that was written into the output device
1722 * had this time value associated */
1723 usec
= pa_bytes_to_usec(s
->timing_info
.read_index
< 0 ? 0 : (uint64_t) s
->timing_info
.read_index
, &s
->sample_spec
);
1725 if (!s
->corked
&& !s
->suspended
) {
1727 if (!ignore_transport
)
1728 /* Because the latency info took a little time to come
1729 * to us, we assume that the real output time is actually
1731 usec
+= s
->timing_info
.transport_usec
;
1733 /* However, the output device usually maintains a buffer
1734 too, hence the real sample currently played is a little
1736 if (s
->timing_info
.sink_usec
>= usec
)
1739 usec
-= s
->timing_info
.sink_usec
;
1743 pa_assert(s
->direction
== PA_STREAM_RECORD
);
1745 /* The last byte written into the server side queue had
1746 * this time value associated */
1747 usec
= pa_bytes_to_usec(s
->timing_info
.write_index
< 0 ? 0 : (uint64_t) s
->timing_info
.write_index
, &s
->sample_spec
);
1749 if (!s
->corked
&& !s
->suspended
) {
1751 if (!ignore_transport
)
1752 /* Add transport latency */
1753 usec
+= s
->timing_info
.transport_usec
;
1755 /* Add latency of data in device buffer */
1756 usec
+= s
->timing_info
.source_usec
;
1758 /* If this is a monitor source, we need to correct the
1759 * time by the playback device buffer */
1760 if (s
->timing_info
.sink_usec
>= usec
)
1763 usec
-= s
->timing_info
.sink_usec
;
1770 static void stream_get_timing_info_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
1771 pa_operation
*o
= userdata
;
1772 struct timeval local
, remote
, now
;
1774 bool playing
= false;
1775 uint64_t underrun_for
= 0, playing_for
= 0;
1779 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
1781 if (!o
->context
|| !o
->stream
)
1784 i
= &o
->stream
->timing_info
;
1786 o
->stream
->timing_info_valid
= false;
1787 i
->write_index_corrupt
= true;
1788 i
->read_index_corrupt
= true;
1790 if (command
!= PA_COMMAND_REPLY
) {
1791 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
1796 if (pa_tagstruct_get_usec(t
, &i
->sink_usec
) < 0 ||
1797 pa_tagstruct_get_usec(t
, &i
->source_usec
) < 0 ||
1798 pa_tagstruct_get_boolean(t
, &playing
) < 0 ||
1799 pa_tagstruct_get_timeval(t
, &local
) < 0 ||
1800 pa_tagstruct_get_timeval(t
, &remote
) < 0 ||
1801 pa_tagstruct_gets64(t
, &i
->write_index
) < 0 ||
1802 pa_tagstruct_gets64(t
, &i
->read_index
) < 0) {
1804 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
1808 if (o
->context
->version
>= 13 &&
1809 o
->stream
->direction
== PA_STREAM_PLAYBACK
)
1810 if (pa_tagstruct_getu64(t
, &underrun_for
) < 0 ||
1811 pa_tagstruct_getu64(t
, &playing_for
) < 0) {
1813 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
1817 if (!pa_tagstruct_eof(t
)) {
1818 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
1821 o
->stream
->timing_info_valid
= true;
1822 i
->write_index_corrupt
= false;
1823 i
->read_index_corrupt
= false;
1825 i
->playing
= (int) playing
;
1826 i
->since_underrun
= (int64_t) (playing
? playing_for
: underrun_for
);
1828 pa_gettimeofday(&now
);
1830 /* Calculate timestamps */
1831 if (pa_timeval_cmp(&local
, &remote
) <= 0 && pa_timeval_cmp(&remote
, &now
) <= 0) {
1832 /* local and remote seem to have synchronized clocks */
1834 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
)
1835 i
->transport_usec
= pa_timeval_diff(&remote
, &local
);
1837 i
->transport_usec
= pa_timeval_diff(&now
, &remote
);
1839 i
->synchronized_clocks
= true;
1840 i
->timestamp
= remote
;
1842 /* clocks are not synchronized, let's estimate latency then */
1843 i
->transport_usec
= pa_timeval_diff(&now
, &local
)/2;
1844 i
->synchronized_clocks
= false;
1845 i
->timestamp
= local
;
1846 pa_timeval_add(&i
->timestamp
, i
->transport_usec
);
1849 /* Invalidate read and write indexes if necessary */
1850 if (tag
< o
->stream
->read_index_not_before
)
1851 i
->read_index_corrupt
= true;
1853 if (tag
< o
->stream
->write_index_not_before
)
1854 i
->write_index_corrupt
= true;
1856 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
) {
1857 /* Write index correction */
1860 uint32_t ctag
= tag
;
1862 /* Go through the saved correction values and add up the
1863 * total correction.*/
1864 for (n
= 0, j
= o
->stream
->current_write_index_correction
+1;
1865 n
< PA_MAX_WRITE_INDEX_CORRECTIONS
;
1866 n
++, j
= (j
+ 1) % PA_MAX_WRITE_INDEX_CORRECTIONS
) {
1868 /* Step over invalid data or out-of-date data */
1869 if (!o
->stream
->write_index_corrections
[j
].valid
||
1870 o
->stream
->write_index_corrections
[j
].tag
< ctag
)
1873 /* Make sure that everything is in order */
1874 ctag
= o
->stream
->write_index_corrections
[j
].tag
+1;
1876 /* Now fix the write index */
1877 if (o
->stream
->write_index_corrections
[j
].corrupt
) {
1878 /* A corrupting seek was made */
1879 i
->write_index_corrupt
= true;
1880 } else if (o
->stream
->write_index_corrections
[j
].absolute
) {
1881 /* An absolute seek was made */
1882 i
->write_index
= o
->stream
->write_index_corrections
[j
].value
;
1883 i
->write_index_corrupt
= false;
1884 } else if (!i
->write_index_corrupt
) {
1885 /* A relative seek was made */
1886 i
->write_index
+= o
->stream
->write_index_corrections
[j
].value
;
1890 /* Clear old correction entries */
1891 for (n
= 0; n
< PA_MAX_WRITE_INDEX_CORRECTIONS
; n
++) {
1892 if (!o
->stream
->write_index_corrections
[n
].valid
)
1895 if (o
->stream
->write_index_corrections
[n
].tag
<= tag
)
1896 o
->stream
->write_index_corrections
[n
].valid
= false;
1900 if (o
->stream
->direction
== PA_STREAM_RECORD
) {
1901 /* Read index correction */
1903 if (!i
->read_index_corrupt
)
1904 i
->read_index
-= (int64_t) pa_memblockq_get_length(o
->stream
->record_memblockq
);
1907 /* Update smoother if we're not corked */
1908 if (o
->stream
->smoother
&& !o
->stream
->corked
) {
1911 u
= x
= pa_rtclock_now() - i
->transport_usec
;
1913 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
&& o
->context
->version
>= 13) {
1916 /* If we weren't playing then it will take some time
1917 * until the audio will actually come out through the
1918 * speakers. Since we follow that timing here, we need
1919 * to try to fix this up */
1921 su
= pa_bytes_to_usec((uint64_t) i
->since_underrun
, &o
->stream
->sample_spec
);
1923 if (su
< i
->sink_usec
)
1924 x
+= i
->sink_usec
- su
;
1928 pa_smoother_pause(o
->stream
->smoother
, x
);
1930 /* Update the smoother */
1931 if ((o
->stream
->direction
== PA_STREAM_PLAYBACK
&& !i
->read_index_corrupt
) ||
1932 (o
->stream
->direction
== PA_STREAM_RECORD
&& !i
->write_index_corrupt
))
1933 pa_smoother_put(o
->stream
->smoother
, u
, calc_time(o
->stream
, true));
1936 pa_smoother_resume(o
->stream
->smoother
, x
, true);
1940 o
->stream
->auto_timing_update_requested
= false;
1942 if (o
->stream
->latency_update_callback
)
1943 o
->stream
->latency_update_callback(o
->stream
, o
->stream
->latency_update_userdata
);
1945 if (o
->callback
&& o
->stream
&& o
->stream
->state
== PA_STREAM_READY
) {
1946 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
1947 cb(o
->stream
, o
->stream
->timing_info_valid
, o
->userdata
);
1952 pa_operation_done(o
);
1953 pa_operation_unref(o
);
1956 pa_operation
* pa_stream_update_timing_info(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
1964 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
1966 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
1967 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
1968 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
1970 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1971 /* Find a place to store the write_index correction data for this entry */
1972 cidx
= (s
->current_write_index_correction
+ 1) % PA_MAX_WRITE_INDEX_CORRECTIONS
;
1974 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1975 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !s
->write_index_corrections
[cidx
].valid
, PA_ERR_INTERNAL
);
1977 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
1979 t
= pa_tagstruct_command(
1981 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_GET_PLAYBACK_LATENCY
: PA_COMMAND_GET_RECORD_LATENCY
),
1983 pa_tagstruct_putu32(t
, s
->channel
);
1984 pa_tagstruct_put_timeval(t
, pa_gettimeofday(&now
));
1986 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
1987 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_get_timing_info_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
1989 if (s
->direction
== PA_STREAM_PLAYBACK
) {
1990 /* Fill in initial correction data */
1992 s
->current_write_index_correction
= cidx
;
1994 s
->write_index_corrections
[cidx
].valid
= true;
1995 s
->write_index_corrections
[cidx
].absolute
= false;
1996 s
->write_index_corrections
[cidx
].corrupt
= false;
1997 s
->write_index_corrections
[cidx
].tag
= tag
;
1998 s
->write_index_corrections
[cidx
].value
= 0;
2004 void pa_stream_disconnect_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2005 pa_stream
*s
= userdata
;
2009 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2013 if (command
!= PA_COMMAND_REPLY
) {
2014 if (pa_context_handle_error(s
->context
, command
, t
, false) < 0)
2017 pa_stream_set_state(s
, PA_STREAM_FAILED
);
2019 } else if (!pa_tagstruct_eof(t
)) {
2020 pa_context_fail(s
->context
, PA_ERR_PROTOCOL
);
2024 pa_stream_set_state(s
, PA_STREAM_TERMINATED
);
2030 int pa_stream_disconnect(pa_stream
*s
) {
2035 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2037 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2038 PA_CHECK_VALIDITY(s
->context
, s
->channel_valid
, PA_ERR_BADSTATE
);
2039 PA_CHECK_VALIDITY(s
->context
, s
->context
->state
== PA_CONTEXT_READY
, PA_ERR_BADSTATE
);
2043 t
= pa_tagstruct_command(
2045 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_DELETE_PLAYBACK_STREAM
:
2046 (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_DELETE_RECORD_STREAM
: PA_COMMAND_DELETE_UPLOAD_STREAM
)),
2048 pa_tagstruct_putu32(t
, s
->channel
);
2049 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2050 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_disconnect_callback
, s
, NULL
);
2056 void pa_stream_set_read_callback(pa_stream
*s
, pa_stream_request_cb_t cb
, void *userdata
) {
2058 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2060 if (pa_detect_fork())
2063 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2066 s
->read_callback
= cb
;
2067 s
->read_userdata
= userdata
;
2070 void pa_stream_set_write_callback(pa_stream
*s
, pa_stream_request_cb_t cb
, void *userdata
) {
2072 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2074 if (pa_detect_fork())
2077 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2080 s
->write_callback
= cb
;
2081 s
->write_userdata
= userdata
;
2084 void pa_stream_set_state_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2086 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2088 if (pa_detect_fork())
2091 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2094 s
->state_callback
= cb
;
2095 s
->state_userdata
= userdata
;
2098 void pa_stream_set_overflow_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2100 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2102 if (pa_detect_fork())
2105 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2108 s
->overflow_callback
= cb
;
2109 s
->overflow_userdata
= userdata
;
2112 void pa_stream_set_underflow_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2114 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2116 if (pa_detect_fork())
2119 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2122 s
->underflow_callback
= cb
;
2123 s
->underflow_userdata
= userdata
;
2126 void pa_stream_set_latency_update_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2128 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2130 if (pa_detect_fork())
2133 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2136 s
->latency_update_callback
= cb
;
2137 s
->latency_update_userdata
= userdata
;
2140 void pa_stream_set_moved_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2142 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2144 if (pa_detect_fork())
2147 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2150 s
->moved_callback
= cb
;
2151 s
->moved_userdata
= userdata
;
2154 void pa_stream_set_suspended_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2156 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2158 if (pa_detect_fork())
2161 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2164 s
->suspended_callback
= cb
;
2165 s
->suspended_userdata
= userdata
;
2168 void pa_stream_set_started_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2170 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2172 if (pa_detect_fork())
2175 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2178 s
->started_callback
= cb
;
2179 s
->started_userdata
= userdata
;
2182 void pa_stream_set_event_callback(pa_stream
*s
, pa_stream_event_cb_t cb
, void *userdata
) {
2184 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2186 if (pa_detect_fork())
2189 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2192 s
->event_callback
= cb
;
2193 s
->event_userdata
= userdata
;
2196 void pa_stream_set_buffer_attr_callback(pa_stream
*s
, pa_stream_notify_cb_t cb
, void *userdata
) {
2198 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2200 if (pa_detect_fork())
2203 if (s
->state
== PA_STREAM_TERMINATED
|| s
->state
== PA_STREAM_FAILED
)
2206 s
->buffer_attr_callback
= cb
;
2207 s
->buffer_attr_userdata
= userdata
;
2210 void pa_stream_simple_ack_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2211 pa_operation
*o
= userdata
;
2216 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
2221 if (command
!= PA_COMMAND_REPLY
) {
2222 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
2226 } else if (!pa_tagstruct_eof(t
)) {
2227 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2232 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
2233 cb(o
->stream
, success
, o
->userdata
);
2237 pa_operation_done(o
);
2238 pa_operation_unref(o
);
2241 pa_operation
* pa_stream_cork(pa_stream
*s
, int b
, pa_stream_success_cb_t cb
, void *userdata
) {
2247 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2249 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2250 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2251 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2253 /* Ask for a timing update before we cork/uncork to get the best
2254 * accuracy for the transport latency suitable for the
2255 * check_smoother_status() call in the started callback */
2256 request_auto_timing_update(s
, true);
2260 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2262 t
= pa_tagstruct_command(
2264 (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_CORK_PLAYBACK_STREAM
: PA_COMMAND_CORK_RECORD_STREAM
),
2266 pa_tagstruct_putu32(t
, s
->channel
);
2267 pa_tagstruct_put_boolean(t
, !!b
);
2268 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2269 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2271 check_smoother_status(s
, false, false, false);
2273 /* This might cause the indexes to hang/start again, hence let's
2274 * request a timing update, after the cork/uncork, too */
2275 request_auto_timing_update(s
, true);
2280 static pa_operation
* stream_send_simple_command(pa_stream
*s
, uint32_t command
, pa_stream_success_cb_t cb
, void *userdata
) {
2286 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2288 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2289 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2291 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2293 t
= pa_tagstruct_command(s
->context
, command
, &tag
);
2294 pa_tagstruct_putu32(t
, s
->channel
);
2295 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2296 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2301 pa_operation
* pa_stream_flush(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
2305 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2307 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2308 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2309 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2311 /* Ask for a timing update *before* the flush, so that the
2312 * transport usec is as up to date as possible when we get the
2313 * underflow message and update the smoother status*/
2314 request_auto_timing_update(s
, true);
2316 if (!(o
= stream_send_simple_command(s
, (uint32_t) (s
->direction
== PA_STREAM_PLAYBACK
? PA_COMMAND_FLUSH_PLAYBACK_STREAM
: PA_COMMAND_FLUSH_RECORD_STREAM
), cb
, userdata
)))
2319 if (s
->direction
== PA_STREAM_PLAYBACK
) {
2321 if (s
->write_index_corrections
[s
->current_write_index_correction
].valid
)
2322 s
->write_index_corrections
[s
->current_write_index_correction
].corrupt
= true;
2324 if (s
->buffer_attr
.prebuf
> 0)
2325 check_smoother_status(s
, false, false, true);
2327 /* This will change the write index, but leave the
2328 * read index untouched. */
2329 invalidate_indexes(s
, false, true);
2332 /* For record streams this has no influence on the write
2333 * index, but the read index might jump. */
2334 invalidate_indexes(s
, true, false);
2336 /* Note that we do not update requested_bytes here. This is
2337 * because we cannot really know how data actually was dropped
2338 * from the write index due to this. This 'error' will be applied
2339 * by both client and server and hence we should be fine. */
2344 pa_operation
* pa_stream_prebuf(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
2348 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2350 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2351 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
, PA_ERR_BADSTATE
);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->buffer_attr
.prebuf
> 0, PA_ERR_BADSTATE
);
2355 /* Ask for a timing update before we cork/uncork to get the best
2356 * accuracy for the transport latency suitable for the
2357 * check_smoother_status() call in the started callback */
2358 request_auto_timing_update(s
, true);
2360 if (!(o
= stream_send_simple_command(s
, PA_COMMAND_PREBUF_PLAYBACK_STREAM
, cb
, userdata
)))
2363 /* This might cause the read index to hang again, hence
2364 * let's request a timing update */
2365 request_auto_timing_update(s
, true);
2370 pa_operation
* pa_stream_trigger(pa_stream
*s
, pa_stream_success_cb_t cb
, void *userdata
) {
2374 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2376 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2377 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2378 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
== PA_STREAM_PLAYBACK
, PA_ERR_BADSTATE
);
2379 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->buffer_attr
.prebuf
> 0, PA_ERR_BADSTATE
);
2381 /* Ask for a timing update before we cork/uncork to get the best
2382 * accuracy for the transport latency suitable for the
2383 * check_smoother_status() call in the started callback */
2384 request_auto_timing_update(s
, true);
2386 if (!(o
= stream_send_simple_command(s
, PA_COMMAND_TRIGGER_PLAYBACK_STREAM
, cb
, userdata
)))
2389 /* This might cause the read index to start moving again, hence
2390 * let's request a timing update */
2391 request_auto_timing_update(s
, true);
2396 pa_operation
* pa_stream_set_name(pa_stream
*s
, const char *name
, pa_stream_success_cb_t cb
, void *userdata
) {
2400 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2403 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2404 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2405 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2407 if (s
->context
->version
>= 13) {
2408 pa_proplist
*p
= pa_proplist_new();
2410 pa_proplist_sets(p
, PA_PROP_MEDIA_NAME
, name
);
2411 o
= pa_stream_proplist_update(s
, PA_UPDATE_REPLACE
, p
, cb
, userdata
);
2412 pa_proplist_free(p
);
2417 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2418 t
= pa_tagstruct_command(
2420 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_SET_RECORD_STREAM_NAME
: PA_COMMAND_SET_PLAYBACK_STREAM_NAME
),
2422 pa_tagstruct_putu32(t
, s
->channel
);
2423 pa_tagstruct_puts(t
, name
);
2424 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2425 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2431 int pa_stream_get_time(pa_stream
*s
, pa_usec_t
*r_usec
) {
2435 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2437 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2438 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2439 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2440 PA_CHECK_VALIDITY(s
->context
, s
->timing_info_valid
, PA_ERR_NODATA
);
2441 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_PLAYBACK
|| !s
->timing_info
.read_index_corrupt
, PA_ERR_NODATA
);
2442 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_RECORD
|| !s
->timing_info
.write_index_corrupt
, PA_ERR_NODATA
);
2445 usec
= pa_smoother_get(s
->smoother
, pa_rtclock_now());
2447 usec
= calc_time(s
, false);
2449 /* Make sure the time runs monotonically */
2450 if (!(s
->flags
& PA_STREAM_NOT_MONOTONIC
)) {
2451 if (usec
< s
->previous_time
)
2452 usec
= s
->previous_time
;
2454 s
->previous_time
= usec
;
2463 static pa_usec_t
time_counter_diff(pa_stream
*s
, pa_usec_t a
, pa_usec_t b
, int *negative
) {
2465 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2473 if (negative
&& s
->direction
== PA_STREAM_RECORD
) {
2481 int pa_stream_get_latency(pa_stream
*s
, pa_usec_t
*r_usec
, int *negative
) {
2487 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2490 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2491 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2492 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2493 PA_CHECK_VALIDITY(s
->context
, s
->timing_info_valid
, PA_ERR_NODATA
);
2494 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_PLAYBACK
|| !s
->timing_info
.write_index_corrupt
, PA_ERR_NODATA
);
2495 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_RECORD
|| !s
->timing_info
.read_index_corrupt
, PA_ERR_NODATA
);
2497 if ((r
= pa_stream_get_time(s
, &t
)) < 0)
2500 if (s
->direction
== PA_STREAM_PLAYBACK
)
2501 cindex
= s
->timing_info
.write_index
;
2503 cindex
= s
->timing_info
.read_index
;
2508 c
= pa_bytes_to_usec((uint64_t) cindex
, &s
->sample_spec
);
2510 if (s
->direction
== PA_STREAM_PLAYBACK
)
2511 *r_usec
= time_counter_diff(s
, c
, t
, negative
);
2513 *r_usec
= time_counter_diff(s
, t
, c
, negative
);
2518 const pa_timing_info
* pa_stream_get_timing_info(pa_stream
*s
) {
2520 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2522 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2523 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2524 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2525 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->timing_info_valid
, PA_ERR_NODATA
);
2527 return &s
->timing_info
;
2530 const pa_sample_spec
* pa_stream_get_sample_spec(pa_stream
*s
) {
2532 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2534 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2536 return &s
->sample_spec
;
2539 const pa_channel_map
* pa_stream_get_channel_map(pa_stream
*s
) {
2541 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2543 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2545 return &s
->channel_map
;
2548 const pa_format_info
* pa_stream_get_format_info(pa_stream
*s
) {
2550 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2552 /* We don't have the format till routing is done */
2553 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2554 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2558 const pa_buffer_attr
* pa_stream_get_buffer_attr(pa_stream
*s
) {
2560 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2562 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2563 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2564 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 9, PA_ERR_NOTSUPPORTED
);
2566 return &s
->buffer_attr
;
2569 static void stream_set_buffer_attr_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2570 pa_operation
*o
= userdata
;
2575 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
2580 if (command
!= PA_COMMAND_REPLY
) {
2581 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
2586 if (o
->stream
->direction
== PA_STREAM_PLAYBACK
) {
2587 if (pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.maxlength
) < 0 ||
2588 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.tlength
) < 0 ||
2589 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.prebuf
) < 0 ||
2590 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.minreq
) < 0) {
2591 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2594 } else if (o
->stream
->direction
== PA_STREAM_RECORD
) {
2595 if (pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.maxlength
) < 0 ||
2596 pa_tagstruct_getu32(t
, &o
->stream
->buffer_attr
.fragsize
) < 0) {
2597 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2602 if (o
->stream
->context
->version
>= 13) {
2605 if (pa_tagstruct_get_usec(t
, &usec
) < 0) {
2606 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2610 if (o
->stream
->direction
== PA_STREAM_RECORD
)
2611 o
->stream
->timing_info
.configured_source_usec
= usec
;
2613 o
->stream
->timing_info
.configured_sink_usec
= usec
;
2616 if (!pa_tagstruct_eof(t
)) {
2617 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2623 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
2624 cb(o
->stream
, success
, o
->userdata
);
2628 pa_operation_done(o
);
2629 pa_operation_unref(o
);
2632 pa_operation
* pa_stream_set_buffer_attr(pa_stream
*s
, const pa_buffer_attr
*attr
, pa_stream_success_cb_t cb
, void *userdata
) {
2636 pa_buffer_attr copy
;
2639 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2642 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2643 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2644 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2645 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2647 /* Ask for a timing update before we cork/uncork to get the best
2648 * accuracy for the transport latency suitable for the
2649 * check_smoother_status() call in the started callback */
2650 request_auto_timing_update(s
, true);
2652 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2654 t
= pa_tagstruct_command(
2656 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR
: PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR
),
2658 pa_tagstruct_putu32(t
, s
->channel
);
2661 patch_buffer_attr(s
, ©
, NULL
);
2664 pa_tagstruct_putu32(t
, attr
->maxlength
);
2666 if (s
->direction
== PA_STREAM_PLAYBACK
)
2669 PA_TAG_U32
, attr
->tlength
,
2670 PA_TAG_U32
, attr
->prebuf
,
2671 PA_TAG_U32
, attr
->minreq
,
2674 pa_tagstruct_putu32(t
, attr
->fragsize
);
2676 if (s
->context
->version
>= 13)
2677 pa_tagstruct_put_boolean(t
, !!(s
->flags
& PA_STREAM_ADJUST_LATENCY
));
2679 if (s
->context
->version
>= 14)
2680 pa_tagstruct_put_boolean(t
, !!(s
->flags
& PA_STREAM_EARLY_REQUESTS
));
2682 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2683 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_set_buffer_attr_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2685 /* This might cause changes in the read/write index, hence let's
2686 * request a timing update */
2687 request_auto_timing_update(s
, true);
2692 uint32_t pa_stream_get_device_index(pa_stream
*s
) {
2694 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2696 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, PA_INVALID_INDEX
);
2697 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2698 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2699 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
, PA_INVALID_INDEX
);
2700 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->device_index
!= PA_INVALID_INDEX
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2702 return s
->device_index
;
2705 const char *pa_stream_get_device_name(pa_stream
*s
) {
2707 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2709 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2710 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2711 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2712 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2713 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->device_name
, PA_ERR_BADSTATE
);
2715 return s
->device_name
;
2718 int pa_stream_is_suspended(pa_stream
*s
) {
2720 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2722 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2723 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2724 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2725 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2727 return s
->suspended
;
2730 int pa_stream_is_corked(pa_stream
*s
) {
2732 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2734 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2735 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2736 PA_CHECK_VALIDITY(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2741 static void stream_update_sample_rate_callback(pa_pdispatch
*pd
, uint32_t command
, uint32_t tag
, pa_tagstruct
*t
, void *userdata
) {
2742 pa_operation
*o
= userdata
;
2747 pa_assert(PA_REFCNT_VALUE(o
) >= 1);
2752 if (command
!= PA_COMMAND_REPLY
) {
2753 if (pa_context_handle_error(o
->context
, command
, t
, false) < 0)
2759 if (!pa_tagstruct_eof(t
)) {
2760 pa_context_fail(o
->context
, PA_ERR_PROTOCOL
);
2765 o
->stream
->sample_spec
.rate
= PA_PTR_TO_UINT(o
->private);
2766 pa_assert(pa_sample_spec_valid(&o
->stream
->sample_spec
));
2769 pa_stream_success_cb_t cb
= (pa_stream_success_cb_t
) o
->callback
;
2770 cb(o
->stream
, success
, o
->userdata
);
2774 pa_operation_done(o
);
2775 pa_operation_unref(o
);
2778 pa_operation
*pa_stream_update_sample_rate(pa_stream
*s
, uint32_t rate
, pa_stream_success_cb_t cb
, void *userdata
) {
2784 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2786 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2787 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, pa_sample_rate_valid(rate
), PA_ERR_INVALID
);
2788 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2789 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2790 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->flags
& PA_STREAM_VARIABLE_RATE
, PA_ERR_BADSTATE
);
2791 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 12, PA_ERR_NOTSUPPORTED
);
2793 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2794 o
->private = PA_UINT_TO_PTR(rate
);
2796 t
= pa_tagstruct_command(
2798 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE
: PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE
),
2800 pa_tagstruct_putu32(t
, s
->channel
);
2801 pa_tagstruct_putu32(t
, rate
);
2803 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2804 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, stream_update_sample_rate_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2809 pa_operation
*pa_stream_proplist_update(pa_stream
*s
, pa_update_mode_t mode
, pa_proplist
*p
, pa_stream_success_cb_t cb
, void *userdata
) {
2815 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2817 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2818 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, mode
== PA_UPDATE_SET
|| mode
== PA_UPDATE_MERGE
|| mode
== PA_UPDATE_REPLACE
, PA_ERR_INVALID
);
2819 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2820 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2821 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
);
2823 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2825 t
= pa_tagstruct_command(
2827 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST
: PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST
),
2829 pa_tagstruct_putu32(t
, s
->channel
);
2830 pa_tagstruct_putu32(t
, (uint32_t) mode
);
2831 pa_tagstruct_put_proplist(t
, p
);
2833 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2834 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2836 /* Please note that we don't update s->proplist here, because we
2837 * don't export that field */
2842 pa_operation
*pa_stream_proplist_remove(pa_stream
*s
, const char *const keys
[], pa_stream_success_cb_t cb
, void *userdata
) {
2846 const char * const*k
;
2849 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2851 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2852 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, keys
&& keys
[0], PA_ERR_INVALID
);
2853 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->state
== PA_STREAM_READY
, PA_ERR_BADSTATE
);
2854 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->direction
!= PA_STREAM_UPLOAD
, PA_ERR_BADSTATE
);
2855 PA_CHECK_VALIDITY_RETURN_NULL(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
);
2857 o
= pa_operation_new(s
->context
, s
, (pa_operation_cb_t
) cb
, userdata
);
2859 t
= pa_tagstruct_command(
2861 (uint32_t) (s
->direction
== PA_STREAM_RECORD
? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST
: PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST
),
2863 pa_tagstruct_putu32(t
, s
->channel
);
2865 for (k
= keys
; *k
; k
++)
2866 pa_tagstruct_puts(t
, *k
);
2868 pa_tagstruct_puts(t
, NULL
);
2870 pa_pstream_send_tagstruct(s
->context
->pstream
, t
);
2871 pa_pdispatch_register_reply(s
->context
->pdispatch
, tag
, DEFAULT_TIMEOUT
, pa_stream_simple_ack_callback
, pa_operation_ref(o
), (pa_free_cb_t
) pa_operation_unref
);
2873 /* Please note that we don't update s->proplist here, because we
2874 * don't export that field */
2879 int pa_stream_set_monitor_stream(pa_stream
*s
, uint32_t sink_input_idx
) {
2881 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2883 PA_CHECK_VALIDITY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
);
2884 PA_CHECK_VALIDITY(s
->context
, sink_input_idx
!= PA_INVALID_INDEX
, PA_ERR_INVALID
);
2885 PA_CHECK_VALIDITY(s
->context
, s
->state
== PA_STREAM_UNCONNECTED
, PA_ERR_BADSTATE
);
2886 PA_CHECK_VALIDITY(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
);
2888 s
->direct_on_input
= sink_input_idx
;
2893 uint32_t pa_stream_get_monitor_stream(pa_stream
*s
) {
2895 pa_assert(PA_REFCNT_VALUE(s
) >= 1);
2897 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, !pa_detect_fork(), PA_ERR_FORKED
, PA_INVALID_INDEX
);
2898 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->direct_on_input
!= PA_INVALID_INDEX
, PA_ERR_BADSTATE
, PA_INVALID_INDEX
);
2899 PA_CHECK_VALIDITY_RETURN_ANY(s
->context
, s
->context
->version
>= 13, PA_ERR_NOTSUPPORTED
, PA_INVALID_INDEX
);
2901 return s
->direct_on_input
;