]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
capture: Implement per-stream volume control for capture streams.
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
90 pa_proplist *p) {
91
92 pa_stream *s;
93 unsigned int i;
94
95 pa_assert(c);
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
99
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103 s = pa_xnew(pa_stream, 1);
104 PA_REFCNT_INIT(s);
105 s->context = c;
106 s->mainloop = c->mainloop;
107
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
110 s->flags = 0;
111
112 if (ss)
113 s->sample_spec = *ss;
114 else
115 s->sample_spec.format = PA_SAMPLE_INVALID;
116
117 if (map)
118 s->channel_map = *map;
119 else
120 pa_channel_map_init(&s->channel_map);
121
122 s->n_formats = 0;
123 if (formats) {
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
127 }
128
129 /* We'll get the final negotiated format after connecting */
130 s->format = NULL;
131
132 s->direct_on_input = PA_INVALID_INDEX;
133
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135 if (name)
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138 s->channel = 0;
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
142
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146 /* We initialize der target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
149
150 s->buffer_attr.maxlength = (uint32_t) -1;
151 if (ss)
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153 else {
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
158 .rate = 48000,
159 .channels = 2,
160 };
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162 }
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
166
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
170 s->corked = FALSE;
171
172 s->write_memblock = NULL;
173 s->write_data = NULL;
174
175 pa_memchunk_reset(&s->peek_memchunk);
176 s->peek_data = NULL;
177 s->record_memblockq = NULL;
178
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
181
182 s->previous_time = 0;
183
184 s->read_index_not_before = 0;
185 s->write_index_not_before = 0;
186 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
187 s->write_index_corrections[i].valid = 0;
188 s->current_write_index_correction = 0;
189
190 s->auto_timing_update_event = NULL;
191 s->auto_timing_update_requested = FALSE;
192 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
193
194 reset_callbacks(s);
195
196 s->smoother = NULL;
197
198 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
199 PA_LLIST_PREPEND(pa_stream, c->streams, s);
200 pa_stream_ref(s);
201
202 return s;
203 }
204
205 pa_stream *pa_stream_new_with_proplist(
206 pa_context *c,
207 const char *name,
208 const pa_sample_spec *ss,
209 const pa_channel_map *map,
210 pa_proplist *p) {
211
212 pa_channel_map tmap;
213
214 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
215 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
219
220 if (!map)
221 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
222
223 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
224 }
225
226 pa_stream *pa_stream_new_extended(
227 pa_context *c,
228 const char *name,
229 pa_format_info * const *formats,
230 unsigned int n_formats,
231 pa_proplist *p) {
232
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239 pa_operation *o, *n;
240 pa_assert(s);
241
242 if (!s->context)
243 return;
244
245 /* Detach from context */
246
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
249 n = o->next;
250
251 if (o->stream == s)
252 pa_operation_cancel(o);
253 }
254
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261 s->channel = 0;
262 s->channel_valid = FALSE;
263 }
264
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266 pa_stream_unref(s);
267
268 s->context = NULL;
269
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
273 }
274
275 reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279 unsigned int i;
280
281 pa_assert(s);
282
283 stream_unlink(s);
284
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
288 }
289
290 if (s->peek_memchunk.memblock) {
291 if (s->peek_data)
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
294 }
295
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
298
299 if (s->proplist)
300 pa_proplist_free(s->proplist);
301
302 if (s->smoother)
303 pa_smoother_free(s->smoother);
304
305 for (i = 0; i < s->n_formats; i++)
306 pa_format_info_free(s->req_formats[i]);
307
308 if (s->format)
309 pa_format_info_free(s->format);
310
311 pa_xfree(s->device_name);
312 pa_xfree(s);
313 }
314
315 void pa_stream_unref(pa_stream *s) {
316 pa_assert(s);
317 pa_assert(PA_REFCNT_VALUE(s) >= 1);
318
319 if (PA_REFCNT_DEC(s) <= 0)
320 stream_free(s);
321 }
322
323 pa_stream* pa_stream_ref(pa_stream *s) {
324 pa_assert(s);
325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
326
327 PA_REFCNT_INC(s);
328 return s;
329 }
330
331 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
332 pa_assert(s);
333 pa_assert(PA_REFCNT_VALUE(s) >= 1);
334
335 return s->state;
336 }
337
338 pa_context* pa_stream_get_context(pa_stream *s) {
339 pa_assert(s);
340 pa_assert(PA_REFCNT_VALUE(s) >= 1);
341
342 return s->context;
343 }
344
345 uint32_t pa_stream_get_index(pa_stream *s) {
346 pa_assert(s);
347 pa_assert(PA_REFCNT_VALUE(s) >= 1);
348
349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
351
352 return s->stream_index;
353 }
354
355 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
356 pa_assert(s);
357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
358
359 if (s->state == st)
360 return;
361
362 pa_stream_ref(s);
363
364 s->state = st;
365
366 if (s->state_callback)
367 s->state_callback(s, s->state_userdata);
368
369 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
370 stream_unlink(s);
371
372 pa_stream_unref(s);
373 }
374
375 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
376 pa_assert(s);
377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
378
379 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
380 return;
381
382 if (s->state == PA_STREAM_READY &&
383 (force || !s->auto_timing_update_requested)) {
384 pa_operation *o;
385
386 /* pa_log("Automatically requesting new timing data"); */
387
388 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
389 pa_operation_unref(o);
390 s->auto_timing_update_requested = TRUE;
391 }
392 }
393
394 if (s->auto_timing_update_event) {
395 if (s->suspended && !force) {
396 pa_assert(s->mainloop);
397 s->mainloop->time_free(s->auto_timing_update_event);
398 s->auto_timing_update_event = NULL;
399 } else {
400 if (force)
401 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
402
403 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
404
405 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
406 }
407 }
408 }
409
410 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
411 pa_context *c = userdata;
412 pa_stream *s;
413 uint32_t channel;
414
415 pa_assert(pd);
416 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
417 pa_assert(t);
418 pa_assert(c);
419 pa_assert(PA_REFCNT_VALUE(c) >= 1);
420
421 pa_context_ref(c);
422
423 if (pa_tagstruct_getu32(t, &channel) < 0 ||
424 !pa_tagstruct_eof(t)) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
426 goto finish;
427 }
428
429 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
430 goto finish;
431
432 if (s->state != PA_STREAM_READY)
433 goto finish;
434
435 pa_context_set_error(c, PA_ERR_KILLED);
436 pa_stream_set_state(s, PA_STREAM_FAILED);
437
438 finish:
439 pa_context_unref(c);
440 }
441
442 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
443 pa_usec_t x;
444
445 pa_assert(s);
446 pa_assert(!force_start || !force_stop);
447
448 if (!s->smoother)
449 return;
450
451 x = pa_rtclock_now();
452
453 if (s->timing_info_valid) {
454 if (aposteriori)
455 x -= s->timing_info.transport_usec;
456 else
457 x += s->timing_info.transport_usec;
458 }
459
460 if (s->suspended || s->corked || force_stop)
461 pa_smoother_pause(s->smoother, x);
462 else if (force_start || s->buffer_attr.prebuf == 0) {
463
464 if (!s->timing_info_valid &&
465 !aposteriori &&
466 !force_start &&
467 !force_stop &&
468 s->context->version >= 13) {
469
470 /* If the server supports STARTED events we take them as
471 * indications when audio really starts/stops playing, if
472 * we don't have any timing info yet -- instead of trying
473 * to be smart and guessing the server time. Otherwise the
474 * unknown transport delay add too much noise to our time
475 * calculations. */
476
477 return;
478 }
479
480 pa_smoother_resume(s->smoother, x, TRUE);
481 }
482
483 /* Please note that we have no idea if playback actually started
484 * if prebuf is non-zero! */
485 }
486
487 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
488
489 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
491 pa_stream *s;
492 uint32_t channel;
493 const char *dn;
494 pa_bool_t suspended;
495 uint32_t di;
496 pa_usec_t usec = 0;
497 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
498
499 pa_assert(pd);
500 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
501 pa_assert(t);
502 pa_assert(c);
503 pa_assert(PA_REFCNT_VALUE(c) >= 1);
504
505 pa_context_ref(c);
506
507 if (c->version < 12) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
509 goto finish;
510 }
511
512 if (pa_tagstruct_getu32(t, &channel) < 0 ||
513 pa_tagstruct_getu32(t, &di) < 0 ||
514 pa_tagstruct_gets(t, &dn) < 0 ||
515 pa_tagstruct_get_boolean(t, &suspended) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
517 goto finish;
518 }
519
520 if (c->version >= 13) {
521
522 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
523 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
524 pa_tagstruct_getu32(t, &fragsize) < 0 ||
525 pa_tagstruct_get_usec(t, &usec) < 0) {
526 pa_context_fail(c, PA_ERR_PROTOCOL);
527 goto finish;
528 }
529 } else {
530 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
531 pa_tagstruct_getu32(t, &tlength) < 0 ||
532 pa_tagstruct_getu32(t, &prebuf) < 0 ||
533 pa_tagstruct_getu32(t, &minreq) < 0 ||
534 pa_tagstruct_get_usec(t, &usec) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
536 goto finish;
537 }
538 }
539 }
540
541 if (!pa_tagstruct_eof(t)) {
542 pa_context_fail(c, PA_ERR_PROTOCOL);
543 goto finish;
544 }
545
546 if (!dn || di == PA_INVALID_INDEX) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
548 goto finish;
549 }
550
551 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
552 goto finish;
553
554 if (s->state != PA_STREAM_READY)
555 goto finish;
556
557 if (c->version >= 13) {
558 if (s->direction == PA_STREAM_RECORD)
559 s->timing_info.configured_source_usec = usec;
560 else
561 s->timing_info.configured_sink_usec = usec;
562
563 s->buffer_attr.maxlength = maxlength;
564 s->buffer_attr.fragsize = fragsize;
565 s->buffer_attr.tlength = tlength;
566 s->buffer_attr.prebuf = prebuf;
567 s->buffer_attr.minreq = minreq;
568 }
569
570 pa_xfree(s->device_name);
571 s->device_name = pa_xstrdup(dn);
572 s->device_index = di;
573
574 s->suspended = suspended;
575
576 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
577 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
578 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
579 request_auto_timing_update(s, TRUE);
580 }
581
582 check_smoother_status(s, TRUE, FALSE, FALSE);
583 request_auto_timing_update(s, TRUE);
584
585 if (s->moved_callback)
586 s->moved_callback(s, s->moved_userdata);
587
588 finish:
589 pa_context_unref(c);
590 }
591
592 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
593 pa_context *c = userdata;
594 pa_stream *s;
595 uint32_t channel;
596 pa_usec_t usec = 0;
597 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
598
599 pa_assert(pd);
600 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
601 pa_assert(t);
602 pa_assert(c);
603 pa_assert(PA_REFCNT_VALUE(c) >= 1);
604
605 pa_context_ref(c);
606
607 if (c->version < 15) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
609 goto finish;
610 }
611
612 if (pa_tagstruct_getu32(t, &channel) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
614 goto finish;
615 }
616
617 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
618 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
619 pa_tagstruct_getu32(t, &fragsize) < 0 ||
620 pa_tagstruct_get_usec(t, &usec) < 0) {
621 pa_context_fail(c, PA_ERR_PROTOCOL);
622 goto finish;
623 }
624 } else {
625 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
626 pa_tagstruct_getu32(t, &tlength) < 0 ||
627 pa_tagstruct_getu32(t, &prebuf) < 0 ||
628 pa_tagstruct_getu32(t, &minreq) < 0 ||
629 pa_tagstruct_get_usec(t, &usec) < 0) {
630 pa_context_fail(c, PA_ERR_PROTOCOL);
631 goto finish;
632 }
633 }
634
635 if (!pa_tagstruct_eof(t)) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
637 goto finish;
638 }
639
640 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
641 goto finish;
642
643 if (s->state != PA_STREAM_READY)
644 goto finish;
645
646 if (s->direction == PA_STREAM_RECORD)
647 s->timing_info.configured_source_usec = usec;
648 else
649 s->timing_info.configured_sink_usec = usec;
650
651 s->buffer_attr.maxlength = maxlength;
652 s->buffer_attr.fragsize = fragsize;
653 s->buffer_attr.tlength = tlength;
654 s->buffer_attr.prebuf = prebuf;
655 s->buffer_attr.minreq = minreq;
656
657 request_auto_timing_update(s, TRUE);
658
659 if (s->buffer_attr_callback)
660 s->buffer_attr_callback(s, s->buffer_attr_userdata);
661
662 finish:
663 pa_context_unref(c);
664 }
665
666 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
667 pa_context *c = userdata;
668 pa_stream *s;
669 uint32_t channel;
670 pa_bool_t suspended;
671
672 pa_assert(pd);
673 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
674 pa_assert(t);
675 pa_assert(c);
676 pa_assert(PA_REFCNT_VALUE(c) >= 1);
677
678 pa_context_ref(c);
679
680 if (c->version < 12) {
681 pa_context_fail(c, PA_ERR_PROTOCOL);
682 goto finish;
683 }
684
685 if (pa_tagstruct_getu32(t, &channel) < 0 ||
686 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
687 !pa_tagstruct_eof(t)) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
689 goto finish;
690 }
691
692 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
693 goto finish;
694
695 if (s->state != PA_STREAM_READY)
696 goto finish;
697
698 s->suspended = suspended;
699
700 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
701 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
702 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
703 request_auto_timing_update(s, TRUE);
704 }
705
706 check_smoother_status(s, TRUE, FALSE, FALSE);
707 request_auto_timing_update(s, TRUE);
708
709 if (s->suspended_callback)
710 s->suspended_callback(s, s->suspended_userdata);
711
712 finish:
713 pa_context_unref(c);
714 }
715
716 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 pa_context *c = userdata;
718 pa_stream *s;
719 uint32_t channel;
720
721 pa_assert(pd);
722 pa_assert(command == PA_COMMAND_STARTED);
723 pa_assert(t);
724 pa_assert(c);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727 pa_context_ref(c);
728
729 if (c->version < 13) {
730 pa_context_fail(c, PA_ERR_PROTOCOL);
731 goto finish;
732 }
733
734 if (pa_tagstruct_getu32(t, &channel) < 0 ||
735 !pa_tagstruct_eof(t)) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
737 goto finish;
738 }
739
740 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741 goto finish;
742
743 if (s->state != PA_STREAM_READY)
744 goto finish;
745
746 check_smoother_status(s, TRUE, TRUE, FALSE);
747 request_auto_timing_update(s, TRUE);
748
749 if (s->started_callback)
750 s->started_callback(s, s->started_userdata);
751
752 finish:
753 pa_context_unref(c);
754 }
755
756 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
758 pa_stream *s;
759 uint32_t channel;
760 pa_proplist *pl = NULL;
761 const char *event;
762
763 pa_assert(pd);
764 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
765 pa_assert(t);
766 pa_assert(c);
767 pa_assert(PA_REFCNT_VALUE(c) >= 1);
768
769 pa_context_ref(c);
770
771 if (c->version < 15) {
772 pa_context_fail(c, PA_ERR_PROTOCOL);
773 goto finish;
774 }
775
776 pl = pa_proplist_new();
777
778 if (pa_tagstruct_getu32(t, &channel) < 0 ||
779 pa_tagstruct_gets(t, &event) < 0 ||
780 pa_tagstruct_get_proplist(t, pl) < 0 ||
781 !pa_tagstruct_eof(t) || !event) {
782 pa_context_fail(c, PA_ERR_PROTOCOL);
783 goto finish;
784 }
785
786 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
787 goto finish;
788
789 if (s->state != PA_STREAM_READY)
790 goto finish;
791
792 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
793 /* Let client know what the running time was when the stream had to be
794 * killed */
795 pa_usec_t time;
796 if (pa_stream_get_time(s, &time) == 0)
797 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) time);
798 }
799
800 if (s->event_callback)
801 s->event_callback(s, event, pl, s->event_userdata);
802
803 finish:
804 pa_context_unref(c);
805
806 if (pl)
807 pa_proplist_free(pl);
808 }
809
810 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
811 pa_stream *s;
812 pa_context *c = userdata;
813 uint32_t bytes, channel;
814
815 pa_assert(pd);
816 pa_assert(command == PA_COMMAND_REQUEST);
817 pa_assert(t);
818 pa_assert(c);
819 pa_assert(PA_REFCNT_VALUE(c) >= 1);
820
821 pa_context_ref(c);
822
823 if (pa_tagstruct_getu32(t, &channel) < 0 ||
824 pa_tagstruct_getu32(t, &bytes) < 0 ||
825 !pa_tagstruct_eof(t)) {
826 pa_context_fail(c, PA_ERR_PROTOCOL);
827 goto finish;
828 }
829
830 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
831 goto finish;
832
833 if (s->state != PA_STREAM_READY)
834 goto finish;
835
836 s->requested_bytes += bytes;
837
838 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
839
840 if (s->requested_bytes > 0 && s->write_callback)
841 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
842
843 finish:
844 pa_context_unref(c);
845 }
846
847 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
848 pa_stream *s;
849 pa_context *c = userdata;
850 uint32_t channel;
851
852 pa_assert(pd);
853 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
854 pa_assert(t);
855 pa_assert(c);
856 pa_assert(PA_REFCNT_VALUE(c) >= 1);
857
858 pa_context_ref(c);
859
860 if (pa_tagstruct_getu32(t, &channel) < 0 ||
861 !pa_tagstruct_eof(t)) {
862 pa_context_fail(c, PA_ERR_PROTOCOL);
863 goto finish;
864 }
865
866 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
867 goto finish;
868
869 if (s->state != PA_STREAM_READY)
870 goto finish;
871
872 if (s->buffer_attr.prebuf > 0)
873 check_smoother_status(s, TRUE, FALSE, TRUE);
874
875 request_auto_timing_update(s, TRUE);
876
877 if (command == PA_COMMAND_OVERFLOW) {
878 if (s->overflow_callback)
879 s->overflow_callback(s, s->overflow_userdata);
880 } else if (command == PA_COMMAND_UNDERFLOW) {
881 if (s->underflow_callback)
882 s->underflow_callback(s, s->underflow_userdata);
883 }
884
885 finish:
886 pa_context_unref(c);
887 }
888
889 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
890 pa_assert(s);
891 pa_assert(PA_REFCNT_VALUE(s) >= 1);
892
893 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
894
895 if (s->state != PA_STREAM_READY)
896 return;
897
898 if (w) {
899 s->write_index_not_before = s->context->ctag;
900
901 if (s->timing_info_valid)
902 s->timing_info.write_index_corrupt = TRUE;
903
904 /* pa_log("write_index invalidated"); */
905 }
906
907 if (r) {
908 s->read_index_not_before = s->context->ctag;
909
910 if (s->timing_info_valid)
911 s->timing_info.read_index_corrupt = TRUE;
912
913 /* pa_log("read_index invalidated"); */
914 }
915
916 request_auto_timing_update(s, TRUE);
917 }
918
919 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
920 pa_stream *s = userdata;
921
922 pa_assert(s);
923 pa_assert(PA_REFCNT_VALUE(s) >= 1);
924
925 pa_stream_ref(s);
926 request_auto_timing_update(s, FALSE);
927 pa_stream_unref(s);
928 }
929
930 static void create_stream_complete(pa_stream *s) {
931 pa_assert(s);
932 pa_assert(PA_REFCNT_VALUE(s) >= 1);
933 pa_assert(s->state == PA_STREAM_CREATING);
934
935 pa_stream_set_state(s, PA_STREAM_READY);
936
937 if (s->requested_bytes > 0 && s->write_callback)
938 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
939
940 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
941 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
942 pa_assert(!s->auto_timing_update_event);
943 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
944
945 request_auto_timing_update(s, TRUE);
946 }
947
948 check_smoother_status(s, TRUE, FALSE, FALSE);
949 }
950
951 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
952 const char *e;
953
954 pa_assert(s);
955 pa_assert(attr);
956
957 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
958 uint32_t ms;
959
960 if (pa_atou(e, &ms) < 0 || ms <= 0)
961 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
962 else {
963 attr->maxlength = (uint32_t) -1;
964 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
965 attr->minreq = (uint32_t) -1;
966 attr->prebuf = (uint32_t) -1;
967 attr->fragsize = attr->tlength;
968 }
969
970 if (flags)
971 *flags |= PA_STREAM_ADJUST_LATENCY;
972 }
973
974 if (s->context->version >= 13)
975 return;
976
977 /* Version older than 0.9.10 didn't do server side buffer_attr
978 * selection, hence we have to fake it on the client side. */
979
980 /* We choose fairly conservative values here, to not confuse
981 * old clients with extremely large playback buffers */
982
983 if (attr->maxlength == (uint32_t) -1)
984 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
985
986 if (attr->tlength == (uint32_t) -1)
987 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
988
989 if (attr->minreq == (uint32_t) -1)
990 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
991
992 if (attr->prebuf == (uint32_t) -1)
993 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
994
995 if (attr->fragsize == (uint32_t) -1)
996 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
997 }
998
999 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1000 pa_stream *s = userdata;
1001 uint32_t requested_bytes = 0;
1002
1003 pa_assert(pd);
1004 pa_assert(s);
1005 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1006 pa_assert(s->state == PA_STREAM_CREATING);
1007
1008 pa_stream_ref(s);
1009
1010 if (command != PA_COMMAND_REPLY) {
1011 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1012 goto finish;
1013
1014 pa_stream_set_state(s, PA_STREAM_FAILED);
1015 goto finish;
1016 }
1017
1018 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1019 s->channel == PA_INVALID_INDEX ||
1020 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1021 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1022 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1023 goto finish;
1024 }
1025
1026 s->requested_bytes = (int64_t) requested_bytes;
1027
1028 if (s->context->version >= 9) {
1029 if (s->direction == PA_STREAM_PLAYBACK) {
1030 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1031 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1032 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1033 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1034 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1035 goto finish;
1036 }
1037 } else if (s->direction == PA_STREAM_RECORD) {
1038 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1039 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1040 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1041 goto finish;
1042 }
1043 }
1044 }
1045
1046 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1047 pa_sample_spec ss;
1048 pa_channel_map cm;
1049 const char *dn = NULL;
1050 pa_bool_t suspended;
1051
1052 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1053 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1054 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1055 pa_tagstruct_gets(t, &dn) < 0 ||
1056 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1057 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1058 goto finish;
1059 }
1060
1061 if (!dn || s->device_index == PA_INVALID_INDEX ||
1062 ss.channels != cm.channels ||
1063 !pa_channel_map_valid(&cm) ||
1064 !pa_sample_spec_valid(&ss) ||
1065 (s->n_formats == 0 && (
1066 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1067 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1068 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1069 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1070 goto finish;
1071 }
1072
1073 pa_xfree(s->device_name);
1074 s->device_name = pa_xstrdup(dn);
1075 s->suspended = suspended;
1076
1077 s->channel_map = cm;
1078 s->sample_spec = ss;
1079 }
1080
1081 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1082 pa_usec_t usec;
1083
1084 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1085 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1086 goto finish;
1087 }
1088
1089 if (s->direction == PA_STREAM_RECORD)
1090 s->timing_info.configured_source_usec = usec;
1091 else
1092 s->timing_info.configured_sink_usec = usec;
1093 }
1094
1095 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1096 || s->context->version >= 22) {
1097
1098 pa_format_info *f = pa_format_info_new();
1099 pa_tagstruct_get_format_info(t, f);
1100
1101 if (pa_format_info_valid(f))
1102 s->format = f;
1103 else {
1104 pa_format_info_free(f);
1105 if (s->n_formats > 0) {
1106 /* We used the extended API, so we should have got back a proper format */
1107 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1108 goto finish;
1109 }
1110 }
1111 }
1112
1113 if (!pa_tagstruct_eof(t)) {
1114 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1115 goto finish;
1116 }
1117
1118 if (s->direction == PA_STREAM_RECORD) {
1119 pa_assert(!s->record_memblockq);
1120
1121 s->record_memblockq = pa_memblockq_new(
1122 0,
1123 s->buffer_attr.maxlength,
1124 0,
1125 pa_frame_size(&s->sample_spec),
1126 1,
1127 0,
1128 0,
1129 NULL);
1130 }
1131
1132 s->channel_valid = TRUE;
1133 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1134
1135 create_stream_complete(s);
1136
1137 finish:
1138 pa_stream_unref(s);
1139 }
1140
1141 static int create_stream(
1142 pa_stream_direction_t direction,
1143 pa_stream *s,
1144 const char *dev,
1145 const pa_buffer_attr *attr,
1146 pa_stream_flags_t flags,
1147 const pa_cvolume *volume,
1148 pa_stream *sync_stream) {
1149
1150 pa_tagstruct *t;
1151 uint32_t tag;
1152 pa_bool_t volume_set = !!volume;
1153 pa_cvolume cv;
1154 uint32_t i;
1155
1156 pa_assert(s);
1157 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1158 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1159
1160 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1161 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1162 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1163 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1164 PA_STREAM_INTERPOLATE_TIMING|
1165 PA_STREAM_NOT_MONOTONIC|
1166 PA_STREAM_AUTO_TIMING_UPDATE|
1167 PA_STREAM_NO_REMAP_CHANNELS|
1168 PA_STREAM_NO_REMIX_CHANNELS|
1169 PA_STREAM_FIX_FORMAT|
1170 PA_STREAM_FIX_RATE|
1171 PA_STREAM_FIX_CHANNELS|
1172 PA_STREAM_DONT_MOVE|
1173 PA_STREAM_VARIABLE_RATE|
1174 PA_STREAM_PEAK_DETECT|
1175 PA_STREAM_START_MUTED|
1176 PA_STREAM_ADJUST_LATENCY|
1177 PA_STREAM_EARLY_REQUESTS|
1178 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1179 PA_STREAM_START_UNMUTED|
1180 PA_STREAM_FAIL_ON_SUSPEND|
1181 PA_STREAM_RELATIVE_VOLUME|
1182 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1183
1184
1185 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1186 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1187 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1188 /* Althought some of the other flags are not supported on older
1189 * version, we don't check for them here, because it doesn't hurt
1190 * when they are passed but actually not supported. This makes
1191 * client development easier */
1192
1193 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1194 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1195 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1196 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1197 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1198
1199 pa_stream_ref(s);
1200
1201 s->direction = direction;
1202
1203 if (sync_stream)
1204 s->syncid = sync_stream->syncid;
1205
1206 if (attr)
1207 s->buffer_attr = *attr;
1208 patch_buffer_attr(s, &s->buffer_attr, &flags);
1209
1210 s->flags = flags;
1211 s->corked = !!(flags & PA_STREAM_START_CORKED);
1212
1213 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1214 pa_usec_t x;
1215
1216 x = pa_rtclock_now();
1217
1218 pa_assert(!s->smoother);
1219 s->smoother = pa_smoother_new(
1220 SMOOTHER_ADJUST_TIME,
1221 SMOOTHER_HISTORY_TIME,
1222 !(flags & PA_STREAM_NOT_MONOTONIC),
1223 TRUE,
1224 SMOOTHER_MIN_HISTORY,
1225 x,
1226 TRUE);
1227 }
1228
1229 if (!dev)
1230 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1231
1232 t = pa_tagstruct_command(
1233 s->context,
1234 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1235 &tag);
1236
1237 if (s->context->version < 13)
1238 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1239
1240 pa_tagstruct_put(
1241 t,
1242 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1243 PA_TAG_CHANNEL_MAP, &s->channel_map,
1244 PA_TAG_U32, PA_INVALID_INDEX,
1245 PA_TAG_STRING, dev,
1246 PA_TAG_U32, s->buffer_attr.maxlength,
1247 PA_TAG_BOOLEAN, s->corked,
1248 PA_TAG_INVALID);
1249
1250 if (!volume) {
1251 if (pa_sample_spec_valid(&s->sample_spec))
1252 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1253 else {
1254 /* This is not really relevant, since no volume was set, and
1255 * the real number of channels is embedded in the format_info
1256 * structure */
1257 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1258 }
1259 }
1260
1261 if (s->direction == PA_STREAM_PLAYBACK) {
1262 pa_tagstruct_put(
1263 t,
1264 PA_TAG_U32, s->buffer_attr.tlength,
1265 PA_TAG_U32, s->buffer_attr.prebuf,
1266 PA_TAG_U32, s->buffer_attr.minreq,
1267 PA_TAG_U32, s->syncid,
1268 PA_TAG_INVALID);
1269
1270 pa_tagstruct_put_cvolume(t, volume);
1271 } else
1272 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1273
1274 if (s->context->version >= 12) {
1275 pa_tagstruct_put(
1276 t,
1277 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1278 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1279 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1280 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1281 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1282 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1283 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1284 PA_TAG_INVALID);
1285 }
1286
1287 if (s->context->version >= 13) {
1288
1289 if (s->direction == PA_STREAM_PLAYBACK)
1290 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1291 else
1292 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1293
1294 pa_tagstruct_put(
1295 t,
1296 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1297 PA_TAG_PROPLIST, s->proplist,
1298 PA_TAG_INVALID);
1299
1300 if (s->direction == PA_STREAM_RECORD)
1301 pa_tagstruct_putu32(t, s->direct_on_input);
1302 }
1303
1304 if (s->context->version >= 14) {
1305
1306 if (s->direction == PA_STREAM_PLAYBACK)
1307 pa_tagstruct_put_boolean(t, volume_set);
1308
1309 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1310 }
1311
1312 if (s->context->version >= 15) {
1313
1314 if (s->direction == PA_STREAM_PLAYBACK)
1315 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1316
1317 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1318 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1319 }
1320
1321 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1322 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1323
1324 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1325 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1326
1327 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1328 || s->context->version >= 22) {
1329
1330 pa_tagstruct_putu8(t, s->n_formats);
1331 for (i = 0; i < s->n_formats; i++)
1332 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1333 }
1334
1335 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1336 pa_tagstruct_put_cvolume(t, volume);
1337 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1338 pa_tagstruct_put_boolean(t, volume_set);
1339 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1340 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1341 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1342 }
1343
1344 pa_pstream_send_tagstruct(s->context->pstream, t);
1345 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1346
1347 pa_stream_set_state(s, PA_STREAM_CREATING);
1348
1349 pa_stream_unref(s);
1350 return 0;
1351 }
1352
1353 int pa_stream_connect_playback(
1354 pa_stream *s,
1355 const char *dev,
1356 const pa_buffer_attr *attr,
1357 pa_stream_flags_t flags,
1358 const pa_cvolume *volume,
1359 pa_stream *sync_stream) {
1360
1361 pa_assert(s);
1362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1363
1364 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1365 }
1366
1367 int pa_stream_connect_record(
1368 pa_stream *s,
1369 const char *dev,
1370 const pa_buffer_attr *attr,
1371 pa_stream_flags_t flags) {
1372
1373 pa_assert(s);
1374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1375
1376 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1377 }
1378
1379 int pa_stream_begin_write(
1380 pa_stream *s,
1381 void **data,
1382 size_t *nbytes) {
1383
1384 pa_assert(s);
1385 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1386
1387 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1388 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1389 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1390 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1391 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1392
1393 if (*nbytes != (size_t) -1) {
1394 size_t m, fs;
1395
1396 m = pa_mempool_block_size_max(s->context->mempool);
1397 fs = pa_frame_size(&s->sample_spec);
1398
1399 m = (m / fs) * fs;
1400 if (*nbytes > m)
1401 *nbytes = m;
1402 }
1403
1404 if (!s->write_memblock) {
1405 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1406 s->write_data = pa_memblock_acquire(s->write_memblock);
1407 }
1408
1409 *data = s->write_data;
1410 *nbytes = pa_memblock_get_length(s->write_memblock);
1411
1412 return 0;
1413 }
1414
1415 int pa_stream_cancel_write(
1416 pa_stream *s) {
1417
1418 pa_assert(s);
1419 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1420
1421 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1422 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1423 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1424 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1425
1426 pa_assert(s->write_data);
1427
1428 pa_memblock_release(s->write_memblock);
1429 pa_memblock_unref(s->write_memblock);
1430 s->write_memblock = NULL;
1431 s->write_data = NULL;
1432
1433 return 0;
1434 }
1435
1436 int pa_stream_write(
1437 pa_stream *s,
1438 const void *data,
1439 size_t length,
1440 pa_free_cb_t free_cb,
1441 int64_t offset,
1442 pa_seek_mode_t seek) {
1443
1444 pa_assert(s);
1445 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1446 pa_assert(data);
1447
1448 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1449 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1450 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1451 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1452 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1453 PA_CHECK_VALIDITY(s->context,
1454 !s->write_memblock ||
1455 ((data >= s->write_data) &&
1456 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1457 PA_ERR_INVALID);
1458 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1459
1460 if (s->write_memblock) {
1461 pa_memchunk chunk;
1462
1463 /* pa_stream_write_begin() was called before */
1464
1465 pa_memblock_release(s->write_memblock);
1466
1467 chunk.memblock = s->write_memblock;
1468 chunk.index = (const char *) data - (const char *) s->write_data;
1469 chunk.length = length;
1470
1471 s->write_memblock = NULL;
1472 s->write_data = NULL;
1473
1474 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1475 pa_memblock_unref(chunk.memblock);
1476
1477 } else {
1478 pa_seek_mode_t t_seek = seek;
1479 int64_t t_offset = offset;
1480 size_t t_length = length;
1481 const void *t_data = data;
1482
1483 /* pa_stream_write_begin() was not called before */
1484
1485 while (t_length > 0) {
1486 pa_memchunk chunk;
1487
1488 chunk.index = 0;
1489
1490 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1491 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1492 chunk.length = t_length;
1493 } else {
1494 void *d;
1495
1496 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1497 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1498
1499 d = pa_memblock_acquire(chunk.memblock);
1500 memcpy(d, t_data, chunk.length);
1501 pa_memblock_release(chunk.memblock);
1502 }
1503
1504 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1505
1506 t_offset = 0;
1507 t_seek = PA_SEEK_RELATIVE;
1508
1509 t_data = (const uint8_t*) t_data + chunk.length;
1510 t_length -= chunk.length;
1511
1512 pa_memblock_unref(chunk.memblock);
1513 }
1514
1515 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1516 free_cb((void*) data);
1517 }
1518
1519 /* This is obviously wrong since we ignore the seeking index . But
1520 * that's OK, the server side applies the same error */
1521 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1522
1523 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1524
1525 if (s->direction == PA_STREAM_PLAYBACK) {
1526
1527 /* Update latency request correction */
1528 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1529
1530 if (seek == PA_SEEK_ABSOLUTE) {
1531 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1532 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1533 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1534 } else if (seek == PA_SEEK_RELATIVE) {
1535 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1536 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1537 } else
1538 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1539 }
1540
1541 /* Update the write index in the already available latency data */
1542 if (s->timing_info_valid) {
1543
1544 if (seek == PA_SEEK_ABSOLUTE) {
1545 s->timing_info.write_index_corrupt = FALSE;
1546 s->timing_info.write_index = offset + (int64_t) length;
1547 } else if (seek == PA_SEEK_RELATIVE) {
1548 if (!s->timing_info.write_index_corrupt)
1549 s->timing_info.write_index += offset + (int64_t) length;
1550 } else
1551 s->timing_info.write_index_corrupt = TRUE;
1552 }
1553
1554 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1555 request_auto_timing_update(s, TRUE);
1556 }
1557
1558 return 0;
1559 }
1560
1561 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1562 pa_assert(s);
1563 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1564 pa_assert(data);
1565 pa_assert(length);
1566
1567 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1568 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1569 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1570
1571 if (!s->peek_memchunk.memblock) {
1572
1573 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1574 *data = NULL;
1575 *length = 0;
1576 return 0;
1577 }
1578
1579 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1580 }
1581
1582 pa_assert(s->peek_data);
1583 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1584 *length = s->peek_memchunk.length;
1585 return 0;
1586 }
1587
1588 int pa_stream_drop(pa_stream *s) {
1589 pa_assert(s);
1590 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1591
1592 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1593 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1594 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1595 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1596
1597 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1598
1599 /* Fix the simulated local read index */
1600 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1601 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1602
1603 pa_assert(s->peek_data);
1604 pa_memblock_release(s->peek_memchunk.memblock);
1605 pa_memblock_unref(s->peek_memchunk.memblock);
1606 pa_memchunk_reset(&s->peek_memchunk);
1607
1608 return 0;
1609 }
1610
1611 size_t pa_stream_writable_size(pa_stream *s) {
1612 pa_assert(s);
1613 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614
1615 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1616 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1617 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1618
1619 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1620 }
1621
1622 size_t pa_stream_readable_size(pa_stream *s) {
1623 pa_assert(s);
1624 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1625
1626 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1627 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1628 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1629
1630 return pa_memblockq_get_length(s->record_memblockq);
1631 }
1632
1633 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1634 pa_operation *o;
1635 pa_tagstruct *t;
1636 uint32_t tag;
1637
1638 pa_assert(s);
1639 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1640
1641 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1643 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1644
1645 /* Ask for a timing update before we cork/uncork to get the best
1646 * accuracy for the transport latency suitable for the
1647 * check_smoother_status() call in the started callback */
1648 request_auto_timing_update(s, TRUE);
1649
1650 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1651
1652 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1653 pa_tagstruct_putu32(t, s->channel);
1654 pa_pstream_send_tagstruct(s->context->pstream, t);
1655 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1656
1657 /* This might cause the read index to conitnue again, hence
1658 * let's request a timing update */
1659 request_auto_timing_update(s, TRUE);
1660
1661 return o;
1662 }
1663
1664 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1665 pa_usec_t usec;
1666
1667 pa_assert(s);
1668 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1669 pa_assert(s->state == PA_STREAM_READY);
1670 pa_assert(s->direction != PA_STREAM_UPLOAD);
1671 pa_assert(s->timing_info_valid);
1672 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1673 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1674
1675 if (s->direction == PA_STREAM_PLAYBACK) {
1676 /* The last byte that was written into the output device
1677 * had this time value associated */
1678 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1679
1680 if (!s->corked && !s->suspended) {
1681
1682 if (!ignore_transport)
1683 /* Because the latency info took a little time to come
1684 * to us, we assume that the real output time is actually
1685 * a little ahead */
1686 usec += s->timing_info.transport_usec;
1687
1688 /* However, the output device usually maintains a buffer
1689 too, hence the real sample currently played is a little
1690 back */
1691 if (s->timing_info.sink_usec >= usec)
1692 usec = 0;
1693 else
1694 usec -= s->timing_info.sink_usec;
1695 }
1696
1697 } else {
1698 pa_assert(s->direction == PA_STREAM_RECORD);
1699
1700 /* The last byte written into the server side queue had
1701 * this time value associated */
1702 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1703
1704 if (!s->corked && !s->suspended) {
1705
1706 if (!ignore_transport)
1707 /* Add transport latency */
1708 usec += s->timing_info.transport_usec;
1709
1710 /* Add latency of data in device buffer */
1711 usec += s->timing_info.source_usec;
1712
1713 /* If this is a monitor source, we need to correct the
1714 * time by the playback device buffer */
1715 if (s->timing_info.sink_usec >= usec)
1716 usec = 0;
1717 else
1718 usec -= s->timing_info.sink_usec;
1719 }
1720 }
1721
1722 return usec;
1723 }
1724
1725 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1726 pa_operation *o = userdata;
1727 struct timeval local, remote, now;
1728 pa_timing_info *i;
1729 pa_bool_t playing = FALSE;
1730 uint64_t underrun_for = 0, playing_for = 0;
1731
1732 pa_assert(pd);
1733 pa_assert(o);
1734 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1735
1736 if (!o->context || !o->stream)
1737 goto finish;
1738
1739 i = &o->stream->timing_info;
1740
1741 o->stream->timing_info_valid = FALSE;
1742 i->write_index_corrupt = TRUE;
1743 i->read_index_corrupt = TRUE;
1744
1745 if (command != PA_COMMAND_REPLY) {
1746 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1747 goto finish;
1748
1749 } else {
1750
1751 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1752 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1753 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1754 pa_tagstruct_get_timeval(t, &local) < 0 ||
1755 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1756 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1757 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1758
1759 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1760 goto finish;
1761 }
1762
1763 if (o->context->version >= 13 &&
1764 o->stream->direction == PA_STREAM_PLAYBACK)
1765 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1766 pa_tagstruct_getu64(t, &playing_for) < 0) {
1767
1768 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1769 goto finish;
1770 }
1771
1772
1773 if (!pa_tagstruct_eof(t)) {
1774 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1775 goto finish;
1776 }
1777 o->stream->timing_info_valid = TRUE;
1778 i->write_index_corrupt = FALSE;
1779 i->read_index_corrupt = FALSE;
1780
1781 i->playing = (int) playing;
1782 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1783
1784 pa_gettimeofday(&now);
1785
1786 /* Calculcate timestamps */
1787 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1788 /* local and remote seem to have synchronized clocks */
1789
1790 if (o->stream->direction == PA_STREAM_PLAYBACK)
1791 i->transport_usec = pa_timeval_diff(&remote, &local);
1792 else
1793 i->transport_usec = pa_timeval_diff(&now, &remote);
1794
1795 i->synchronized_clocks = TRUE;
1796 i->timestamp = remote;
1797 } else {
1798 /* clocks are not synchronized, let's estimate latency then */
1799 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1800 i->synchronized_clocks = FALSE;
1801 i->timestamp = local;
1802 pa_timeval_add(&i->timestamp, i->transport_usec);
1803 }
1804
1805 /* Invalidate read and write indexes if necessary */
1806 if (tag < o->stream->read_index_not_before)
1807 i->read_index_corrupt = TRUE;
1808
1809 if (tag < o->stream->write_index_not_before)
1810 i->write_index_corrupt = TRUE;
1811
1812 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1813 /* Write index correction */
1814
1815 int n, j;
1816 uint32_t ctag = tag;
1817
1818 /* Go through the saved correction values and add up the
1819 * total correction.*/
1820 for (n = 0, j = o->stream->current_write_index_correction+1;
1821 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1822 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1823
1824 /* Step over invalid data or out-of-date data */
1825 if (!o->stream->write_index_corrections[j].valid ||
1826 o->stream->write_index_corrections[j].tag < ctag)
1827 continue;
1828
1829 /* Make sure that everything is in order */
1830 ctag = o->stream->write_index_corrections[j].tag+1;
1831
1832 /* Now fix the write index */
1833 if (o->stream->write_index_corrections[j].corrupt) {
1834 /* A corrupting seek was made */
1835 i->write_index_corrupt = TRUE;
1836 } else if (o->stream->write_index_corrections[j].absolute) {
1837 /* An absolute seek was made */
1838 i->write_index = o->stream->write_index_corrections[j].value;
1839 i->write_index_corrupt = FALSE;
1840 } else if (!i->write_index_corrupt) {
1841 /* A relative seek was made */
1842 i->write_index += o->stream->write_index_corrections[j].value;
1843 }
1844 }
1845
1846 /* Clear old correction entries */
1847 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1848 if (!o->stream->write_index_corrections[n].valid)
1849 continue;
1850
1851 if (o->stream->write_index_corrections[n].tag <= tag)
1852 o->stream->write_index_corrections[n].valid = FALSE;
1853 }
1854 }
1855
1856 if (o->stream->direction == PA_STREAM_RECORD) {
1857 /* Read index correction */
1858
1859 if (!i->read_index_corrupt)
1860 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1861 }
1862
1863 /* Update smoother if we're not corked */
1864 if (o->stream->smoother && !o->stream->corked) {
1865 pa_usec_t u, x;
1866
1867 u = x = pa_rtclock_now() - i->transport_usec;
1868
1869 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1870 pa_usec_t su;
1871
1872 /* If we weren't playing then it will take some time
1873 * until the audio will actually come out through the
1874 * speakers. Since we follow that timing here, we need
1875 * to try to fix this up */
1876
1877 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1878
1879 if (su < i->sink_usec)
1880 x += i->sink_usec - su;
1881 }
1882
1883 if (!i->playing)
1884 pa_smoother_pause(o->stream->smoother, x);
1885
1886 /* Update the smoother */
1887 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1888 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1889 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1890
1891 if (i->playing)
1892 pa_smoother_resume(o->stream->smoother, x, TRUE);
1893 }
1894 }
1895
1896 o->stream->auto_timing_update_requested = FALSE;
1897
1898 if (o->stream->latency_update_callback)
1899 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1900
1901 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1902 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1903 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1904 }
1905
1906 finish:
1907
1908 pa_operation_done(o);
1909 pa_operation_unref(o);
1910 }
1911
1912 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1913 uint32_t tag;
1914 pa_operation *o;
1915 pa_tagstruct *t;
1916 struct timeval now;
1917 int cidx = 0;
1918
1919 pa_assert(s);
1920 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1921
1922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1923 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1924 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1925
1926 if (s->direction == PA_STREAM_PLAYBACK) {
1927 /* Find a place to store the write_index correction data for this entry */
1928 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1929
1930 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1931 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1932 }
1933 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1934
1935 t = pa_tagstruct_command(
1936 s->context,
1937 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1938 &tag);
1939 pa_tagstruct_putu32(t, s->channel);
1940 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1941
1942 pa_pstream_send_tagstruct(s->context->pstream, t);
1943 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1944
1945 if (s->direction == PA_STREAM_PLAYBACK) {
1946 /* Fill in initial correction data */
1947
1948 s->current_write_index_correction = cidx;
1949
1950 s->write_index_corrections[cidx].valid = TRUE;
1951 s->write_index_corrections[cidx].absolute = FALSE;
1952 s->write_index_corrections[cidx].corrupt = FALSE;
1953 s->write_index_corrections[cidx].tag = tag;
1954 s->write_index_corrections[cidx].value = 0;
1955 }
1956
1957 return o;
1958 }
1959
1960 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1961 pa_stream *s = userdata;
1962
1963 pa_assert(pd);
1964 pa_assert(s);
1965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1966
1967 pa_stream_ref(s);
1968
1969 if (command != PA_COMMAND_REPLY) {
1970 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1971 goto finish;
1972
1973 pa_stream_set_state(s, PA_STREAM_FAILED);
1974 goto finish;
1975 } else if (!pa_tagstruct_eof(t)) {
1976 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1977 goto finish;
1978 }
1979
1980 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1981
1982 finish:
1983 pa_stream_unref(s);
1984 }
1985
1986 int pa_stream_disconnect(pa_stream *s) {
1987 pa_tagstruct *t;
1988 uint32_t tag;
1989
1990 pa_assert(s);
1991 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1992
1993 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1994 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1995 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1996
1997 pa_stream_ref(s);
1998
1999 t = pa_tagstruct_command(
2000 s->context,
2001 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2002 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2003 &tag);
2004 pa_tagstruct_putu32(t, s->channel);
2005 pa_pstream_send_tagstruct(s->context->pstream, t);
2006 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2007
2008 pa_stream_unref(s);
2009 return 0;
2010 }
2011
2012 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2013 pa_assert(s);
2014 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2015
2016 if (pa_detect_fork())
2017 return;
2018
2019 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2020 return;
2021
2022 s->read_callback = cb;
2023 s->read_userdata = userdata;
2024 }
2025
2026 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2027 pa_assert(s);
2028 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2029
2030 if (pa_detect_fork())
2031 return;
2032
2033 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2034 return;
2035
2036 s->write_callback = cb;
2037 s->write_userdata = userdata;
2038 }
2039
2040 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2041 pa_assert(s);
2042 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2043
2044 if (pa_detect_fork())
2045 return;
2046
2047 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2048 return;
2049
2050 s->state_callback = cb;
2051 s->state_userdata = userdata;
2052 }
2053
2054 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2055 pa_assert(s);
2056 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2057
2058 if (pa_detect_fork())
2059 return;
2060
2061 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2062 return;
2063
2064 s->overflow_callback = cb;
2065 s->overflow_userdata = userdata;
2066 }
2067
2068 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2069 pa_assert(s);
2070 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2071
2072 if (pa_detect_fork())
2073 return;
2074
2075 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2076 return;
2077
2078 s->underflow_callback = cb;
2079 s->underflow_userdata = userdata;
2080 }
2081
2082 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2083 pa_assert(s);
2084 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2085
2086 if (pa_detect_fork())
2087 return;
2088
2089 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2090 return;
2091
2092 s->latency_update_callback = cb;
2093 s->latency_update_userdata = userdata;
2094 }
2095
2096 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2097 pa_assert(s);
2098 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2099
2100 if (pa_detect_fork())
2101 return;
2102
2103 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2104 return;
2105
2106 s->moved_callback = cb;
2107 s->moved_userdata = userdata;
2108 }
2109
2110 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2111 pa_assert(s);
2112 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2113
2114 if (pa_detect_fork())
2115 return;
2116
2117 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2118 return;
2119
2120 s->suspended_callback = cb;
2121 s->suspended_userdata = userdata;
2122 }
2123
2124 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2125 pa_assert(s);
2126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2127
2128 if (pa_detect_fork())
2129 return;
2130
2131 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2132 return;
2133
2134 s->started_callback = cb;
2135 s->started_userdata = userdata;
2136 }
2137
2138 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2139 pa_assert(s);
2140 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2141
2142 if (pa_detect_fork())
2143 return;
2144
2145 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2146 return;
2147
2148 s->event_callback = cb;
2149 s->event_userdata = userdata;
2150 }
2151
2152 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2153 pa_assert(s);
2154 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2155
2156 if (pa_detect_fork())
2157 return;
2158
2159 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2160 return;
2161
2162 s->buffer_attr_callback = cb;
2163 s->buffer_attr_userdata = userdata;
2164 }
2165
2166 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2167 pa_operation *o = userdata;
2168 int success = 1;
2169
2170 pa_assert(pd);
2171 pa_assert(o);
2172 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2173
2174 if (!o->context)
2175 goto finish;
2176
2177 if (command != PA_COMMAND_REPLY) {
2178 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2179 goto finish;
2180
2181 success = 0;
2182 } else if (!pa_tagstruct_eof(t)) {
2183 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2184 goto finish;
2185 }
2186
2187 if (o->callback) {
2188 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2189 cb(o->stream, success, o->userdata);
2190 }
2191
2192 finish:
2193 pa_operation_done(o);
2194 pa_operation_unref(o);
2195 }
2196
2197 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2198 pa_operation *o;
2199 pa_tagstruct *t;
2200 uint32_t tag;
2201
2202 pa_assert(s);
2203 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2204
2205 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2206 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2207 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2208
2209 /* Ask for a timing update before we cork/uncork to get the best
2210 * accuracy for the transport latency suitable for the
2211 * check_smoother_status() call in the started callback */
2212 request_auto_timing_update(s, TRUE);
2213
2214 s->corked = b;
2215
2216 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2217
2218 t = pa_tagstruct_command(
2219 s->context,
2220 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2221 &tag);
2222 pa_tagstruct_putu32(t, s->channel);
2223 pa_tagstruct_put_boolean(t, !!b);
2224 pa_pstream_send_tagstruct(s->context->pstream, t);
2225 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2226
2227 check_smoother_status(s, FALSE, FALSE, FALSE);
2228
2229 /* This might cause the indexes to hang/start again, hence let's
2230 * request a timing update, after the cork/uncork, too */
2231 request_auto_timing_update(s, TRUE);
2232
2233 return o;
2234 }
2235
2236 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2237 pa_tagstruct *t;
2238 pa_operation *o;
2239 uint32_t tag;
2240
2241 pa_assert(s);
2242 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2243
2244 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2245 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2246
2247 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2248
2249 t = pa_tagstruct_command(s->context, command, &tag);
2250 pa_tagstruct_putu32(t, s->channel);
2251 pa_pstream_send_tagstruct(s->context->pstream, t);
2252 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2253
2254 return o;
2255 }
2256
2257 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2258 pa_operation *o;
2259
2260 pa_assert(s);
2261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2262
2263 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2264 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2265 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2266
2267 /* Ask for a timing update *before* the flush, so that the
2268 * transport usec is as up to date as possible when we get the
2269 * underflow message and update the smoother status*/
2270 request_auto_timing_update(s, TRUE);
2271
2272 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2273 return NULL;
2274
2275 if (s->direction == PA_STREAM_PLAYBACK) {
2276
2277 if (s->write_index_corrections[s->current_write_index_correction].valid)
2278 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2279
2280 if (s->buffer_attr.prebuf > 0)
2281 check_smoother_status(s, FALSE, FALSE, TRUE);
2282
2283 /* This will change the write index, but leave the
2284 * read index untouched. */
2285 invalidate_indexes(s, FALSE, TRUE);
2286
2287 } else
2288 /* For record streams this has no influence on the write
2289 * index, but the read index might jump. */
2290 invalidate_indexes(s, TRUE, FALSE);
2291
2292 /* Note that we do not update requested_bytes here. This is
2293 * because we cannot really know how data actually was dropped
2294 * from the write index due to this. This 'error' will be applied
2295 * by both client and server and hence we should be fine. */
2296
2297 return o;
2298 }
2299
2300 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2301 pa_operation *o;
2302
2303 pa_assert(s);
2304 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2305
2306 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2307 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2308 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2309 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2310
2311 /* Ask for a timing update before we cork/uncork to get the best
2312 * accuracy for the transport latency suitable for the
2313 * check_smoother_status() call in the started callback */
2314 request_auto_timing_update(s, TRUE);
2315
2316 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2317 return NULL;
2318
2319 /* This might cause the read index to hang again, hence
2320 * let's request a timing update */
2321 request_auto_timing_update(s, TRUE);
2322
2323 return o;
2324 }
2325
2326 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2327 pa_operation *o;
2328
2329 pa_assert(s);
2330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2331
2332 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2333 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2335 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2336
2337 /* Ask for a timing update before we cork/uncork to get the best
2338 * accuracy for the transport latency suitable for the
2339 * check_smoother_status() call in the started callback */
2340 request_auto_timing_update(s, TRUE);
2341
2342 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2343 return NULL;
2344
2345 /* This might cause the read index to start moving again, hence
2346 * let's request a timing update */
2347 request_auto_timing_update(s, TRUE);
2348
2349 return o;
2350 }
2351
2352 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2353 pa_operation *o;
2354
2355 pa_assert(s);
2356 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2357 pa_assert(name);
2358
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2360 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2361 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2362
2363 if (s->context->version >= 13) {
2364 pa_proplist *p = pa_proplist_new();
2365
2366 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2367 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2368 pa_proplist_free(p);
2369 } else {
2370 pa_tagstruct *t;
2371 uint32_t tag;
2372
2373 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2374 t = pa_tagstruct_command(
2375 s->context,
2376 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2377 &tag);
2378 pa_tagstruct_putu32(t, s->channel);
2379 pa_tagstruct_puts(t, name);
2380 pa_pstream_send_tagstruct(s->context->pstream, t);
2381 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2382 }
2383
2384 return o;
2385 }
2386
2387 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2388 pa_usec_t usec;
2389
2390 pa_assert(s);
2391 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2392
2393 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2394 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2395 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2396 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2397 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2398 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2399
2400 if (s->smoother)
2401 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2402 else
2403 usec = calc_time(s, FALSE);
2404
2405 /* Make sure the time runs monotonically */
2406 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2407 if (usec < s->previous_time)
2408 usec = s->previous_time;
2409 else
2410 s->previous_time = usec;
2411 }
2412
2413 if (r_usec)
2414 *r_usec = usec;
2415
2416 return 0;
2417 }
2418
2419 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2420 pa_assert(s);
2421 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2422
2423 if (negative)
2424 *negative = 0;
2425
2426 if (a >= b)
2427 return a-b;
2428 else {
2429 if (negative && s->direction == PA_STREAM_RECORD) {
2430 *negative = 1;
2431 return b-a;
2432 } else
2433 return 0;
2434 }
2435 }
2436
2437 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2438 pa_usec_t t, c;
2439 int r;
2440 int64_t cindex;
2441
2442 pa_assert(s);
2443 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2444 pa_assert(r_usec);
2445
2446 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2447 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2448 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2449 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2450 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2451 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2452
2453 if ((r = pa_stream_get_time(s, &t)) < 0)
2454 return r;
2455
2456 if (s->direction == PA_STREAM_PLAYBACK)
2457 cindex = s->timing_info.write_index;
2458 else
2459 cindex = s->timing_info.read_index;
2460
2461 if (cindex < 0)
2462 cindex = 0;
2463
2464 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2465
2466 if (s->direction == PA_STREAM_PLAYBACK)
2467 *r_usec = time_counter_diff(s, c, t, negative);
2468 else
2469 *r_usec = time_counter_diff(s, t, c, negative);
2470
2471 return 0;
2472 }
2473
2474 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2475 pa_assert(s);
2476 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2477
2478 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2479 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2480 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2481 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2482
2483 return &s->timing_info;
2484 }
2485
2486 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2487 pa_assert(s);
2488 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2489
2490 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2491
2492 return &s->sample_spec;
2493 }
2494
2495 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2496 pa_assert(s);
2497 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498
2499 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500
2501 return &s->channel_map;
2502 }
2503
2504 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2505 pa_assert(s);
2506 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2507
2508 /* We don't have the format till routing is done */
2509 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2510 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2511
2512 return s->format;
2513 }
2514 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2515 pa_assert(s);
2516 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2517
2518 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2519 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2520 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2521
2522 return &s->buffer_attr;
2523 }
2524
2525 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2526 pa_operation *o = userdata;
2527 int success = 1;
2528
2529 pa_assert(pd);
2530 pa_assert(o);
2531 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2532
2533 if (!o->context)
2534 goto finish;
2535
2536 if (command != PA_COMMAND_REPLY) {
2537 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2538 goto finish;
2539
2540 success = 0;
2541 } else {
2542 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2543 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2544 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2545 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2546 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2547 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2548 goto finish;
2549 }
2550 } else if (o->stream->direction == PA_STREAM_RECORD) {
2551 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2552 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2553 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2554 goto finish;
2555 }
2556 }
2557
2558 if (o->stream->context->version >= 13) {
2559 pa_usec_t usec;
2560
2561 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2562 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2563 goto finish;
2564 }
2565
2566 if (o->stream->direction == PA_STREAM_RECORD)
2567 o->stream->timing_info.configured_source_usec = usec;
2568 else
2569 o->stream->timing_info.configured_sink_usec = usec;
2570 }
2571
2572 if (!pa_tagstruct_eof(t)) {
2573 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2574 goto finish;
2575 }
2576 }
2577
2578 if (o->callback) {
2579 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2580 cb(o->stream, success, o->userdata);
2581 }
2582
2583 finish:
2584 pa_operation_done(o);
2585 pa_operation_unref(o);
2586 }
2587
2588
2589 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2590 pa_operation *o;
2591 pa_tagstruct *t;
2592 uint32_t tag;
2593 pa_buffer_attr copy;
2594
2595 pa_assert(s);
2596 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2597 pa_assert(attr);
2598
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2602 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2603
2604 /* Ask for a timing update before we cork/uncork to get the best
2605 * accuracy for the transport latency suitable for the
2606 * check_smoother_status() call in the started callback */
2607 request_auto_timing_update(s, TRUE);
2608
2609 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2610
2611 t = pa_tagstruct_command(
2612 s->context,
2613 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2614 &tag);
2615 pa_tagstruct_putu32(t, s->channel);
2616
2617 copy = *attr;
2618 patch_buffer_attr(s, &copy, NULL);
2619 attr = &copy;
2620
2621 pa_tagstruct_putu32(t, attr->maxlength);
2622
2623 if (s->direction == PA_STREAM_PLAYBACK)
2624 pa_tagstruct_put(
2625 t,
2626 PA_TAG_U32, attr->tlength,
2627 PA_TAG_U32, attr->prebuf,
2628 PA_TAG_U32, attr->minreq,
2629 PA_TAG_INVALID);
2630 else
2631 pa_tagstruct_putu32(t, attr->fragsize);
2632
2633 if (s->context->version >= 13)
2634 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2635
2636 if (s->context->version >= 14)
2637 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2638
2639 pa_pstream_send_tagstruct(s->context->pstream, t);
2640 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2641
2642 /* This might cause changes in the read/write indexex, hence let's
2643 * request a timing update */
2644 request_auto_timing_update(s, TRUE);
2645
2646 return o;
2647 }
2648
2649 uint32_t pa_stream_get_device_index(pa_stream *s) {
2650 pa_assert(s);
2651 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2652
2653 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2654 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2655 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2656 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2657 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2658
2659 return s->device_index;
2660 }
2661
2662 const char *pa_stream_get_device_name(pa_stream *s) {
2663 pa_assert(s);
2664 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665
2666 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2667 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2668 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2669 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2670 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2671
2672 return s->device_name;
2673 }
2674
2675 int pa_stream_is_suspended(pa_stream *s) {
2676 pa_assert(s);
2677 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2678
2679 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2680 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2681 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2682 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2683
2684 return s->suspended;
2685 }
2686
2687 int pa_stream_is_corked(pa_stream *s) {
2688 pa_assert(s);
2689 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2690
2691 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2692 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2693 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2694
2695 return s->corked;
2696 }
2697
2698 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2699 pa_operation *o = userdata;
2700 int success = 1;
2701
2702 pa_assert(pd);
2703 pa_assert(o);
2704 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2705
2706 if (!o->context)
2707 goto finish;
2708
2709 if (command != PA_COMMAND_REPLY) {
2710 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2711 goto finish;
2712
2713 success = 0;
2714 } else {
2715
2716 if (!pa_tagstruct_eof(t)) {
2717 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2718 goto finish;
2719 }
2720 }
2721
2722 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2723 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2724
2725 if (o->callback) {
2726 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2727 cb(o->stream, success, o->userdata);
2728 }
2729
2730 finish:
2731 pa_operation_done(o);
2732 pa_operation_unref(o);
2733 }
2734
2735
2736 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2737 pa_operation *o;
2738 pa_tagstruct *t;
2739 uint32_t tag;
2740
2741 pa_assert(s);
2742 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2743
2744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2745 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2746 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2747 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2749 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2750
2751 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2752 o->private = PA_UINT_TO_PTR(rate);
2753
2754 t = pa_tagstruct_command(
2755 s->context,
2756 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2757 &tag);
2758 pa_tagstruct_putu32(t, s->channel);
2759 pa_tagstruct_putu32(t, rate);
2760
2761 pa_pstream_send_tagstruct(s->context->pstream, t);
2762 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2763
2764 return o;
2765 }
2766
2767 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2768 pa_operation *o;
2769 pa_tagstruct *t;
2770 uint32_t tag;
2771
2772 pa_assert(s);
2773 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2774
2775 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2776 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2777 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2778 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2779 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2780
2781 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2782
2783 t = pa_tagstruct_command(
2784 s->context,
2785 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2786 &tag);
2787 pa_tagstruct_putu32(t, s->channel);
2788 pa_tagstruct_putu32(t, (uint32_t) mode);
2789 pa_tagstruct_put_proplist(t, p);
2790
2791 pa_pstream_send_tagstruct(s->context->pstream, t);
2792 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2793
2794 /* Please note that we don't update s->proplist here, because we
2795 * don't export that field */
2796
2797 return o;
2798 }
2799
2800 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2801 pa_operation *o;
2802 pa_tagstruct *t;
2803 uint32_t tag;
2804 const char * const*k;
2805
2806 pa_assert(s);
2807 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2808
2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2813 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2814
2815 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2816
2817 t = pa_tagstruct_command(
2818 s->context,
2819 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2820 &tag);
2821 pa_tagstruct_putu32(t, s->channel);
2822
2823 for (k = keys; *k; k++)
2824 pa_tagstruct_puts(t, *k);
2825
2826 pa_tagstruct_puts(t, NULL);
2827
2828 pa_pstream_send_tagstruct(s->context->pstream, t);
2829 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2830
2831 /* Please note that we don't update s->proplist here, because we
2832 * don't export that field */
2833
2834 return o;
2835 }
2836
2837 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2838 pa_assert(s);
2839 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2840
2841 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2842 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2843 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2844 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2845
2846 s->direct_on_input = sink_input_idx;
2847
2848 return 0;
2849 }
2850
2851 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2852 pa_assert(s);
2853 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2854
2855 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2856 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2857 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2858
2859 return s->direct_on_input;
2860 }