]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
alsa-mixer: Add surround 2.1 profile
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 /* #define STREAM_DEBUG */
48
49 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
50 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
51
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55
56 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
57 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 }
59
60 static void reset_callbacks(pa_stream *s) {
61 s->read_callback = NULL;
62 s->read_userdata = NULL;
63 s->write_callback = NULL;
64 s->write_userdata = NULL;
65 s->state_callback = NULL;
66 s->state_userdata = NULL;
67 s->overflow_callback = NULL;
68 s->overflow_userdata = NULL;
69 s->underflow_callback = NULL;
70 s->underflow_userdata = NULL;
71 s->latency_update_callback = NULL;
72 s->latency_update_userdata = NULL;
73 s->moved_callback = NULL;
74 s->moved_userdata = NULL;
75 s->suspended_callback = NULL;
76 s->suspended_userdata = NULL;
77 s->started_callback = NULL;
78 s->started_userdata = NULL;
79 s->event_callback = NULL;
80 s->event_userdata = NULL;
81 s->buffer_attr_callback = NULL;
82 s->buffer_attr_userdata = NULL;
83 }
84
85 static pa_stream *pa_stream_new_with_proplist_internal(
86 pa_context *c,
87 const char *name,
88 const pa_sample_spec *ss,
89 const pa_channel_map *map,
90 pa_format_info * const *formats,
91 unsigned int n_formats,
92 pa_proplist *p) {
93
94 pa_stream *s;
95 unsigned int i;
96
97 pa_assert(c);
98 pa_assert(PA_REFCNT_VALUE(c) >= 1);
99 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
100 pa_assert(n_formats < PA_MAX_FORMATS);
101
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105 s = pa_xnew(pa_stream, 1);
106 PA_REFCNT_INIT(s);
107 s->context = c;
108 s->mainloop = c->mainloop;
109
110 s->direction = PA_STREAM_NODIRECTION;
111 s->state = PA_STREAM_UNCONNECTED;
112 s->flags = 0;
113
114 if (ss)
115 s->sample_spec = *ss;
116 else
117 pa_sample_spec_init(&s->sample_spec);
118
119 if (map)
120 s->channel_map = *map;
121 else
122 pa_channel_map_init(&s->channel_map);
123
124 s->n_formats = 0;
125 if (formats) {
126 s->n_formats = n_formats;
127 for (i = 0; i < n_formats; i++)
128 s->req_formats[i] = pa_format_info_copy(formats[i]);
129 }
130
131 /* We'll get the final negotiated format after connecting */
132 s->format = NULL;
133
134 s->direct_on_input = PA_INVALID_INDEX;
135
136 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
137 if (name)
138 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139
140 s->channel = 0;
141 s->channel_valid = false;
142 s->syncid = c->csyncid++;
143 s->stream_index = PA_INVALID_INDEX;
144
145 s->requested_bytes = 0;
146 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
147
148 /* We initialize the target length here, so that if the user
149 * passes no explicit buffering metrics the default is similar to
150 * what older PA versions provided. */
151
152 s->buffer_attr.maxlength = (uint32_t) -1;
153 if (ss)
154 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
155 else {
156 /* FIXME: We assume a worst-case compressed format corresponding to
157 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
158 pa_sample_spec tmp_ss = {
159 .format = PA_SAMPLE_S16NE,
160 .rate = 48000,
161 .channels = 2,
162 };
163 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
164 }
165 s->buffer_attr.minreq = (uint32_t) -1;
166 s->buffer_attr.prebuf = (uint32_t) -1;
167 s->buffer_attr.fragsize = (uint32_t) -1;
168
169 s->device_index = PA_INVALID_INDEX;
170 s->device_name = NULL;
171 s->suspended = false;
172 s->corked = false;
173
174 s->write_memblock = NULL;
175 s->write_data = NULL;
176
177 pa_memchunk_reset(&s->peek_memchunk);
178 s->peek_data = NULL;
179 s->record_memblockq = NULL;
180
181 memset(&s->timing_info, 0, sizeof(s->timing_info));
182 s->timing_info_valid = false;
183
184 s->previous_time = 0;
185 s->latest_underrun_at_index = -1;
186
187 s->read_index_not_before = 0;
188 s->write_index_not_before = 0;
189 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
190 s->write_index_corrections[i].valid = 0;
191 s->current_write_index_correction = 0;
192
193 s->auto_timing_update_event = NULL;
194 s->auto_timing_update_requested = false;
195 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
196
197 reset_callbacks(s);
198
199 s->smoother = NULL;
200
201 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
202 PA_LLIST_PREPEND(pa_stream, c->streams, s);
203 pa_stream_ref(s);
204
205 return s;
206 }
207
208 pa_stream *pa_stream_new_with_proplist(
209 pa_context *c,
210 const char *name,
211 const pa_sample_spec *ss,
212 const pa_channel_map *map,
213 pa_proplist *p) {
214
215 pa_channel_map tmap;
216
217 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
220 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
222
223 if (!map)
224 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
225
226 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
227 }
228
229 pa_stream *pa_stream_new_extended(
230 pa_context *c,
231 const char *name,
232 pa_format_info * const *formats,
233 unsigned int n_formats,
234 pa_proplist *p) {
235
236 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
237
238 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
239 }
240
241 static void stream_unlink(pa_stream *s) {
242 pa_operation *o, *n;
243 pa_assert(s);
244
245 if (!s->context)
246 return;
247
248 /* Detach from context */
249
250 /* Unref all operation objects that point to us */
251 for (o = s->context->operations; o; o = n) {
252 n = o->next;
253
254 if (o->stream == s)
255 pa_operation_cancel(o);
256 }
257
258 /* Drop all outstanding replies for this stream */
259 if (s->context->pdispatch)
260 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
261
262 if (s->channel_valid) {
263 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
264 s->channel = 0;
265 s->channel_valid = false;
266 }
267
268 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
269 pa_stream_unref(s);
270
271 s->context = NULL;
272
273 if (s->auto_timing_update_event) {
274 pa_assert(s->mainloop);
275 s->mainloop->time_free(s->auto_timing_update_event);
276 }
277
278 reset_callbacks(s);
279 }
280
281 static void stream_free(pa_stream *s) {
282 unsigned int i;
283
284 pa_assert(s);
285
286 stream_unlink(s);
287
288 if (s->write_memblock) {
289 if (s->write_data)
290 pa_memblock_release(s->write_memblock);
291 pa_memblock_unref(s->write_memblock);
292 }
293
294 if (s->peek_memchunk.memblock) {
295 if (s->peek_data)
296 pa_memblock_release(s->peek_memchunk.memblock);
297 pa_memblock_unref(s->peek_memchunk.memblock);
298 }
299
300 if (s->record_memblockq)
301 pa_memblockq_free(s->record_memblockq);
302
303 if (s->proplist)
304 pa_proplist_free(s->proplist);
305
306 if (s->smoother)
307 pa_smoother_free(s->smoother);
308
309 for (i = 0; i < s->n_formats; i++)
310 pa_format_info_free(s->req_formats[i]);
311
312 if (s->format)
313 pa_format_info_free(s->format);
314
315 pa_xfree(s->device_name);
316 pa_xfree(s);
317 }
318
319 void pa_stream_unref(pa_stream *s) {
320 pa_assert(s);
321 pa_assert(PA_REFCNT_VALUE(s) >= 1);
322
323 if (PA_REFCNT_DEC(s) <= 0)
324 stream_free(s);
325 }
326
327 pa_stream* pa_stream_ref(pa_stream *s) {
328 pa_assert(s);
329 pa_assert(PA_REFCNT_VALUE(s) >= 1);
330
331 PA_REFCNT_INC(s);
332 return s;
333 }
334
335 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
336 pa_assert(s);
337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339 return s->state;
340 }
341
342 pa_context* pa_stream_get_context(pa_stream *s) {
343 pa_assert(s);
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346 return s->context;
347 }
348
349 uint32_t pa_stream_get_index(pa_stream *s) {
350 pa_assert(s);
351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
352
353 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
354 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
355
356 return s->stream_index;
357 }
358
359 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
360 pa_assert(s);
361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
362
363 if (s->state == st)
364 return;
365
366 pa_stream_ref(s);
367
368 s->state = st;
369
370 if (s->state_callback)
371 s->state_callback(s, s->state_userdata);
372
373 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
374 stream_unlink(s);
375
376 pa_stream_unref(s);
377 }
378
379 static void request_auto_timing_update(pa_stream *s, bool force) {
380 pa_assert(s);
381 pa_assert(PA_REFCNT_VALUE(s) >= 1);
382
383 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
384 return;
385
386 if (s->state == PA_STREAM_READY &&
387 (force || !s->auto_timing_update_requested)) {
388 pa_operation *o;
389
390 #ifdef STREAM_DEBUG
391 pa_log_debug("Automatically requesting new timing data");
392 #endif
393
394 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
395 pa_operation_unref(o);
396 s->auto_timing_update_requested = true;
397 }
398 }
399
400 if (s->auto_timing_update_event) {
401 if (s->suspended && !force) {
402 pa_assert(s->mainloop);
403 s->mainloop->time_free(s->auto_timing_update_event);
404 s->auto_timing_update_event = NULL;
405 } else {
406 if (force)
407 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
408
409 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
410
411 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
412 }
413 }
414 }
415
416 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
417 pa_context *c = userdata;
418 pa_stream *s;
419 uint32_t channel;
420
421 pa_assert(pd);
422 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
423 pa_assert(t);
424 pa_assert(c);
425 pa_assert(PA_REFCNT_VALUE(c) >= 1);
426
427 pa_context_ref(c);
428
429 if (pa_tagstruct_getu32(t, &channel) < 0 ||
430 !pa_tagstruct_eof(t)) {
431 pa_context_fail(c, PA_ERR_PROTOCOL);
432 goto finish;
433 }
434
435 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
436 goto finish;
437
438 if (s->state != PA_STREAM_READY)
439 goto finish;
440
441 pa_context_set_error(c, PA_ERR_KILLED);
442 pa_stream_set_state(s, PA_STREAM_FAILED);
443
444 finish:
445 pa_context_unref(c);
446 }
447
448 static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
449 pa_usec_t x;
450
451 pa_assert(s);
452 pa_assert(!force_start || !force_stop);
453
454 if (!s->smoother)
455 return;
456
457 x = pa_rtclock_now();
458
459 if (s->timing_info_valid) {
460 if (aposteriori)
461 x -= s->timing_info.transport_usec;
462 else
463 x += s->timing_info.transport_usec;
464 }
465
466 if (s->suspended || s->corked || force_stop)
467 pa_smoother_pause(s->smoother, x);
468 else if (force_start || s->buffer_attr.prebuf == 0) {
469
470 if (!s->timing_info_valid &&
471 !aposteriori &&
472 !force_start &&
473 !force_stop &&
474 s->context->version >= 13) {
475
476 /* If the server supports STARTED events we take them as
477 * indications when audio really starts/stops playing, if
478 * we don't have any timing info yet -- instead of trying
479 * to be smart and guessing the server time. Otherwise the
480 * unknown transport delay adds too much noise to our time
481 * calculations. */
482
483 return;
484 }
485
486 pa_smoother_resume(s->smoother, x, true);
487 }
488
489 /* Please note that we have no idea if playback actually started
490 * if prebuf is non-zero! */
491 }
492
493 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
494
495 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496 pa_context *c = userdata;
497 pa_stream *s;
498 uint32_t channel;
499 const char *dn;
500 bool suspended;
501 uint32_t di;
502 pa_usec_t usec = 0;
503 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
504
505 pa_assert(pd);
506 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
507 pa_assert(t);
508 pa_assert(c);
509 pa_assert(PA_REFCNT_VALUE(c) >= 1);
510
511 pa_context_ref(c);
512
513 if (c->version < 12) {
514 pa_context_fail(c, PA_ERR_PROTOCOL);
515 goto finish;
516 }
517
518 if (pa_tagstruct_getu32(t, &channel) < 0 ||
519 pa_tagstruct_getu32(t, &di) < 0 ||
520 pa_tagstruct_gets(t, &dn) < 0 ||
521 pa_tagstruct_get_boolean(t, &suspended) < 0) {
522 pa_context_fail(c, PA_ERR_PROTOCOL);
523 goto finish;
524 }
525
526 if (c->version >= 13) {
527
528 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
529 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
530 pa_tagstruct_getu32(t, &fragsize) < 0 ||
531 pa_tagstruct_get_usec(t, &usec) < 0) {
532 pa_context_fail(c, PA_ERR_PROTOCOL);
533 goto finish;
534 }
535 } else {
536 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
537 pa_tagstruct_getu32(t, &tlength) < 0 ||
538 pa_tagstruct_getu32(t, &prebuf) < 0 ||
539 pa_tagstruct_getu32(t, &minreq) < 0 ||
540 pa_tagstruct_get_usec(t, &usec) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
542 goto finish;
543 }
544 }
545 }
546
547 if (!pa_tagstruct_eof(t)) {
548 pa_context_fail(c, PA_ERR_PROTOCOL);
549 goto finish;
550 }
551
552 if (!dn || di == PA_INVALID_INDEX) {
553 pa_context_fail(c, PA_ERR_PROTOCOL);
554 goto finish;
555 }
556
557 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
558 goto finish;
559
560 if (s->state != PA_STREAM_READY)
561 goto finish;
562
563 if (c->version >= 13) {
564 if (s->direction == PA_STREAM_RECORD)
565 s->timing_info.configured_source_usec = usec;
566 else
567 s->timing_info.configured_sink_usec = usec;
568
569 s->buffer_attr.maxlength = maxlength;
570 s->buffer_attr.fragsize = fragsize;
571 s->buffer_attr.tlength = tlength;
572 s->buffer_attr.prebuf = prebuf;
573 s->buffer_attr.minreq = minreq;
574 }
575
576 pa_xfree(s->device_name);
577 s->device_name = pa_xstrdup(dn);
578 s->device_index = di;
579
580 s->suspended = suspended;
581
582 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
583 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
584 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
585 request_auto_timing_update(s, true);
586 }
587
588 check_smoother_status(s, true, false, false);
589 request_auto_timing_update(s, true);
590
591 if (s->moved_callback)
592 s->moved_callback(s, s->moved_userdata);
593
594 finish:
595 pa_context_unref(c);
596 }
597
598 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
599 pa_context *c = userdata;
600 pa_stream *s;
601 uint32_t channel;
602 pa_usec_t usec = 0;
603 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
604
605 pa_assert(pd);
606 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
607 pa_assert(t);
608 pa_assert(c);
609 pa_assert(PA_REFCNT_VALUE(c) >= 1);
610
611 pa_context_ref(c);
612
613 if (c->version < 15) {
614 pa_context_fail(c, PA_ERR_PROTOCOL);
615 goto finish;
616 }
617
618 if (pa_tagstruct_getu32(t, &channel) < 0) {
619 pa_context_fail(c, PA_ERR_PROTOCOL);
620 goto finish;
621 }
622
623 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
624 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
625 pa_tagstruct_getu32(t, &fragsize) < 0 ||
626 pa_tagstruct_get_usec(t, &usec) < 0) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
628 goto finish;
629 }
630 } else {
631 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
632 pa_tagstruct_getu32(t, &tlength) < 0 ||
633 pa_tagstruct_getu32(t, &prebuf) < 0 ||
634 pa_tagstruct_getu32(t, &minreq) < 0 ||
635 pa_tagstruct_get_usec(t, &usec) < 0) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
637 goto finish;
638 }
639 }
640
641 if (!pa_tagstruct_eof(t)) {
642 pa_context_fail(c, PA_ERR_PROTOCOL);
643 goto finish;
644 }
645
646 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
647 goto finish;
648
649 if (s->state != PA_STREAM_READY)
650 goto finish;
651
652 if (s->direction == PA_STREAM_RECORD)
653 s->timing_info.configured_source_usec = usec;
654 else
655 s->timing_info.configured_sink_usec = usec;
656
657 s->buffer_attr.maxlength = maxlength;
658 s->buffer_attr.fragsize = fragsize;
659 s->buffer_attr.tlength = tlength;
660 s->buffer_attr.prebuf = prebuf;
661 s->buffer_attr.minreq = minreq;
662
663 request_auto_timing_update(s, true);
664
665 if (s->buffer_attr_callback)
666 s->buffer_attr_callback(s, s->buffer_attr_userdata);
667
668 finish:
669 pa_context_unref(c);
670 }
671
672 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
674 pa_stream *s;
675 uint32_t channel;
676 bool suspended;
677
678 pa_assert(pd);
679 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
680 pa_assert(t);
681 pa_assert(c);
682 pa_assert(PA_REFCNT_VALUE(c) >= 1);
683
684 pa_context_ref(c);
685
686 if (c->version < 12) {
687 pa_context_fail(c, PA_ERR_PROTOCOL);
688 goto finish;
689 }
690
691 if (pa_tagstruct_getu32(t, &channel) < 0 ||
692 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
693 !pa_tagstruct_eof(t)) {
694 pa_context_fail(c, PA_ERR_PROTOCOL);
695 goto finish;
696 }
697
698 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
699 goto finish;
700
701 if (s->state != PA_STREAM_READY)
702 goto finish;
703
704 s->suspended = suspended;
705
706 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
707 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
708 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
709 request_auto_timing_update(s, true);
710 }
711
712 check_smoother_status(s, true, false, false);
713 request_auto_timing_update(s, true);
714
715 if (s->suspended_callback)
716 s->suspended_callback(s, s->suspended_userdata);
717
718 finish:
719 pa_context_unref(c);
720 }
721
722 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
723 pa_context *c = userdata;
724 pa_stream *s;
725 uint32_t channel;
726
727 pa_assert(pd);
728 pa_assert(command == PA_COMMAND_STARTED);
729 pa_assert(t);
730 pa_assert(c);
731 pa_assert(PA_REFCNT_VALUE(c) >= 1);
732
733 pa_context_ref(c);
734
735 if (c->version < 13) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
737 goto finish;
738 }
739
740 if (pa_tagstruct_getu32(t, &channel) < 0 ||
741 !pa_tagstruct_eof(t)) {
742 pa_context_fail(c, PA_ERR_PROTOCOL);
743 goto finish;
744 }
745
746 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
747 goto finish;
748
749 if (s->state != PA_STREAM_READY)
750 goto finish;
751
752 check_smoother_status(s, true, true, false);
753 request_auto_timing_update(s, true);
754
755 if (s->started_callback)
756 s->started_callback(s, s->started_userdata);
757
758 finish:
759 pa_context_unref(c);
760 }
761
762 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
763 pa_context *c = userdata;
764 pa_stream *s;
765 uint32_t channel;
766 pa_proplist *pl = NULL;
767 const char *event;
768
769 pa_assert(pd);
770 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
771 pa_assert(t);
772 pa_assert(c);
773 pa_assert(PA_REFCNT_VALUE(c) >= 1);
774
775 pa_context_ref(c);
776
777 if (c->version < 15) {
778 pa_context_fail(c, PA_ERR_PROTOCOL);
779 goto finish;
780 }
781
782 pl = pa_proplist_new();
783
784 if (pa_tagstruct_getu32(t, &channel) < 0 ||
785 pa_tagstruct_gets(t, &event) < 0 ||
786 pa_tagstruct_get_proplist(t, pl) < 0 ||
787 !pa_tagstruct_eof(t) || !event) {
788 pa_context_fail(c, PA_ERR_PROTOCOL);
789 goto finish;
790 }
791
792 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
793 goto finish;
794
795 if (s->state != PA_STREAM_READY)
796 goto finish;
797
798 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
799 /* Let client know what the running time was when the stream had to be killed */
800 pa_usec_t stream_time;
801 if (pa_stream_get_time(s, &stream_time) == 0)
802 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
803 }
804
805 if (s->event_callback)
806 s->event_callback(s, event, pl, s->event_userdata);
807
808 finish:
809 pa_context_unref(c);
810
811 if (pl)
812 pa_proplist_free(pl);
813 }
814
815 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
816 pa_stream *s;
817 pa_context *c = userdata;
818 uint32_t bytes, channel;
819
820 pa_assert(pd);
821 pa_assert(command == PA_COMMAND_REQUEST);
822 pa_assert(t);
823 pa_assert(c);
824 pa_assert(PA_REFCNT_VALUE(c) >= 1);
825
826 pa_context_ref(c);
827
828 if (pa_tagstruct_getu32(t, &channel) < 0 ||
829 pa_tagstruct_getu32(t, &bytes) < 0 ||
830 !pa_tagstruct_eof(t)) {
831 pa_context_fail(c, PA_ERR_PROTOCOL);
832 goto finish;
833 }
834
835 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
836 goto finish;
837
838 if (s->state != PA_STREAM_READY)
839 goto finish;
840
841 s->requested_bytes += bytes;
842
843 #ifdef STREAM_DEBUG
844 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
845 #endif
846
847 if (s->requested_bytes > 0 && s->write_callback)
848 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
849
850 finish:
851 pa_context_unref(c);
852 }
853
854 int64_t pa_stream_get_underflow_index(pa_stream *p) {
855 pa_assert(p);
856 return p->latest_underrun_at_index;
857 }
858
859 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
860 pa_stream *s;
861 pa_context *c = userdata;
862 uint32_t channel;
863 int64_t offset = -1;
864
865 pa_assert(pd);
866 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
867 pa_assert(t);
868 pa_assert(c);
869 pa_assert(PA_REFCNT_VALUE(c) >= 1);
870
871 pa_context_ref(c);
872
873 if (pa_tagstruct_getu32(t, &channel) < 0) {
874 pa_context_fail(c, PA_ERR_PROTOCOL);
875 goto finish;
876 }
877
878 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
879 if (pa_tagstruct_gets64(t, &offset) < 0) {
880 pa_context_fail(c, PA_ERR_PROTOCOL);
881 goto finish;
882 }
883 }
884
885 if (!pa_tagstruct_eof(t)) {
886 pa_context_fail(c, PA_ERR_PROTOCOL);
887 goto finish;
888 }
889
890 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
891 goto finish;
892
893 if (s->state != PA_STREAM_READY)
894 goto finish;
895
896 if (offset != -1)
897 s->latest_underrun_at_index = offset;
898
899 if (s->buffer_attr.prebuf > 0)
900 check_smoother_status(s, true, false, true);
901
902 request_auto_timing_update(s, true);
903
904 if (command == PA_COMMAND_OVERFLOW) {
905 if (s->overflow_callback)
906 s->overflow_callback(s, s->overflow_userdata);
907 } else if (command == PA_COMMAND_UNDERFLOW) {
908 if (s->underflow_callback)
909 s->underflow_callback(s, s->underflow_userdata);
910 }
911
912 finish:
913 pa_context_unref(c);
914 }
915
916 static void invalidate_indexes(pa_stream *s, bool r, bool w) {
917 pa_assert(s);
918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
919
920 #ifdef STREAM_DEBUG
921 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
922 #endif
923
924 if (s->state != PA_STREAM_READY)
925 return;
926
927 if (w) {
928 s->write_index_not_before = s->context->ctag;
929
930 if (s->timing_info_valid)
931 s->timing_info.write_index_corrupt = true;
932
933 #ifdef STREAM_DEBUG
934 pa_log_debug("write_index invalidated");
935 #endif
936 }
937
938 if (r) {
939 s->read_index_not_before = s->context->ctag;
940
941 if (s->timing_info_valid)
942 s->timing_info.read_index_corrupt = true;
943
944 #ifdef STREAM_DEBUG
945 pa_log_debug("read_index invalidated");
946 #endif
947 }
948
949 request_auto_timing_update(s, true);
950 }
951
952 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
953 pa_stream *s = userdata;
954
955 pa_assert(s);
956 pa_assert(PA_REFCNT_VALUE(s) >= 1);
957
958 pa_stream_ref(s);
959 request_auto_timing_update(s, false);
960 pa_stream_unref(s);
961 }
962
963 static void create_stream_complete(pa_stream *s) {
964 pa_assert(s);
965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
966 pa_assert(s->state == PA_STREAM_CREATING);
967
968 pa_stream_set_state(s, PA_STREAM_READY);
969
970 if (s->requested_bytes > 0 && s->write_callback)
971 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
972
973 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
974 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
975 pa_assert(!s->auto_timing_update_event);
976 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
977
978 request_auto_timing_update(s, true);
979 }
980
981 check_smoother_status(s, true, false, false);
982 }
983
984 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
985 const char *e;
986
987 pa_assert(s);
988 pa_assert(attr);
989
990 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
991 uint32_t ms;
992
993 if (pa_atou(e, &ms) < 0 || ms <= 0)
994 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
995 else {
996 attr->maxlength = (uint32_t) -1;
997 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
998 attr->minreq = (uint32_t) -1;
999 attr->prebuf = (uint32_t) -1;
1000 attr->fragsize = attr->tlength;
1001 }
1002
1003 if (flags)
1004 *flags |= PA_STREAM_ADJUST_LATENCY;
1005 }
1006
1007 if (s->context->version >= 13)
1008 return;
1009
1010 /* Version older than 0.9.10 didn't do server side buffer_attr
1011 * selection, hence we have to fake it on the client side. */
1012
1013 /* We choose fairly conservative values here, to not confuse
1014 * old clients with extremely large playback buffers */
1015
1016 if (attr->maxlength == (uint32_t) -1)
1017 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1018
1019 if (attr->tlength == (uint32_t) -1)
1020 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1021
1022 if (attr->minreq == (uint32_t) -1)
1023 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1024
1025 if (attr->prebuf == (uint32_t) -1)
1026 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1027
1028 if (attr->fragsize == (uint32_t) -1)
1029 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1030 }
1031
1032 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1033 pa_stream *s = userdata;
1034 uint32_t requested_bytes = 0;
1035
1036 pa_assert(pd);
1037 pa_assert(s);
1038 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1039 pa_assert(s->state == PA_STREAM_CREATING);
1040
1041 pa_stream_ref(s);
1042
1043 if (command != PA_COMMAND_REPLY) {
1044 if (pa_context_handle_error(s->context, command, t, false) < 0)
1045 goto finish;
1046
1047 pa_stream_set_state(s, PA_STREAM_FAILED);
1048 goto finish;
1049 }
1050
1051 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1052 s->channel == PA_INVALID_INDEX ||
1053 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1054 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1055 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1056 goto finish;
1057 }
1058
1059 s->requested_bytes = (int64_t) requested_bytes;
1060
1061 if (s->context->version >= 9) {
1062 if (s->direction == PA_STREAM_PLAYBACK) {
1063 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1064 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1065 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1066 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1067 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1068 goto finish;
1069 }
1070 } else if (s->direction == PA_STREAM_RECORD) {
1071 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1072 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1073 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1074 goto finish;
1075 }
1076 }
1077 }
1078
1079 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1080 pa_sample_spec ss;
1081 pa_channel_map cm;
1082 const char *dn = NULL;
1083 bool suspended;
1084
1085 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1086 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1087 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1088 pa_tagstruct_gets(t, &dn) < 0 ||
1089 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1090 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091 goto finish;
1092 }
1093
1094 if (!dn || s->device_index == PA_INVALID_INDEX ||
1095 ss.channels != cm.channels ||
1096 !pa_channel_map_valid(&cm) ||
1097 !pa_sample_spec_valid(&ss) ||
1098 (s->n_formats == 0 && (
1099 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1100 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1101 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1102 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1103 goto finish;
1104 }
1105
1106 pa_xfree(s->device_name);
1107 s->device_name = pa_xstrdup(dn);
1108 s->suspended = suspended;
1109
1110 s->channel_map = cm;
1111 s->sample_spec = ss;
1112 }
1113
1114 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1115 pa_usec_t usec;
1116
1117 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1118 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1119 goto finish;
1120 }
1121
1122 if (s->direction == PA_STREAM_RECORD)
1123 s->timing_info.configured_source_usec = usec;
1124 else
1125 s->timing_info.configured_sink_usec = usec;
1126 }
1127
1128 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1129 || s->context->version >= 22) {
1130
1131 pa_format_info *f = pa_format_info_new();
1132 pa_tagstruct_get_format_info(t, f);
1133
1134 if (pa_format_info_valid(f))
1135 s->format = f;
1136 else {
1137 pa_format_info_free(f);
1138 if (s->n_formats > 0) {
1139 /* We used the extended API, so we should have got back a proper format */
1140 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1141 goto finish;
1142 }
1143 }
1144 }
1145
1146 if (!pa_tagstruct_eof(t)) {
1147 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1148 goto finish;
1149 }
1150
1151 if (s->direction == PA_STREAM_RECORD) {
1152 pa_assert(!s->record_memblockq);
1153
1154 s->record_memblockq = pa_memblockq_new(
1155 "client side record memblockq",
1156 0,
1157 s->buffer_attr.maxlength,
1158 0,
1159 &s->sample_spec,
1160 1,
1161 0,
1162 0,
1163 NULL);
1164 }
1165
1166 s->channel_valid = true;
1167 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1168
1169 create_stream_complete(s);
1170
1171 finish:
1172 pa_stream_unref(s);
1173 }
1174
1175 static int create_stream(
1176 pa_stream_direction_t direction,
1177 pa_stream *s,
1178 const char *dev,
1179 const pa_buffer_attr *attr,
1180 pa_stream_flags_t flags,
1181 const pa_cvolume *volume,
1182 pa_stream *sync_stream) {
1183
1184 pa_tagstruct *t;
1185 uint32_t tag;
1186 bool volume_set = !!volume;
1187 pa_cvolume cv;
1188 uint32_t i;
1189
1190 pa_assert(s);
1191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1193
1194 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1195 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1196 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1197 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1198 PA_STREAM_INTERPOLATE_TIMING|
1199 PA_STREAM_NOT_MONOTONIC|
1200 PA_STREAM_AUTO_TIMING_UPDATE|
1201 PA_STREAM_NO_REMAP_CHANNELS|
1202 PA_STREAM_NO_REMIX_CHANNELS|
1203 PA_STREAM_FIX_FORMAT|
1204 PA_STREAM_FIX_RATE|
1205 PA_STREAM_FIX_CHANNELS|
1206 PA_STREAM_DONT_MOVE|
1207 PA_STREAM_VARIABLE_RATE|
1208 PA_STREAM_PEAK_DETECT|
1209 PA_STREAM_START_MUTED|
1210 PA_STREAM_ADJUST_LATENCY|
1211 PA_STREAM_EARLY_REQUESTS|
1212 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1213 PA_STREAM_START_UNMUTED|
1214 PA_STREAM_FAIL_ON_SUSPEND|
1215 PA_STREAM_RELATIVE_VOLUME|
1216 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1217
1218 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1219 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1220 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1221 /* Although some of the other flags are not supported on older
1222 * version, we don't check for them here, because it doesn't hurt
1223 * when they are passed but actually not supported. This makes
1224 * client development easier */
1225
1226 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1227 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1228 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1229
1230 pa_stream_ref(s);
1231
1232 s->direction = direction;
1233
1234 if (sync_stream)
1235 s->syncid = sync_stream->syncid;
1236
1237 if (attr)
1238 s->buffer_attr = *attr;
1239 patch_buffer_attr(s, &s->buffer_attr, &flags);
1240
1241 s->flags = flags;
1242 s->corked = !!(flags & PA_STREAM_START_CORKED);
1243
1244 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1245 pa_usec_t x;
1246
1247 x = pa_rtclock_now();
1248
1249 pa_assert(!s->smoother);
1250 s->smoother = pa_smoother_new(
1251 SMOOTHER_ADJUST_TIME,
1252 SMOOTHER_HISTORY_TIME,
1253 !(flags & PA_STREAM_NOT_MONOTONIC),
1254 true,
1255 SMOOTHER_MIN_HISTORY,
1256 x,
1257 true);
1258 }
1259
1260 if (!dev)
1261 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1262
1263 t = pa_tagstruct_command(
1264 s->context,
1265 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1266 &tag);
1267
1268 if (s->context->version < 13)
1269 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1270
1271 pa_tagstruct_put(
1272 t,
1273 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1274 PA_TAG_CHANNEL_MAP, &s->channel_map,
1275 PA_TAG_U32, PA_INVALID_INDEX,
1276 PA_TAG_STRING, dev,
1277 PA_TAG_U32, s->buffer_attr.maxlength,
1278 PA_TAG_BOOLEAN, s->corked,
1279 PA_TAG_INVALID);
1280
1281 if (!volume) {
1282 if (pa_sample_spec_valid(&s->sample_spec))
1283 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1284 else {
1285 /* This is not really relevant, since no volume was set, and
1286 * the real number of channels is embedded in the format_info
1287 * structure */
1288 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1289 }
1290 }
1291
1292 if (s->direction == PA_STREAM_PLAYBACK) {
1293 pa_tagstruct_put(
1294 t,
1295 PA_TAG_U32, s->buffer_attr.tlength,
1296 PA_TAG_U32, s->buffer_attr.prebuf,
1297 PA_TAG_U32, s->buffer_attr.minreq,
1298 PA_TAG_U32, s->syncid,
1299 PA_TAG_INVALID);
1300
1301 pa_tagstruct_put_cvolume(t, volume);
1302 } else
1303 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1304
1305 if (s->context->version >= 12) {
1306 pa_tagstruct_put(
1307 t,
1308 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1309 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1310 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1311 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1312 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1313 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1314 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1315 PA_TAG_INVALID);
1316 }
1317
1318 if (s->context->version >= 13) {
1319
1320 if (s->direction == PA_STREAM_PLAYBACK)
1321 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1322 else
1323 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1324
1325 pa_tagstruct_put(
1326 t,
1327 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1328 PA_TAG_PROPLIST, s->proplist,
1329 PA_TAG_INVALID);
1330
1331 if (s->direction == PA_STREAM_RECORD)
1332 pa_tagstruct_putu32(t, s->direct_on_input);
1333 }
1334
1335 if (s->context->version >= 14) {
1336
1337 if (s->direction == PA_STREAM_PLAYBACK)
1338 pa_tagstruct_put_boolean(t, volume_set);
1339
1340 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1341 }
1342
1343 if (s->context->version >= 15) {
1344
1345 if (s->direction == PA_STREAM_PLAYBACK)
1346 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1347
1348 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1349 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1350 }
1351
1352 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1353 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1354
1355 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1356 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1357
1358 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1359 || s->context->version >= 22) {
1360
1361 pa_tagstruct_putu8(t, s->n_formats);
1362 for (i = 0; i < s->n_formats; i++)
1363 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1364 }
1365
1366 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1367 pa_tagstruct_put_cvolume(t, volume);
1368 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1369 pa_tagstruct_put_boolean(t, volume_set);
1370 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1371 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1372 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1373 }
1374
1375 pa_pstream_send_tagstruct(s->context->pstream, t);
1376 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1377
1378 pa_stream_set_state(s, PA_STREAM_CREATING);
1379
1380 pa_stream_unref(s);
1381 return 0;
1382 }
1383
1384 int pa_stream_connect_playback(
1385 pa_stream *s,
1386 const char *dev,
1387 const pa_buffer_attr *attr,
1388 pa_stream_flags_t flags,
1389 const pa_cvolume *volume,
1390 pa_stream *sync_stream) {
1391
1392 pa_assert(s);
1393 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1394
1395 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1396 }
1397
1398 int pa_stream_connect_record(
1399 pa_stream *s,
1400 const char *dev,
1401 const pa_buffer_attr *attr,
1402 pa_stream_flags_t flags) {
1403
1404 pa_assert(s);
1405 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1406
1407 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1408 }
1409
1410 int pa_stream_begin_write(
1411 pa_stream *s,
1412 void **data,
1413 size_t *nbytes) {
1414
1415 pa_assert(s);
1416 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1417
1418 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1419 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1420 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1421 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1422 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1423
1424 if (*nbytes != (size_t) -1) {
1425 size_t m, fs;
1426
1427 m = pa_mempool_block_size_max(s->context->mempool);
1428 fs = pa_frame_size(&s->sample_spec);
1429
1430 m = (m / fs) * fs;
1431 if (*nbytes > m)
1432 *nbytes = m;
1433 }
1434
1435 if (!s->write_memblock) {
1436 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1437 s->write_data = pa_memblock_acquire(s->write_memblock);
1438 }
1439
1440 *data = s->write_data;
1441 *nbytes = pa_memblock_get_length(s->write_memblock);
1442
1443 return 0;
1444 }
1445
1446 int pa_stream_cancel_write(
1447 pa_stream *s) {
1448
1449 pa_assert(s);
1450 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1451
1452 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1453 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1454 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1455 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1456
1457 pa_assert(s->write_data);
1458
1459 pa_memblock_release(s->write_memblock);
1460 pa_memblock_unref(s->write_memblock);
1461 s->write_memblock = NULL;
1462 s->write_data = NULL;
1463
1464 return 0;
1465 }
1466
1467 int pa_stream_write(
1468 pa_stream *s,
1469 const void *data,
1470 size_t length,
1471 pa_free_cb_t free_cb,
1472 int64_t offset,
1473 pa_seek_mode_t seek) {
1474
1475 pa_assert(s);
1476 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1477 pa_assert(data);
1478
1479 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1480 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1481 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1482 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1483 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1484 PA_CHECK_VALIDITY(s->context,
1485 !s->write_memblock ||
1486 ((data >= s->write_data) &&
1487 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1488 PA_ERR_INVALID);
1489 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1490
1491 if (s->write_memblock) {
1492 pa_memchunk chunk;
1493
1494 /* pa_stream_write_begin() was called before */
1495
1496 pa_memblock_release(s->write_memblock);
1497
1498 chunk.memblock = s->write_memblock;
1499 chunk.index = (const char *) data - (const char *) s->write_data;
1500 chunk.length = length;
1501
1502 s->write_memblock = NULL;
1503 s->write_data = NULL;
1504
1505 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1506 pa_memblock_unref(chunk.memblock);
1507
1508 } else {
1509 pa_seek_mode_t t_seek = seek;
1510 int64_t t_offset = offset;
1511 size_t t_length = length;
1512 const void *t_data = data;
1513
1514 /* pa_stream_write_begin() was not called before */
1515
1516 while (t_length > 0) {
1517 pa_memchunk chunk;
1518
1519 chunk.index = 0;
1520
1521 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1522 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1523 chunk.length = t_length;
1524 } else {
1525 void *d;
1526
1527 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1528 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1529
1530 d = pa_memblock_acquire(chunk.memblock);
1531 memcpy(d, t_data, chunk.length);
1532 pa_memblock_release(chunk.memblock);
1533 }
1534
1535 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1536
1537 t_offset = 0;
1538 t_seek = PA_SEEK_RELATIVE;
1539
1540 t_data = (const uint8_t*) t_data + chunk.length;
1541 t_length -= chunk.length;
1542
1543 pa_memblock_unref(chunk.memblock);
1544 }
1545
1546 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1547 free_cb((void*) data);
1548 }
1549
1550 /* This is obviously wrong since we ignore the seeking index . But
1551 * that's OK, the server side applies the same error */
1552 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1553
1554 #ifdef STREAM_DEBUG
1555 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1556 #endif
1557
1558 if (s->direction == PA_STREAM_PLAYBACK) {
1559
1560 /* Update latency request correction */
1561 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1562
1563 if (seek == PA_SEEK_ABSOLUTE) {
1564 s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1565 s->write_index_corrections[s->current_write_index_correction].absolute = true;
1566 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1567 } else if (seek == PA_SEEK_RELATIVE) {
1568 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1569 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1570 } else
1571 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1572 }
1573
1574 /* Update the write index in the already available latency data */
1575 if (s->timing_info_valid) {
1576
1577 if (seek == PA_SEEK_ABSOLUTE) {
1578 s->timing_info.write_index_corrupt = false;
1579 s->timing_info.write_index = offset + (int64_t) length;
1580 } else if (seek == PA_SEEK_RELATIVE) {
1581 if (!s->timing_info.write_index_corrupt)
1582 s->timing_info.write_index += offset + (int64_t) length;
1583 } else
1584 s->timing_info.write_index_corrupt = true;
1585 }
1586
1587 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1588 request_auto_timing_update(s, true);
1589 }
1590
1591 return 0;
1592 }
1593
1594 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1595 pa_assert(s);
1596 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1597 pa_assert(data);
1598 pa_assert(length);
1599
1600 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1601 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1602 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1603
1604 if (!s->peek_memchunk.memblock) {
1605
1606 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1607 /* record_memblockq is empty. */
1608 *data = NULL;
1609 *length = 0;
1610 return 0;
1611
1612 } else if (!s->peek_memchunk.memblock) {
1613 /* record_memblockq isn't empty, but it doesn't have any data at
1614 * the current read index. */
1615 *data = NULL;
1616 *length = s->peek_memchunk.length;
1617 return 0;
1618 }
1619
1620 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1621 }
1622
1623 pa_assert(s->peek_data);
1624 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1625 *length = s->peek_memchunk.length;
1626 return 0;
1627 }
1628
1629 int pa_stream_drop(pa_stream *s) {
1630 pa_assert(s);
1631 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1632
1633 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1634 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1635 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1636 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1637
1638 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1639
1640 /* Fix the simulated local read index */
1641 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1642 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1643
1644 if (s->peek_memchunk.memblock) {
1645 pa_assert(s->peek_data);
1646 s->peek_data = NULL;
1647 pa_memblock_release(s->peek_memchunk.memblock);
1648 pa_memblock_unref(s->peek_memchunk.memblock);
1649 }
1650
1651 pa_memchunk_reset(&s->peek_memchunk);
1652
1653 return 0;
1654 }
1655
1656 size_t pa_stream_writable_size(pa_stream *s) {
1657 pa_assert(s);
1658 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1659
1660 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1661 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1662 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1663
1664 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1665 }
1666
1667 size_t pa_stream_readable_size(pa_stream *s) {
1668 pa_assert(s);
1669 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1670
1671 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1672 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1673 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1674
1675 return pa_memblockq_get_length(s->record_memblockq);
1676 }
1677
1678 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1679 pa_operation *o;
1680 pa_tagstruct *t;
1681 uint32_t tag;
1682
1683 pa_assert(s);
1684 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1685
1686 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1687 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1688 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1689
1690 /* Ask for a timing update before we cork/uncork to get the best
1691 * accuracy for the transport latency suitable for the
1692 * check_smoother_status() call in the started callback */
1693 request_auto_timing_update(s, true);
1694
1695 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1696
1697 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1698 pa_tagstruct_putu32(t, s->channel);
1699 pa_pstream_send_tagstruct(s->context->pstream, t);
1700 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1701
1702 /* This might cause the read index to continue again, hence
1703 * let's request a timing update */
1704 request_auto_timing_update(s, true);
1705
1706 return o;
1707 }
1708
1709 static pa_usec_t calc_time(pa_stream *s, bool ignore_transport) {
1710 pa_usec_t usec;
1711
1712 pa_assert(s);
1713 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1714 pa_assert(s->state == PA_STREAM_READY);
1715 pa_assert(s->direction != PA_STREAM_UPLOAD);
1716 pa_assert(s->timing_info_valid);
1717 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1718 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1719
1720 if (s->direction == PA_STREAM_PLAYBACK) {
1721 /* The last byte that was written into the output device
1722 * had this time value associated */
1723 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1724
1725 if (!s->corked && !s->suspended) {
1726
1727 if (!ignore_transport)
1728 /* Because the latency info took a little time to come
1729 * to us, we assume that the real output time is actually
1730 * a little ahead */
1731 usec += s->timing_info.transport_usec;
1732
1733 /* However, the output device usually maintains a buffer
1734 too, hence the real sample currently played is a little
1735 back */
1736 if (s->timing_info.sink_usec >= usec)
1737 usec = 0;
1738 else
1739 usec -= s->timing_info.sink_usec;
1740 }
1741
1742 } else {
1743 pa_assert(s->direction == PA_STREAM_RECORD);
1744
1745 /* The last byte written into the server side queue had
1746 * this time value associated */
1747 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1748
1749 if (!s->corked && !s->suspended) {
1750
1751 if (!ignore_transport)
1752 /* Add transport latency */
1753 usec += s->timing_info.transport_usec;
1754
1755 /* Add latency of data in device buffer */
1756 usec += s->timing_info.source_usec;
1757
1758 /* If this is a monitor source, we need to correct the
1759 * time by the playback device buffer */
1760 if (s->timing_info.sink_usec >= usec)
1761 usec = 0;
1762 else
1763 usec -= s->timing_info.sink_usec;
1764 }
1765 }
1766
1767 return usec;
1768 }
1769
1770 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1771 pa_operation *o = userdata;
1772 struct timeval local, remote, now;
1773 pa_timing_info *i;
1774 bool playing = false;
1775 uint64_t underrun_for = 0, playing_for = 0;
1776
1777 pa_assert(pd);
1778 pa_assert(o);
1779 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1780
1781 if (!o->context || !o->stream)
1782 goto finish;
1783
1784 i = &o->stream->timing_info;
1785
1786 o->stream->timing_info_valid = false;
1787 i->write_index_corrupt = true;
1788 i->read_index_corrupt = true;
1789
1790 if (command != PA_COMMAND_REPLY) {
1791 if (pa_context_handle_error(o->context, command, t, false) < 0)
1792 goto finish;
1793
1794 } else {
1795
1796 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1797 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1798 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1799 pa_tagstruct_get_timeval(t, &local) < 0 ||
1800 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1801 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1802 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1803
1804 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1805 goto finish;
1806 }
1807
1808 if (o->context->version >= 13 &&
1809 o->stream->direction == PA_STREAM_PLAYBACK)
1810 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1811 pa_tagstruct_getu64(t, &playing_for) < 0) {
1812
1813 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1814 goto finish;
1815 }
1816
1817 if (!pa_tagstruct_eof(t)) {
1818 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1819 goto finish;
1820 }
1821 o->stream->timing_info_valid = true;
1822 i->write_index_corrupt = false;
1823 i->read_index_corrupt = false;
1824
1825 i->playing = (int) playing;
1826 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1827
1828 pa_gettimeofday(&now);
1829
1830 /* Calculate timestamps */
1831 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1832 /* local and remote seem to have synchronized clocks */
1833
1834 if (o->stream->direction == PA_STREAM_PLAYBACK)
1835 i->transport_usec = pa_timeval_diff(&remote, &local);
1836 else
1837 i->transport_usec = pa_timeval_diff(&now, &remote);
1838
1839 i->synchronized_clocks = true;
1840 i->timestamp = remote;
1841 } else {
1842 /* clocks are not synchronized, let's estimate latency then */
1843 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1844 i->synchronized_clocks = false;
1845 i->timestamp = local;
1846 pa_timeval_add(&i->timestamp, i->transport_usec);
1847 }
1848
1849 /* Invalidate read and write indexes if necessary */
1850 if (tag < o->stream->read_index_not_before)
1851 i->read_index_corrupt = true;
1852
1853 if (tag < o->stream->write_index_not_before)
1854 i->write_index_corrupt = true;
1855
1856 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1857 /* Write index correction */
1858
1859 int n, j;
1860 uint32_t ctag = tag;
1861
1862 /* Go through the saved correction values and add up the
1863 * total correction.*/
1864 for (n = 0, j = o->stream->current_write_index_correction+1;
1865 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1866 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1867
1868 /* Step over invalid data or out-of-date data */
1869 if (!o->stream->write_index_corrections[j].valid ||
1870 o->stream->write_index_corrections[j].tag < ctag)
1871 continue;
1872
1873 /* Make sure that everything is in order */
1874 ctag = o->stream->write_index_corrections[j].tag+1;
1875
1876 /* Now fix the write index */
1877 if (o->stream->write_index_corrections[j].corrupt) {
1878 /* A corrupting seek was made */
1879 i->write_index_corrupt = true;
1880 } else if (o->stream->write_index_corrections[j].absolute) {
1881 /* An absolute seek was made */
1882 i->write_index = o->stream->write_index_corrections[j].value;
1883 i->write_index_corrupt = false;
1884 } else if (!i->write_index_corrupt) {
1885 /* A relative seek was made */
1886 i->write_index += o->stream->write_index_corrections[j].value;
1887 }
1888 }
1889
1890 /* Clear old correction entries */
1891 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1892 if (!o->stream->write_index_corrections[n].valid)
1893 continue;
1894
1895 if (o->stream->write_index_corrections[n].tag <= tag)
1896 o->stream->write_index_corrections[n].valid = false;
1897 }
1898 }
1899
1900 if (o->stream->direction == PA_STREAM_RECORD) {
1901 /* Read index correction */
1902
1903 if (!i->read_index_corrupt)
1904 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1905 }
1906
1907 /* Update smoother if we're not corked */
1908 if (o->stream->smoother && !o->stream->corked) {
1909 pa_usec_t u, x;
1910
1911 u = x = pa_rtclock_now() - i->transport_usec;
1912
1913 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1914 pa_usec_t su;
1915
1916 /* If we weren't playing then it will take some time
1917 * until the audio will actually come out through the
1918 * speakers. Since we follow that timing here, we need
1919 * to try to fix this up */
1920
1921 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1922
1923 if (su < i->sink_usec)
1924 x += i->sink_usec - su;
1925 }
1926
1927 if (!i->playing)
1928 pa_smoother_pause(o->stream->smoother, x);
1929
1930 /* Update the smoother */
1931 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1932 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1933 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1934
1935 if (i->playing)
1936 pa_smoother_resume(o->stream->smoother, x, true);
1937 }
1938 }
1939
1940 o->stream->auto_timing_update_requested = false;
1941
1942 if (o->stream->latency_update_callback)
1943 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1944
1945 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1946 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1947 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1948 }
1949
1950 finish:
1951
1952 pa_operation_done(o);
1953 pa_operation_unref(o);
1954 }
1955
1956 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1957 uint32_t tag;
1958 pa_operation *o;
1959 pa_tagstruct *t;
1960 struct timeval now;
1961 int cidx = 0;
1962
1963 pa_assert(s);
1964 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1965
1966 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1967 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1968 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1969
1970 if (s->direction == PA_STREAM_PLAYBACK) {
1971 /* Find a place to store the write_index correction data for this entry */
1972 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1973
1974 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1975 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1976 }
1977 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1978
1979 t = pa_tagstruct_command(
1980 s->context,
1981 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1982 &tag);
1983 pa_tagstruct_putu32(t, s->channel);
1984 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1985
1986 pa_pstream_send_tagstruct(s->context->pstream, t);
1987 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1988
1989 if (s->direction == PA_STREAM_PLAYBACK) {
1990 /* Fill in initial correction data */
1991
1992 s->current_write_index_correction = cidx;
1993
1994 s->write_index_corrections[cidx].valid = true;
1995 s->write_index_corrections[cidx].absolute = false;
1996 s->write_index_corrections[cidx].corrupt = false;
1997 s->write_index_corrections[cidx].tag = tag;
1998 s->write_index_corrections[cidx].value = 0;
1999 }
2000
2001 return o;
2002 }
2003
2004 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2005 pa_stream *s = userdata;
2006
2007 pa_assert(pd);
2008 pa_assert(s);
2009 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2010
2011 pa_stream_ref(s);
2012
2013 if (command != PA_COMMAND_REPLY) {
2014 if (pa_context_handle_error(s->context, command, t, false) < 0)
2015 goto finish;
2016
2017 pa_stream_set_state(s, PA_STREAM_FAILED);
2018 goto finish;
2019 } else if (!pa_tagstruct_eof(t)) {
2020 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2021 goto finish;
2022 }
2023
2024 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2025
2026 finish:
2027 pa_stream_unref(s);
2028 }
2029
2030 int pa_stream_disconnect(pa_stream *s) {
2031 pa_tagstruct *t;
2032 uint32_t tag;
2033
2034 pa_assert(s);
2035 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2036
2037 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2038 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2039 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2040
2041 pa_stream_ref(s);
2042
2043 t = pa_tagstruct_command(
2044 s->context,
2045 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2046 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2047 &tag);
2048 pa_tagstruct_putu32(t, s->channel);
2049 pa_pstream_send_tagstruct(s->context->pstream, t);
2050 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2051
2052 pa_stream_unref(s);
2053 return 0;
2054 }
2055
2056 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2057 pa_assert(s);
2058 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2059
2060 if (pa_detect_fork())
2061 return;
2062
2063 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2064 return;
2065
2066 s->read_callback = cb;
2067 s->read_userdata = userdata;
2068 }
2069
2070 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2071 pa_assert(s);
2072 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2073
2074 if (pa_detect_fork())
2075 return;
2076
2077 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2078 return;
2079
2080 s->write_callback = cb;
2081 s->write_userdata = userdata;
2082 }
2083
2084 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2085 pa_assert(s);
2086 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2087
2088 if (pa_detect_fork())
2089 return;
2090
2091 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2092 return;
2093
2094 s->state_callback = cb;
2095 s->state_userdata = userdata;
2096 }
2097
2098 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2099 pa_assert(s);
2100 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2101
2102 if (pa_detect_fork())
2103 return;
2104
2105 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2106 return;
2107
2108 s->overflow_callback = cb;
2109 s->overflow_userdata = userdata;
2110 }
2111
2112 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2113 pa_assert(s);
2114 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2115
2116 if (pa_detect_fork())
2117 return;
2118
2119 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2120 return;
2121
2122 s->underflow_callback = cb;
2123 s->underflow_userdata = userdata;
2124 }
2125
2126 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2127 pa_assert(s);
2128 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2129
2130 if (pa_detect_fork())
2131 return;
2132
2133 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2134 return;
2135
2136 s->latency_update_callback = cb;
2137 s->latency_update_userdata = userdata;
2138 }
2139
2140 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2141 pa_assert(s);
2142 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2143
2144 if (pa_detect_fork())
2145 return;
2146
2147 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2148 return;
2149
2150 s->moved_callback = cb;
2151 s->moved_userdata = userdata;
2152 }
2153
2154 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2155 pa_assert(s);
2156 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2157
2158 if (pa_detect_fork())
2159 return;
2160
2161 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2162 return;
2163
2164 s->suspended_callback = cb;
2165 s->suspended_userdata = userdata;
2166 }
2167
2168 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2169 pa_assert(s);
2170 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2171
2172 if (pa_detect_fork())
2173 return;
2174
2175 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2176 return;
2177
2178 s->started_callback = cb;
2179 s->started_userdata = userdata;
2180 }
2181
2182 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2183 pa_assert(s);
2184 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2185
2186 if (pa_detect_fork())
2187 return;
2188
2189 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2190 return;
2191
2192 s->event_callback = cb;
2193 s->event_userdata = userdata;
2194 }
2195
2196 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2197 pa_assert(s);
2198 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2199
2200 if (pa_detect_fork())
2201 return;
2202
2203 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2204 return;
2205
2206 s->buffer_attr_callback = cb;
2207 s->buffer_attr_userdata = userdata;
2208 }
2209
2210 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2211 pa_operation *o = userdata;
2212 int success = 1;
2213
2214 pa_assert(pd);
2215 pa_assert(o);
2216 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2217
2218 if (!o->context)
2219 goto finish;
2220
2221 if (command != PA_COMMAND_REPLY) {
2222 if (pa_context_handle_error(o->context, command, t, false) < 0)
2223 goto finish;
2224
2225 success = 0;
2226 } else if (!pa_tagstruct_eof(t)) {
2227 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2228 goto finish;
2229 }
2230
2231 if (o->callback) {
2232 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2233 cb(o->stream, success, o->userdata);
2234 }
2235
2236 finish:
2237 pa_operation_done(o);
2238 pa_operation_unref(o);
2239 }
2240
2241 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2242 pa_operation *o;
2243 pa_tagstruct *t;
2244 uint32_t tag;
2245
2246 pa_assert(s);
2247 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2248
2249 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2250 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2251 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2252
2253 /* Ask for a timing update before we cork/uncork to get the best
2254 * accuracy for the transport latency suitable for the
2255 * check_smoother_status() call in the started callback */
2256 request_auto_timing_update(s, true);
2257
2258 s->corked = b;
2259
2260 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2261
2262 t = pa_tagstruct_command(
2263 s->context,
2264 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2265 &tag);
2266 pa_tagstruct_putu32(t, s->channel);
2267 pa_tagstruct_put_boolean(t, !!b);
2268 pa_pstream_send_tagstruct(s->context->pstream, t);
2269 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2270
2271 check_smoother_status(s, false, false, false);
2272
2273 /* This might cause the indexes to hang/start again, hence let's
2274 * request a timing update, after the cork/uncork, too */
2275 request_auto_timing_update(s, true);
2276
2277 return o;
2278 }
2279
2280 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2281 pa_tagstruct *t;
2282 pa_operation *o;
2283 uint32_t tag;
2284
2285 pa_assert(s);
2286 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2287
2288 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2289 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2290
2291 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2292
2293 t = pa_tagstruct_command(s->context, command, &tag);
2294 pa_tagstruct_putu32(t, s->channel);
2295 pa_pstream_send_tagstruct(s->context->pstream, t);
2296 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2297
2298 return o;
2299 }
2300
2301 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2302 pa_operation *o;
2303
2304 pa_assert(s);
2305 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2306
2307 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2308 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2309 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2310
2311 /* Ask for a timing update *before* the flush, so that the
2312 * transport usec is as up to date as possible when we get the
2313 * underflow message and update the smoother status*/
2314 request_auto_timing_update(s, true);
2315
2316 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2317 return NULL;
2318
2319 if (s->direction == PA_STREAM_PLAYBACK) {
2320
2321 if (s->write_index_corrections[s->current_write_index_correction].valid)
2322 s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2323
2324 if (s->buffer_attr.prebuf > 0)
2325 check_smoother_status(s, false, false, true);
2326
2327 /* This will change the write index, but leave the
2328 * read index untouched. */
2329 invalidate_indexes(s, false, true);
2330
2331 } else
2332 /* For record streams this has no influence on the write
2333 * index, but the read index might jump. */
2334 invalidate_indexes(s, true, false);
2335
2336 /* Note that we do not update requested_bytes here. This is
2337 * because we cannot really know how data actually was dropped
2338 * from the write index due to this. This 'error' will be applied
2339 * by both client and server and hence we should be fine. */
2340
2341 return o;
2342 }
2343
2344 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2345 pa_operation *o;
2346
2347 pa_assert(s);
2348 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2349
2350 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2351 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2354
2355 /* Ask for a timing update before we cork/uncork to get the best
2356 * accuracy for the transport latency suitable for the
2357 * check_smoother_status() call in the started callback */
2358 request_auto_timing_update(s, true);
2359
2360 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2361 return NULL;
2362
2363 /* This might cause the read index to hang again, hence
2364 * let's request a timing update */
2365 request_auto_timing_update(s, true);
2366
2367 return o;
2368 }
2369
2370 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2371 pa_operation *o;
2372
2373 pa_assert(s);
2374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2375
2376 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2377 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2378 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2379 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2380
2381 /* Ask for a timing update before we cork/uncork to get the best
2382 * accuracy for the transport latency suitable for the
2383 * check_smoother_status() call in the started callback */
2384 request_auto_timing_update(s, true);
2385
2386 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2387 return NULL;
2388
2389 /* This might cause the read index to start moving again, hence
2390 * let's request a timing update */
2391 request_auto_timing_update(s, true);
2392
2393 return o;
2394 }
2395
2396 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2397 pa_operation *o;
2398
2399 pa_assert(s);
2400 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2401 pa_assert(name);
2402
2403 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2404 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2405 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2406
2407 if (s->context->version >= 13) {
2408 pa_proplist *p = pa_proplist_new();
2409
2410 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2411 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2412 pa_proplist_free(p);
2413 } else {
2414 pa_tagstruct *t;
2415 uint32_t tag;
2416
2417 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2418 t = pa_tagstruct_command(
2419 s->context,
2420 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2421 &tag);
2422 pa_tagstruct_putu32(t, s->channel);
2423 pa_tagstruct_puts(t, name);
2424 pa_pstream_send_tagstruct(s->context->pstream, t);
2425 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2426 }
2427
2428 return o;
2429 }
2430
2431 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2432 pa_usec_t usec;
2433
2434 pa_assert(s);
2435 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2436
2437 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2438 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2439 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2440 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2441 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2442 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2443
2444 if (s->smoother)
2445 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2446 else
2447 usec = calc_time(s, false);
2448
2449 /* Make sure the time runs monotonically */
2450 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2451 if (usec < s->previous_time)
2452 usec = s->previous_time;
2453 else
2454 s->previous_time = usec;
2455 }
2456
2457 if (r_usec)
2458 *r_usec = usec;
2459
2460 return 0;
2461 }
2462
2463 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2464 pa_assert(s);
2465 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2466
2467 if (negative)
2468 *negative = 0;
2469
2470 if (a >= b)
2471 return a-b;
2472 else {
2473 if (negative && s->direction == PA_STREAM_RECORD) {
2474 *negative = 1;
2475 return b-a;
2476 } else
2477 return 0;
2478 }
2479 }
2480
2481 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2482 pa_usec_t t, c;
2483 int r;
2484 int64_t cindex;
2485
2486 pa_assert(s);
2487 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2488 pa_assert(r_usec);
2489
2490 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2491 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2492 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2493 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2494 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2495 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2496
2497 if ((r = pa_stream_get_time(s, &t)) < 0)
2498 return r;
2499
2500 if (s->direction == PA_STREAM_PLAYBACK)
2501 cindex = s->timing_info.write_index;
2502 else
2503 cindex = s->timing_info.read_index;
2504
2505 if (cindex < 0)
2506 cindex = 0;
2507
2508 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2509
2510 if (s->direction == PA_STREAM_PLAYBACK)
2511 *r_usec = time_counter_diff(s, c, t, negative);
2512 else
2513 *r_usec = time_counter_diff(s, t, c, negative);
2514
2515 return 0;
2516 }
2517
2518 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2519 pa_assert(s);
2520 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2521
2522 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2523 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2524 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2525 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2526
2527 return &s->timing_info;
2528 }
2529
2530 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2531 pa_assert(s);
2532 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2533
2534 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2535
2536 return &s->sample_spec;
2537 }
2538
2539 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2540 pa_assert(s);
2541 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2542
2543 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2544
2545 return &s->channel_map;
2546 }
2547
2548 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2549 pa_assert(s);
2550 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2551
2552 /* We don't have the format till routing is done */
2553 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2554 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2555
2556 return s->format;
2557 }
2558 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2559 pa_assert(s);
2560 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2561
2562 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2563 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2564 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2565
2566 return &s->buffer_attr;
2567 }
2568
2569 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2570 pa_operation *o = userdata;
2571 int success = 1;
2572
2573 pa_assert(pd);
2574 pa_assert(o);
2575 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2576
2577 if (!o->context)
2578 goto finish;
2579
2580 if (command != PA_COMMAND_REPLY) {
2581 if (pa_context_handle_error(o->context, command, t, false) < 0)
2582 goto finish;
2583
2584 success = 0;
2585 } else {
2586 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2587 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2588 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2589 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2590 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2591 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2592 goto finish;
2593 }
2594 } else if (o->stream->direction == PA_STREAM_RECORD) {
2595 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2596 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2597 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2598 goto finish;
2599 }
2600 }
2601
2602 if (o->stream->context->version >= 13) {
2603 pa_usec_t usec;
2604
2605 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2606 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2607 goto finish;
2608 }
2609
2610 if (o->stream->direction == PA_STREAM_RECORD)
2611 o->stream->timing_info.configured_source_usec = usec;
2612 else
2613 o->stream->timing_info.configured_sink_usec = usec;
2614 }
2615
2616 if (!pa_tagstruct_eof(t)) {
2617 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2618 goto finish;
2619 }
2620 }
2621
2622 if (o->callback) {
2623 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2624 cb(o->stream, success, o->userdata);
2625 }
2626
2627 finish:
2628 pa_operation_done(o);
2629 pa_operation_unref(o);
2630 }
2631
2632 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2633 pa_operation *o;
2634 pa_tagstruct *t;
2635 uint32_t tag;
2636 pa_buffer_attr copy;
2637
2638 pa_assert(s);
2639 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2640 pa_assert(attr);
2641
2642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2643 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2644 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2645 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2646
2647 /* Ask for a timing update before we cork/uncork to get the best
2648 * accuracy for the transport latency suitable for the
2649 * check_smoother_status() call in the started callback */
2650 request_auto_timing_update(s, true);
2651
2652 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2653
2654 t = pa_tagstruct_command(
2655 s->context,
2656 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2657 &tag);
2658 pa_tagstruct_putu32(t, s->channel);
2659
2660 copy = *attr;
2661 patch_buffer_attr(s, &copy, NULL);
2662 attr = &copy;
2663
2664 pa_tagstruct_putu32(t, attr->maxlength);
2665
2666 if (s->direction == PA_STREAM_PLAYBACK)
2667 pa_tagstruct_put(
2668 t,
2669 PA_TAG_U32, attr->tlength,
2670 PA_TAG_U32, attr->prebuf,
2671 PA_TAG_U32, attr->minreq,
2672 PA_TAG_INVALID);
2673 else
2674 pa_tagstruct_putu32(t, attr->fragsize);
2675
2676 if (s->context->version >= 13)
2677 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2678
2679 if (s->context->version >= 14)
2680 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2681
2682 pa_pstream_send_tagstruct(s->context->pstream, t);
2683 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2684
2685 /* This might cause changes in the read/write index, hence let's
2686 * request a timing update */
2687 request_auto_timing_update(s, true);
2688
2689 return o;
2690 }
2691
2692 uint32_t pa_stream_get_device_index(pa_stream *s) {
2693 pa_assert(s);
2694 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2695
2696 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2697 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2698 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2699 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2700 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2701
2702 return s->device_index;
2703 }
2704
2705 const char *pa_stream_get_device_name(pa_stream *s) {
2706 pa_assert(s);
2707 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2708
2709 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2710 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2711 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2712 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2714
2715 return s->device_name;
2716 }
2717
2718 int pa_stream_is_suspended(pa_stream *s) {
2719 pa_assert(s);
2720 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2721
2722 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2723 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2724 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2725 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2726
2727 return s->suspended;
2728 }
2729
2730 int pa_stream_is_corked(pa_stream *s) {
2731 pa_assert(s);
2732 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2733
2734 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2735 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2736 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2737
2738 return s->corked;
2739 }
2740
2741 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2742 pa_operation *o = userdata;
2743 int success = 1;
2744
2745 pa_assert(pd);
2746 pa_assert(o);
2747 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2748
2749 if (!o->context)
2750 goto finish;
2751
2752 if (command != PA_COMMAND_REPLY) {
2753 if (pa_context_handle_error(o->context, command, t, false) < 0)
2754 goto finish;
2755
2756 success = 0;
2757 } else {
2758
2759 if (!pa_tagstruct_eof(t)) {
2760 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2761 goto finish;
2762 }
2763 }
2764
2765 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2766 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2767
2768 if (o->callback) {
2769 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2770 cb(o->stream, success, o->userdata);
2771 }
2772
2773 finish:
2774 pa_operation_done(o);
2775 pa_operation_unref(o);
2776 }
2777
2778 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2779 pa_operation *o;
2780 pa_tagstruct *t;
2781 uint32_t tag;
2782
2783 pa_assert(s);
2784 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2785
2786 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2787 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2788 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2789 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2790 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2791 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2792
2793 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2794 o->private = PA_UINT_TO_PTR(rate);
2795
2796 t = pa_tagstruct_command(
2797 s->context,
2798 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2799 &tag);
2800 pa_tagstruct_putu32(t, s->channel);
2801 pa_tagstruct_putu32(t, rate);
2802
2803 pa_pstream_send_tagstruct(s->context->pstream, t);
2804 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2805
2806 return o;
2807 }
2808
2809 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2810 pa_operation *o;
2811 pa_tagstruct *t;
2812 uint32_t tag;
2813
2814 pa_assert(s);
2815 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2816
2817 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2818 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2819 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2820 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2821 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2822
2823 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2824
2825 t = pa_tagstruct_command(
2826 s->context,
2827 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2828 &tag);
2829 pa_tagstruct_putu32(t, s->channel);
2830 pa_tagstruct_putu32(t, (uint32_t) mode);
2831 pa_tagstruct_put_proplist(t, p);
2832
2833 pa_pstream_send_tagstruct(s->context->pstream, t);
2834 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2835
2836 /* Please note that we don't update s->proplist here, because we
2837 * don't export that field */
2838
2839 return o;
2840 }
2841
2842 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2843 pa_operation *o;
2844 pa_tagstruct *t;
2845 uint32_t tag;
2846 const char * const*k;
2847
2848 pa_assert(s);
2849 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2850
2851 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2852 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2853 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2854 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2855 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2856
2857 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2858
2859 t = pa_tagstruct_command(
2860 s->context,
2861 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2862 &tag);
2863 pa_tagstruct_putu32(t, s->channel);
2864
2865 for (k = keys; *k; k++)
2866 pa_tagstruct_puts(t, *k);
2867
2868 pa_tagstruct_puts(t, NULL);
2869
2870 pa_pstream_send_tagstruct(s->context->pstream, t);
2871 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2872
2873 /* Please note that we don't update s->proplist here, because we
2874 * don't export that field */
2875
2876 return o;
2877 }
2878
2879 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2880 pa_assert(s);
2881 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2882
2883 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2884 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2885 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2886 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2887
2888 s->direct_on_input = sink_input_idx;
2889
2890 return 0;
2891 }
2892
2893 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2894 pa_assert(s);
2895 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2896
2897 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2898 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2899 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2900
2901 return s->direct_on_input;
2902 }