]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
protocol-native: Allow clients to know at what index underrun occurred
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
90 pa_proplist *p) {
91
92 pa_stream *s;
93 unsigned int i;
94
95 pa_assert(c);
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
99
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103 s = pa_xnew(pa_stream, 1);
104 PA_REFCNT_INIT(s);
105 s->context = c;
106 s->mainloop = c->mainloop;
107
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
110 s->flags = 0;
111
112 if (ss)
113 s->sample_spec = *ss;
114 else
115 s->sample_spec.format = PA_SAMPLE_INVALID;
116
117 if (map)
118 s->channel_map = *map;
119 else
120 pa_channel_map_init(&s->channel_map);
121
122 s->n_formats = 0;
123 if (formats) {
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
127 }
128
129 /* We'll get the final negotiated format after connecting */
130 s->format = NULL;
131
132 s->direct_on_input = PA_INVALID_INDEX;
133
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135 if (name)
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138 s->channel = 0;
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
142
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146 /* We initialize der target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
149
150 s->buffer_attr.maxlength = (uint32_t) -1;
151 if (ss)
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153 else {
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
158 .rate = 48000,
159 .channels = 2,
160 };
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162 }
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
166
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
170 s->corked = FALSE;
171
172 s->write_memblock = NULL;
173 s->write_data = NULL;
174
175 pa_memchunk_reset(&s->peek_memchunk);
176 s->peek_data = NULL;
177 s->record_memblockq = NULL;
178
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
181
182 s->previous_time = 0;
183 s->latest_underrun_at_index = -1;
184
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
190
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195 reset_callbacks(s);
196
197 s->smoother = NULL;
198
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
201 pa_stream_ref(s);
202
203 return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207 pa_context *c,
208 const char *name,
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
211 pa_proplist *p) {
212
213 pa_channel_map tmap;
214
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221 if (!map)
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228 pa_context *c,
229 const char *name,
230 pa_format_info * const *formats,
231 unsigned int n_formats,
232 pa_proplist *p) {
233
234 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
235
236 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
237 }
238
239 static void stream_unlink(pa_stream *s) {
240 pa_operation *o, *n;
241 pa_assert(s);
242
243 if (!s->context)
244 return;
245
246 /* Detach from context */
247
248 /* Unref all operatio object that point to us */
249 for (o = s->context->operations; o; o = n) {
250 n = o->next;
251
252 if (o->stream == s)
253 pa_operation_cancel(o);
254 }
255
256 /* Drop all outstanding replies for this stream */
257 if (s->context->pdispatch)
258 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
259
260 if (s->channel_valid) {
261 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
262 s->channel = 0;
263 s->channel_valid = FALSE;
264 }
265
266 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
267 pa_stream_unref(s);
268
269 s->context = NULL;
270
271 if (s->auto_timing_update_event) {
272 pa_assert(s->mainloop);
273 s->mainloop->time_free(s->auto_timing_update_event);
274 }
275
276 reset_callbacks(s);
277 }
278
279 static void stream_free(pa_stream *s) {
280 unsigned int i;
281
282 pa_assert(s);
283
284 stream_unlink(s);
285
286 if (s->write_memblock) {
287 pa_memblock_release(s->write_memblock);
288 pa_memblock_unref(s->write_data);
289 }
290
291 if (s->peek_memchunk.memblock) {
292 if (s->peek_data)
293 pa_memblock_release(s->peek_memchunk.memblock);
294 pa_memblock_unref(s->peek_memchunk.memblock);
295 }
296
297 if (s->record_memblockq)
298 pa_memblockq_free(s->record_memblockq);
299
300 if (s->proplist)
301 pa_proplist_free(s->proplist);
302
303 if (s->smoother)
304 pa_smoother_free(s->smoother);
305
306 for (i = 0; i < s->n_formats; i++)
307 pa_format_info_free(s->req_formats[i]);
308
309 if (s->format)
310 pa_format_info_free(s->format);
311
312 pa_xfree(s->device_name);
313 pa_xfree(s);
314 }
315
316 void pa_stream_unref(pa_stream *s) {
317 pa_assert(s);
318 pa_assert(PA_REFCNT_VALUE(s) >= 1);
319
320 if (PA_REFCNT_DEC(s) <= 0)
321 stream_free(s);
322 }
323
324 pa_stream* pa_stream_ref(pa_stream *s) {
325 pa_assert(s);
326 pa_assert(PA_REFCNT_VALUE(s) >= 1);
327
328 PA_REFCNT_INC(s);
329 return s;
330 }
331
332 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
333 pa_assert(s);
334 pa_assert(PA_REFCNT_VALUE(s) >= 1);
335
336 return s->state;
337 }
338
339 pa_context* pa_stream_get_context(pa_stream *s) {
340 pa_assert(s);
341 pa_assert(PA_REFCNT_VALUE(s) >= 1);
342
343 return s->context;
344 }
345
346 uint32_t pa_stream_get_index(pa_stream *s) {
347 pa_assert(s);
348 pa_assert(PA_REFCNT_VALUE(s) >= 1);
349
350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
351 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
352
353 return s->stream_index;
354 }
355
356 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
357 pa_assert(s);
358 pa_assert(PA_REFCNT_VALUE(s) >= 1);
359
360 if (s->state == st)
361 return;
362
363 pa_stream_ref(s);
364
365 s->state = st;
366
367 if (s->state_callback)
368 s->state_callback(s, s->state_userdata);
369
370 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
371 stream_unlink(s);
372
373 pa_stream_unref(s);
374 }
375
376 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
377 pa_assert(s);
378 pa_assert(PA_REFCNT_VALUE(s) >= 1);
379
380 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
381 return;
382
383 if (s->state == PA_STREAM_READY &&
384 (force || !s->auto_timing_update_requested)) {
385 pa_operation *o;
386
387 /* pa_log("Automatically requesting new timing data"); */
388
389 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
390 pa_operation_unref(o);
391 s->auto_timing_update_requested = TRUE;
392 }
393 }
394
395 if (s->auto_timing_update_event) {
396 if (s->suspended && !force) {
397 pa_assert(s->mainloop);
398 s->mainloop->time_free(s->auto_timing_update_event);
399 s->auto_timing_update_event = NULL;
400 } else {
401 if (force)
402 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
403
404 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
405
406 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
407 }
408 }
409 }
410
411 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
412 pa_context *c = userdata;
413 pa_stream *s;
414 uint32_t channel;
415
416 pa_assert(pd);
417 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
418 pa_assert(t);
419 pa_assert(c);
420 pa_assert(PA_REFCNT_VALUE(c) >= 1);
421
422 pa_context_ref(c);
423
424 if (pa_tagstruct_getu32(t, &channel) < 0 ||
425 !pa_tagstruct_eof(t)) {
426 pa_context_fail(c, PA_ERR_PROTOCOL);
427 goto finish;
428 }
429
430 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
431 goto finish;
432
433 if (s->state != PA_STREAM_READY)
434 goto finish;
435
436 pa_context_set_error(c, PA_ERR_KILLED);
437 pa_stream_set_state(s, PA_STREAM_FAILED);
438
439 finish:
440 pa_context_unref(c);
441 }
442
443 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
444 pa_usec_t x;
445
446 pa_assert(s);
447 pa_assert(!force_start || !force_stop);
448
449 if (!s->smoother)
450 return;
451
452 x = pa_rtclock_now();
453
454 if (s->timing_info_valid) {
455 if (aposteriori)
456 x -= s->timing_info.transport_usec;
457 else
458 x += s->timing_info.transport_usec;
459 }
460
461 if (s->suspended || s->corked || force_stop)
462 pa_smoother_pause(s->smoother, x);
463 else if (force_start || s->buffer_attr.prebuf == 0) {
464
465 if (!s->timing_info_valid &&
466 !aposteriori &&
467 !force_start &&
468 !force_stop &&
469 s->context->version >= 13) {
470
471 /* If the server supports STARTED events we take them as
472 * indications when audio really starts/stops playing, if
473 * we don't have any timing info yet -- instead of trying
474 * to be smart and guessing the server time. Otherwise the
475 * unknown transport delay add too much noise to our time
476 * calculations. */
477
478 return;
479 }
480
481 pa_smoother_resume(s->smoother, x, TRUE);
482 }
483
484 /* Please note that we have no idea if playback actually started
485 * if prebuf is non-zero! */
486 }
487
488 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
489
490 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
491 pa_context *c = userdata;
492 pa_stream *s;
493 uint32_t channel;
494 const char *dn;
495 pa_bool_t suspended;
496 uint32_t di;
497 pa_usec_t usec = 0;
498 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
499
500 pa_assert(pd);
501 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
502 pa_assert(t);
503 pa_assert(c);
504 pa_assert(PA_REFCNT_VALUE(c) >= 1);
505
506 pa_context_ref(c);
507
508 if (c->version < 12) {
509 pa_context_fail(c, PA_ERR_PROTOCOL);
510 goto finish;
511 }
512
513 if (pa_tagstruct_getu32(t, &channel) < 0 ||
514 pa_tagstruct_getu32(t, &di) < 0 ||
515 pa_tagstruct_gets(t, &dn) < 0 ||
516 pa_tagstruct_get_boolean(t, &suspended) < 0) {
517 pa_context_fail(c, PA_ERR_PROTOCOL);
518 goto finish;
519 }
520
521 if (c->version >= 13) {
522
523 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
524 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
525 pa_tagstruct_getu32(t, &fragsize) < 0 ||
526 pa_tagstruct_get_usec(t, &usec) < 0) {
527 pa_context_fail(c, PA_ERR_PROTOCOL);
528 goto finish;
529 }
530 } else {
531 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
532 pa_tagstruct_getu32(t, &tlength) < 0 ||
533 pa_tagstruct_getu32(t, &prebuf) < 0 ||
534 pa_tagstruct_getu32(t, &minreq) < 0 ||
535 pa_tagstruct_get_usec(t, &usec) < 0) {
536 pa_context_fail(c, PA_ERR_PROTOCOL);
537 goto finish;
538 }
539 }
540 }
541
542 if (!pa_tagstruct_eof(t)) {
543 pa_context_fail(c, PA_ERR_PROTOCOL);
544 goto finish;
545 }
546
547 if (!dn || di == PA_INVALID_INDEX) {
548 pa_context_fail(c, PA_ERR_PROTOCOL);
549 goto finish;
550 }
551
552 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
553 goto finish;
554
555 if (s->state != PA_STREAM_READY)
556 goto finish;
557
558 if (c->version >= 13) {
559 if (s->direction == PA_STREAM_RECORD)
560 s->timing_info.configured_source_usec = usec;
561 else
562 s->timing_info.configured_sink_usec = usec;
563
564 s->buffer_attr.maxlength = maxlength;
565 s->buffer_attr.fragsize = fragsize;
566 s->buffer_attr.tlength = tlength;
567 s->buffer_attr.prebuf = prebuf;
568 s->buffer_attr.minreq = minreq;
569 }
570
571 pa_xfree(s->device_name);
572 s->device_name = pa_xstrdup(dn);
573 s->device_index = di;
574
575 s->suspended = suspended;
576
577 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
578 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
579 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
580 request_auto_timing_update(s, TRUE);
581 }
582
583 check_smoother_status(s, TRUE, FALSE, FALSE);
584 request_auto_timing_update(s, TRUE);
585
586 if (s->moved_callback)
587 s->moved_callback(s, s->moved_userdata);
588
589 finish:
590 pa_context_unref(c);
591 }
592
593 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
594 pa_context *c = userdata;
595 pa_stream *s;
596 uint32_t channel;
597 pa_usec_t usec = 0;
598 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
599
600 pa_assert(pd);
601 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
602 pa_assert(t);
603 pa_assert(c);
604 pa_assert(PA_REFCNT_VALUE(c) >= 1);
605
606 pa_context_ref(c);
607
608 if (c->version < 15) {
609 pa_context_fail(c, PA_ERR_PROTOCOL);
610 goto finish;
611 }
612
613 if (pa_tagstruct_getu32(t, &channel) < 0) {
614 pa_context_fail(c, PA_ERR_PROTOCOL);
615 goto finish;
616 }
617
618 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
619 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
620 pa_tagstruct_getu32(t, &fragsize) < 0 ||
621 pa_tagstruct_get_usec(t, &usec) < 0) {
622 pa_context_fail(c, PA_ERR_PROTOCOL);
623 goto finish;
624 }
625 } else {
626 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
627 pa_tagstruct_getu32(t, &tlength) < 0 ||
628 pa_tagstruct_getu32(t, &prebuf) < 0 ||
629 pa_tagstruct_getu32(t, &minreq) < 0 ||
630 pa_tagstruct_get_usec(t, &usec) < 0) {
631 pa_context_fail(c, PA_ERR_PROTOCOL);
632 goto finish;
633 }
634 }
635
636 if (!pa_tagstruct_eof(t)) {
637 pa_context_fail(c, PA_ERR_PROTOCOL);
638 goto finish;
639 }
640
641 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
642 goto finish;
643
644 if (s->state != PA_STREAM_READY)
645 goto finish;
646
647 if (s->direction == PA_STREAM_RECORD)
648 s->timing_info.configured_source_usec = usec;
649 else
650 s->timing_info.configured_sink_usec = usec;
651
652 s->buffer_attr.maxlength = maxlength;
653 s->buffer_attr.fragsize = fragsize;
654 s->buffer_attr.tlength = tlength;
655 s->buffer_attr.prebuf = prebuf;
656 s->buffer_attr.minreq = minreq;
657
658 request_auto_timing_update(s, TRUE);
659
660 if (s->buffer_attr_callback)
661 s->buffer_attr_callback(s, s->buffer_attr_userdata);
662
663 finish:
664 pa_context_unref(c);
665 }
666
667 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
668 pa_context *c = userdata;
669 pa_stream *s;
670 uint32_t channel;
671 pa_bool_t suspended;
672
673 pa_assert(pd);
674 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
675 pa_assert(t);
676 pa_assert(c);
677 pa_assert(PA_REFCNT_VALUE(c) >= 1);
678
679 pa_context_ref(c);
680
681 if (c->version < 12) {
682 pa_context_fail(c, PA_ERR_PROTOCOL);
683 goto finish;
684 }
685
686 if (pa_tagstruct_getu32(t, &channel) < 0 ||
687 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
688 !pa_tagstruct_eof(t)) {
689 pa_context_fail(c, PA_ERR_PROTOCOL);
690 goto finish;
691 }
692
693 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
694 goto finish;
695
696 if (s->state != PA_STREAM_READY)
697 goto finish;
698
699 s->suspended = suspended;
700
701 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
702 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
703 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
704 request_auto_timing_update(s, TRUE);
705 }
706
707 check_smoother_status(s, TRUE, FALSE, FALSE);
708 request_auto_timing_update(s, TRUE);
709
710 if (s->suspended_callback)
711 s->suspended_callback(s, s->suspended_userdata);
712
713 finish:
714 pa_context_unref(c);
715 }
716
717 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718 pa_context *c = userdata;
719 pa_stream *s;
720 uint32_t channel;
721
722 pa_assert(pd);
723 pa_assert(command == PA_COMMAND_STARTED);
724 pa_assert(t);
725 pa_assert(c);
726 pa_assert(PA_REFCNT_VALUE(c) >= 1);
727
728 pa_context_ref(c);
729
730 if (c->version < 13) {
731 pa_context_fail(c, PA_ERR_PROTOCOL);
732 goto finish;
733 }
734
735 if (pa_tagstruct_getu32(t, &channel) < 0 ||
736 !pa_tagstruct_eof(t)) {
737 pa_context_fail(c, PA_ERR_PROTOCOL);
738 goto finish;
739 }
740
741 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
742 goto finish;
743
744 if (s->state != PA_STREAM_READY)
745 goto finish;
746
747 check_smoother_status(s, TRUE, TRUE, FALSE);
748 request_auto_timing_update(s, TRUE);
749
750 if (s->started_callback)
751 s->started_callback(s, s->started_userdata);
752
753 finish:
754 pa_context_unref(c);
755 }
756
757 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
758 pa_context *c = userdata;
759 pa_stream *s;
760 uint32_t channel;
761 pa_proplist *pl = NULL;
762 const char *event;
763
764 pa_assert(pd);
765 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
766 pa_assert(t);
767 pa_assert(c);
768 pa_assert(PA_REFCNT_VALUE(c) >= 1);
769
770 pa_context_ref(c);
771
772 if (c->version < 15) {
773 pa_context_fail(c, PA_ERR_PROTOCOL);
774 goto finish;
775 }
776
777 pl = pa_proplist_new();
778
779 if (pa_tagstruct_getu32(t, &channel) < 0 ||
780 pa_tagstruct_gets(t, &event) < 0 ||
781 pa_tagstruct_get_proplist(t, pl) < 0 ||
782 !pa_tagstruct_eof(t) || !event) {
783 pa_context_fail(c, PA_ERR_PROTOCOL);
784 goto finish;
785 }
786
787 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
788 goto finish;
789
790 if (s->state != PA_STREAM_READY)
791 goto finish;
792
793 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
794 /* Let client know what the running time was when the stream had to be killed */
795 pa_usec_t stream_time;
796 if (pa_stream_get_time(s, &stream_time) == 0)
797 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
798 }
799
800 if (s->event_callback)
801 s->event_callback(s, event, pl, s->event_userdata);
802
803 finish:
804 pa_context_unref(c);
805
806 if (pl)
807 pa_proplist_free(pl);
808 }
809
810 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811 pa_stream *s;
812 pa_context *c = userdata;
813 uint32_t bytes, channel;
814
815 pa_assert(pd);
816 pa_assert(command == PA_COMMAND_REQUEST);
817 pa_assert(t);
818 pa_assert(c);
819 pa_assert(PA_REFCNT_VALUE(c) >= 1);
820
821 pa_context_ref(c);
822
823 if (pa_tagstruct_getu32(t, &channel) < 0 ||
824 pa_tagstruct_getu32(t, &bytes) < 0 ||
825 !pa_tagstruct_eof(t)) {
826 pa_context_fail(c, PA_ERR_PROTOCOL);
827 goto finish;
828 }
829
830 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
831 goto finish;
832
833 if (s->state != PA_STREAM_READY)
834 goto finish;
835
836 s->requested_bytes += bytes;
837
838 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
839
840 if (s->requested_bytes > 0 && s->write_callback)
841 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
842
843 finish:
844 pa_context_unref(c);
845 }
846
847 int64_t pa_stream_get_underflow_index(pa_stream *p)
848 {
849 pa_assert(p);
850 return p->latest_underrun_at_index;
851 }
852
853 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
854 pa_stream *s;
855 pa_context *c = userdata;
856 uint32_t channel;
857 int64_t offset = -1;
858
859 pa_assert(pd);
860 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
861 pa_assert(t);
862 pa_assert(c);
863 pa_assert(PA_REFCNT_VALUE(c) >= 1);
864
865 pa_context_ref(c);
866
867 if (pa_tagstruct_getu32(t, &channel) < 0) {
868 pa_context_fail(c, PA_ERR_PROTOCOL);
869 goto finish;
870 }
871
872 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
873 if (pa_tagstruct_gets64(t, &offset) < 0) {
874 pa_context_fail(c, PA_ERR_PROTOCOL);
875 goto finish;
876 }
877 }
878
879 if (!pa_tagstruct_eof(t)) {
880 pa_context_fail(c, PA_ERR_PROTOCOL);
881 goto finish;
882 }
883
884 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
885 goto finish;
886
887 if (s->state != PA_STREAM_READY)
888 goto finish;
889
890 if (offset != -1)
891 s->latest_underrun_at_index = offset;
892
893 if (s->buffer_attr.prebuf > 0)
894 check_smoother_status(s, TRUE, FALSE, TRUE);
895
896 request_auto_timing_update(s, TRUE);
897
898 if (command == PA_COMMAND_OVERFLOW) {
899 if (s->overflow_callback)
900 s->overflow_callback(s, s->overflow_userdata);
901 } else if (command == PA_COMMAND_UNDERFLOW) {
902 if (s->underflow_callback)
903 s->underflow_callback(s, s->underflow_userdata);
904 }
905
906 finish:
907 pa_context_unref(c);
908 }
909
910 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
911 pa_assert(s);
912 pa_assert(PA_REFCNT_VALUE(s) >= 1);
913
914 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
915
916 if (s->state != PA_STREAM_READY)
917 return;
918
919 if (w) {
920 s->write_index_not_before = s->context->ctag;
921
922 if (s->timing_info_valid)
923 s->timing_info.write_index_corrupt = TRUE;
924
925 /* pa_log("write_index invalidated"); */
926 }
927
928 if (r) {
929 s->read_index_not_before = s->context->ctag;
930
931 if (s->timing_info_valid)
932 s->timing_info.read_index_corrupt = TRUE;
933
934 /* pa_log("read_index invalidated"); */
935 }
936
937 request_auto_timing_update(s, TRUE);
938 }
939
940 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
941 pa_stream *s = userdata;
942
943 pa_assert(s);
944 pa_assert(PA_REFCNT_VALUE(s) >= 1);
945
946 pa_stream_ref(s);
947 request_auto_timing_update(s, FALSE);
948 pa_stream_unref(s);
949 }
950
951 static void create_stream_complete(pa_stream *s) {
952 pa_assert(s);
953 pa_assert(PA_REFCNT_VALUE(s) >= 1);
954 pa_assert(s->state == PA_STREAM_CREATING);
955
956 pa_stream_set_state(s, PA_STREAM_READY);
957
958 if (s->requested_bytes > 0 && s->write_callback)
959 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
960
961 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
962 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
963 pa_assert(!s->auto_timing_update_event);
964 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
965
966 request_auto_timing_update(s, TRUE);
967 }
968
969 check_smoother_status(s, TRUE, FALSE, FALSE);
970 }
971
972 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
973 const char *e;
974
975 pa_assert(s);
976 pa_assert(attr);
977
978 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
979 uint32_t ms;
980
981 if (pa_atou(e, &ms) < 0 || ms <= 0)
982 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
983 else {
984 attr->maxlength = (uint32_t) -1;
985 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
986 attr->minreq = (uint32_t) -1;
987 attr->prebuf = (uint32_t) -1;
988 attr->fragsize = attr->tlength;
989 }
990
991 if (flags)
992 *flags |= PA_STREAM_ADJUST_LATENCY;
993 }
994
995 if (s->context->version >= 13)
996 return;
997
998 /* Version older than 0.9.10 didn't do server side buffer_attr
999 * selection, hence we have to fake it on the client side. */
1000
1001 /* We choose fairly conservative values here, to not confuse
1002 * old clients with extremely large playback buffers */
1003
1004 if (attr->maxlength == (uint32_t) -1)
1005 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1006
1007 if (attr->tlength == (uint32_t) -1)
1008 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1009
1010 if (attr->minreq == (uint32_t) -1)
1011 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1012
1013 if (attr->prebuf == (uint32_t) -1)
1014 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1015
1016 if (attr->fragsize == (uint32_t) -1)
1017 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1018 }
1019
1020 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1021 pa_stream *s = userdata;
1022 uint32_t requested_bytes = 0;
1023
1024 pa_assert(pd);
1025 pa_assert(s);
1026 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1027 pa_assert(s->state == PA_STREAM_CREATING);
1028
1029 pa_stream_ref(s);
1030
1031 if (command != PA_COMMAND_REPLY) {
1032 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1033 goto finish;
1034
1035 pa_stream_set_state(s, PA_STREAM_FAILED);
1036 goto finish;
1037 }
1038
1039 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1040 s->channel == PA_INVALID_INDEX ||
1041 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1042 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1043 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1044 goto finish;
1045 }
1046
1047 s->requested_bytes = (int64_t) requested_bytes;
1048
1049 if (s->context->version >= 9) {
1050 if (s->direction == PA_STREAM_PLAYBACK) {
1051 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1052 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1053 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1054 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1055 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1056 goto finish;
1057 }
1058 } else if (s->direction == PA_STREAM_RECORD) {
1059 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1060 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1061 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1062 goto finish;
1063 }
1064 }
1065 }
1066
1067 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1068 pa_sample_spec ss;
1069 pa_channel_map cm;
1070 const char *dn = NULL;
1071 pa_bool_t suspended;
1072
1073 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1074 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1075 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1076 pa_tagstruct_gets(t, &dn) < 0 ||
1077 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1078 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1079 goto finish;
1080 }
1081
1082 if (!dn || s->device_index == PA_INVALID_INDEX ||
1083 ss.channels != cm.channels ||
1084 !pa_channel_map_valid(&cm) ||
1085 !pa_sample_spec_valid(&ss) ||
1086 (s->n_formats == 0 && (
1087 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1088 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1089 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1090 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091 goto finish;
1092 }
1093
1094 pa_xfree(s->device_name);
1095 s->device_name = pa_xstrdup(dn);
1096 s->suspended = suspended;
1097
1098 s->channel_map = cm;
1099 s->sample_spec = ss;
1100 }
1101
1102 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1103 pa_usec_t usec;
1104
1105 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1106 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107 goto finish;
1108 }
1109
1110 if (s->direction == PA_STREAM_RECORD)
1111 s->timing_info.configured_source_usec = usec;
1112 else
1113 s->timing_info.configured_sink_usec = usec;
1114 }
1115
1116 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1117 || s->context->version >= 22) {
1118
1119 pa_format_info *f = pa_format_info_new();
1120 pa_tagstruct_get_format_info(t, f);
1121
1122 if (pa_format_info_valid(f))
1123 s->format = f;
1124 else {
1125 pa_format_info_free(f);
1126 if (s->n_formats > 0) {
1127 /* We used the extended API, so we should have got back a proper format */
1128 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1129 goto finish;
1130 }
1131 }
1132 }
1133
1134 if (!pa_tagstruct_eof(t)) {
1135 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1136 goto finish;
1137 }
1138
1139 if (s->direction == PA_STREAM_RECORD) {
1140 pa_assert(!s->record_memblockq);
1141
1142 s->record_memblockq = pa_memblockq_new(
1143 0,
1144 s->buffer_attr.maxlength,
1145 0,
1146 pa_frame_size(&s->sample_spec),
1147 1,
1148 0,
1149 0,
1150 NULL);
1151 }
1152
1153 s->channel_valid = TRUE;
1154 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1155
1156 create_stream_complete(s);
1157
1158 finish:
1159 pa_stream_unref(s);
1160 }
1161
1162 static int create_stream(
1163 pa_stream_direction_t direction,
1164 pa_stream *s,
1165 const char *dev,
1166 const pa_buffer_attr *attr,
1167 pa_stream_flags_t flags,
1168 const pa_cvolume *volume,
1169 pa_stream *sync_stream) {
1170
1171 pa_tagstruct *t;
1172 uint32_t tag;
1173 pa_bool_t volume_set = !!volume;
1174 pa_cvolume cv;
1175 uint32_t i;
1176
1177 pa_assert(s);
1178 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1179 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1180
1181 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1182 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1183 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1184 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1185 PA_STREAM_INTERPOLATE_TIMING|
1186 PA_STREAM_NOT_MONOTONIC|
1187 PA_STREAM_AUTO_TIMING_UPDATE|
1188 PA_STREAM_NO_REMAP_CHANNELS|
1189 PA_STREAM_NO_REMIX_CHANNELS|
1190 PA_STREAM_FIX_FORMAT|
1191 PA_STREAM_FIX_RATE|
1192 PA_STREAM_FIX_CHANNELS|
1193 PA_STREAM_DONT_MOVE|
1194 PA_STREAM_VARIABLE_RATE|
1195 PA_STREAM_PEAK_DETECT|
1196 PA_STREAM_START_MUTED|
1197 PA_STREAM_ADJUST_LATENCY|
1198 PA_STREAM_EARLY_REQUESTS|
1199 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1200 PA_STREAM_START_UNMUTED|
1201 PA_STREAM_FAIL_ON_SUSPEND|
1202 PA_STREAM_RELATIVE_VOLUME|
1203 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1204
1205
1206 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1207 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1208 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1209 /* Althought some of the other flags are not supported on older
1210 * version, we don't check for them here, because it doesn't hurt
1211 * when they are passed but actually not supported. This makes
1212 * client development easier */
1213
1214 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1215 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1216 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1217 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1218 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1219
1220 pa_stream_ref(s);
1221
1222 s->direction = direction;
1223
1224 if (sync_stream)
1225 s->syncid = sync_stream->syncid;
1226
1227 if (attr)
1228 s->buffer_attr = *attr;
1229 patch_buffer_attr(s, &s->buffer_attr, &flags);
1230
1231 s->flags = flags;
1232 s->corked = !!(flags & PA_STREAM_START_CORKED);
1233
1234 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1235 pa_usec_t x;
1236
1237 x = pa_rtclock_now();
1238
1239 pa_assert(!s->smoother);
1240 s->smoother = pa_smoother_new(
1241 SMOOTHER_ADJUST_TIME,
1242 SMOOTHER_HISTORY_TIME,
1243 !(flags & PA_STREAM_NOT_MONOTONIC),
1244 TRUE,
1245 SMOOTHER_MIN_HISTORY,
1246 x,
1247 TRUE);
1248 }
1249
1250 if (!dev)
1251 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1252
1253 t = pa_tagstruct_command(
1254 s->context,
1255 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1256 &tag);
1257
1258 if (s->context->version < 13)
1259 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1260
1261 pa_tagstruct_put(
1262 t,
1263 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1264 PA_TAG_CHANNEL_MAP, &s->channel_map,
1265 PA_TAG_U32, PA_INVALID_INDEX,
1266 PA_TAG_STRING, dev,
1267 PA_TAG_U32, s->buffer_attr.maxlength,
1268 PA_TAG_BOOLEAN, s->corked,
1269 PA_TAG_INVALID);
1270
1271 if (!volume) {
1272 if (pa_sample_spec_valid(&s->sample_spec))
1273 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1274 else {
1275 /* This is not really relevant, since no volume was set, and
1276 * the real number of channels is embedded in the format_info
1277 * structure */
1278 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1279 }
1280 }
1281
1282 if (s->direction == PA_STREAM_PLAYBACK) {
1283 pa_tagstruct_put(
1284 t,
1285 PA_TAG_U32, s->buffer_attr.tlength,
1286 PA_TAG_U32, s->buffer_attr.prebuf,
1287 PA_TAG_U32, s->buffer_attr.minreq,
1288 PA_TAG_U32, s->syncid,
1289 PA_TAG_INVALID);
1290
1291 pa_tagstruct_put_cvolume(t, volume);
1292 } else
1293 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1294
1295 if (s->context->version >= 12) {
1296 pa_tagstruct_put(
1297 t,
1298 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1299 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1300 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1301 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1302 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1303 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1304 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1305 PA_TAG_INVALID);
1306 }
1307
1308 if (s->context->version >= 13) {
1309
1310 if (s->direction == PA_STREAM_PLAYBACK)
1311 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1312 else
1313 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1314
1315 pa_tagstruct_put(
1316 t,
1317 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1318 PA_TAG_PROPLIST, s->proplist,
1319 PA_TAG_INVALID);
1320
1321 if (s->direction == PA_STREAM_RECORD)
1322 pa_tagstruct_putu32(t, s->direct_on_input);
1323 }
1324
1325 if (s->context->version >= 14) {
1326
1327 if (s->direction == PA_STREAM_PLAYBACK)
1328 pa_tagstruct_put_boolean(t, volume_set);
1329
1330 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1331 }
1332
1333 if (s->context->version >= 15) {
1334
1335 if (s->direction == PA_STREAM_PLAYBACK)
1336 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1337
1338 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1339 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1340 }
1341
1342 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1343 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1344
1345 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1346 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1347
1348 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1349 || s->context->version >= 22) {
1350
1351 pa_tagstruct_putu8(t, s->n_formats);
1352 for (i = 0; i < s->n_formats; i++)
1353 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1354 }
1355
1356 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1357 pa_tagstruct_put_cvolume(t, volume);
1358 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1359 pa_tagstruct_put_boolean(t, volume_set);
1360 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1361 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1362 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1363 }
1364
1365 pa_pstream_send_tagstruct(s->context->pstream, t);
1366 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1367
1368 pa_stream_set_state(s, PA_STREAM_CREATING);
1369
1370 pa_stream_unref(s);
1371 return 0;
1372 }
1373
1374 int pa_stream_connect_playback(
1375 pa_stream *s,
1376 const char *dev,
1377 const pa_buffer_attr *attr,
1378 pa_stream_flags_t flags,
1379 const pa_cvolume *volume,
1380 pa_stream *sync_stream) {
1381
1382 pa_assert(s);
1383 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1384
1385 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1386 }
1387
1388 int pa_stream_connect_record(
1389 pa_stream *s,
1390 const char *dev,
1391 const pa_buffer_attr *attr,
1392 pa_stream_flags_t flags) {
1393
1394 pa_assert(s);
1395 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1396
1397 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1398 }
1399
1400 int pa_stream_begin_write(
1401 pa_stream *s,
1402 void **data,
1403 size_t *nbytes) {
1404
1405 pa_assert(s);
1406 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1407
1408 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1409 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1410 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1411 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1412 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1413
1414 if (*nbytes != (size_t) -1) {
1415 size_t m, fs;
1416
1417 m = pa_mempool_block_size_max(s->context->mempool);
1418 fs = pa_frame_size(&s->sample_spec);
1419
1420 m = (m / fs) * fs;
1421 if (*nbytes > m)
1422 *nbytes = m;
1423 }
1424
1425 if (!s->write_memblock) {
1426 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1427 s->write_data = pa_memblock_acquire(s->write_memblock);
1428 }
1429
1430 *data = s->write_data;
1431 *nbytes = pa_memblock_get_length(s->write_memblock);
1432
1433 return 0;
1434 }
1435
1436 int pa_stream_cancel_write(
1437 pa_stream *s) {
1438
1439 pa_assert(s);
1440 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1441
1442 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1443 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1444 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1445 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1446
1447 pa_assert(s->write_data);
1448
1449 pa_memblock_release(s->write_memblock);
1450 pa_memblock_unref(s->write_memblock);
1451 s->write_memblock = NULL;
1452 s->write_data = NULL;
1453
1454 return 0;
1455 }
1456
1457 int pa_stream_write(
1458 pa_stream *s,
1459 const void *data,
1460 size_t length,
1461 pa_free_cb_t free_cb,
1462 int64_t offset,
1463 pa_seek_mode_t seek) {
1464
1465 pa_assert(s);
1466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1467 pa_assert(data);
1468
1469 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1470 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1471 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1472 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1473 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1474 PA_CHECK_VALIDITY(s->context,
1475 !s->write_memblock ||
1476 ((data >= s->write_data) &&
1477 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1478 PA_ERR_INVALID);
1479 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1480
1481 if (s->write_memblock) {
1482 pa_memchunk chunk;
1483
1484 /* pa_stream_write_begin() was called before */
1485
1486 pa_memblock_release(s->write_memblock);
1487
1488 chunk.memblock = s->write_memblock;
1489 chunk.index = (const char *) data - (const char *) s->write_data;
1490 chunk.length = length;
1491
1492 s->write_memblock = NULL;
1493 s->write_data = NULL;
1494
1495 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1496 pa_memblock_unref(chunk.memblock);
1497
1498 } else {
1499 pa_seek_mode_t t_seek = seek;
1500 int64_t t_offset = offset;
1501 size_t t_length = length;
1502 const void *t_data = data;
1503
1504 /* pa_stream_write_begin() was not called before */
1505
1506 while (t_length > 0) {
1507 pa_memchunk chunk;
1508
1509 chunk.index = 0;
1510
1511 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1512 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1513 chunk.length = t_length;
1514 } else {
1515 void *d;
1516
1517 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1518 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1519
1520 d = pa_memblock_acquire(chunk.memblock);
1521 memcpy(d, t_data, chunk.length);
1522 pa_memblock_release(chunk.memblock);
1523 }
1524
1525 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1526
1527 t_offset = 0;
1528 t_seek = PA_SEEK_RELATIVE;
1529
1530 t_data = (const uint8_t*) t_data + chunk.length;
1531 t_length -= chunk.length;
1532
1533 pa_memblock_unref(chunk.memblock);
1534 }
1535
1536 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1537 free_cb((void*) data);
1538 }
1539
1540 /* This is obviously wrong since we ignore the seeking index . But
1541 * that's OK, the server side applies the same error */
1542 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1543
1544 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1545
1546 if (s->direction == PA_STREAM_PLAYBACK) {
1547
1548 /* Update latency request correction */
1549 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1550
1551 if (seek == PA_SEEK_ABSOLUTE) {
1552 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1553 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1554 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1555 } else if (seek == PA_SEEK_RELATIVE) {
1556 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1557 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1558 } else
1559 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1560 }
1561
1562 /* Update the write index in the already available latency data */
1563 if (s->timing_info_valid) {
1564
1565 if (seek == PA_SEEK_ABSOLUTE) {
1566 s->timing_info.write_index_corrupt = FALSE;
1567 s->timing_info.write_index = offset + (int64_t) length;
1568 } else if (seek == PA_SEEK_RELATIVE) {
1569 if (!s->timing_info.write_index_corrupt)
1570 s->timing_info.write_index += offset + (int64_t) length;
1571 } else
1572 s->timing_info.write_index_corrupt = TRUE;
1573 }
1574
1575 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1576 request_auto_timing_update(s, TRUE);
1577 }
1578
1579 return 0;
1580 }
1581
1582 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1583 pa_assert(s);
1584 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1585 pa_assert(data);
1586 pa_assert(length);
1587
1588 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1589 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1590 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1591
1592 if (!s->peek_memchunk.memblock) {
1593
1594 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1595 *data = NULL;
1596 *length = 0;
1597 return 0;
1598 }
1599
1600 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1601 }
1602
1603 pa_assert(s->peek_data);
1604 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1605 *length = s->peek_memchunk.length;
1606 return 0;
1607 }
1608
1609 int pa_stream_drop(pa_stream *s) {
1610 pa_assert(s);
1611 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1612
1613 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1614 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1615 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1616 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1617
1618 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1619
1620 /* Fix the simulated local read index */
1621 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1622 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1623
1624 pa_assert(s->peek_data);
1625 pa_memblock_release(s->peek_memchunk.memblock);
1626 pa_memblock_unref(s->peek_memchunk.memblock);
1627 pa_memchunk_reset(&s->peek_memchunk);
1628
1629 return 0;
1630 }
1631
1632 size_t pa_stream_writable_size(pa_stream *s) {
1633 pa_assert(s);
1634 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1635
1636 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1637 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1638 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1639
1640 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1641 }
1642
1643 size_t pa_stream_readable_size(pa_stream *s) {
1644 pa_assert(s);
1645 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1646
1647 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1648 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1649 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1650
1651 return pa_memblockq_get_length(s->record_memblockq);
1652 }
1653
1654 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1655 pa_operation *o;
1656 pa_tagstruct *t;
1657 uint32_t tag;
1658
1659 pa_assert(s);
1660 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1661
1662 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1663 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1664 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1665
1666 /* Ask for a timing update before we cork/uncork to get the best
1667 * accuracy for the transport latency suitable for the
1668 * check_smoother_status() call in the started callback */
1669 request_auto_timing_update(s, TRUE);
1670
1671 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1672
1673 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1674 pa_tagstruct_putu32(t, s->channel);
1675 pa_pstream_send_tagstruct(s->context->pstream, t);
1676 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1677
1678 /* This might cause the read index to continue again, hence
1679 * let's request a timing update */
1680 request_auto_timing_update(s, TRUE);
1681
1682 return o;
1683 }
1684
1685 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1686 pa_usec_t usec;
1687
1688 pa_assert(s);
1689 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1690 pa_assert(s->state == PA_STREAM_READY);
1691 pa_assert(s->direction != PA_STREAM_UPLOAD);
1692 pa_assert(s->timing_info_valid);
1693 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1694 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1695
1696 if (s->direction == PA_STREAM_PLAYBACK) {
1697 /* The last byte that was written into the output device
1698 * had this time value associated */
1699 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1700
1701 if (!s->corked && !s->suspended) {
1702
1703 if (!ignore_transport)
1704 /* Because the latency info took a little time to come
1705 * to us, we assume that the real output time is actually
1706 * a little ahead */
1707 usec += s->timing_info.transport_usec;
1708
1709 /* However, the output device usually maintains a buffer
1710 too, hence the real sample currently played is a little
1711 back */
1712 if (s->timing_info.sink_usec >= usec)
1713 usec = 0;
1714 else
1715 usec -= s->timing_info.sink_usec;
1716 }
1717
1718 } else {
1719 pa_assert(s->direction == PA_STREAM_RECORD);
1720
1721 /* The last byte written into the server side queue had
1722 * this time value associated */
1723 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1724
1725 if (!s->corked && !s->suspended) {
1726
1727 if (!ignore_transport)
1728 /* Add transport latency */
1729 usec += s->timing_info.transport_usec;
1730
1731 /* Add latency of data in device buffer */
1732 usec += s->timing_info.source_usec;
1733
1734 /* If this is a monitor source, we need to correct the
1735 * time by the playback device buffer */
1736 if (s->timing_info.sink_usec >= usec)
1737 usec = 0;
1738 else
1739 usec -= s->timing_info.sink_usec;
1740 }
1741 }
1742
1743 return usec;
1744 }
1745
1746 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1747 pa_operation *o = userdata;
1748 struct timeval local, remote, now;
1749 pa_timing_info *i;
1750 pa_bool_t playing = FALSE;
1751 uint64_t underrun_for = 0, playing_for = 0;
1752
1753 pa_assert(pd);
1754 pa_assert(o);
1755 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1756
1757 if (!o->context || !o->stream)
1758 goto finish;
1759
1760 i = &o->stream->timing_info;
1761
1762 o->stream->timing_info_valid = FALSE;
1763 i->write_index_corrupt = TRUE;
1764 i->read_index_corrupt = TRUE;
1765
1766 if (command != PA_COMMAND_REPLY) {
1767 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1768 goto finish;
1769
1770 } else {
1771
1772 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1773 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1774 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1775 pa_tagstruct_get_timeval(t, &local) < 0 ||
1776 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1777 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1778 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1779
1780 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1781 goto finish;
1782 }
1783
1784 if (o->context->version >= 13 &&
1785 o->stream->direction == PA_STREAM_PLAYBACK)
1786 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1787 pa_tagstruct_getu64(t, &playing_for) < 0) {
1788
1789 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1790 goto finish;
1791 }
1792
1793
1794 if (!pa_tagstruct_eof(t)) {
1795 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1796 goto finish;
1797 }
1798 o->stream->timing_info_valid = TRUE;
1799 i->write_index_corrupt = FALSE;
1800 i->read_index_corrupt = FALSE;
1801
1802 i->playing = (int) playing;
1803 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1804
1805 pa_gettimeofday(&now);
1806
1807 /* Calculcate timestamps */
1808 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1809 /* local and remote seem to have synchronized clocks */
1810
1811 if (o->stream->direction == PA_STREAM_PLAYBACK)
1812 i->transport_usec = pa_timeval_diff(&remote, &local);
1813 else
1814 i->transport_usec = pa_timeval_diff(&now, &remote);
1815
1816 i->synchronized_clocks = TRUE;
1817 i->timestamp = remote;
1818 } else {
1819 /* clocks are not synchronized, let's estimate latency then */
1820 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1821 i->synchronized_clocks = FALSE;
1822 i->timestamp = local;
1823 pa_timeval_add(&i->timestamp, i->transport_usec);
1824 }
1825
1826 /* Invalidate read and write indexes if necessary */
1827 if (tag < o->stream->read_index_not_before)
1828 i->read_index_corrupt = TRUE;
1829
1830 if (tag < o->stream->write_index_not_before)
1831 i->write_index_corrupt = TRUE;
1832
1833 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1834 /* Write index correction */
1835
1836 int n, j;
1837 uint32_t ctag = tag;
1838
1839 /* Go through the saved correction values and add up the
1840 * total correction.*/
1841 for (n = 0, j = o->stream->current_write_index_correction+1;
1842 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1843 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1844
1845 /* Step over invalid data or out-of-date data */
1846 if (!o->stream->write_index_corrections[j].valid ||
1847 o->stream->write_index_corrections[j].tag < ctag)
1848 continue;
1849
1850 /* Make sure that everything is in order */
1851 ctag = o->stream->write_index_corrections[j].tag+1;
1852
1853 /* Now fix the write index */
1854 if (o->stream->write_index_corrections[j].corrupt) {
1855 /* A corrupting seek was made */
1856 i->write_index_corrupt = TRUE;
1857 } else if (o->stream->write_index_corrections[j].absolute) {
1858 /* An absolute seek was made */
1859 i->write_index = o->stream->write_index_corrections[j].value;
1860 i->write_index_corrupt = FALSE;
1861 } else if (!i->write_index_corrupt) {
1862 /* A relative seek was made */
1863 i->write_index += o->stream->write_index_corrections[j].value;
1864 }
1865 }
1866
1867 /* Clear old correction entries */
1868 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1869 if (!o->stream->write_index_corrections[n].valid)
1870 continue;
1871
1872 if (o->stream->write_index_corrections[n].tag <= tag)
1873 o->stream->write_index_corrections[n].valid = FALSE;
1874 }
1875 }
1876
1877 if (o->stream->direction == PA_STREAM_RECORD) {
1878 /* Read index correction */
1879
1880 if (!i->read_index_corrupt)
1881 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1882 }
1883
1884 /* Update smoother if we're not corked */
1885 if (o->stream->smoother && !o->stream->corked) {
1886 pa_usec_t u, x;
1887
1888 u = x = pa_rtclock_now() - i->transport_usec;
1889
1890 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1891 pa_usec_t su;
1892
1893 /* If we weren't playing then it will take some time
1894 * until the audio will actually come out through the
1895 * speakers. Since we follow that timing here, we need
1896 * to try to fix this up */
1897
1898 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1899
1900 if (su < i->sink_usec)
1901 x += i->sink_usec - su;
1902 }
1903
1904 if (!i->playing)
1905 pa_smoother_pause(o->stream->smoother, x);
1906
1907 /* Update the smoother */
1908 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1909 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1910 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1911
1912 if (i->playing)
1913 pa_smoother_resume(o->stream->smoother, x, TRUE);
1914 }
1915 }
1916
1917 o->stream->auto_timing_update_requested = FALSE;
1918
1919 if (o->stream->latency_update_callback)
1920 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1921
1922 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1923 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1924 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1925 }
1926
1927 finish:
1928
1929 pa_operation_done(o);
1930 pa_operation_unref(o);
1931 }
1932
1933 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1934 uint32_t tag;
1935 pa_operation *o;
1936 pa_tagstruct *t;
1937 struct timeval now;
1938 int cidx = 0;
1939
1940 pa_assert(s);
1941 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1942
1943 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1944 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1945 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1946
1947 if (s->direction == PA_STREAM_PLAYBACK) {
1948 /* Find a place to store the write_index correction data for this entry */
1949 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1950
1951 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1952 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1953 }
1954 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1955
1956 t = pa_tagstruct_command(
1957 s->context,
1958 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1959 &tag);
1960 pa_tagstruct_putu32(t, s->channel);
1961 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1962
1963 pa_pstream_send_tagstruct(s->context->pstream, t);
1964 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1965
1966 if (s->direction == PA_STREAM_PLAYBACK) {
1967 /* Fill in initial correction data */
1968
1969 s->current_write_index_correction = cidx;
1970
1971 s->write_index_corrections[cidx].valid = TRUE;
1972 s->write_index_corrections[cidx].absolute = FALSE;
1973 s->write_index_corrections[cidx].corrupt = FALSE;
1974 s->write_index_corrections[cidx].tag = tag;
1975 s->write_index_corrections[cidx].value = 0;
1976 }
1977
1978 return o;
1979 }
1980
1981 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1982 pa_stream *s = userdata;
1983
1984 pa_assert(pd);
1985 pa_assert(s);
1986 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1987
1988 pa_stream_ref(s);
1989
1990 if (command != PA_COMMAND_REPLY) {
1991 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1992 goto finish;
1993
1994 pa_stream_set_state(s, PA_STREAM_FAILED);
1995 goto finish;
1996 } else if (!pa_tagstruct_eof(t)) {
1997 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1998 goto finish;
1999 }
2000
2001 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2002
2003 finish:
2004 pa_stream_unref(s);
2005 }
2006
2007 int pa_stream_disconnect(pa_stream *s) {
2008 pa_tagstruct *t;
2009 uint32_t tag;
2010
2011 pa_assert(s);
2012 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2013
2014 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2015 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2016 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2017
2018 pa_stream_ref(s);
2019
2020 t = pa_tagstruct_command(
2021 s->context,
2022 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2023 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2024 &tag);
2025 pa_tagstruct_putu32(t, s->channel);
2026 pa_pstream_send_tagstruct(s->context->pstream, t);
2027 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2028
2029 pa_stream_unref(s);
2030 return 0;
2031 }
2032
2033 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2034 pa_assert(s);
2035 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2036
2037 if (pa_detect_fork())
2038 return;
2039
2040 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2041 return;
2042
2043 s->read_callback = cb;
2044 s->read_userdata = userdata;
2045 }
2046
2047 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2048 pa_assert(s);
2049 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2050
2051 if (pa_detect_fork())
2052 return;
2053
2054 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2055 return;
2056
2057 s->write_callback = cb;
2058 s->write_userdata = userdata;
2059 }
2060
2061 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2062 pa_assert(s);
2063 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2064
2065 if (pa_detect_fork())
2066 return;
2067
2068 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2069 return;
2070
2071 s->state_callback = cb;
2072 s->state_userdata = userdata;
2073 }
2074
2075 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2076 pa_assert(s);
2077 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2078
2079 if (pa_detect_fork())
2080 return;
2081
2082 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2083 return;
2084
2085 s->overflow_callback = cb;
2086 s->overflow_userdata = userdata;
2087 }
2088
2089 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2090 pa_assert(s);
2091 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2092
2093 if (pa_detect_fork())
2094 return;
2095
2096 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2097 return;
2098
2099 s->underflow_callback = cb;
2100 s->underflow_userdata = userdata;
2101 }
2102
2103 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2104 pa_assert(s);
2105 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2106
2107 if (pa_detect_fork())
2108 return;
2109
2110 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2111 return;
2112
2113 s->latency_update_callback = cb;
2114 s->latency_update_userdata = userdata;
2115 }
2116
2117 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2118 pa_assert(s);
2119 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2120
2121 if (pa_detect_fork())
2122 return;
2123
2124 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2125 return;
2126
2127 s->moved_callback = cb;
2128 s->moved_userdata = userdata;
2129 }
2130
2131 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2132 pa_assert(s);
2133 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2134
2135 if (pa_detect_fork())
2136 return;
2137
2138 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2139 return;
2140
2141 s->suspended_callback = cb;
2142 s->suspended_userdata = userdata;
2143 }
2144
2145 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2146 pa_assert(s);
2147 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2148
2149 if (pa_detect_fork())
2150 return;
2151
2152 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2153 return;
2154
2155 s->started_callback = cb;
2156 s->started_userdata = userdata;
2157 }
2158
2159 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2160 pa_assert(s);
2161 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2162
2163 if (pa_detect_fork())
2164 return;
2165
2166 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2167 return;
2168
2169 s->event_callback = cb;
2170 s->event_userdata = userdata;
2171 }
2172
2173 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2174 pa_assert(s);
2175 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2176
2177 if (pa_detect_fork())
2178 return;
2179
2180 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2181 return;
2182
2183 s->buffer_attr_callback = cb;
2184 s->buffer_attr_userdata = userdata;
2185 }
2186
2187 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2188 pa_operation *o = userdata;
2189 int success = 1;
2190
2191 pa_assert(pd);
2192 pa_assert(o);
2193 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2194
2195 if (!o->context)
2196 goto finish;
2197
2198 if (command != PA_COMMAND_REPLY) {
2199 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2200 goto finish;
2201
2202 success = 0;
2203 } else if (!pa_tagstruct_eof(t)) {
2204 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2205 goto finish;
2206 }
2207
2208 if (o->callback) {
2209 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2210 cb(o->stream, success, o->userdata);
2211 }
2212
2213 finish:
2214 pa_operation_done(o);
2215 pa_operation_unref(o);
2216 }
2217
2218 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2219 pa_operation *o;
2220 pa_tagstruct *t;
2221 uint32_t tag;
2222
2223 pa_assert(s);
2224 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2225
2226 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2227 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2228 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2229
2230 /* Ask for a timing update before we cork/uncork to get the best
2231 * accuracy for the transport latency suitable for the
2232 * check_smoother_status() call in the started callback */
2233 request_auto_timing_update(s, TRUE);
2234
2235 s->corked = b;
2236
2237 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2238
2239 t = pa_tagstruct_command(
2240 s->context,
2241 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2242 &tag);
2243 pa_tagstruct_putu32(t, s->channel);
2244 pa_tagstruct_put_boolean(t, !!b);
2245 pa_pstream_send_tagstruct(s->context->pstream, t);
2246 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2247
2248 check_smoother_status(s, FALSE, FALSE, FALSE);
2249
2250 /* This might cause the indexes to hang/start again, hence let's
2251 * request a timing update, after the cork/uncork, too */
2252 request_auto_timing_update(s, TRUE);
2253
2254 return o;
2255 }
2256
2257 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2258 pa_tagstruct *t;
2259 pa_operation *o;
2260 uint32_t tag;
2261
2262 pa_assert(s);
2263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2264
2265 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2266 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2267
2268 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2269
2270 t = pa_tagstruct_command(s->context, command, &tag);
2271 pa_tagstruct_putu32(t, s->channel);
2272 pa_pstream_send_tagstruct(s->context->pstream, t);
2273 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2274
2275 return o;
2276 }
2277
2278 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2279 pa_operation *o;
2280
2281 pa_assert(s);
2282 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2283
2284 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2285 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2286 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2287
2288 /* Ask for a timing update *before* the flush, so that the
2289 * transport usec is as up to date as possible when we get the
2290 * underflow message and update the smoother status*/
2291 request_auto_timing_update(s, TRUE);
2292
2293 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2294 return NULL;
2295
2296 if (s->direction == PA_STREAM_PLAYBACK) {
2297
2298 if (s->write_index_corrections[s->current_write_index_correction].valid)
2299 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2300
2301 if (s->buffer_attr.prebuf > 0)
2302 check_smoother_status(s, FALSE, FALSE, TRUE);
2303
2304 /* This will change the write index, but leave the
2305 * read index untouched. */
2306 invalidate_indexes(s, FALSE, TRUE);
2307
2308 } else
2309 /* For record streams this has no influence on the write
2310 * index, but the read index might jump. */
2311 invalidate_indexes(s, TRUE, FALSE);
2312
2313 /* Note that we do not update requested_bytes here. This is
2314 * because we cannot really know how data actually was dropped
2315 * from the write index due to this. This 'error' will be applied
2316 * by both client and server and hence we should be fine. */
2317
2318 return o;
2319 }
2320
2321 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2322 pa_operation *o;
2323
2324 pa_assert(s);
2325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2326
2327 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2328 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2330 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2331
2332 /* Ask for a timing update before we cork/uncork to get the best
2333 * accuracy for the transport latency suitable for the
2334 * check_smoother_status() call in the started callback */
2335 request_auto_timing_update(s, TRUE);
2336
2337 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2338 return NULL;
2339
2340 /* This might cause the read index to hang again, hence
2341 * let's request a timing update */
2342 request_auto_timing_update(s, TRUE);
2343
2344 return o;
2345 }
2346
2347 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2348 pa_operation *o;
2349
2350 pa_assert(s);
2351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2352
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2356 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2357
2358 /* Ask for a timing update before we cork/uncork to get the best
2359 * accuracy for the transport latency suitable for the
2360 * check_smoother_status() call in the started callback */
2361 request_auto_timing_update(s, TRUE);
2362
2363 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2364 return NULL;
2365
2366 /* This might cause the read index to start moving again, hence
2367 * let's request a timing update */
2368 request_auto_timing_update(s, TRUE);
2369
2370 return o;
2371 }
2372
2373 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2374 pa_operation *o;
2375
2376 pa_assert(s);
2377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2378 pa_assert(name);
2379
2380 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2383
2384 if (s->context->version >= 13) {
2385 pa_proplist *p = pa_proplist_new();
2386
2387 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2388 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2389 pa_proplist_free(p);
2390 } else {
2391 pa_tagstruct *t;
2392 uint32_t tag;
2393
2394 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2395 t = pa_tagstruct_command(
2396 s->context,
2397 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2398 &tag);
2399 pa_tagstruct_putu32(t, s->channel);
2400 pa_tagstruct_puts(t, name);
2401 pa_pstream_send_tagstruct(s->context->pstream, t);
2402 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2403 }
2404
2405 return o;
2406 }
2407
2408 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2409 pa_usec_t usec;
2410
2411 pa_assert(s);
2412 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2413
2414 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2415 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2416 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2417 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2418 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2419 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2420
2421 if (s->smoother)
2422 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2423 else
2424 usec = calc_time(s, FALSE);
2425
2426 /* Make sure the time runs monotonically */
2427 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2428 if (usec < s->previous_time)
2429 usec = s->previous_time;
2430 else
2431 s->previous_time = usec;
2432 }
2433
2434 if (r_usec)
2435 *r_usec = usec;
2436
2437 return 0;
2438 }
2439
2440 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2441 pa_assert(s);
2442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443
2444 if (negative)
2445 *negative = 0;
2446
2447 if (a >= b)
2448 return a-b;
2449 else {
2450 if (negative && s->direction == PA_STREAM_RECORD) {
2451 *negative = 1;
2452 return b-a;
2453 } else
2454 return 0;
2455 }
2456 }
2457
2458 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2459 pa_usec_t t, c;
2460 int r;
2461 int64_t cindex;
2462
2463 pa_assert(s);
2464 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2465 pa_assert(r_usec);
2466
2467 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2468 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2469 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2470 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2471 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2472 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2473
2474 if ((r = pa_stream_get_time(s, &t)) < 0)
2475 return r;
2476
2477 if (s->direction == PA_STREAM_PLAYBACK)
2478 cindex = s->timing_info.write_index;
2479 else
2480 cindex = s->timing_info.read_index;
2481
2482 if (cindex < 0)
2483 cindex = 0;
2484
2485 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2486
2487 if (s->direction == PA_STREAM_PLAYBACK)
2488 *r_usec = time_counter_diff(s, c, t, negative);
2489 else
2490 *r_usec = time_counter_diff(s, t, c, negative);
2491
2492 return 0;
2493 }
2494
2495 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2496 pa_assert(s);
2497 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498
2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2503
2504 return &s->timing_info;
2505 }
2506
2507 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2508 pa_assert(s);
2509 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2510
2511 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2512
2513 return &s->sample_spec;
2514 }
2515
2516 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2517 pa_assert(s);
2518 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2519
2520 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2521
2522 return &s->channel_map;
2523 }
2524
2525 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2526 pa_assert(s);
2527 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2528
2529 /* We don't have the format till routing is done */
2530 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2532
2533 return s->format;
2534 }
2535 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2536 pa_assert(s);
2537 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2538
2539 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2540 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2541 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2542
2543 return &s->buffer_attr;
2544 }
2545
2546 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2547 pa_operation *o = userdata;
2548 int success = 1;
2549
2550 pa_assert(pd);
2551 pa_assert(o);
2552 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2553
2554 if (!o->context)
2555 goto finish;
2556
2557 if (command != PA_COMMAND_REPLY) {
2558 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2559 goto finish;
2560
2561 success = 0;
2562 } else {
2563 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2564 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2565 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2566 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2567 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2568 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2569 goto finish;
2570 }
2571 } else if (o->stream->direction == PA_STREAM_RECORD) {
2572 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2573 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2574 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2575 goto finish;
2576 }
2577 }
2578
2579 if (o->stream->context->version >= 13) {
2580 pa_usec_t usec;
2581
2582 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2583 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2584 goto finish;
2585 }
2586
2587 if (o->stream->direction == PA_STREAM_RECORD)
2588 o->stream->timing_info.configured_source_usec = usec;
2589 else
2590 o->stream->timing_info.configured_sink_usec = usec;
2591 }
2592
2593 if (!pa_tagstruct_eof(t)) {
2594 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2595 goto finish;
2596 }
2597 }
2598
2599 if (o->callback) {
2600 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2601 cb(o->stream, success, o->userdata);
2602 }
2603
2604 finish:
2605 pa_operation_done(o);
2606 pa_operation_unref(o);
2607 }
2608
2609
2610 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2611 pa_operation *o;
2612 pa_tagstruct *t;
2613 uint32_t tag;
2614 pa_buffer_attr copy;
2615
2616 pa_assert(s);
2617 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2618 pa_assert(attr);
2619
2620 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2621 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2622 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2624
2625 /* Ask for a timing update before we cork/uncork to get the best
2626 * accuracy for the transport latency suitable for the
2627 * check_smoother_status() call in the started callback */
2628 request_auto_timing_update(s, TRUE);
2629
2630 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2631
2632 t = pa_tagstruct_command(
2633 s->context,
2634 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2635 &tag);
2636 pa_tagstruct_putu32(t, s->channel);
2637
2638 copy = *attr;
2639 patch_buffer_attr(s, &copy, NULL);
2640 attr = &copy;
2641
2642 pa_tagstruct_putu32(t, attr->maxlength);
2643
2644 if (s->direction == PA_STREAM_PLAYBACK)
2645 pa_tagstruct_put(
2646 t,
2647 PA_TAG_U32, attr->tlength,
2648 PA_TAG_U32, attr->prebuf,
2649 PA_TAG_U32, attr->minreq,
2650 PA_TAG_INVALID);
2651 else
2652 pa_tagstruct_putu32(t, attr->fragsize);
2653
2654 if (s->context->version >= 13)
2655 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2656
2657 if (s->context->version >= 14)
2658 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2659
2660 pa_pstream_send_tagstruct(s->context->pstream, t);
2661 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2662
2663 /* This might cause changes in the read/write indexex, hence let's
2664 * request a timing update */
2665 request_auto_timing_update(s, TRUE);
2666
2667 return o;
2668 }
2669
2670 uint32_t pa_stream_get_device_index(pa_stream *s) {
2671 pa_assert(s);
2672 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2673
2674 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2675 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2676 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2678 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2679
2680 return s->device_index;
2681 }
2682
2683 const char *pa_stream_get_device_name(pa_stream *s) {
2684 pa_assert(s);
2685 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2686
2687 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2688 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2689 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2690 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2691 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2692
2693 return s->device_name;
2694 }
2695
2696 int pa_stream_is_suspended(pa_stream *s) {
2697 pa_assert(s);
2698 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2699
2700 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2701 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2702 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2703 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2704
2705 return s->suspended;
2706 }
2707
2708 int pa_stream_is_corked(pa_stream *s) {
2709 pa_assert(s);
2710 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2711
2712 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2713 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2714 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2715
2716 return s->corked;
2717 }
2718
2719 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2720 pa_operation *o = userdata;
2721 int success = 1;
2722
2723 pa_assert(pd);
2724 pa_assert(o);
2725 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2726
2727 if (!o->context)
2728 goto finish;
2729
2730 if (command != PA_COMMAND_REPLY) {
2731 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2732 goto finish;
2733
2734 success = 0;
2735 } else {
2736
2737 if (!pa_tagstruct_eof(t)) {
2738 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2739 goto finish;
2740 }
2741 }
2742
2743 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2744 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2745
2746 if (o->callback) {
2747 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2748 cb(o->stream, success, o->userdata);
2749 }
2750
2751 finish:
2752 pa_operation_done(o);
2753 pa_operation_unref(o);
2754 }
2755
2756
2757 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2758 pa_operation *o;
2759 pa_tagstruct *t;
2760 uint32_t tag;
2761
2762 pa_assert(s);
2763 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2764
2765 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2766 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2767 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2768 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2769 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2770 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2771
2772 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2773 o->private = PA_UINT_TO_PTR(rate);
2774
2775 t = pa_tagstruct_command(
2776 s->context,
2777 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2778 &tag);
2779 pa_tagstruct_putu32(t, s->channel);
2780 pa_tagstruct_putu32(t, rate);
2781
2782 pa_pstream_send_tagstruct(s->context->pstream, t);
2783 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2784
2785 return o;
2786 }
2787
2788 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2789 pa_operation *o;
2790 pa_tagstruct *t;
2791 uint32_t tag;
2792
2793 pa_assert(s);
2794 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2795
2796 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2797 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2798 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2799 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2800 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2801
2802 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2803
2804 t = pa_tagstruct_command(
2805 s->context,
2806 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2807 &tag);
2808 pa_tagstruct_putu32(t, s->channel);
2809 pa_tagstruct_putu32(t, (uint32_t) mode);
2810 pa_tagstruct_put_proplist(t, p);
2811
2812 pa_pstream_send_tagstruct(s->context->pstream, t);
2813 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2814
2815 /* Please note that we don't update s->proplist here, because we
2816 * don't export that field */
2817
2818 return o;
2819 }
2820
2821 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2822 pa_operation *o;
2823 pa_tagstruct *t;
2824 uint32_t tag;
2825 const char * const*k;
2826
2827 pa_assert(s);
2828 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2829
2830 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2831 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2832 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2833 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2834 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2835
2836 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2837
2838 t = pa_tagstruct_command(
2839 s->context,
2840 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2841 &tag);
2842 pa_tagstruct_putu32(t, s->channel);
2843
2844 for (k = keys; *k; k++)
2845 pa_tagstruct_puts(t, *k);
2846
2847 pa_tagstruct_puts(t, NULL);
2848
2849 pa_pstream_send_tagstruct(s->context->pstream, t);
2850 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2851
2852 /* Please note that we don't update s->proplist here, because we
2853 * don't export that field */
2854
2855 return o;
2856 }
2857
2858 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2859 pa_assert(s);
2860 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2861
2862 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2863 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2864 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2865 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2866
2867 s->direct_on_input = sink_input_idx;
2868
2869 return 0;
2870 }
2871
2872 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2873 pa_assert(s);
2874 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2875
2876 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2877 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2878 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2879
2880 return s->direct_on_input;
2881 }