]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
3658064fa40bc53fbf417cfc40331ba89fc25011
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 unsigned int n_formats,
90 pa_proplist *p) {
91
92 pa_stream *s;
93 unsigned int i;
94
95 pa_assert(c);
96 pa_assert(PA_REFCNT_VALUE(c) >= 1);
97 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98 pa_assert(n_formats < PA_MAX_FORMATS);
99
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103 s = pa_xnew(pa_stream, 1);
104 PA_REFCNT_INIT(s);
105 s->context = c;
106 s->mainloop = c->mainloop;
107
108 s->direction = PA_STREAM_NODIRECTION;
109 s->state = PA_STREAM_UNCONNECTED;
110 s->flags = 0;
111
112 if (ss)
113 s->sample_spec = *ss;
114 else
115 s->sample_spec.format = PA_SAMPLE_INVALID;
116
117 if (map)
118 s->channel_map = *map;
119 else
120 pa_channel_map_init(&s->channel_map);
121
122 s->n_formats = 0;
123 if (formats) {
124 s->n_formats = n_formats;
125 for (i = 0; i < n_formats; i++)
126 s->req_formats[i] = pa_format_info_copy(formats[i]);
127 }
128
129 /* We'll get the final negotiated format after connecting */
130 s->format = NULL;
131
132 s->direct_on_input = PA_INVALID_INDEX;
133
134 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
135 if (name)
136 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
137
138 s->channel = 0;
139 s->channel_valid = FALSE;
140 s->syncid = c->csyncid++;
141 s->stream_index = PA_INVALID_INDEX;
142
143 s->requested_bytes = 0;
144 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
145
146 /* We initialize der target length here, so that if the user
147 * passes no explicit buffering metrics the default is similar to
148 * what older PA versions provided. */
149
150 s->buffer_attr.maxlength = (uint32_t) -1;
151 if (ss)
152 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
153 else {
154 /* FIXME: We assume a worst-case compressed format corresponding to
155 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156 pa_sample_spec tmp_ss = {
157 .format = PA_SAMPLE_S16NE,
158 .rate = 48000,
159 .channels = 2,
160 };
161 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
162 }
163 s->buffer_attr.minreq = (uint32_t) -1;
164 s->buffer_attr.prebuf = (uint32_t) -1;
165 s->buffer_attr.fragsize = (uint32_t) -1;
166
167 s->device_index = PA_INVALID_INDEX;
168 s->device_name = NULL;
169 s->suspended = FALSE;
170 s->corked = FALSE;
171
172 s->write_memblock = NULL;
173 s->write_data = NULL;
174
175 pa_memchunk_reset(&s->peek_memchunk);
176 s->peek_data = NULL;
177 s->record_memblockq = NULL;
178
179 memset(&s->timing_info, 0, sizeof(s->timing_info));
180 s->timing_info_valid = FALSE;
181
182 s->previous_time = 0;
183
184 s->read_index_not_before = 0;
185 s->write_index_not_before = 0;
186 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
187 s->write_index_corrections[i].valid = 0;
188 s->current_write_index_correction = 0;
189
190 s->auto_timing_update_event = NULL;
191 s->auto_timing_update_requested = FALSE;
192 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
193
194 reset_callbacks(s);
195
196 s->smoother = NULL;
197
198 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
199 PA_LLIST_PREPEND(pa_stream, c->streams, s);
200 pa_stream_ref(s);
201
202 return s;
203 }
204
205 pa_stream *pa_stream_new_with_proplist(
206 pa_context *c,
207 const char *name,
208 const pa_sample_spec *ss,
209 const pa_channel_map *map,
210 pa_proplist *p) {
211
212 pa_channel_map tmap;
213
214 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
215 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
219
220 if (!map)
221 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
222
223 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
224 }
225
226 pa_stream *pa_stream_new_extended(
227 pa_context *c,
228 const char *name,
229 pa_format_info * const *formats,
230 unsigned int n_formats,
231 pa_proplist *p) {
232
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239 pa_operation *o, *n;
240 pa_assert(s);
241
242 if (!s->context)
243 return;
244
245 /* Detach from context */
246
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
249 n = o->next;
250
251 if (o->stream == s)
252 pa_operation_cancel(o);
253 }
254
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261 s->channel = 0;
262 s->channel_valid = FALSE;
263 }
264
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266 pa_stream_unref(s);
267
268 s->context = NULL;
269
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
273 }
274
275 reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279 unsigned int i;
280
281 pa_assert(s);
282
283 stream_unlink(s);
284
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
288 }
289
290 if (s->peek_memchunk.memblock) {
291 if (s->peek_data)
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
294 }
295
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
298
299 if (s->proplist)
300 pa_proplist_free(s->proplist);
301
302 if (s->smoother)
303 pa_smoother_free(s->smoother);
304
305 for (i = 0; i < s->n_formats; i++)
306 pa_format_info_free(s->req_formats[i]);
307
308 if (s->format)
309 pa_format_info_free(s->format);
310
311 pa_xfree(s->device_name);
312 pa_xfree(s);
313 }
314
315 void pa_stream_unref(pa_stream *s) {
316 pa_assert(s);
317 pa_assert(PA_REFCNT_VALUE(s) >= 1);
318
319 if (PA_REFCNT_DEC(s) <= 0)
320 stream_free(s);
321 }
322
323 pa_stream* pa_stream_ref(pa_stream *s) {
324 pa_assert(s);
325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
326
327 PA_REFCNT_INC(s);
328 return s;
329 }
330
331 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
332 pa_assert(s);
333 pa_assert(PA_REFCNT_VALUE(s) >= 1);
334
335 return s->state;
336 }
337
338 pa_context* pa_stream_get_context(pa_stream *s) {
339 pa_assert(s);
340 pa_assert(PA_REFCNT_VALUE(s) >= 1);
341
342 return s->context;
343 }
344
345 uint32_t pa_stream_get_index(pa_stream *s) {
346 pa_assert(s);
347 pa_assert(PA_REFCNT_VALUE(s) >= 1);
348
349 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
350 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
351
352 return s->stream_index;
353 }
354
355 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
356 pa_assert(s);
357 pa_assert(PA_REFCNT_VALUE(s) >= 1);
358
359 if (s->state == st)
360 return;
361
362 pa_stream_ref(s);
363
364 s->state = st;
365
366 if (s->state_callback)
367 s->state_callback(s, s->state_userdata);
368
369 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
370 stream_unlink(s);
371
372 pa_stream_unref(s);
373 }
374
375 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
376 pa_assert(s);
377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
378
379 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
380 return;
381
382 if (s->state == PA_STREAM_READY &&
383 (force || !s->auto_timing_update_requested)) {
384 pa_operation *o;
385
386 /* pa_log("Automatically requesting new timing data"); */
387
388 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
389 pa_operation_unref(o);
390 s->auto_timing_update_requested = TRUE;
391 }
392 }
393
394 if (s->auto_timing_update_event) {
395 if (s->suspended && !force) {
396 pa_assert(s->mainloop);
397 s->mainloop->time_free(s->auto_timing_update_event);
398 s->auto_timing_update_event = NULL;
399 } else {
400 if (force)
401 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
402
403 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
404
405 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
406 }
407 }
408 }
409
410 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
411 pa_context *c = userdata;
412 pa_stream *s;
413 uint32_t channel;
414
415 pa_assert(pd);
416 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
417 pa_assert(t);
418 pa_assert(c);
419 pa_assert(PA_REFCNT_VALUE(c) >= 1);
420
421 pa_context_ref(c);
422
423 if (pa_tagstruct_getu32(t, &channel) < 0 ||
424 !pa_tagstruct_eof(t)) {
425 pa_context_fail(c, PA_ERR_PROTOCOL);
426 goto finish;
427 }
428
429 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
430 goto finish;
431
432 if (s->state != PA_STREAM_READY)
433 goto finish;
434
435 pa_context_set_error(c, PA_ERR_KILLED);
436 pa_stream_set_state(s, PA_STREAM_FAILED);
437
438 finish:
439 pa_context_unref(c);
440 }
441
442 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
443 pa_usec_t x;
444
445 pa_assert(s);
446 pa_assert(!force_start || !force_stop);
447
448 if (!s->smoother)
449 return;
450
451 x = pa_rtclock_now();
452
453 if (s->timing_info_valid) {
454 if (aposteriori)
455 x -= s->timing_info.transport_usec;
456 else
457 x += s->timing_info.transport_usec;
458 }
459
460 if (s->suspended || s->corked || force_stop)
461 pa_smoother_pause(s->smoother, x);
462 else if (force_start || s->buffer_attr.prebuf == 0) {
463
464 if (!s->timing_info_valid &&
465 !aposteriori &&
466 !force_start &&
467 !force_stop &&
468 s->context->version >= 13) {
469
470 /* If the server supports STARTED events we take them as
471 * indications when audio really starts/stops playing, if
472 * we don't have any timing info yet -- instead of trying
473 * to be smart and guessing the server time. Otherwise the
474 * unknown transport delay add too much noise to our time
475 * calculations. */
476
477 return;
478 }
479
480 pa_smoother_resume(s->smoother, x, TRUE);
481 }
482
483 /* Please note that we have no idea if playback actually started
484 * if prebuf is non-zero! */
485 }
486
487 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
488
489 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
491 pa_stream *s;
492 uint32_t channel;
493 const char *dn;
494 pa_bool_t suspended;
495 uint32_t di;
496 pa_usec_t usec = 0;
497 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
498
499 pa_assert(pd);
500 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
501 pa_assert(t);
502 pa_assert(c);
503 pa_assert(PA_REFCNT_VALUE(c) >= 1);
504
505 pa_context_ref(c);
506
507 if (c->version < 12) {
508 pa_context_fail(c, PA_ERR_PROTOCOL);
509 goto finish;
510 }
511
512 if (pa_tagstruct_getu32(t, &channel) < 0 ||
513 pa_tagstruct_getu32(t, &di) < 0 ||
514 pa_tagstruct_gets(t, &dn) < 0 ||
515 pa_tagstruct_get_boolean(t, &suspended) < 0) {
516 pa_context_fail(c, PA_ERR_PROTOCOL);
517 goto finish;
518 }
519
520 if (c->version >= 13) {
521
522 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
523 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
524 pa_tagstruct_getu32(t, &fragsize) < 0 ||
525 pa_tagstruct_get_usec(t, &usec) < 0) {
526 pa_context_fail(c, PA_ERR_PROTOCOL);
527 goto finish;
528 }
529 } else {
530 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
531 pa_tagstruct_getu32(t, &tlength) < 0 ||
532 pa_tagstruct_getu32(t, &prebuf) < 0 ||
533 pa_tagstruct_getu32(t, &minreq) < 0 ||
534 pa_tagstruct_get_usec(t, &usec) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
536 goto finish;
537 }
538 }
539 }
540
541 if (!pa_tagstruct_eof(t)) {
542 pa_context_fail(c, PA_ERR_PROTOCOL);
543 goto finish;
544 }
545
546 if (!dn || di == PA_INVALID_INDEX) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
548 goto finish;
549 }
550
551 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
552 goto finish;
553
554 if (s->state != PA_STREAM_READY)
555 goto finish;
556
557 if (c->version >= 13) {
558 if (s->direction == PA_STREAM_RECORD)
559 s->timing_info.configured_source_usec = usec;
560 else
561 s->timing_info.configured_sink_usec = usec;
562
563 s->buffer_attr.maxlength = maxlength;
564 s->buffer_attr.fragsize = fragsize;
565 s->buffer_attr.tlength = tlength;
566 s->buffer_attr.prebuf = prebuf;
567 s->buffer_attr.minreq = minreq;
568 }
569
570 pa_xfree(s->device_name);
571 s->device_name = pa_xstrdup(dn);
572 s->device_index = di;
573
574 s->suspended = suspended;
575
576 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
577 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
578 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
579 request_auto_timing_update(s, TRUE);
580 }
581
582 check_smoother_status(s, TRUE, FALSE, FALSE);
583 request_auto_timing_update(s, TRUE);
584
585 if (s->moved_callback)
586 s->moved_callback(s, s->moved_userdata);
587
588 finish:
589 pa_context_unref(c);
590 }
591
592 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
593 pa_context *c = userdata;
594 pa_stream *s;
595 uint32_t channel;
596 pa_usec_t usec = 0;
597 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
598
599 pa_assert(pd);
600 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
601 pa_assert(t);
602 pa_assert(c);
603 pa_assert(PA_REFCNT_VALUE(c) >= 1);
604
605 pa_context_ref(c);
606
607 if (c->version < 15) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
609 goto finish;
610 }
611
612 if (pa_tagstruct_getu32(t, &channel) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
614 goto finish;
615 }
616
617 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
618 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
619 pa_tagstruct_getu32(t, &fragsize) < 0 ||
620 pa_tagstruct_get_usec(t, &usec) < 0) {
621 pa_context_fail(c, PA_ERR_PROTOCOL);
622 goto finish;
623 }
624 } else {
625 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
626 pa_tagstruct_getu32(t, &tlength) < 0 ||
627 pa_tagstruct_getu32(t, &prebuf) < 0 ||
628 pa_tagstruct_getu32(t, &minreq) < 0 ||
629 pa_tagstruct_get_usec(t, &usec) < 0) {
630 pa_context_fail(c, PA_ERR_PROTOCOL);
631 goto finish;
632 }
633 }
634
635 if (!pa_tagstruct_eof(t)) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
637 goto finish;
638 }
639
640 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
641 goto finish;
642
643 if (s->state != PA_STREAM_READY)
644 goto finish;
645
646 if (s->direction == PA_STREAM_RECORD)
647 s->timing_info.configured_source_usec = usec;
648 else
649 s->timing_info.configured_sink_usec = usec;
650
651 s->buffer_attr.maxlength = maxlength;
652 s->buffer_attr.fragsize = fragsize;
653 s->buffer_attr.tlength = tlength;
654 s->buffer_attr.prebuf = prebuf;
655 s->buffer_attr.minreq = minreq;
656
657 request_auto_timing_update(s, TRUE);
658
659 if (s->buffer_attr_callback)
660 s->buffer_attr_callback(s, s->buffer_attr_userdata);
661
662 finish:
663 pa_context_unref(c);
664 }
665
666 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
667 pa_context *c = userdata;
668 pa_stream *s;
669 uint32_t channel;
670 pa_bool_t suspended;
671
672 pa_assert(pd);
673 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
674 pa_assert(t);
675 pa_assert(c);
676 pa_assert(PA_REFCNT_VALUE(c) >= 1);
677
678 pa_context_ref(c);
679
680 if (c->version < 12) {
681 pa_context_fail(c, PA_ERR_PROTOCOL);
682 goto finish;
683 }
684
685 if (pa_tagstruct_getu32(t, &channel) < 0 ||
686 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
687 !pa_tagstruct_eof(t)) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
689 goto finish;
690 }
691
692 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
693 goto finish;
694
695 if (s->state != PA_STREAM_READY)
696 goto finish;
697
698 s->suspended = suspended;
699
700 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
701 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
702 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
703 request_auto_timing_update(s, TRUE);
704 }
705
706 check_smoother_status(s, TRUE, FALSE, FALSE);
707 request_auto_timing_update(s, TRUE);
708
709 if (s->suspended_callback)
710 s->suspended_callback(s, s->suspended_userdata);
711
712 finish:
713 pa_context_unref(c);
714 }
715
716 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 pa_context *c = userdata;
718 pa_stream *s;
719 uint32_t channel;
720
721 pa_assert(pd);
722 pa_assert(command == PA_COMMAND_STARTED);
723 pa_assert(t);
724 pa_assert(c);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727 pa_context_ref(c);
728
729 if (c->version < 13) {
730 pa_context_fail(c, PA_ERR_PROTOCOL);
731 goto finish;
732 }
733
734 if (pa_tagstruct_getu32(t, &channel) < 0 ||
735 !pa_tagstruct_eof(t)) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
737 goto finish;
738 }
739
740 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
741 goto finish;
742
743 if (s->state != PA_STREAM_READY)
744 goto finish;
745
746 check_smoother_status(s, TRUE, TRUE, FALSE);
747 request_auto_timing_update(s, TRUE);
748
749 if (s->started_callback)
750 s->started_callback(s, s->started_userdata);
751
752 finish:
753 pa_context_unref(c);
754 }
755
756 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757 pa_context *c = userdata;
758 pa_stream *s;
759 uint32_t channel;
760 pa_proplist *pl = NULL;
761 const char *event;
762
763 pa_assert(pd);
764 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
765 pa_assert(t);
766 pa_assert(c);
767 pa_assert(PA_REFCNT_VALUE(c) >= 1);
768
769 pa_context_ref(c);
770
771 if (c->version < 15) {
772 pa_context_fail(c, PA_ERR_PROTOCOL);
773 goto finish;
774 }
775
776 pl = pa_proplist_new();
777
778 if (pa_tagstruct_getu32(t, &channel) < 0 ||
779 pa_tagstruct_gets(t, &event) < 0 ||
780 pa_tagstruct_get_proplist(t, pl) < 0 ||
781 !pa_tagstruct_eof(t) || !event) {
782 pa_context_fail(c, PA_ERR_PROTOCOL);
783 goto finish;
784 }
785
786 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
787 goto finish;
788
789 if (s->state != PA_STREAM_READY)
790 goto finish;
791
792 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
793 /* Let client know what the running time was when the stream had to be killed */
794 pa_usec_t stream_time;
795 if (pa_stream_get_time(s, &stream_time) == 0)
796 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
797 }
798
799 if (s->event_callback)
800 s->event_callback(s, event, pl, s->event_userdata);
801
802 finish:
803 pa_context_unref(c);
804
805 if (pl)
806 pa_proplist_free(pl);
807 }
808
809 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
810 pa_stream *s;
811 pa_context *c = userdata;
812 uint32_t bytes, channel;
813
814 pa_assert(pd);
815 pa_assert(command == PA_COMMAND_REQUEST);
816 pa_assert(t);
817 pa_assert(c);
818 pa_assert(PA_REFCNT_VALUE(c) >= 1);
819
820 pa_context_ref(c);
821
822 if (pa_tagstruct_getu32(t, &channel) < 0 ||
823 pa_tagstruct_getu32(t, &bytes) < 0 ||
824 !pa_tagstruct_eof(t)) {
825 pa_context_fail(c, PA_ERR_PROTOCOL);
826 goto finish;
827 }
828
829 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
830 goto finish;
831
832 if (s->state != PA_STREAM_READY)
833 goto finish;
834
835 s->requested_bytes += bytes;
836
837 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
838
839 if (s->requested_bytes > 0 && s->write_callback)
840 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
841
842 finish:
843 pa_context_unref(c);
844 }
845
846 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
847 pa_stream *s;
848 pa_context *c = userdata;
849 uint32_t channel;
850
851 pa_assert(pd);
852 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
853 pa_assert(t);
854 pa_assert(c);
855 pa_assert(PA_REFCNT_VALUE(c) >= 1);
856
857 pa_context_ref(c);
858
859 if (pa_tagstruct_getu32(t, &channel) < 0 ||
860 !pa_tagstruct_eof(t)) {
861 pa_context_fail(c, PA_ERR_PROTOCOL);
862 goto finish;
863 }
864
865 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
866 goto finish;
867
868 if (s->state != PA_STREAM_READY)
869 goto finish;
870
871 if (s->buffer_attr.prebuf > 0)
872 check_smoother_status(s, TRUE, FALSE, TRUE);
873
874 request_auto_timing_update(s, TRUE);
875
876 if (command == PA_COMMAND_OVERFLOW) {
877 if (s->overflow_callback)
878 s->overflow_callback(s, s->overflow_userdata);
879 } else if (command == PA_COMMAND_UNDERFLOW) {
880 if (s->underflow_callback)
881 s->underflow_callback(s, s->underflow_userdata);
882 }
883
884 finish:
885 pa_context_unref(c);
886 }
887
888 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
889 pa_assert(s);
890 pa_assert(PA_REFCNT_VALUE(s) >= 1);
891
892 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
893
894 if (s->state != PA_STREAM_READY)
895 return;
896
897 if (w) {
898 s->write_index_not_before = s->context->ctag;
899
900 if (s->timing_info_valid)
901 s->timing_info.write_index_corrupt = TRUE;
902
903 /* pa_log("write_index invalidated"); */
904 }
905
906 if (r) {
907 s->read_index_not_before = s->context->ctag;
908
909 if (s->timing_info_valid)
910 s->timing_info.read_index_corrupt = TRUE;
911
912 /* pa_log("read_index invalidated"); */
913 }
914
915 request_auto_timing_update(s, TRUE);
916 }
917
918 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
919 pa_stream *s = userdata;
920
921 pa_assert(s);
922 pa_assert(PA_REFCNT_VALUE(s) >= 1);
923
924 pa_stream_ref(s);
925 request_auto_timing_update(s, FALSE);
926 pa_stream_unref(s);
927 }
928
929 static void create_stream_complete(pa_stream *s) {
930 pa_assert(s);
931 pa_assert(PA_REFCNT_VALUE(s) >= 1);
932 pa_assert(s->state == PA_STREAM_CREATING);
933
934 pa_stream_set_state(s, PA_STREAM_READY);
935
936 if (s->requested_bytes > 0 && s->write_callback)
937 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
938
939 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
940 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
941 pa_assert(!s->auto_timing_update_event);
942 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
943
944 request_auto_timing_update(s, TRUE);
945 }
946
947 check_smoother_status(s, TRUE, FALSE, FALSE);
948 }
949
950 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
951 const char *e;
952
953 pa_assert(s);
954 pa_assert(attr);
955
956 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
957 uint32_t ms;
958
959 if (pa_atou(e, &ms) < 0 || ms <= 0)
960 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
961 else {
962 attr->maxlength = (uint32_t) -1;
963 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
964 attr->minreq = (uint32_t) -1;
965 attr->prebuf = (uint32_t) -1;
966 attr->fragsize = attr->tlength;
967 }
968
969 if (flags)
970 *flags |= PA_STREAM_ADJUST_LATENCY;
971 }
972
973 if (s->context->version >= 13)
974 return;
975
976 /* Version older than 0.9.10 didn't do server side buffer_attr
977 * selection, hence we have to fake it on the client side. */
978
979 /* We choose fairly conservative values here, to not confuse
980 * old clients with extremely large playback buffers */
981
982 if (attr->maxlength == (uint32_t) -1)
983 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
984
985 if (attr->tlength == (uint32_t) -1)
986 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
987
988 if (attr->minreq == (uint32_t) -1)
989 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
990
991 if (attr->prebuf == (uint32_t) -1)
992 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
993
994 if (attr->fragsize == (uint32_t) -1)
995 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
996 }
997
998 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
999 pa_stream *s = userdata;
1000 uint32_t requested_bytes = 0;
1001
1002 pa_assert(pd);
1003 pa_assert(s);
1004 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1005 pa_assert(s->state == PA_STREAM_CREATING);
1006
1007 pa_stream_ref(s);
1008
1009 if (command != PA_COMMAND_REPLY) {
1010 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1011 goto finish;
1012
1013 pa_stream_set_state(s, PA_STREAM_FAILED);
1014 goto finish;
1015 }
1016
1017 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1018 s->channel == PA_INVALID_INDEX ||
1019 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1020 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1021 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1022 goto finish;
1023 }
1024
1025 s->requested_bytes = (int64_t) requested_bytes;
1026
1027 if (s->context->version >= 9) {
1028 if (s->direction == PA_STREAM_PLAYBACK) {
1029 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1030 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1031 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1032 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1033 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1034 goto finish;
1035 }
1036 } else if (s->direction == PA_STREAM_RECORD) {
1037 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1038 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1039 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1040 goto finish;
1041 }
1042 }
1043 }
1044
1045 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1046 pa_sample_spec ss;
1047 pa_channel_map cm;
1048 const char *dn = NULL;
1049 pa_bool_t suspended;
1050
1051 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1052 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1053 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1054 pa_tagstruct_gets(t, &dn) < 0 ||
1055 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1056 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057 goto finish;
1058 }
1059
1060 if (!dn || s->device_index == PA_INVALID_INDEX ||
1061 ss.channels != cm.channels ||
1062 !pa_channel_map_valid(&cm) ||
1063 !pa_sample_spec_valid(&ss) ||
1064 (s->n_formats == 0 && (
1065 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1066 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1067 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1068 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1069 goto finish;
1070 }
1071
1072 pa_xfree(s->device_name);
1073 s->device_name = pa_xstrdup(dn);
1074 s->suspended = suspended;
1075
1076 s->channel_map = cm;
1077 s->sample_spec = ss;
1078 }
1079
1080 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1081 pa_usec_t usec;
1082
1083 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1084 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1085 goto finish;
1086 }
1087
1088 if (s->direction == PA_STREAM_RECORD)
1089 s->timing_info.configured_source_usec = usec;
1090 else
1091 s->timing_info.configured_sink_usec = usec;
1092 }
1093
1094 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1095 || s->context->version >= 22) {
1096
1097 pa_format_info *f = pa_format_info_new();
1098 pa_tagstruct_get_format_info(t, f);
1099
1100 if (pa_format_info_valid(f))
1101 s->format = f;
1102 else {
1103 pa_format_info_free(f);
1104 if (s->n_formats > 0) {
1105 /* We used the extended API, so we should have got back a proper format */
1106 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1107 goto finish;
1108 }
1109 }
1110 }
1111
1112 if (!pa_tagstruct_eof(t)) {
1113 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1114 goto finish;
1115 }
1116
1117 if (s->direction == PA_STREAM_RECORD) {
1118 pa_assert(!s->record_memblockq);
1119
1120 s->record_memblockq = pa_memblockq_new(
1121 0,
1122 s->buffer_attr.maxlength,
1123 0,
1124 pa_frame_size(&s->sample_spec),
1125 1,
1126 0,
1127 0,
1128 NULL);
1129 }
1130
1131 s->channel_valid = TRUE;
1132 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1133
1134 create_stream_complete(s);
1135
1136 finish:
1137 pa_stream_unref(s);
1138 }
1139
1140 static int create_stream(
1141 pa_stream_direction_t direction,
1142 pa_stream *s,
1143 const char *dev,
1144 const pa_buffer_attr *attr,
1145 pa_stream_flags_t flags,
1146 const pa_cvolume *volume,
1147 pa_stream *sync_stream) {
1148
1149 pa_tagstruct *t;
1150 uint32_t tag;
1151 pa_bool_t volume_set = !!volume;
1152 pa_cvolume cv;
1153 uint32_t i;
1154
1155 pa_assert(s);
1156 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1157 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1158
1159 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1160 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1161 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1162 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1163 PA_STREAM_INTERPOLATE_TIMING|
1164 PA_STREAM_NOT_MONOTONIC|
1165 PA_STREAM_AUTO_TIMING_UPDATE|
1166 PA_STREAM_NO_REMAP_CHANNELS|
1167 PA_STREAM_NO_REMIX_CHANNELS|
1168 PA_STREAM_FIX_FORMAT|
1169 PA_STREAM_FIX_RATE|
1170 PA_STREAM_FIX_CHANNELS|
1171 PA_STREAM_DONT_MOVE|
1172 PA_STREAM_VARIABLE_RATE|
1173 PA_STREAM_PEAK_DETECT|
1174 PA_STREAM_START_MUTED|
1175 PA_STREAM_ADJUST_LATENCY|
1176 PA_STREAM_EARLY_REQUESTS|
1177 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1178 PA_STREAM_START_UNMUTED|
1179 PA_STREAM_FAIL_ON_SUSPEND|
1180 PA_STREAM_RELATIVE_VOLUME|
1181 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1182
1183
1184 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1185 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1186 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1187 /* Althought some of the other flags are not supported on older
1188 * version, we don't check for them here, because it doesn't hurt
1189 * when they are passed but actually not supported. This makes
1190 * client development easier */
1191
1192 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1193 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1194 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1195 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1196 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1197
1198 pa_stream_ref(s);
1199
1200 s->direction = direction;
1201
1202 if (sync_stream)
1203 s->syncid = sync_stream->syncid;
1204
1205 if (attr)
1206 s->buffer_attr = *attr;
1207 patch_buffer_attr(s, &s->buffer_attr, &flags);
1208
1209 s->flags = flags;
1210 s->corked = !!(flags & PA_STREAM_START_CORKED);
1211
1212 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1213 pa_usec_t x;
1214
1215 x = pa_rtclock_now();
1216
1217 pa_assert(!s->smoother);
1218 s->smoother = pa_smoother_new(
1219 SMOOTHER_ADJUST_TIME,
1220 SMOOTHER_HISTORY_TIME,
1221 !(flags & PA_STREAM_NOT_MONOTONIC),
1222 TRUE,
1223 SMOOTHER_MIN_HISTORY,
1224 x,
1225 TRUE);
1226 }
1227
1228 if (!dev)
1229 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1230
1231 t = pa_tagstruct_command(
1232 s->context,
1233 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1234 &tag);
1235
1236 if (s->context->version < 13)
1237 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1238
1239 pa_tagstruct_put(
1240 t,
1241 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1242 PA_TAG_CHANNEL_MAP, &s->channel_map,
1243 PA_TAG_U32, PA_INVALID_INDEX,
1244 PA_TAG_STRING, dev,
1245 PA_TAG_U32, s->buffer_attr.maxlength,
1246 PA_TAG_BOOLEAN, s->corked,
1247 PA_TAG_INVALID);
1248
1249 if (!volume) {
1250 if (pa_sample_spec_valid(&s->sample_spec))
1251 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1252 else {
1253 /* This is not really relevant, since no volume was set, and
1254 * the real number of channels is embedded in the format_info
1255 * structure */
1256 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1257 }
1258 }
1259
1260 if (s->direction == PA_STREAM_PLAYBACK) {
1261 pa_tagstruct_put(
1262 t,
1263 PA_TAG_U32, s->buffer_attr.tlength,
1264 PA_TAG_U32, s->buffer_attr.prebuf,
1265 PA_TAG_U32, s->buffer_attr.minreq,
1266 PA_TAG_U32, s->syncid,
1267 PA_TAG_INVALID);
1268
1269 pa_tagstruct_put_cvolume(t, volume);
1270 } else
1271 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1272
1273 if (s->context->version >= 12) {
1274 pa_tagstruct_put(
1275 t,
1276 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1277 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1278 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1279 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1280 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1281 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1282 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1283 PA_TAG_INVALID);
1284 }
1285
1286 if (s->context->version >= 13) {
1287
1288 if (s->direction == PA_STREAM_PLAYBACK)
1289 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1290 else
1291 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1292
1293 pa_tagstruct_put(
1294 t,
1295 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1296 PA_TAG_PROPLIST, s->proplist,
1297 PA_TAG_INVALID);
1298
1299 if (s->direction == PA_STREAM_RECORD)
1300 pa_tagstruct_putu32(t, s->direct_on_input);
1301 }
1302
1303 if (s->context->version >= 14) {
1304
1305 if (s->direction == PA_STREAM_PLAYBACK)
1306 pa_tagstruct_put_boolean(t, volume_set);
1307
1308 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1309 }
1310
1311 if (s->context->version >= 15) {
1312
1313 if (s->direction == PA_STREAM_PLAYBACK)
1314 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1315
1316 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1317 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1318 }
1319
1320 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1321 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1322
1323 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1324 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1325
1326 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1327 || s->context->version >= 22) {
1328
1329 pa_tagstruct_putu8(t, s->n_formats);
1330 for (i = 0; i < s->n_formats; i++)
1331 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1332 }
1333
1334 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1335 pa_tagstruct_put_cvolume(t, volume);
1336 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1337 pa_tagstruct_put_boolean(t, volume_set);
1338 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1339 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1340 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1341 }
1342
1343 pa_pstream_send_tagstruct(s->context->pstream, t);
1344 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1345
1346 pa_stream_set_state(s, PA_STREAM_CREATING);
1347
1348 pa_stream_unref(s);
1349 return 0;
1350 }
1351
1352 int pa_stream_connect_playback(
1353 pa_stream *s,
1354 const char *dev,
1355 const pa_buffer_attr *attr,
1356 pa_stream_flags_t flags,
1357 const pa_cvolume *volume,
1358 pa_stream *sync_stream) {
1359
1360 pa_assert(s);
1361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1362
1363 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1364 }
1365
1366 int pa_stream_connect_record(
1367 pa_stream *s,
1368 const char *dev,
1369 const pa_buffer_attr *attr,
1370 pa_stream_flags_t flags) {
1371
1372 pa_assert(s);
1373 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1374
1375 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1376 }
1377
1378 int pa_stream_begin_write(
1379 pa_stream *s,
1380 void **data,
1381 size_t *nbytes) {
1382
1383 pa_assert(s);
1384 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1385
1386 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1387 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1388 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1389 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1390 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1391
1392 if (*nbytes != (size_t) -1) {
1393 size_t m, fs;
1394
1395 m = pa_mempool_block_size_max(s->context->mempool);
1396 fs = pa_frame_size(&s->sample_spec);
1397
1398 m = (m / fs) * fs;
1399 if (*nbytes > m)
1400 *nbytes = m;
1401 }
1402
1403 if (!s->write_memblock) {
1404 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1405 s->write_data = pa_memblock_acquire(s->write_memblock);
1406 }
1407
1408 *data = s->write_data;
1409 *nbytes = pa_memblock_get_length(s->write_memblock);
1410
1411 return 0;
1412 }
1413
1414 int pa_stream_cancel_write(
1415 pa_stream *s) {
1416
1417 pa_assert(s);
1418 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1419
1420 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1421 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1422 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1423 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1424
1425 pa_assert(s->write_data);
1426
1427 pa_memblock_release(s->write_memblock);
1428 pa_memblock_unref(s->write_memblock);
1429 s->write_memblock = NULL;
1430 s->write_data = NULL;
1431
1432 return 0;
1433 }
1434
1435 int pa_stream_write(
1436 pa_stream *s,
1437 const void *data,
1438 size_t length,
1439 pa_free_cb_t free_cb,
1440 int64_t offset,
1441 pa_seek_mode_t seek) {
1442
1443 pa_assert(s);
1444 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1445 pa_assert(data);
1446
1447 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1448 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1449 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1450 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1451 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1452 PA_CHECK_VALIDITY(s->context,
1453 !s->write_memblock ||
1454 ((data >= s->write_data) &&
1455 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1456 PA_ERR_INVALID);
1457 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1458
1459 if (s->write_memblock) {
1460 pa_memchunk chunk;
1461
1462 /* pa_stream_write_begin() was called before */
1463
1464 pa_memblock_release(s->write_memblock);
1465
1466 chunk.memblock = s->write_memblock;
1467 chunk.index = (const char *) data - (const char *) s->write_data;
1468 chunk.length = length;
1469
1470 s->write_memblock = NULL;
1471 s->write_data = NULL;
1472
1473 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1474 pa_memblock_unref(chunk.memblock);
1475
1476 } else {
1477 pa_seek_mode_t t_seek = seek;
1478 int64_t t_offset = offset;
1479 size_t t_length = length;
1480 const void *t_data = data;
1481
1482 /* pa_stream_write_begin() was not called before */
1483
1484 while (t_length > 0) {
1485 pa_memchunk chunk;
1486
1487 chunk.index = 0;
1488
1489 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1490 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1491 chunk.length = t_length;
1492 } else {
1493 void *d;
1494
1495 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1496 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1497
1498 d = pa_memblock_acquire(chunk.memblock);
1499 memcpy(d, t_data, chunk.length);
1500 pa_memblock_release(chunk.memblock);
1501 }
1502
1503 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1504
1505 t_offset = 0;
1506 t_seek = PA_SEEK_RELATIVE;
1507
1508 t_data = (const uint8_t*) t_data + chunk.length;
1509 t_length -= chunk.length;
1510
1511 pa_memblock_unref(chunk.memblock);
1512 }
1513
1514 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1515 free_cb((void*) data);
1516 }
1517
1518 /* This is obviously wrong since we ignore the seeking index . But
1519 * that's OK, the server side applies the same error */
1520 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1521
1522 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1523
1524 if (s->direction == PA_STREAM_PLAYBACK) {
1525
1526 /* Update latency request correction */
1527 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1528
1529 if (seek == PA_SEEK_ABSOLUTE) {
1530 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1531 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1532 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1533 } else if (seek == PA_SEEK_RELATIVE) {
1534 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1535 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1536 } else
1537 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1538 }
1539
1540 /* Update the write index in the already available latency data */
1541 if (s->timing_info_valid) {
1542
1543 if (seek == PA_SEEK_ABSOLUTE) {
1544 s->timing_info.write_index_corrupt = FALSE;
1545 s->timing_info.write_index = offset + (int64_t) length;
1546 } else if (seek == PA_SEEK_RELATIVE) {
1547 if (!s->timing_info.write_index_corrupt)
1548 s->timing_info.write_index += offset + (int64_t) length;
1549 } else
1550 s->timing_info.write_index_corrupt = TRUE;
1551 }
1552
1553 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1554 request_auto_timing_update(s, TRUE);
1555 }
1556
1557 return 0;
1558 }
1559
1560 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1561 pa_assert(s);
1562 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1563 pa_assert(data);
1564 pa_assert(length);
1565
1566 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1567 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1568 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1569
1570 if (!s->peek_memchunk.memblock) {
1571
1572 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1573 *data = NULL;
1574 *length = 0;
1575 return 0;
1576 }
1577
1578 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1579 }
1580
1581 pa_assert(s->peek_data);
1582 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1583 *length = s->peek_memchunk.length;
1584 return 0;
1585 }
1586
1587 int pa_stream_drop(pa_stream *s) {
1588 pa_assert(s);
1589 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1590
1591 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1592 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1593 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1594 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1595
1596 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1597
1598 /* Fix the simulated local read index */
1599 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1600 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1601
1602 pa_assert(s->peek_data);
1603 pa_memblock_release(s->peek_memchunk.memblock);
1604 pa_memblock_unref(s->peek_memchunk.memblock);
1605 pa_memchunk_reset(&s->peek_memchunk);
1606
1607 return 0;
1608 }
1609
1610 size_t pa_stream_writable_size(pa_stream *s) {
1611 pa_assert(s);
1612 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1613
1614 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1615 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1616 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1617
1618 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1619 }
1620
1621 size_t pa_stream_readable_size(pa_stream *s) {
1622 pa_assert(s);
1623 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1624
1625 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1626 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1627 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1628
1629 return pa_memblockq_get_length(s->record_memblockq);
1630 }
1631
1632 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1633 pa_operation *o;
1634 pa_tagstruct *t;
1635 uint32_t tag;
1636
1637 pa_assert(s);
1638 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1639
1640 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1641 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1643
1644 /* Ask for a timing update before we cork/uncork to get the best
1645 * accuracy for the transport latency suitable for the
1646 * check_smoother_status() call in the started callback */
1647 request_auto_timing_update(s, TRUE);
1648
1649 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1650
1651 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1652 pa_tagstruct_putu32(t, s->channel);
1653 pa_pstream_send_tagstruct(s->context->pstream, t);
1654 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1655
1656 /* This might cause the read index to continue again, hence
1657 * let's request a timing update */
1658 request_auto_timing_update(s, TRUE);
1659
1660 return o;
1661 }
1662
1663 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1664 pa_usec_t usec;
1665
1666 pa_assert(s);
1667 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1668 pa_assert(s->state == PA_STREAM_READY);
1669 pa_assert(s->direction != PA_STREAM_UPLOAD);
1670 pa_assert(s->timing_info_valid);
1671 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1672 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1673
1674 if (s->direction == PA_STREAM_PLAYBACK) {
1675 /* The last byte that was written into the output device
1676 * had this time value associated */
1677 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1678
1679 if (!s->corked && !s->suspended) {
1680
1681 if (!ignore_transport)
1682 /* Because the latency info took a little time to come
1683 * to us, we assume that the real output time is actually
1684 * a little ahead */
1685 usec += s->timing_info.transport_usec;
1686
1687 /* However, the output device usually maintains a buffer
1688 too, hence the real sample currently played is a little
1689 back */
1690 if (s->timing_info.sink_usec >= usec)
1691 usec = 0;
1692 else
1693 usec -= s->timing_info.sink_usec;
1694 }
1695
1696 } else {
1697 pa_assert(s->direction == PA_STREAM_RECORD);
1698
1699 /* The last byte written into the server side queue had
1700 * this time value associated */
1701 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1702
1703 if (!s->corked && !s->suspended) {
1704
1705 if (!ignore_transport)
1706 /* Add transport latency */
1707 usec += s->timing_info.transport_usec;
1708
1709 /* Add latency of data in device buffer */
1710 usec += s->timing_info.source_usec;
1711
1712 /* If this is a monitor source, we need to correct the
1713 * time by the playback device buffer */
1714 if (s->timing_info.sink_usec >= usec)
1715 usec = 0;
1716 else
1717 usec -= s->timing_info.sink_usec;
1718 }
1719 }
1720
1721 return usec;
1722 }
1723
1724 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1725 pa_operation *o = userdata;
1726 struct timeval local, remote, now;
1727 pa_timing_info *i;
1728 pa_bool_t playing = FALSE;
1729 uint64_t underrun_for = 0, playing_for = 0;
1730
1731 pa_assert(pd);
1732 pa_assert(o);
1733 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1734
1735 if (!o->context || !o->stream)
1736 goto finish;
1737
1738 i = &o->stream->timing_info;
1739
1740 o->stream->timing_info_valid = FALSE;
1741 i->write_index_corrupt = TRUE;
1742 i->read_index_corrupt = TRUE;
1743
1744 if (command != PA_COMMAND_REPLY) {
1745 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1746 goto finish;
1747
1748 } else {
1749
1750 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1751 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1752 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1753 pa_tagstruct_get_timeval(t, &local) < 0 ||
1754 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1755 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1756 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1757
1758 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1759 goto finish;
1760 }
1761
1762 if (o->context->version >= 13 &&
1763 o->stream->direction == PA_STREAM_PLAYBACK)
1764 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1765 pa_tagstruct_getu64(t, &playing_for) < 0) {
1766
1767 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1768 goto finish;
1769 }
1770
1771
1772 if (!pa_tagstruct_eof(t)) {
1773 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1774 goto finish;
1775 }
1776 o->stream->timing_info_valid = TRUE;
1777 i->write_index_corrupt = FALSE;
1778 i->read_index_corrupt = FALSE;
1779
1780 i->playing = (int) playing;
1781 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1782
1783 pa_gettimeofday(&now);
1784
1785 /* Calculcate timestamps */
1786 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1787 /* local and remote seem to have synchronized clocks */
1788
1789 if (o->stream->direction == PA_STREAM_PLAYBACK)
1790 i->transport_usec = pa_timeval_diff(&remote, &local);
1791 else
1792 i->transport_usec = pa_timeval_diff(&now, &remote);
1793
1794 i->synchronized_clocks = TRUE;
1795 i->timestamp = remote;
1796 } else {
1797 /* clocks are not synchronized, let's estimate latency then */
1798 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1799 i->synchronized_clocks = FALSE;
1800 i->timestamp = local;
1801 pa_timeval_add(&i->timestamp, i->transport_usec);
1802 }
1803
1804 /* Invalidate read and write indexes if necessary */
1805 if (tag < o->stream->read_index_not_before)
1806 i->read_index_corrupt = TRUE;
1807
1808 if (tag < o->stream->write_index_not_before)
1809 i->write_index_corrupt = TRUE;
1810
1811 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1812 /* Write index correction */
1813
1814 int n, j;
1815 uint32_t ctag = tag;
1816
1817 /* Go through the saved correction values and add up the
1818 * total correction.*/
1819 for (n = 0, j = o->stream->current_write_index_correction+1;
1820 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1821 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1822
1823 /* Step over invalid data or out-of-date data */
1824 if (!o->stream->write_index_corrections[j].valid ||
1825 o->stream->write_index_corrections[j].tag < ctag)
1826 continue;
1827
1828 /* Make sure that everything is in order */
1829 ctag = o->stream->write_index_corrections[j].tag+1;
1830
1831 /* Now fix the write index */
1832 if (o->stream->write_index_corrections[j].corrupt) {
1833 /* A corrupting seek was made */
1834 i->write_index_corrupt = TRUE;
1835 } else if (o->stream->write_index_corrections[j].absolute) {
1836 /* An absolute seek was made */
1837 i->write_index = o->stream->write_index_corrections[j].value;
1838 i->write_index_corrupt = FALSE;
1839 } else if (!i->write_index_corrupt) {
1840 /* A relative seek was made */
1841 i->write_index += o->stream->write_index_corrections[j].value;
1842 }
1843 }
1844
1845 /* Clear old correction entries */
1846 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1847 if (!o->stream->write_index_corrections[n].valid)
1848 continue;
1849
1850 if (o->stream->write_index_corrections[n].tag <= tag)
1851 o->stream->write_index_corrections[n].valid = FALSE;
1852 }
1853 }
1854
1855 if (o->stream->direction == PA_STREAM_RECORD) {
1856 /* Read index correction */
1857
1858 if (!i->read_index_corrupt)
1859 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1860 }
1861
1862 /* Update smoother if we're not corked */
1863 if (o->stream->smoother && !o->stream->corked) {
1864 pa_usec_t u, x;
1865
1866 u = x = pa_rtclock_now() - i->transport_usec;
1867
1868 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1869 pa_usec_t su;
1870
1871 /* If we weren't playing then it will take some time
1872 * until the audio will actually come out through the
1873 * speakers. Since we follow that timing here, we need
1874 * to try to fix this up */
1875
1876 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1877
1878 if (su < i->sink_usec)
1879 x += i->sink_usec - su;
1880 }
1881
1882 if (!i->playing)
1883 pa_smoother_pause(o->stream->smoother, x);
1884
1885 /* Update the smoother */
1886 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1887 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1888 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1889
1890 if (i->playing)
1891 pa_smoother_resume(o->stream->smoother, x, TRUE);
1892 }
1893 }
1894
1895 o->stream->auto_timing_update_requested = FALSE;
1896
1897 if (o->stream->latency_update_callback)
1898 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1899
1900 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1901 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1902 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1903 }
1904
1905 finish:
1906
1907 pa_operation_done(o);
1908 pa_operation_unref(o);
1909 }
1910
1911 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1912 uint32_t tag;
1913 pa_operation *o;
1914 pa_tagstruct *t;
1915 struct timeval now;
1916 int cidx = 0;
1917
1918 pa_assert(s);
1919 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1920
1921 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1923 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1924
1925 if (s->direction == PA_STREAM_PLAYBACK) {
1926 /* Find a place to store the write_index correction data for this entry */
1927 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1928
1929 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1930 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1931 }
1932 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1933
1934 t = pa_tagstruct_command(
1935 s->context,
1936 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1937 &tag);
1938 pa_tagstruct_putu32(t, s->channel);
1939 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1940
1941 pa_pstream_send_tagstruct(s->context->pstream, t);
1942 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1943
1944 if (s->direction == PA_STREAM_PLAYBACK) {
1945 /* Fill in initial correction data */
1946
1947 s->current_write_index_correction = cidx;
1948
1949 s->write_index_corrections[cidx].valid = TRUE;
1950 s->write_index_corrections[cidx].absolute = FALSE;
1951 s->write_index_corrections[cidx].corrupt = FALSE;
1952 s->write_index_corrections[cidx].tag = tag;
1953 s->write_index_corrections[cidx].value = 0;
1954 }
1955
1956 return o;
1957 }
1958
1959 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1960 pa_stream *s = userdata;
1961
1962 pa_assert(pd);
1963 pa_assert(s);
1964 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1965
1966 pa_stream_ref(s);
1967
1968 if (command != PA_COMMAND_REPLY) {
1969 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1970 goto finish;
1971
1972 pa_stream_set_state(s, PA_STREAM_FAILED);
1973 goto finish;
1974 } else if (!pa_tagstruct_eof(t)) {
1975 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1976 goto finish;
1977 }
1978
1979 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1980
1981 finish:
1982 pa_stream_unref(s);
1983 }
1984
1985 int pa_stream_disconnect(pa_stream *s) {
1986 pa_tagstruct *t;
1987 uint32_t tag;
1988
1989 pa_assert(s);
1990 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1991
1992 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1993 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1994 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1995
1996 pa_stream_ref(s);
1997
1998 t = pa_tagstruct_command(
1999 s->context,
2000 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2001 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2002 &tag);
2003 pa_tagstruct_putu32(t, s->channel);
2004 pa_pstream_send_tagstruct(s->context->pstream, t);
2005 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2006
2007 pa_stream_unref(s);
2008 return 0;
2009 }
2010
2011 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2012 pa_assert(s);
2013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2014
2015 if (pa_detect_fork())
2016 return;
2017
2018 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2019 return;
2020
2021 s->read_callback = cb;
2022 s->read_userdata = userdata;
2023 }
2024
2025 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2026 pa_assert(s);
2027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2028
2029 if (pa_detect_fork())
2030 return;
2031
2032 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2033 return;
2034
2035 s->write_callback = cb;
2036 s->write_userdata = userdata;
2037 }
2038
2039 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2040 pa_assert(s);
2041 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2042
2043 if (pa_detect_fork())
2044 return;
2045
2046 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2047 return;
2048
2049 s->state_callback = cb;
2050 s->state_userdata = userdata;
2051 }
2052
2053 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2054 pa_assert(s);
2055 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2056
2057 if (pa_detect_fork())
2058 return;
2059
2060 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2061 return;
2062
2063 s->overflow_callback = cb;
2064 s->overflow_userdata = userdata;
2065 }
2066
2067 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2068 pa_assert(s);
2069 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2070
2071 if (pa_detect_fork())
2072 return;
2073
2074 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2075 return;
2076
2077 s->underflow_callback = cb;
2078 s->underflow_userdata = userdata;
2079 }
2080
2081 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2082 pa_assert(s);
2083 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2084
2085 if (pa_detect_fork())
2086 return;
2087
2088 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2089 return;
2090
2091 s->latency_update_callback = cb;
2092 s->latency_update_userdata = userdata;
2093 }
2094
2095 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2096 pa_assert(s);
2097 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098
2099 if (pa_detect_fork())
2100 return;
2101
2102 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2103 return;
2104
2105 s->moved_callback = cb;
2106 s->moved_userdata = userdata;
2107 }
2108
2109 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2110 pa_assert(s);
2111 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2112
2113 if (pa_detect_fork())
2114 return;
2115
2116 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2117 return;
2118
2119 s->suspended_callback = cb;
2120 s->suspended_userdata = userdata;
2121 }
2122
2123 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2124 pa_assert(s);
2125 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2126
2127 if (pa_detect_fork())
2128 return;
2129
2130 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2131 return;
2132
2133 s->started_callback = cb;
2134 s->started_userdata = userdata;
2135 }
2136
2137 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2138 pa_assert(s);
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141 if (pa_detect_fork())
2142 return;
2143
2144 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2145 return;
2146
2147 s->event_callback = cb;
2148 s->event_userdata = userdata;
2149 }
2150
2151 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2152 pa_assert(s);
2153 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2154
2155 if (pa_detect_fork())
2156 return;
2157
2158 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2159 return;
2160
2161 s->buffer_attr_callback = cb;
2162 s->buffer_attr_userdata = userdata;
2163 }
2164
2165 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2166 pa_operation *o = userdata;
2167 int success = 1;
2168
2169 pa_assert(pd);
2170 pa_assert(o);
2171 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2172
2173 if (!o->context)
2174 goto finish;
2175
2176 if (command != PA_COMMAND_REPLY) {
2177 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2178 goto finish;
2179
2180 success = 0;
2181 } else if (!pa_tagstruct_eof(t)) {
2182 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2183 goto finish;
2184 }
2185
2186 if (o->callback) {
2187 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2188 cb(o->stream, success, o->userdata);
2189 }
2190
2191 finish:
2192 pa_operation_done(o);
2193 pa_operation_unref(o);
2194 }
2195
2196 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2197 pa_operation *o;
2198 pa_tagstruct *t;
2199 uint32_t tag;
2200
2201 pa_assert(s);
2202 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2203
2204 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2205 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2206 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2207
2208 /* Ask for a timing update before we cork/uncork to get the best
2209 * accuracy for the transport latency suitable for the
2210 * check_smoother_status() call in the started callback */
2211 request_auto_timing_update(s, TRUE);
2212
2213 s->corked = b;
2214
2215 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2216
2217 t = pa_tagstruct_command(
2218 s->context,
2219 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2220 &tag);
2221 pa_tagstruct_putu32(t, s->channel);
2222 pa_tagstruct_put_boolean(t, !!b);
2223 pa_pstream_send_tagstruct(s->context->pstream, t);
2224 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2225
2226 check_smoother_status(s, FALSE, FALSE, FALSE);
2227
2228 /* This might cause the indexes to hang/start again, hence let's
2229 * request a timing update, after the cork/uncork, too */
2230 request_auto_timing_update(s, TRUE);
2231
2232 return o;
2233 }
2234
2235 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2236 pa_tagstruct *t;
2237 pa_operation *o;
2238 uint32_t tag;
2239
2240 pa_assert(s);
2241 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2242
2243 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2244 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2245
2246 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2247
2248 t = pa_tagstruct_command(s->context, command, &tag);
2249 pa_tagstruct_putu32(t, s->channel);
2250 pa_pstream_send_tagstruct(s->context->pstream, t);
2251 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2252
2253 return o;
2254 }
2255
2256 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2257 pa_operation *o;
2258
2259 pa_assert(s);
2260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2261
2262 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2263 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2264 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2265
2266 /* Ask for a timing update *before* the flush, so that the
2267 * transport usec is as up to date as possible when we get the
2268 * underflow message and update the smoother status*/
2269 request_auto_timing_update(s, TRUE);
2270
2271 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2272 return NULL;
2273
2274 if (s->direction == PA_STREAM_PLAYBACK) {
2275
2276 if (s->write_index_corrections[s->current_write_index_correction].valid)
2277 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2278
2279 if (s->buffer_attr.prebuf > 0)
2280 check_smoother_status(s, FALSE, FALSE, TRUE);
2281
2282 /* This will change the write index, but leave the
2283 * read index untouched. */
2284 invalidate_indexes(s, FALSE, TRUE);
2285
2286 } else
2287 /* For record streams this has no influence on the write
2288 * index, but the read index might jump. */
2289 invalidate_indexes(s, TRUE, FALSE);
2290
2291 /* Note that we do not update requested_bytes here. This is
2292 * because we cannot really know how data actually was dropped
2293 * from the write index due to this. This 'error' will be applied
2294 * by both client and server and hence we should be fine. */
2295
2296 return o;
2297 }
2298
2299 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2300 pa_operation *o;
2301
2302 pa_assert(s);
2303 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2304
2305 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2306 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2307 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2308 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2309
2310 /* Ask for a timing update before we cork/uncork to get the best
2311 * accuracy for the transport latency suitable for the
2312 * check_smoother_status() call in the started callback */
2313 request_auto_timing_update(s, TRUE);
2314
2315 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2316 return NULL;
2317
2318 /* This might cause the read index to hang again, hence
2319 * let's request a timing update */
2320 request_auto_timing_update(s, TRUE);
2321
2322 return o;
2323 }
2324
2325 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2326 pa_operation *o;
2327
2328 pa_assert(s);
2329 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2330
2331 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2332 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2333 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2335
2336 /* Ask for a timing update before we cork/uncork to get the best
2337 * accuracy for the transport latency suitable for the
2338 * check_smoother_status() call in the started callback */
2339 request_auto_timing_update(s, TRUE);
2340
2341 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2342 return NULL;
2343
2344 /* This might cause the read index to start moving again, hence
2345 * let's request a timing update */
2346 request_auto_timing_update(s, TRUE);
2347
2348 return o;
2349 }
2350
2351 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2352 pa_operation *o;
2353
2354 pa_assert(s);
2355 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2356 pa_assert(name);
2357
2358 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2359 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2360 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2361
2362 if (s->context->version >= 13) {
2363 pa_proplist *p = pa_proplist_new();
2364
2365 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2366 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2367 pa_proplist_free(p);
2368 } else {
2369 pa_tagstruct *t;
2370 uint32_t tag;
2371
2372 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2373 t = pa_tagstruct_command(
2374 s->context,
2375 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2376 &tag);
2377 pa_tagstruct_putu32(t, s->channel);
2378 pa_tagstruct_puts(t, name);
2379 pa_pstream_send_tagstruct(s->context->pstream, t);
2380 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2381 }
2382
2383 return o;
2384 }
2385
2386 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2387 pa_usec_t usec;
2388
2389 pa_assert(s);
2390 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2391
2392 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2393 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2394 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2395 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2396 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2397 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2398
2399 if (s->smoother)
2400 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2401 else
2402 usec = calc_time(s, FALSE);
2403
2404 /* Make sure the time runs monotonically */
2405 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2406 if (usec < s->previous_time)
2407 usec = s->previous_time;
2408 else
2409 s->previous_time = usec;
2410 }
2411
2412 if (r_usec)
2413 *r_usec = usec;
2414
2415 return 0;
2416 }
2417
2418 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2419 pa_assert(s);
2420 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421
2422 if (negative)
2423 *negative = 0;
2424
2425 if (a >= b)
2426 return a-b;
2427 else {
2428 if (negative && s->direction == PA_STREAM_RECORD) {
2429 *negative = 1;
2430 return b-a;
2431 } else
2432 return 0;
2433 }
2434 }
2435
2436 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2437 pa_usec_t t, c;
2438 int r;
2439 int64_t cindex;
2440
2441 pa_assert(s);
2442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443 pa_assert(r_usec);
2444
2445 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2446 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2447 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2448 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2449 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2450 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2451
2452 if ((r = pa_stream_get_time(s, &t)) < 0)
2453 return r;
2454
2455 if (s->direction == PA_STREAM_PLAYBACK)
2456 cindex = s->timing_info.write_index;
2457 else
2458 cindex = s->timing_info.read_index;
2459
2460 if (cindex < 0)
2461 cindex = 0;
2462
2463 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2464
2465 if (s->direction == PA_STREAM_PLAYBACK)
2466 *r_usec = time_counter_diff(s, c, t, negative);
2467 else
2468 *r_usec = time_counter_diff(s, t, c, negative);
2469
2470 return 0;
2471 }
2472
2473 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2474 pa_assert(s);
2475 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476
2477 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2478 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2479 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2480 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2481
2482 return &s->timing_info;
2483 }
2484
2485 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2486 pa_assert(s);
2487 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2488
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2490
2491 return &s->sample_spec;
2492 }
2493
2494 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2495 pa_assert(s);
2496 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2497
2498 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2499
2500 return &s->channel_map;
2501 }
2502
2503 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2504 pa_assert(s);
2505 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2506
2507 /* We don't have the format till routing is done */
2508 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2509 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2510
2511 return s->format;
2512 }
2513 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2514 pa_assert(s);
2515 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2516
2517 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2520
2521 return &s->buffer_attr;
2522 }
2523
2524 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2525 pa_operation *o = userdata;
2526 int success = 1;
2527
2528 pa_assert(pd);
2529 pa_assert(o);
2530 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2531
2532 if (!o->context)
2533 goto finish;
2534
2535 if (command != PA_COMMAND_REPLY) {
2536 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2537 goto finish;
2538
2539 success = 0;
2540 } else {
2541 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2542 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2543 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2544 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2545 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2546 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2547 goto finish;
2548 }
2549 } else if (o->stream->direction == PA_STREAM_RECORD) {
2550 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2551 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2552 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2553 goto finish;
2554 }
2555 }
2556
2557 if (o->stream->context->version >= 13) {
2558 pa_usec_t usec;
2559
2560 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2561 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2562 goto finish;
2563 }
2564
2565 if (o->stream->direction == PA_STREAM_RECORD)
2566 o->stream->timing_info.configured_source_usec = usec;
2567 else
2568 o->stream->timing_info.configured_sink_usec = usec;
2569 }
2570
2571 if (!pa_tagstruct_eof(t)) {
2572 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2573 goto finish;
2574 }
2575 }
2576
2577 if (o->callback) {
2578 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2579 cb(o->stream, success, o->userdata);
2580 }
2581
2582 finish:
2583 pa_operation_done(o);
2584 pa_operation_unref(o);
2585 }
2586
2587
2588 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2589 pa_operation *o;
2590 pa_tagstruct *t;
2591 uint32_t tag;
2592 pa_buffer_attr copy;
2593
2594 pa_assert(s);
2595 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2596 pa_assert(attr);
2597
2598 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2602
2603 /* Ask for a timing update before we cork/uncork to get the best
2604 * accuracy for the transport latency suitable for the
2605 * check_smoother_status() call in the started callback */
2606 request_auto_timing_update(s, TRUE);
2607
2608 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2609
2610 t = pa_tagstruct_command(
2611 s->context,
2612 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2613 &tag);
2614 pa_tagstruct_putu32(t, s->channel);
2615
2616 copy = *attr;
2617 patch_buffer_attr(s, &copy, NULL);
2618 attr = &copy;
2619
2620 pa_tagstruct_putu32(t, attr->maxlength);
2621
2622 if (s->direction == PA_STREAM_PLAYBACK)
2623 pa_tagstruct_put(
2624 t,
2625 PA_TAG_U32, attr->tlength,
2626 PA_TAG_U32, attr->prebuf,
2627 PA_TAG_U32, attr->minreq,
2628 PA_TAG_INVALID);
2629 else
2630 pa_tagstruct_putu32(t, attr->fragsize);
2631
2632 if (s->context->version >= 13)
2633 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2634
2635 if (s->context->version >= 14)
2636 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2637
2638 pa_pstream_send_tagstruct(s->context->pstream, t);
2639 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2640
2641 /* This might cause changes in the read/write indexex, hence let's
2642 * request a timing update */
2643 request_auto_timing_update(s, TRUE);
2644
2645 return o;
2646 }
2647
2648 uint32_t pa_stream_get_device_index(pa_stream *s) {
2649 pa_assert(s);
2650 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2651
2652 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2653 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2654 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2655 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2656 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2657
2658 return s->device_index;
2659 }
2660
2661 const char *pa_stream_get_device_name(pa_stream *s) {
2662 pa_assert(s);
2663 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2664
2665 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2666 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2667 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2668 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2669 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2670
2671 return s->device_name;
2672 }
2673
2674 int pa_stream_is_suspended(pa_stream *s) {
2675 pa_assert(s);
2676 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2677
2678 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2679 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2680 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2681 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2682
2683 return s->suspended;
2684 }
2685
2686 int pa_stream_is_corked(pa_stream *s) {
2687 pa_assert(s);
2688 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2689
2690 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2691 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2692 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2693
2694 return s->corked;
2695 }
2696
2697 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2698 pa_operation *o = userdata;
2699 int success = 1;
2700
2701 pa_assert(pd);
2702 pa_assert(o);
2703 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2704
2705 if (!o->context)
2706 goto finish;
2707
2708 if (command != PA_COMMAND_REPLY) {
2709 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2710 goto finish;
2711
2712 success = 0;
2713 } else {
2714
2715 if (!pa_tagstruct_eof(t)) {
2716 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2717 goto finish;
2718 }
2719 }
2720
2721 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2722 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2723
2724 if (o->callback) {
2725 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2726 cb(o->stream, success, o->userdata);
2727 }
2728
2729 finish:
2730 pa_operation_done(o);
2731 pa_operation_unref(o);
2732 }
2733
2734
2735 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2736 pa_operation *o;
2737 pa_tagstruct *t;
2738 uint32_t tag;
2739
2740 pa_assert(s);
2741 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2742
2743 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2744 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2745 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2746 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2747 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2748 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2749
2750 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2751 o->private = PA_UINT_TO_PTR(rate);
2752
2753 t = pa_tagstruct_command(
2754 s->context,
2755 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2756 &tag);
2757 pa_tagstruct_putu32(t, s->channel);
2758 pa_tagstruct_putu32(t, rate);
2759
2760 pa_pstream_send_tagstruct(s->context->pstream, t);
2761 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2762
2763 return o;
2764 }
2765
2766 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2767 pa_operation *o;
2768 pa_tagstruct *t;
2769 uint32_t tag;
2770
2771 pa_assert(s);
2772 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2773
2774 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2775 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2776 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2777 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2778 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2779
2780 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2781
2782 t = pa_tagstruct_command(
2783 s->context,
2784 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2785 &tag);
2786 pa_tagstruct_putu32(t, s->channel);
2787 pa_tagstruct_putu32(t, (uint32_t) mode);
2788 pa_tagstruct_put_proplist(t, p);
2789
2790 pa_pstream_send_tagstruct(s->context->pstream, t);
2791 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2792
2793 /* Please note that we don't update s->proplist here, because we
2794 * don't export that field */
2795
2796 return o;
2797 }
2798
2799 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2800 pa_operation *o;
2801 pa_tagstruct *t;
2802 uint32_t tag;
2803 const char * const*k;
2804
2805 pa_assert(s);
2806 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2807
2808 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2809 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2810 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2811 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2812 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2813
2814 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2815
2816 t = pa_tagstruct_command(
2817 s->context,
2818 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2819 &tag);
2820 pa_tagstruct_putu32(t, s->channel);
2821
2822 for (k = keys; *k; k++)
2823 pa_tagstruct_puts(t, *k);
2824
2825 pa_tagstruct_puts(t, NULL);
2826
2827 pa_pstream_send_tagstruct(s->context->pstream, t);
2828 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2829
2830 /* Please note that we don't update s->proplist here, because we
2831 * don't export that field */
2832
2833 return o;
2834 }
2835
2836 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2837 pa_assert(s);
2838 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2839
2840 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2841 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2842 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2843 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2844
2845 s->direct_on_input = sink_input_idx;
2846
2847 return 0;
2848 }
2849
2850 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2851 pa_assert(s);
2852 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2853
2854 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2855 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2856 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2857
2858 return s->direct_on_input;
2859 }