]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
d3763260839c93776537f4d7408cb466cb2f57ad
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 /* #define STREAM_DEBUG */
48
49 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
50 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
51
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55
56 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
57 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 }
59
60 static void reset_callbacks(pa_stream *s) {
61 s->read_callback = NULL;
62 s->read_userdata = NULL;
63 s->write_callback = NULL;
64 s->write_userdata = NULL;
65 s->state_callback = NULL;
66 s->state_userdata = NULL;
67 s->overflow_callback = NULL;
68 s->overflow_userdata = NULL;
69 s->underflow_callback = NULL;
70 s->underflow_userdata = NULL;
71 s->latency_update_callback = NULL;
72 s->latency_update_userdata = NULL;
73 s->moved_callback = NULL;
74 s->moved_userdata = NULL;
75 s->suspended_callback = NULL;
76 s->suspended_userdata = NULL;
77 s->started_callback = NULL;
78 s->started_userdata = NULL;
79 s->event_callback = NULL;
80 s->event_userdata = NULL;
81 s->buffer_attr_callback = NULL;
82 s->buffer_attr_userdata = NULL;
83 }
84
85 static pa_stream *pa_stream_new_with_proplist_internal(
86 pa_context *c,
87 const char *name,
88 const pa_sample_spec *ss,
89 const pa_channel_map *map,
90 pa_format_info * const *formats,
91 unsigned int n_formats,
92 pa_proplist *p) {
93
94 pa_stream *s;
95 unsigned int i;
96
97 pa_assert(c);
98 pa_assert(PA_REFCNT_VALUE(c) >= 1);
99 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
100 pa_assert(n_formats < PA_MAX_FORMATS);
101
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105 s = pa_xnew(pa_stream, 1);
106 PA_REFCNT_INIT(s);
107 s->context = c;
108 s->mainloop = c->mainloop;
109
110 s->direction = PA_STREAM_NODIRECTION;
111 s->state = PA_STREAM_UNCONNECTED;
112 s->flags = 0;
113
114 if (ss)
115 s->sample_spec = *ss;
116 else
117 pa_sample_spec_init(&s->sample_spec);
118
119 if (map)
120 s->channel_map = *map;
121 else
122 pa_channel_map_init(&s->channel_map);
123
124 s->n_formats = 0;
125 if (formats) {
126 s->n_formats = n_formats;
127 for (i = 0; i < n_formats; i++)
128 s->req_formats[i] = pa_format_info_copy(formats[i]);
129 }
130
131 /* We'll get the final negotiated format after connecting */
132 s->format = NULL;
133
134 s->direct_on_input = PA_INVALID_INDEX;
135
136 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
137 if (name)
138 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139
140 s->channel = 0;
141 s->channel_valid = false;
142 s->syncid = c->csyncid++;
143 s->stream_index = PA_INVALID_INDEX;
144
145 s->requested_bytes = 0;
146 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
147
148 /* We initialize the target length here, so that if the user
149 * passes no explicit buffering metrics the default is similar to
150 * what older PA versions provided. */
151
152 s->buffer_attr.maxlength = (uint32_t) -1;
153 if (ss)
154 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
155 else {
156 /* FIXME: We assume a worst-case compressed format corresponding to
157 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
158 pa_sample_spec tmp_ss = {
159 .format = PA_SAMPLE_S16NE,
160 .rate = 48000,
161 .channels = 2,
162 };
163 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
164 }
165 s->buffer_attr.minreq = (uint32_t) -1;
166 s->buffer_attr.prebuf = (uint32_t) -1;
167 s->buffer_attr.fragsize = (uint32_t) -1;
168
169 s->device_index = PA_INVALID_INDEX;
170 s->device_name = NULL;
171 s->suspended = false;
172 s->corked = false;
173
174 s->write_memblock = NULL;
175 s->write_data = NULL;
176
177 pa_memchunk_reset(&s->peek_memchunk);
178 s->peek_data = NULL;
179 s->record_memblockq = NULL;
180
181 memset(&s->timing_info, 0, sizeof(s->timing_info));
182 s->timing_info_valid = false;
183
184 s->previous_time = 0;
185 s->latest_underrun_at_index = -1;
186
187 s->read_index_not_before = 0;
188 s->write_index_not_before = 0;
189 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
190 s->write_index_corrections[i].valid = 0;
191 s->current_write_index_correction = 0;
192
193 s->auto_timing_update_event = NULL;
194 s->auto_timing_update_requested = false;
195 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
196
197 reset_callbacks(s);
198
199 s->smoother = NULL;
200
201 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
202 PA_LLIST_PREPEND(pa_stream, c->streams, s);
203 pa_stream_ref(s);
204
205 return s;
206 }
207
208 pa_stream *pa_stream_new_with_proplist(
209 pa_context *c,
210 const char *name,
211 const pa_sample_spec *ss,
212 const pa_channel_map *map,
213 pa_proplist *p) {
214
215 pa_channel_map tmap;
216
217 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
220 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
222
223 if (!map)
224 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
225
226 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
227 }
228
229 pa_stream *pa_stream_new_extended(
230 pa_context *c,
231 const char *name,
232 pa_format_info * const *formats,
233 unsigned int n_formats,
234 pa_proplist *p) {
235
236 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
237
238 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
239 }
240
241 static void stream_unlink(pa_stream *s) {
242 pa_operation *o, *n;
243 pa_assert(s);
244
245 if (!s->context)
246 return;
247
248 /* Detach from context */
249
250 /* Unref all operation objects that point to us */
251 for (o = s->context->operations; o; o = n) {
252 n = o->next;
253
254 if (o->stream == s)
255 pa_operation_cancel(o);
256 }
257
258 /* Drop all outstanding replies for this stream */
259 if (s->context->pdispatch)
260 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
261
262 if (s->channel_valid) {
263 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
264 s->channel = 0;
265 s->channel_valid = false;
266 }
267
268 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
269 pa_stream_unref(s);
270
271 s->context = NULL;
272
273 if (s->auto_timing_update_event) {
274 pa_assert(s->mainloop);
275 s->mainloop->time_free(s->auto_timing_update_event);
276 }
277
278 reset_callbacks(s);
279 }
280
281 static void stream_free(pa_stream *s) {
282 unsigned int i;
283
284 pa_assert(s);
285
286 stream_unlink(s);
287
288 if (s->write_memblock) {
289 if (s->write_data)
290 pa_memblock_release(s->write_memblock);
291 pa_memblock_unref(s->write_memblock);
292 }
293
294 if (s->peek_memchunk.memblock) {
295 if (s->peek_data)
296 pa_memblock_release(s->peek_memchunk.memblock);
297 pa_memblock_unref(s->peek_memchunk.memblock);
298 }
299
300 if (s->record_memblockq)
301 pa_memblockq_free(s->record_memblockq);
302
303 if (s->proplist)
304 pa_proplist_free(s->proplist);
305
306 if (s->smoother)
307 pa_smoother_free(s->smoother);
308
309 for (i = 0; i < s->n_formats; i++)
310 pa_format_info_free(s->req_formats[i]);
311
312 if (s->format)
313 pa_format_info_free(s->format);
314
315 pa_xfree(s->device_name);
316 pa_xfree(s);
317 }
318
319 void pa_stream_unref(pa_stream *s) {
320 pa_assert(s);
321 pa_assert(PA_REFCNT_VALUE(s) >= 1);
322
323 if (PA_REFCNT_DEC(s) <= 0)
324 stream_free(s);
325 }
326
327 pa_stream* pa_stream_ref(pa_stream *s) {
328 pa_assert(s);
329 pa_assert(PA_REFCNT_VALUE(s) >= 1);
330
331 PA_REFCNT_INC(s);
332 return s;
333 }
334
335 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
336 pa_assert(s);
337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339 return s->state;
340 }
341
342 pa_context* pa_stream_get_context(pa_stream *s) {
343 pa_assert(s);
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346 return s->context;
347 }
348
349 uint32_t pa_stream_get_index(pa_stream *s) {
350 pa_assert(s);
351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
352
353 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
354 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
355
356 return s->stream_index;
357 }
358
359 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
360 pa_assert(s);
361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
362
363 if (s->state == st)
364 return;
365
366 pa_stream_ref(s);
367
368 s->state = st;
369
370 if (s->state_callback)
371 s->state_callback(s, s->state_userdata);
372
373 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
374 stream_unlink(s);
375
376 pa_stream_unref(s);
377 }
378
379 static void request_auto_timing_update(pa_stream *s, bool force) {
380 pa_assert(s);
381 pa_assert(PA_REFCNT_VALUE(s) >= 1);
382
383 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
384 return;
385
386 if (s->state == PA_STREAM_READY &&
387 (force || !s->auto_timing_update_requested)) {
388 pa_operation *o;
389
390 #ifdef STREAM_DEBUG
391 pa_log_debug("Automatically requesting new timing data");
392 #endif
393
394 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
395 pa_operation_unref(o);
396 s->auto_timing_update_requested = true;
397 }
398 }
399
400 if (s->auto_timing_update_event) {
401 if (s->suspended && !force) {
402 pa_assert(s->mainloop);
403 s->mainloop->time_free(s->auto_timing_update_event);
404 s->auto_timing_update_event = NULL;
405 } else {
406 if (force)
407 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
408
409 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
410
411 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
412 }
413 }
414 }
415
416 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
417 pa_context *c = userdata;
418 pa_stream *s;
419 uint32_t channel;
420
421 pa_assert(pd);
422 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
423 pa_assert(t);
424 pa_assert(c);
425 pa_assert(PA_REFCNT_VALUE(c) >= 1);
426
427 pa_context_ref(c);
428
429 if (pa_tagstruct_getu32(t, &channel) < 0 ||
430 !pa_tagstruct_eof(t)) {
431 pa_context_fail(c, PA_ERR_PROTOCOL);
432 goto finish;
433 }
434
435 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
436 goto finish;
437
438 if (s->state != PA_STREAM_READY)
439 goto finish;
440
441 pa_context_set_error(c, PA_ERR_KILLED);
442 pa_stream_set_state(s, PA_STREAM_FAILED);
443
444 finish:
445 pa_context_unref(c);
446 }
447
448 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
449 pa_usec_t x;
450
451 pa_assert(s);
452 pa_assert(!force_start || !force_stop);
453
454 if (!s->smoother)
455 return;
456
457 x = pa_rtclock_now();
458
459 if (s->timing_info_valid) {
460 if (aposteriori)
461 x -= s->timing_info.transport_usec;
462 else
463 x += s->timing_info.transport_usec;
464 }
465
466 if (s->suspended || s->corked || force_stop)
467 pa_smoother_pause(s->smoother, x);
468 else if (force_start || s->buffer_attr.prebuf == 0) {
469
470 if (!s->timing_info_valid &&
471 !aposteriori &&
472 !force_start &&
473 !force_stop &&
474 s->context->version >= 13) {
475
476 /* If the server supports STARTED events we take them as
477 * indications when audio really starts/stops playing, if
478 * we don't have any timing info yet -- instead of trying
479 * to be smart and guessing the server time. Otherwise the
480 * unknown transport delay adds too much noise to our time
481 * calculations. */
482
483 return;
484 }
485
486 pa_smoother_resume(s->smoother, x, true);
487 }
488
489 /* Please note that we have no idea if playback actually started
490 * if prebuf is non-zero! */
491 }
492
493 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
494
495 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496 pa_context *c = userdata;
497 pa_stream *s;
498 uint32_t channel;
499 const char *dn;
500 bool suspended;
501 uint32_t di;
502 pa_usec_t usec = 0;
503 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
504
505 pa_assert(pd);
506 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
507 pa_assert(t);
508 pa_assert(c);
509 pa_assert(PA_REFCNT_VALUE(c) >= 1);
510
511 pa_context_ref(c);
512
513 if (c->version < 12) {
514 pa_context_fail(c, PA_ERR_PROTOCOL);
515 goto finish;
516 }
517
518 if (pa_tagstruct_getu32(t, &channel) < 0 ||
519 pa_tagstruct_getu32(t, &di) < 0 ||
520 pa_tagstruct_gets(t, &dn) < 0 ||
521 pa_tagstruct_get_boolean(t, &suspended) < 0) {
522 pa_context_fail(c, PA_ERR_PROTOCOL);
523 goto finish;
524 }
525
526 if (c->version >= 13) {
527
528 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
529 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
530 pa_tagstruct_getu32(t, &fragsize) < 0 ||
531 pa_tagstruct_get_usec(t, &usec) < 0) {
532 pa_context_fail(c, PA_ERR_PROTOCOL);
533 goto finish;
534 }
535 } else {
536 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
537 pa_tagstruct_getu32(t, &tlength) < 0 ||
538 pa_tagstruct_getu32(t, &prebuf) < 0 ||
539 pa_tagstruct_getu32(t, &minreq) < 0 ||
540 pa_tagstruct_get_usec(t, &usec) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
542 goto finish;
543 }
544 }
545 }
546
547 if (!pa_tagstruct_eof(t)) {
548 pa_context_fail(c, PA_ERR_PROTOCOL);
549 goto finish;
550 }
551
552 if (!dn || di == PA_INVALID_INDEX) {
553 pa_context_fail(c, PA_ERR_PROTOCOL);
554 goto finish;
555 }
556
557 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
558 goto finish;
559
560 if (s->state != PA_STREAM_READY)
561 goto finish;
562
563 if (c->version >= 13) {
564 if (s->direction == PA_STREAM_RECORD)
565 s->timing_info.configured_source_usec = usec;
566 else
567 s->timing_info.configured_sink_usec = usec;
568
569 s->buffer_attr.maxlength = maxlength;
570 s->buffer_attr.fragsize = fragsize;
571 s->buffer_attr.tlength = tlength;
572 s->buffer_attr.prebuf = prebuf;
573 s->buffer_attr.minreq = minreq;
574 }
575
576 pa_xfree(s->device_name);
577 s->device_name = pa_xstrdup(dn);
578 s->device_index = di;
579
580 s->suspended = suspended;
581
582 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
583 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
584 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
585 request_auto_timing_update(s, true);
586 }
587
588 check_smoother_status(s, true, false, false);
589 request_auto_timing_update(s, true);
590
591 if (s->moved_callback)
592 s->moved_callback(s, s->moved_userdata);
593
594 finish:
595 pa_context_unref(c);
596 }
597
598 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
599 pa_context *c = userdata;
600 pa_stream *s;
601 uint32_t channel;
602 pa_usec_t usec = 0;
603 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
604
605 pa_assert(pd);
606 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
607 pa_assert(t);
608 pa_assert(c);
609 pa_assert(PA_REFCNT_VALUE(c) >= 1);
610
611 pa_context_ref(c);
612
613 if (c->version < 15) {
614 pa_context_fail(c, PA_ERR_PROTOCOL);
615 goto finish;
616 }
617
618 if (pa_tagstruct_getu32(t, &channel) < 0) {
619 pa_context_fail(c, PA_ERR_PROTOCOL);
620 goto finish;
621 }
622
623 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
624 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
625 pa_tagstruct_getu32(t, &fragsize) < 0 ||
626 pa_tagstruct_get_usec(t, &usec) < 0) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
628 goto finish;
629 }
630 } else {
631 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
632 pa_tagstruct_getu32(t, &tlength) < 0 ||
633 pa_tagstruct_getu32(t, &prebuf) < 0 ||
634 pa_tagstruct_getu32(t, &minreq) < 0 ||
635 pa_tagstruct_get_usec(t, &usec) < 0) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
637 goto finish;
638 }
639 }
640
641 if (!pa_tagstruct_eof(t)) {
642 pa_context_fail(c, PA_ERR_PROTOCOL);
643 goto finish;
644 }
645
646 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
647 goto finish;
648
649 if (s->state != PA_STREAM_READY)
650 goto finish;
651
652 if (s->direction == PA_STREAM_RECORD)
653 s->timing_info.configured_source_usec = usec;
654 else
655 s->timing_info.configured_sink_usec = usec;
656
657 s->buffer_attr.maxlength = maxlength;
658 s->buffer_attr.fragsize = fragsize;
659 s->buffer_attr.tlength = tlength;
660 s->buffer_attr.prebuf = prebuf;
661 s->buffer_attr.minreq = minreq;
662
663 request_auto_timing_update(s, true);
664
665 if (s->buffer_attr_callback)
666 s->buffer_attr_callback(s, s->buffer_attr_userdata);
667
668 finish:
669 pa_context_unref(c);
670 }
671
672 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
674 pa_stream *s;
675 uint32_t channel;
676 bool suspended;
677
678 pa_assert(pd);
679 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
680 pa_assert(t);
681 pa_assert(c);
682 pa_assert(PA_REFCNT_VALUE(c) >= 1);
683
684 pa_context_ref(c);
685
686 if (c->version < 12) {
687 pa_context_fail(c, PA_ERR_PROTOCOL);
688 goto finish;
689 }
690
691 if (pa_tagstruct_getu32(t, &channel) < 0 ||
692 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
693 !pa_tagstruct_eof(t)) {
694 pa_context_fail(c, PA_ERR_PROTOCOL);
695 goto finish;
696 }
697
698 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
699 goto finish;
700
701 if (s->state != PA_STREAM_READY)
702 goto finish;
703
704 s->suspended = suspended;
705
706 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
707 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
708 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
709 request_auto_timing_update(s, true);
710 }
711
712 check_smoother_status(s, true, false, false);
713 request_auto_timing_update(s, true);
714
715 if (s->suspended_callback)
716 s->suspended_callback(s, s->suspended_userdata);
717
718 finish:
719 pa_context_unref(c);
720 }
721
722 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
723 pa_context *c = userdata;
724 pa_stream *s;
725 uint32_t channel;
726
727 pa_assert(pd);
728 pa_assert(command == PA_COMMAND_STARTED);
729 pa_assert(t);
730 pa_assert(c);
731 pa_assert(PA_REFCNT_VALUE(c) >= 1);
732
733 pa_context_ref(c);
734
735 if (c->version < 13) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
737 goto finish;
738 }
739
740 if (pa_tagstruct_getu32(t, &channel) < 0 ||
741 !pa_tagstruct_eof(t)) {
742 pa_context_fail(c, PA_ERR_PROTOCOL);
743 goto finish;
744 }
745
746 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
747 goto finish;
748
749 if (s->state != PA_STREAM_READY)
750 goto finish;
751
752 check_smoother_status(s, true, true, false);
753 request_auto_timing_update(s, true);
754
755 if (s->started_callback)
756 s->started_callback(s, s->started_userdata);
757
758 finish:
759 pa_context_unref(c);
760 }
761
762 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
763 pa_context *c = userdata;
764 pa_stream *s;
765 uint32_t channel;
766 pa_proplist *pl = NULL;
767 const char *event;
768
769 pa_assert(pd);
770 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
771 pa_assert(t);
772 pa_assert(c);
773 pa_assert(PA_REFCNT_VALUE(c) >= 1);
774
775 pa_context_ref(c);
776
777 if (c->version < 15) {
778 pa_context_fail(c, PA_ERR_PROTOCOL);
779 goto finish;
780 }
781
782 pl = pa_proplist_new();
783
784 if (pa_tagstruct_getu32(t, &channel) < 0 ||
785 pa_tagstruct_gets(t, &event) < 0 ||
786 pa_tagstruct_get_proplist(t, pl) < 0 ||
787 !pa_tagstruct_eof(t) || !event) {
788 pa_context_fail(c, PA_ERR_PROTOCOL);
789 goto finish;
790 }
791
792 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
793 goto finish;
794
795 if (s->state != PA_STREAM_READY)
796 goto finish;
797
798 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
799 /* Let client know what the running time was when the stream had to be killed */
800 pa_usec_t stream_time;
801 if (pa_stream_get_time(s, &stream_time) == 0)
802 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
803 }
804
805 if (s->event_callback)
806 s->event_callback(s, event, pl, s->event_userdata);
807
808 finish:
809 pa_context_unref(c);
810
811 if (pl)
812 pa_proplist_free(pl);
813 }
814
815 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
816 pa_stream *s;
817 pa_context *c = userdata;
818 uint32_t bytes, channel;
819
820 pa_assert(pd);
821 pa_assert(command == PA_COMMAND_REQUEST);
822 pa_assert(t);
823 pa_assert(c);
824 pa_assert(PA_REFCNT_VALUE(c) >= 1);
825
826 pa_context_ref(c);
827
828 if (pa_tagstruct_getu32(t, &channel) < 0 ||
829 pa_tagstruct_getu32(t, &bytes) < 0 ||
830 !pa_tagstruct_eof(t)) {
831 pa_context_fail(c, PA_ERR_PROTOCOL);
832 goto finish;
833 }
834
835 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
836 goto finish;
837
838 if (s->state != PA_STREAM_READY)
839 goto finish;
840
841 s->requested_bytes += bytes;
842
843 #ifdef STREAM_DEBUG
844 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
845 #endif
846
847 if (s->requested_bytes > 0 && s->write_callback)
848 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
849
850 finish:
851 pa_context_unref(c);
852 }
853
854 int64_t pa_stream_get_underflow_index(pa_stream *p) {
855 pa_assert(p);
856 return p->latest_underrun_at_index;
857 }
858
859 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
860 pa_stream *s;
861 pa_context *c = userdata;
862 uint32_t channel;
863 int64_t offset = -1;
864
865 pa_assert(pd);
866 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
867 pa_assert(t);
868 pa_assert(c);
869 pa_assert(PA_REFCNT_VALUE(c) >= 1);
870
871 pa_context_ref(c);
872
873 if (pa_tagstruct_getu32(t, &channel) < 0) {
874 pa_context_fail(c, PA_ERR_PROTOCOL);
875 goto finish;
876 }
877
878 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
879 if (pa_tagstruct_gets64(t, &offset) < 0) {
880 pa_context_fail(c, PA_ERR_PROTOCOL);
881 goto finish;
882 }
883 }
884
885 if (!pa_tagstruct_eof(t)) {
886 pa_context_fail(c, PA_ERR_PROTOCOL);
887 goto finish;
888 }
889
890 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
891 goto finish;
892
893 if (s->state != PA_STREAM_READY)
894 goto finish;
895
896 if (offset != -1)
897 s->latest_underrun_at_index = offset;
898
899 if (s->buffer_attr.prebuf > 0)
900 check_smoother_status(s, true, false, true);
901
902 request_auto_timing_update(s, true);
903
904 if (command == PA_COMMAND_OVERFLOW) {
905 if (s->overflow_callback)
906 s->overflow_callback(s, s->overflow_userdata);
907 } else if (command == PA_COMMAND_UNDERFLOW) {
908 if (s->underflow_callback)
909 s->underflow_callback(s, s->underflow_userdata);
910 }
911
912 finish:
913 pa_context_unref(c);
914 }
915
916 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
917 pa_assert(s);
918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
919
920 #ifdef STREAM_DEBUG
921 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
922 #endif
923
924 if (s->state != PA_STREAM_READY)
925 return;
926
927 if (w) {
928 s->write_index_not_before = s->context->ctag;
929
930 if (s->timing_info_valid)
931 s->timing_info.write_index_corrupt = true;
932
933 #ifdef STREAM_DEBUG
934 pa_log_debug("write_index invalidated");
935 #endif
936 }
937
938 if (r) {
939 s->read_index_not_before = s->context->ctag;
940
941 if (s->timing_info_valid)
942 s->timing_info.read_index_corrupt = true;
943
944 #ifdef STREAM_DEBUG
945 pa_log_debug("read_index invalidated");
946 #endif
947 }
948
949 request_auto_timing_update(s, true);
950 }
951
952 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
953 pa_stream *s = userdata;
954
955 pa_assert(s);
956 pa_assert(PA_REFCNT_VALUE(s) >= 1);
957
958 pa_stream_ref(s);
959 request_auto_timing_update(s, false);
960 pa_stream_unref(s);
961 }
962
963 static void create_stream_complete(pa_stream *s) {
964 pa_assert(s);
965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
966 pa_assert(s->state == PA_STREAM_CREATING);
967
968 pa_stream_set_state(s, PA_STREAM_READY);
969
970 if (s->requested_bytes > 0 && s->write_callback)
971 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
972
973 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
974 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
975 pa_assert(!s->auto_timing_update_event);
976 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
977
978 request_auto_timing_update(s, true);
979 }
980
981 check_smoother_status(s, true, false, false);
982 }
983
984 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
985 const char *e;
986
987 pa_assert(s);
988 pa_assert(attr);
989
990 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
991 uint32_t ms;
992
993 if (pa_atou(e, &ms) < 0 || ms <= 0)
994 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
995 else {
996 attr->maxlength = (uint32_t) -1;
997 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
998 attr->minreq = (uint32_t) -1;
999 attr->prebuf = (uint32_t) -1;
1000 attr->fragsize = attr->tlength;
1001 }
1002
1003 if (flags)
1004 *flags |= PA_STREAM_ADJUST_LATENCY;
1005 }
1006
1007 if (s->context->version >= 13)
1008 return;
1009
1010 /* Version older than 0.9.10 didn't do server side buffer_attr
1011 * selection, hence we have to fake it on the client side. */
1012
1013 /* We choose fairly conservative values here, to not confuse
1014 * old clients with extremely large playback buffers */
1015
1016 if (attr->maxlength == (uint32_t) -1)
1017 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1018
1019 if (attr->tlength == (uint32_t) -1)
1020 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1021
1022 if (attr->minreq == (uint32_t) -1)
1023 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1024
1025 if (attr->prebuf == (uint32_t) -1)
1026 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1027
1028 if (attr->fragsize == (uint32_t) -1)
1029 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1030 }
1031
1032 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1033 pa_stream *s = userdata;
1034 uint32_t requested_bytes = 0;
1035
1036 pa_assert(pd);
1037 pa_assert(s);
1038 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1039 pa_assert(s->state == PA_STREAM_CREATING);
1040
1041 pa_stream_ref(s);
1042
1043 if (command != PA_COMMAND_REPLY) {
1044 if (pa_context_handle_error(s->context, command, t, false) < 0)
1045 goto finish;
1046
1047 pa_stream_set_state(s, PA_STREAM_FAILED);
1048 goto finish;
1049 }
1050
1051 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1052 s->channel == PA_INVALID_INDEX ||
1053 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1054 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1055 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1056 goto finish;
1057 }
1058
1059 s->requested_bytes = (int64_t) requested_bytes;
1060
1061 if (s->context->version >= 9) {
1062 if (s->direction == PA_STREAM_PLAYBACK) {
1063 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1064 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1065 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1066 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1067 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1068 goto finish;
1069 }
1070 } else if (s->direction == PA_STREAM_RECORD) {
1071 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1072 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1073 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1074 goto finish;
1075 }
1076 }
1077 }
1078
1079 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1080 pa_sample_spec ss;
1081 pa_channel_map cm;
1082 const char *dn = NULL;
1083 bool suspended;
1084
1085 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1086 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1087 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1088 pa_tagstruct_gets(t, &dn) < 0 ||
1089 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1090 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091 goto finish;
1092 }
1093
1094 if (!dn || s->device_index == PA_INVALID_INDEX ||
1095 ss.channels != cm.channels ||
1096 !pa_channel_map_valid(&cm) ||
1097 !pa_sample_spec_valid(&ss) ||
1098 (s->n_formats == 0 && (
1099 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1100 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1101 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1102 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1103 goto finish;
1104 }
1105
1106 pa_xfree(s->device_name);
1107 s->device_name = pa_xstrdup(dn);
1108 s->suspended = suspended;
1109
1110 s->channel_map = cm;
1111 s->sample_spec = ss;
1112 }
1113
1114 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1115 pa_usec_t usec;
1116
1117 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1118 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1119 goto finish;
1120 }
1121
1122 if (s->direction == PA_STREAM_RECORD)
1123 s->timing_info.configured_source_usec = usec;
1124 else
1125 s->timing_info.configured_sink_usec = usec;
1126 }
1127
1128 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1129 || s->context->version >= 22) {
1130
1131 pa_format_info *f = pa_format_info_new();
1132 pa_tagstruct_get_format_info(t, f);
1133
1134 if (pa_format_info_valid(f))
1135 s->format = f;
1136 else {
1137 pa_format_info_free(f);
1138 if (s->n_formats > 0) {
1139 /* We used the extended API, so we should have got back a proper format */
1140 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1141 goto finish;
1142 }
1143 }
1144 }
1145
1146 if (!pa_tagstruct_eof(t)) {
1147 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1148 goto finish;
1149 }
1150
1151 if (s->direction == PA_STREAM_RECORD) {
1152 pa_assert(!s->record_memblockq);
1153
1154 s->record_memblockq = pa_memblockq_new(
1155 "client side record memblockq",
1156 0,
1157 s->buffer_attr.maxlength,
1158 0,
1159 &s->sample_spec,
1160 1,
1161 0,
1162 0,
1163 NULL);
1164 }
1165
1166 s->channel_valid = true;
1167 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1168
1169 create_stream_complete(s);
1170
1171 finish:
1172 pa_stream_unref(s);
1173 }
1174
1175 static int create_stream(
1176 pa_stream_direction_t direction,
1177 pa_stream *s,
1178 const char *dev,
1179 const pa_buffer_attr *attr,
1180 pa_stream_flags_t flags,
1181 const pa_cvolume *volume,
1182 pa_stream *sync_stream) {
1183
1184 pa_tagstruct *t;
1185 uint32_t tag;
1186 bool volume_set = !!volume;
1187 pa_cvolume cv;
1188 uint32_t i;
1189
1190 pa_assert(s);
1191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1193
1194 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1195 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1196 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1197 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1198 PA_STREAM_INTERPOLATE_TIMING|
1199 PA_STREAM_NOT_MONOTONIC|
1200 PA_STREAM_AUTO_TIMING_UPDATE|
1201 PA_STREAM_NO_REMAP_CHANNELS|
1202 PA_STREAM_NO_REMIX_CHANNELS|
1203 PA_STREAM_FIX_FORMAT|
1204 PA_STREAM_FIX_RATE|
1205 PA_STREAM_FIX_CHANNELS|
1206 PA_STREAM_DONT_MOVE|
1207 PA_STREAM_VARIABLE_RATE|
1208 PA_STREAM_PEAK_DETECT|
1209 PA_STREAM_START_MUTED|
1210 PA_STREAM_ADJUST_LATENCY|
1211 PA_STREAM_EARLY_REQUESTS|
1212 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1213 PA_STREAM_START_UNMUTED|
1214 PA_STREAM_FAIL_ON_SUSPEND|
1215 PA_STREAM_RELATIVE_VOLUME|
1216 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1217
1218 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1219 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1220 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1221 /* Although some of the other flags are not supported on older
1222 * version, we don't check for them here, because it doesn't hurt
1223 * when they are passed but actually not supported. This makes
1224 * client development easier */
1225
1226 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1227 PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1228 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1229 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1230
1231 pa_stream_ref(s);
1232
1233 s->direction = direction;
1234
1235 if (sync_stream)
1236 s->syncid = sync_stream->syncid;
1237
1238 if (attr)
1239 s->buffer_attr = *attr;
1240 patch_buffer_attr(s, &s->buffer_attr, &flags);
1241
1242 s->flags = flags;
1243 s->corked = !!(flags & PA_STREAM_START_CORKED);
1244
1245 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1246 pa_usec_t x;
1247
1248 x = pa_rtclock_now();
1249
1250 pa_assert(!s->smoother);
1251 s->smoother = pa_smoother_new(
1252 SMOOTHER_ADJUST_TIME,
1253 SMOOTHER_HISTORY_TIME,
1254 !(flags & PA_STREAM_NOT_MONOTONIC),
1255 true,
1256 SMOOTHER_MIN_HISTORY,
1257 x,
1258 true);
1259 }
1260
1261 if (!dev)
1262 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1263
1264 t = pa_tagstruct_command(
1265 s->context,
1266 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1267 &tag);
1268
1269 if (s->context->version < 13)
1270 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1271
1272 pa_tagstruct_put(
1273 t,
1274 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1275 PA_TAG_CHANNEL_MAP, &s->channel_map,
1276 PA_TAG_U32, PA_INVALID_INDEX,
1277 PA_TAG_STRING, dev,
1278 PA_TAG_U32, s->buffer_attr.maxlength,
1279 PA_TAG_BOOLEAN, s->corked,
1280 PA_TAG_INVALID);
1281
1282 if (!volume) {
1283 if (pa_sample_spec_valid(&s->sample_spec))
1284 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1285 else {
1286 /* This is not really relevant, since no volume was set, and
1287 * the real number of channels is embedded in the format_info
1288 * structure */
1289 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1290 }
1291 }
1292
1293 if (s->direction == PA_STREAM_PLAYBACK) {
1294 pa_tagstruct_put(
1295 t,
1296 PA_TAG_U32, s->buffer_attr.tlength,
1297 PA_TAG_U32, s->buffer_attr.prebuf,
1298 PA_TAG_U32, s->buffer_attr.minreq,
1299 PA_TAG_U32, s->syncid,
1300 PA_TAG_INVALID);
1301
1302 pa_tagstruct_put_cvolume(t, volume);
1303 } else
1304 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1305
1306 if (s->context->version >= 12) {
1307 pa_tagstruct_put(
1308 t,
1309 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1310 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1311 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1312 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1313 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1314 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1315 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1316 PA_TAG_INVALID);
1317 }
1318
1319 if (s->context->version >= 13) {
1320
1321 if (s->direction == PA_STREAM_PLAYBACK)
1322 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1323 else
1324 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1325
1326 pa_tagstruct_put(
1327 t,
1328 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1329 PA_TAG_PROPLIST, s->proplist,
1330 PA_TAG_INVALID);
1331
1332 if (s->direction == PA_STREAM_RECORD)
1333 pa_tagstruct_putu32(t, s->direct_on_input);
1334 }
1335
1336 if (s->context->version >= 14) {
1337
1338 if (s->direction == PA_STREAM_PLAYBACK)
1339 pa_tagstruct_put_boolean(t, volume_set);
1340
1341 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1342 }
1343
1344 if (s->context->version >= 15) {
1345
1346 if (s->direction == PA_STREAM_PLAYBACK)
1347 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1348
1349 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1350 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1351 }
1352
1353 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1354 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1355
1356 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1357 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1358
1359 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1360 || s->context->version >= 22) {
1361
1362 pa_tagstruct_putu8(t, s->n_formats);
1363 for (i = 0; i < s->n_formats; i++)
1364 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1365 }
1366
1367 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1368 pa_tagstruct_put_cvolume(t, volume);
1369 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1370 pa_tagstruct_put_boolean(t, volume_set);
1371 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1372 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1373 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1374 }
1375
1376 pa_pstream_send_tagstruct(s->context->pstream, t);
1377 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1378
1379 pa_stream_set_state(s, PA_STREAM_CREATING);
1380
1381 pa_stream_unref(s);
1382 return 0;
1383 }
1384
1385 int pa_stream_connect_playback(
1386 pa_stream *s,
1387 const char *dev,
1388 const pa_buffer_attr *attr,
1389 pa_stream_flags_t flags,
1390 const pa_cvolume *volume,
1391 pa_stream *sync_stream) {
1392
1393 pa_assert(s);
1394 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1395
1396 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1397 }
1398
1399 int pa_stream_connect_record(
1400 pa_stream *s,
1401 const char *dev,
1402 const pa_buffer_attr *attr,
1403 pa_stream_flags_t flags) {
1404
1405 pa_assert(s);
1406 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1407
1408 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1409 }
1410
1411 int pa_stream_begin_write(
1412 pa_stream *s,
1413 void **data,
1414 size_t *nbytes) {
1415
1416 pa_assert(s);
1417 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1418
1419 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1420 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1421 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1422 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1423 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1424
1425 if (*nbytes != (size_t) -1) {
1426 size_t m, fs;
1427
1428 m = pa_mempool_block_size_max(s->context->mempool);
1429 fs = pa_frame_size(&s->sample_spec);
1430
1431 m = (m / fs) * fs;
1432 if (*nbytes > m)
1433 *nbytes = m;
1434 }
1435
1436 if (!s->write_memblock) {
1437 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1438 s->write_data = pa_memblock_acquire(s->write_memblock);
1439 }
1440
1441 *data = s->write_data;
1442 *nbytes = pa_memblock_get_length(s->write_memblock);
1443
1444 return 0;
1445 }
1446
1447 int pa_stream_cancel_write(
1448 pa_stream *s) {
1449
1450 pa_assert(s);
1451 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1452
1453 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1454 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1455 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1456 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1457
1458 pa_assert(s->write_data);
1459
1460 pa_memblock_release(s->write_memblock);
1461 pa_memblock_unref(s->write_memblock);
1462 s->write_memblock = NULL;
1463 s->write_data = NULL;
1464
1465 return 0;
1466 }
1467
1468 int pa_stream_write(
1469 pa_stream *s,
1470 const void *data,
1471 size_t length,
1472 pa_free_cb_t free_cb,
1473 int64_t offset,
1474 pa_seek_mode_t seek) {
1475
1476 pa_assert(s);
1477 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1478 pa_assert(data);
1479
1480 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1481 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1482 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1483 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1484 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1485 PA_CHECK_VALIDITY(s->context,
1486 !s->write_memblock ||
1487 ((data >= s->write_data) &&
1488 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1489 PA_ERR_INVALID);
1490 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1491
1492 if (s->write_memblock) {
1493 pa_memchunk chunk;
1494
1495 /* pa_stream_write_begin() was called before */
1496
1497 pa_memblock_release(s->write_memblock);
1498
1499 chunk.memblock = s->write_memblock;
1500 chunk.index = (const char *) data - (const char *) s->write_data;
1501 chunk.length = length;
1502
1503 s->write_memblock = NULL;
1504 s->write_data = NULL;
1505
1506 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1507 pa_memblock_unref(chunk.memblock);
1508
1509 } else {
1510 pa_seek_mode_t t_seek = seek;
1511 int64_t t_offset = offset;
1512 size_t t_length = length;
1513 const void *t_data = data;
1514
1515 /* pa_stream_write_begin() was not called before */
1516
1517 while (t_length > 0) {
1518 pa_memchunk chunk;
1519
1520 chunk.index = 0;
1521
1522 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1523 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1524 chunk.length = t_length;
1525 } else {
1526 void *d;
1527
1528 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1529 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1530
1531 d = pa_memblock_acquire(chunk.memblock);
1532 memcpy(d, t_data, chunk.length);
1533 pa_memblock_release(chunk.memblock);
1534 }
1535
1536 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1537
1538 t_offset = 0;
1539 t_seek = PA_SEEK_RELATIVE;
1540
1541 t_data = (const uint8_t*) t_data + chunk.length;
1542 t_length -= chunk.length;
1543
1544 pa_memblock_unref(chunk.memblock);
1545 }
1546
1547 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1548 free_cb((void*) data);
1549 }
1550
1551 /* This is obviously wrong since we ignore the seeking index . But
1552 * that's OK, the server side applies the same error */
1553 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1554
1555 #ifdef STREAM_DEBUG
1556 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1557 #endif
1558
1559 if (s->direction == PA_STREAM_PLAYBACK) {
1560
1561 /* Update latency request correction */
1562 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1563
1564 if (seek == PA_SEEK_ABSOLUTE) {
1565 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1566 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1567 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1568 } else if (seek == PA_SEEK_RELATIVE) {
1569 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1570 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1571 } else
1572 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1573 }
1574
1575 /* Update the write index in the already available latency data */
1576 if (s->timing_info_valid) {
1577
1578 if (seek == PA_SEEK_ABSOLUTE) {
1579 s->timing_info.write_index_corrupt = false;
1580 s->timing_info.write_index = offset + (int64_t) length;
1581 } else if (seek == PA_SEEK_RELATIVE) {
1582 if (!s->timing_info.write_index_corrupt)
1583 s->timing_info.write_index += offset + (int64_t) length;
1584 } else
1585 s->timing_info.write_index_corrupt = true;
1586 }
1587
1588 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1589 request_auto_timing_update(s, true);
1590 }
1591
1592 return 0;
1593 }
1594
1595 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1596 pa_assert(s);
1597 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1598 pa_assert(data);
1599 pa_assert(length);
1600
1601 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1602 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1603 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1604
1605 if (!s->peek_memchunk.memblock) {
1606
1607 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1608 /* record_memblockq is empty. */
1609 *data = NULL;
1610 *length = 0;
1611 return 0;
1612
1613 } else if (!s->peek_memchunk.memblock) {
1614 /* record_memblockq isn't empty, but it doesn't have any data at
1615 * the current read index. */
1616 *data = NULL;
1617 *length = s->peek_memchunk.length;
1618 return 0;
1619 }
1620
1621 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1622 }
1623
1624 pa_assert(s->peek_data);
1625 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1626 *length = s->peek_memchunk.length;
1627 return 0;
1628 }
1629
1630 int pa_stream_drop(pa_stream *s) {
1631 pa_assert(s);
1632 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1633
1634 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1635 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1636 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1637 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1638
1639 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1640
1641 /* Fix the simulated local read index */
1642 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1643 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1644
1645 if (s->peek_memchunk.memblock) {
1646 pa_assert(s->peek_data);
1647 s->peek_data = NULL;
1648 pa_memblock_release(s->peek_memchunk.memblock);
1649 pa_memblock_unref(s->peek_memchunk.memblock);
1650 }
1651
1652 pa_memchunk_reset(&s->peek_memchunk);
1653
1654 return 0;
1655 }
1656
1657 size_t pa_stream_writable_size(pa_stream *s) {
1658 pa_assert(s);
1659 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1660
1661 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1662 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1663 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1664
1665 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1666 }
1667
1668 size_t pa_stream_readable_size(pa_stream *s) {
1669 pa_assert(s);
1670 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1671
1672 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1673 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1674 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1675
1676 return pa_memblockq_get_length(s->record_memblockq);
1677 }
1678
1679 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1680 pa_operation *o;
1681 pa_tagstruct *t;
1682 uint32_t tag;
1683
1684 pa_assert(s);
1685 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1686
1687 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1688 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1689 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1690
1691 /* Ask for a timing update before we cork/uncork to get the best
1692 * accuracy for the transport latency suitable for the
1693 * check_smoother_status() call in the started callback */
1694 request_auto_timing_update(s, true);
1695
1696 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1697
1698 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1699 pa_tagstruct_putu32(t, s->channel);
1700 pa_pstream_send_tagstruct(s->context->pstream, t);
1701 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1702
1703 /* This might cause the read index to continue again, hence
1704 * let's request a timing update */
1705 request_auto_timing_update(s, true);
1706
1707 return o;
1708 }
1709
1710 static pa_usec_t calc_time(pa_stream *s, bool ignore_transport) {
1711 pa_usec_t usec;
1712
1713 pa_assert(s);
1714 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1715 pa_assert(s->state == PA_STREAM_READY);
1716 pa_assert(s->direction != PA_STREAM_UPLOAD);
1717 pa_assert(s->timing_info_valid);
1718 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1719 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1720
1721 if (s->direction == PA_STREAM_PLAYBACK) {
1722 /* The last byte that was written into the output device
1723 * had this time value associated */
1724 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1725
1726 if (!s->corked && !s->suspended) {
1727
1728 if (!ignore_transport)
1729 /* Because the latency info took a little time to come
1730 * to us, we assume that the real output time is actually
1731 * a little ahead */
1732 usec += s->timing_info.transport_usec;
1733
1734 /* However, the output device usually maintains a buffer
1735 too, hence the real sample currently played is a little
1736 back */
1737 if (s->timing_info.sink_usec >= usec)
1738 usec = 0;
1739 else
1740 usec -= s->timing_info.sink_usec;
1741 }
1742
1743 } else {
1744 pa_assert(s->direction == PA_STREAM_RECORD);
1745
1746 /* The last byte written into the server side queue had
1747 * this time value associated */
1748 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1749
1750 if (!s->corked && !s->suspended) {
1751
1752 if (!ignore_transport)
1753 /* Add transport latency */
1754 usec += s->timing_info.transport_usec;
1755
1756 /* Add latency of data in device buffer */
1757 usec += s->timing_info.source_usec;
1758
1759 /* If this is a monitor source, we need to correct the
1760 * time by the playback device buffer */
1761 if (s->timing_info.sink_usec >= usec)
1762 usec = 0;
1763 else
1764 usec -= s->timing_info.sink_usec;
1765 }
1766 }
1767
1768 return usec;
1769 }
1770
1771 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1772 pa_operation *o = userdata;
1773 struct timeval local, remote, now;
1774 pa_timing_info *i;
1775 bool playing = false;
1776 uint64_t underrun_for = 0, playing_for = 0;
1777
1778 pa_assert(pd);
1779 pa_assert(o);
1780 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1781
1782 if (!o->context || !o->stream)
1783 goto finish;
1784
1785 i = &o->stream->timing_info;
1786
1787 o->stream->timing_info_valid = false;
1788 i->write_index_corrupt = true;
1789 i->read_index_corrupt = true;
1790
1791 if (command != PA_COMMAND_REPLY) {
1792 if (pa_context_handle_error(o->context, command, t, false) < 0)
1793 goto finish;
1794
1795 } else {
1796
1797 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1798 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1799 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1800 pa_tagstruct_get_timeval(t, &local) < 0 ||
1801 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1802 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1803 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1804
1805 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1806 goto finish;
1807 }
1808
1809 if (o->context->version >= 13 &&
1810 o->stream->direction == PA_STREAM_PLAYBACK)
1811 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1812 pa_tagstruct_getu64(t, &playing_for) < 0) {
1813
1814 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1815 goto finish;
1816 }
1817
1818 if (!pa_tagstruct_eof(t)) {
1819 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1820 goto finish;
1821 }
1822 o->stream->timing_info_valid = true;
1823 i->write_index_corrupt = false;
1824 i->read_index_corrupt = false;
1825
1826 i->playing = (int) playing;
1827 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1828
1829 pa_gettimeofday(&now);
1830
1831 /* Calculate timestamps */
1832 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1833 /* local and remote seem to have synchronized clocks */
1834
1835 if (o->stream->direction == PA_STREAM_PLAYBACK)
1836 i->transport_usec = pa_timeval_diff(&remote, &local);
1837 else
1838 i->transport_usec = pa_timeval_diff(&now, &remote);
1839
1840 i->synchronized_clocks = true;
1841 i->timestamp = remote;
1842 } else {
1843 /* clocks are not synchronized, let's estimate latency then */
1844 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1845 i->synchronized_clocks = false;
1846 i->timestamp = local;
1847 pa_timeval_add(&i->timestamp, i->transport_usec);
1848 }
1849
1850 /* Invalidate read and write indexes if necessary */
1851 if (tag < o->stream->read_index_not_before)
1852 i->read_index_corrupt = true;
1853
1854 if (tag < o->stream->write_index_not_before)
1855 i->write_index_corrupt = true;
1856
1857 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1858 /* Write index correction */
1859
1860 int n, j;
1861 uint32_t ctag = tag;
1862
1863 /* Go through the saved correction values and add up the
1864 * total correction.*/
1865 for (n = 0, j = o->stream->current_write_index_correction+1;
1866 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1867 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1868
1869 /* Step over invalid data or out-of-date data */
1870 if (!o->stream->write_index_corrections[j].valid ||
1871 o->stream->write_index_corrections[j].tag < ctag)
1872 continue;
1873
1874 /* Make sure that everything is in order */
1875 ctag = o->stream->write_index_corrections[j].tag+1;
1876
1877 /* Now fix the write index */
1878 if (o->stream->write_index_corrections[j].corrupt) {
1879 /* A corrupting seek was made */
1880 i->write_index_corrupt = true;
1881 } else if (o->stream->write_index_corrections[j].absolute) {
1882 /* An absolute seek was made */
1883 i->write_index = o->stream->write_index_corrections[j].value;
1884 i->write_index_corrupt = false;
1885 } else if (!i->write_index_corrupt) {
1886 /* A relative seek was made */
1887 i->write_index += o->stream->write_index_corrections[j].value;
1888 }
1889 }
1890
1891 /* Clear old correction entries */
1892 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1893 if (!o->stream->write_index_corrections[n].valid)
1894 continue;
1895
1896 if (o->stream->write_index_corrections[n].tag <= tag)
1897 o->stream->write_index_corrections[n].valid = false;
1898 }
1899 }
1900
1901 if (o->stream->direction == PA_STREAM_RECORD) {
1902 /* Read index correction */
1903
1904 if (!i->read_index_corrupt)
1905 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1906 }
1907
1908 /* Update smoother if we're not corked */
1909 if (o->stream->smoother && !o->stream->corked) {
1910 pa_usec_t u, x;
1911
1912 u = x = pa_rtclock_now() - i->transport_usec;
1913
1914 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1915 pa_usec_t su;
1916
1917 /* If we weren't playing then it will take some time
1918 * until the audio will actually come out through the
1919 * speakers. Since we follow that timing here, we need
1920 * to try to fix this up */
1921
1922 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1923
1924 if (su < i->sink_usec)
1925 x += i->sink_usec - su;
1926 }
1927
1928 if (!i->playing)
1929 pa_smoother_pause(o->stream->smoother, x);
1930
1931 /* Update the smoother */
1932 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1933 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1934 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1935
1936 if (i->playing)
1937 pa_smoother_resume(o->stream->smoother, x, true);
1938 }
1939 }
1940
1941 o->stream->auto_timing_update_requested = false;
1942
1943 if (o->stream->latency_update_callback)
1944 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1945
1946 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1947 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1948 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1949 }
1950
1951 finish:
1952
1953 pa_operation_done(o);
1954 pa_operation_unref(o);
1955 }
1956
1957 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1958 uint32_t tag;
1959 pa_operation *o;
1960 pa_tagstruct *t;
1961 struct timeval now;
1962 int cidx = 0;
1963
1964 pa_assert(s);
1965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1966
1967 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1968 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1969 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1970
1971 if (s->direction == PA_STREAM_PLAYBACK) {
1972 /* Find a place to store the write_index correction data for this entry */
1973 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1974
1975 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1976 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1977 }
1978 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1979
1980 t = pa_tagstruct_command(
1981 s->context,
1982 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1983 &tag);
1984 pa_tagstruct_putu32(t, s->channel);
1985 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1986
1987 pa_pstream_send_tagstruct(s->context->pstream, t);
1988 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1989
1990 if (s->direction == PA_STREAM_PLAYBACK) {
1991 /* Fill in initial correction data */
1992
1993 s->current_write_index_correction = cidx;
1994
1995 s->write_index_corrections[cidx].valid = true;
1996 s->write_index_corrections[cidx].absolute = false;
1997 s->write_index_corrections[cidx].corrupt = false;
1998 s->write_index_corrections[cidx].tag = tag;
1999 s->write_index_corrections[cidx].value = 0;
2000 }
2001
2002 return o;
2003 }
2004
2005 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2006 pa_stream *s = userdata;
2007
2008 pa_assert(pd);
2009 pa_assert(s);
2010 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2011
2012 pa_stream_ref(s);
2013
2014 if (command != PA_COMMAND_REPLY) {
2015 if (pa_context_handle_error(s->context, command, t, false) < 0)
2016 goto finish;
2017
2018 pa_stream_set_state(s, PA_STREAM_FAILED);
2019 goto finish;
2020 } else if (!pa_tagstruct_eof(t)) {
2021 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2022 goto finish;
2023 }
2024
2025 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2026
2027 finish:
2028 pa_stream_unref(s);
2029 }
2030
2031 int pa_stream_disconnect(pa_stream *s) {
2032 pa_tagstruct *t;
2033 uint32_t tag;
2034
2035 pa_assert(s);
2036 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2037
2038 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2039 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2040 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2041
2042 pa_stream_ref(s);
2043
2044 t = pa_tagstruct_command(
2045 s->context,
2046 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2047 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2048 &tag);
2049 pa_tagstruct_putu32(t, s->channel);
2050 pa_pstream_send_tagstruct(s->context->pstream, t);
2051 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2052
2053 pa_stream_unref(s);
2054 return 0;
2055 }
2056
2057 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2058 pa_assert(s);
2059 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2060
2061 if (pa_detect_fork())
2062 return;
2063
2064 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2065 return;
2066
2067 s->read_callback = cb;
2068 s->read_userdata = userdata;
2069 }
2070
2071 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2072 pa_assert(s);
2073 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2074
2075 if (pa_detect_fork())
2076 return;
2077
2078 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2079 return;
2080
2081 s->write_callback = cb;
2082 s->write_userdata = userdata;
2083 }
2084
2085 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2086 pa_assert(s);
2087 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2088
2089 if (pa_detect_fork())
2090 return;
2091
2092 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2093 return;
2094
2095 s->state_callback = cb;
2096 s->state_userdata = userdata;
2097 }
2098
2099 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2100 pa_assert(s);
2101 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2102
2103 if (pa_detect_fork())
2104 return;
2105
2106 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2107 return;
2108
2109 s->overflow_callback = cb;
2110 s->overflow_userdata = userdata;
2111 }
2112
2113 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2114 pa_assert(s);
2115 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117 if (pa_detect_fork())
2118 return;
2119
2120 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2121 return;
2122
2123 s->underflow_callback = cb;
2124 s->underflow_userdata = userdata;
2125 }
2126
2127 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2128 pa_assert(s);
2129 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2130
2131 if (pa_detect_fork())
2132 return;
2133
2134 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2135 return;
2136
2137 s->latency_update_callback = cb;
2138 s->latency_update_userdata = userdata;
2139 }
2140
2141 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2142 pa_assert(s);
2143 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2144
2145 if (pa_detect_fork())
2146 return;
2147
2148 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2149 return;
2150
2151 s->moved_callback = cb;
2152 s->moved_userdata = userdata;
2153 }
2154
2155 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2156 pa_assert(s);
2157 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2158
2159 if (pa_detect_fork())
2160 return;
2161
2162 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2163 return;
2164
2165 s->suspended_callback = cb;
2166 s->suspended_userdata = userdata;
2167 }
2168
2169 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2170 pa_assert(s);
2171 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2172
2173 if (pa_detect_fork())
2174 return;
2175
2176 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2177 return;
2178
2179 s->started_callback = cb;
2180 s->started_userdata = userdata;
2181 }
2182
2183 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2184 pa_assert(s);
2185 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2186
2187 if (pa_detect_fork())
2188 return;
2189
2190 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2191 return;
2192
2193 s->event_callback = cb;
2194 s->event_userdata = userdata;
2195 }
2196
2197 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2198 pa_assert(s);
2199 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2200
2201 if (pa_detect_fork())
2202 return;
2203
2204 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2205 return;
2206
2207 s->buffer_attr_callback = cb;
2208 s->buffer_attr_userdata = userdata;
2209 }
2210
2211 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2212 pa_operation *o = userdata;
2213 int success = 1;
2214
2215 pa_assert(pd);
2216 pa_assert(o);
2217 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2218
2219 if (!o->context)
2220 goto finish;
2221
2222 if (command != PA_COMMAND_REPLY) {
2223 if (pa_context_handle_error(o->context, command, t, false) < 0)
2224 goto finish;
2225
2226 success = 0;
2227 } else if (!pa_tagstruct_eof(t)) {
2228 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2229 goto finish;
2230 }
2231
2232 if (o->callback) {
2233 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2234 cb(o->stream, success, o->userdata);
2235 }
2236
2237 finish:
2238 pa_operation_done(o);
2239 pa_operation_unref(o);
2240 }
2241
2242 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2243 pa_operation *o;
2244 pa_tagstruct *t;
2245 uint32_t tag;
2246
2247 pa_assert(s);
2248 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2249
2250 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2251 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2252 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2253
2254 /* Ask for a timing update before we cork/uncork to get the best
2255 * accuracy for the transport latency suitable for the
2256 * check_smoother_status() call in the started callback */
2257 request_auto_timing_update(s, true);
2258
2259 s->corked = b;
2260
2261 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2262
2263 t = pa_tagstruct_command(
2264 s->context,
2265 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2266 &tag);
2267 pa_tagstruct_putu32(t, s->channel);
2268 pa_tagstruct_put_boolean(t, !!b);
2269 pa_pstream_send_tagstruct(s->context->pstream, t);
2270 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2271
2272 check_smoother_status(s, false, false, false);
2273
2274 /* This might cause the indexes to hang/start again, hence let's
2275 * request a timing update, after the cork/uncork, too */
2276 request_auto_timing_update(s, true);
2277
2278 return o;
2279 }
2280
2281 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2282 pa_tagstruct *t;
2283 pa_operation *o;
2284 uint32_t tag;
2285
2286 pa_assert(s);
2287 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2288
2289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2290 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2291
2292 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2293
2294 t = pa_tagstruct_command(s->context, command, &tag);
2295 pa_tagstruct_putu32(t, s->channel);
2296 pa_pstream_send_tagstruct(s->context->pstream, t);
2297 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2298
2299 return o;
2300 }
2301
2302 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2303 pa_operation *o;
2304
2305 pa_assert(s);
2306 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2307
2308 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2309 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2310 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2311
2312 /* Ask for a timing update *before* the flush, so that the
2313 * transport usec is as up to date as possible when we get the
2314 * underflow message and update the smoother status*/
2315 request_auto_timing_update(s, true);
2316
2317 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2318 return NULL;
2319
2320 if (s->direction == PA_STREAM_PLAYBACK) {
2321
2322 if (s->write_index_corrections[s->current_write_index_correction].valid)
2323 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2324
2325 if (s->buffer_attr.prebuf > 0)
2326 check_smoother_status(s, false, false, true);
2327
2328 /* This will change the write index, but leave the
2329 * read index untouched. */
2330 invalidate_indexes(s, false, true);
2331
2332 } else
2333 /* For record streams this has no influence on the write
2334 * index, but the read index might jump. */
2335 invalidate_indexes(s, true, false);
2336
2337 /* Note that we do not update requested_bytes here. This is
2338 * because we cannot really know how data actually was dropped
2339 * from the write index due to this. This 'error' will be applied
2340 * by both client and server and hence we should be fine. */
2341
2342 return o;
2343 }
2344
2345 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2346 pa_operation *o;
2347
2348 pa_assert(s);
2349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2350
2351 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2355
2356 /* Ask for a timing update before we cork/uncork to get the best
2357 * accuracy for the transport latency suitable for the
2358 * check_smoother_status() call in the started callback */
2359 request_auto_timing_update(s, true);
2360
2361 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2362 return NULL;
2363
2364 /* This might cause the read index to hang again, hence
2365 * let's request a timing update */
2366 request_auto_timing_update(s, true);
2367
2368 return o;
2369 }
2370
2371 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2372 pa_operation *o;
2373
2374 pa_assert(s);
2375 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2376
2377 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2378 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2379 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2380 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2381
2382 /* Ask for a timing update before we cork/uncork to get the best
2383 * accuracy for the transport latency suitable for the
2384 * check_smoother_status() call in the started callback */
2385 request_auto_timing_update(s, true);
2386
2387 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2388 return NULL;
2389
2390 /* This might cause the read index to start moving again, hence
2391 * let's request a timing update */
2392 request_auto_timing_update(s, true);
2393
2394 return o;
2395 }
2396
2397 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2398 pa_operation *o;
2399
2400 pa_assert(s);
2401 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2402 pa_assert(name);
2403
2404 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2405 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2406 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2407
2408 if (s->context->version >= 13) {
2409 pa_proplist *p = pa_proplist_new();
2410
2411 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2412 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2413 pa_proplist_free(p);
2414 } else {
2415 pa_tagstruct *t;
2416 uint32_t tag;
2417
2418 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2419 t = pa_tagstruct_command(
2420 s->context,
2421 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2422 &tag);
2423 pa_tagstruct_putu32(t, s->channel);
2424 pa_tagstruct_puts(t, name);
2425 pa_pstream_send_tagstruct(s->context->pstream, t);
2426 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2427 }
2428
2429 return o;
2430 }
2431
2432 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2433 pa_usec_t usec;
2434
2435 pa_assert(s);
2436 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2437
2438 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2439 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2440 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2441 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2442 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2443 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2444
2445 if (s->smoother)
2446 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2447 else
2448 usec = calc_time(s, false);
2449
2450 /* Make sure the time runs monotonically */
2451 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2452 if (usec < s->previous_time)
2453 usec = s->previous_time;
2454 else
2455 s->previous_time = usec;
2456 }
2457
2458 if (r_usec)
2459 *r_usec = usec;
2460
2461 return 0;
2462 }
2463
2464 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2465 pa_assert(s);
2466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467
2468 if (negative)
2469 *negative = 0;
2470
2471 if (a >= b)
2472 return a-b;
2473 else {
2474 if (negative && s->direction == PA_STREAM_RECORD) {
2475 *negative = 1;
2476 return b-a;
2477 } else
2478 return 0;
2479 }
2480 }
2481
2482 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2483 pa_usec_t t, c;
2484 int r;
2485 int64_t cindex;
2486
2487 pa_assert(s);
2488 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2489 pa_assert(r_usec);
2490
2491 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2492 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2493 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2494 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2495 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2496 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2497
2498 if ((r = pa_stream_get_time(s, &t)) < 0)
2499 return r;
2500
2501 if (s->direction == PA_STREAM_PLAYBACK)
2502 cindex = s->timing_info.write_index;
2503 else
2504 cindex = s->timing_info.read_index;
2505
2506 if (cindex < 0)
2507 cindex = 0;
2508
2509 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2510
2511 if (s->direction == PA_STREAM_PLAYBACK)
2512 *r_usec = time_counter_diff(s, c, t, negative);
2513 else
2514 *r_usec = time_counter_diff(s, t, c, negative);
2515
2516 return 0;
2517 }
2518
2519 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2520 pa_assert(s);
2521 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2522
2523 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2524 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2525 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2526 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2527
2528 return &s->timing_info;
2529 }
2530
2531 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2532 pa_assert(s);
2533 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2534
2535 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2536
2537 return &s->sample_spec;
2538 }
2539
2540 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2541 pa_assert(s);
2542 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2543
2544 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2545
2546 return &s->channel_map;
2547 }
2548
2549 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2550 pa_assert(s);
2551 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2552
2553 /* We don't have the format till routing is done */
2554 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2555 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2556
2557 return s->format;
2558 }
2559 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2560 pa_assert(s);
2561 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2562
2563 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2564 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2565 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2566
2567 return &s->buffer_attr;
2568 }
2569
2570 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2571 pa_operation *o = userdata;
2572 int success = 1;
2573
2574 pa_assert(pd);
2575 pa_assert(o);
2576 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2577
2578 if (!o->context)
2579 goto finish;
2580
2581 if (command != PA_COMMAND_REPLY) {
2582 if (pa_context_handle_error(o->context, command, t, false) < 0)
2583 goto finish;
2584
2585 success = 0;
2586 } else {
2587 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2588 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2589 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2590 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2591 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2592 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2593 goto finish;
2594 }
2595 } else if (o->stream->direction == PA_STREAM_RECORD) {
2596 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2597 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2598 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2599 goto finish;
2600 }
2601 }
2602
2603 if (o->stream->context->version >= 13) {
2604 pa_usec_t usec;
2605
2606 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2607 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2608 goto finish;
2609 }
2610
2611 if (o->stream->direction == PA_STREAM_RECORD)
2612 o->stream->timing_info.configured_source_usec = usec;
2613 else
2614 o->stream->timing_info.configured_sink_usec = usec;
2615 }
2616
2617 if (!pa_tagstruct_eof(t)) {
2618 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2619 goto finish;
2620 }
2621 }
2622
2623 if (o->callback) {
2624 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2625 cb(o->stream, success, o->userdata);
2626 }
2627
2628 finish:
2629 pa_operation_done(o);
2630 pa_operation_unref(o);
2631 }
2632
2633 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2634 pa_operation *o;
2635 pa_tagstruct *t;
2636 uint32_t tag;
2637 pa_buffer_attr copy;
2638
2639 pa_assert(s);
2640 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2641 pa_assert(attr);
2642
2643 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2644 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2645 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2646 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2647
2648 /* Ask for a timing update before we cork/uncork to get the best
2649 * accuracy for the transport latency suitable for the
2650 * check_smoother_status() call in the started callback */
2651 request_auto_timing_update(s, true);
2652
2653 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2654
2655 t = pa_tagstruct_command(
2656 s->context,
2657 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2658 &tag);
2659 pa_tagstruct_putu32(t, s->channel);
2660
2661 copy = *attr;
2662 patch_buffer_attr(s, &copy, NULL);
2663 attr = &copy;
2664
2665 pa_tagstruct_putu32(t, attr->maxlength);
2666
2667 if (s->direction == PA_STREAM_PLAYBACK)
2668 pa_tagstruct_put(
2669 t,
2670 PA_TAG_U32, attr->tlength,
2671 PA_TAG_U32, attr->prebuf,
2672 PA_TAG_U32, attr->minreq,
2673 PA_TAG_INVALID);
2674 else
2675 pa_tagstruct_putu32(t, attr->fragsize);
2676
2677 if (s->context->version >= 13)
2678 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2679
2680 if (s->context->version >= 14)
2681 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2682
2683 pa_pstream_send_tagstruct(s->context->pstream, t);
2684 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2685
2686 /* This might cause changes in the read/write index, hence let's
2687 * request a timing update */
2688 request_auto_timing_update(s, true);
2689
2690 return o;
2691 }
2692
2693 uint32_t pa_stream_get_device_index(pa_stream *s) {
2694 pa_assert(s);
2695 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2696
2697 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2698 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2699 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2700 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2701 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2702
2703 return s->device_index;
2704 }
2705
2706 const char *pa_stream_get_device_name(pa_stream *s) {
2707 pa_assert(s);
2708 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2709
2710 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2711 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2712 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2714 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2715
2716 return s->device_name;
2717 }
2718
2719 int pa_stream_is_suspended(pa_stream *s) {
2720 pa_assert(s);
2721 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2722
2723 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2724 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2725 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2726 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2727
2728 return s->suspended;
2729 }
2730
2731 int pa_stream_is_corked(pa_stream *s) {
2732 pa_assert(s);
2733 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2734
2735 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2736 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2737 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2738
2739 return s->corked;
2740 }
2741
2742 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2743 pa_operation *o = userdata;
2744 int success = 1;
2745
2746 pa_assert(pd);
2747 pa_assert(o);
2748 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2749
2750 if (!o->context)
2751 goto finish;
2752
2753 if (command != PA_COMMAND_REPLY) {
2754 if (pa_context_handle_error(o->context, command, t, false) < 0)
2755 goto finish;
2756
2757 success = 0;
2758 } else {
2759
2760 if (!pa_tagstruct_eof(t)) {
2761 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2762 goto finish;
2763 }
2764 }
2765
2766 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2767 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2768
2769 if (o->callback) {
2770 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2771 cb(o->stream, success, o->userdata);
2772 }
2773
2774 finish:
2775 pa_operation_done(o);
2776 pa_operation_unref(o);
2777 }
2778
2779 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2780 pa_operation *o;
2781 pa_tagstruct *t;
2782 uint32_t tag;
2783
2784 pa_assert(s);
2785 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2786
2787 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2788 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2789 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2790 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2791 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2792 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2793
2794 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2795 o->private = PA_UINT_TO_PTR(rate);
2796
2797 t = pa_tagstruct_command(
2798 s->context,
2799 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2800 &tag);
2801 pa_tagstruct_putu32(t, s->channel);
2802 pa_tagstruct_putu32(t, rate);
2803
2804 pa_pstream_send_tagstruct(s->context->pstream, t);
2805 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2806
2807 return o;
2808 }
2809
2810 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2811 pa_operation *o;
2812 pa_tagstruct *t;
2813 uint32_t tag;
2814
2815 pa_assert(s);
2816 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2817
2818 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2819 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2820 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2821 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2822 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2823
2824 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2825
2826 t = pa_tagstruct_command(
2827 s->context,
2828 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2829 &tag);
2830 pa_tagstruct_putu32(t, s->channel);
2831 pa_tagstruct_putu32(t, (uint32_t) mode);
2832 pa_tagstruct_put_proplist(t, p);
2833
2834 pa_pstream_send_tagstruct(s->context->pstream, t);
2835 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2836
2837 /* Please note that we don't update s->proplist here, because we
2838 * don't export that field */
2839
2840 return o;
2841 }
2842
2843 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2844 pa_operation *o;
2845 pa_tagstruct *t;
2846 uint32_t tag;
2847 const char * const*k;
2848
2849 pa_assert(s);
2850 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2851
2852 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2853 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2854 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2855 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2856 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2857
2858 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2859
2860 t = pa_tagstruct_command(
2861 s->context,
2862 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2863 &tag);
2864 pa_tagstruct_putu32(t, s->channel);
2865
2866 for (k = keys; *k; k++)
2867 pa_tagstruct_puts(t, *k);
2868
2869 pa_tagstruct_puts(t, NULL);
2870
2871 pa_pstream_send_tagstruct(s->context->pstream, t);
2872 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2873
2874 /* Please note that we don't update s->proplist here, because we
2875 * don't export that field */
2876
2877 return o;
2878 }
2879
2880 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2881 pa_assert(s);
2882 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2883
2884 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2885 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2886 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2887 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2888
2889 s->direct_on_input = sink_input_idx;
2890
2891 return 0;
2892 }
2893
2894 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2895 pa_assert(s);
2896 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2897
2898 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2899 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2900 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2901
2902 return s->direct_on_input;
2903 }