]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
508bfa11ef38f1516ed6aea9b7f62097b401928f
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 /* #define STREAM_DEBUG */
48
49 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
50 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
51
52 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
53 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
54 #define SMOOTHER_MIN_HISTORY (4)
55
56 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
57 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58 }
59
60 static void reset_callbacks(pa_stream *s) {
61 s->read_callback = NULL;
62 s->read_userdata = NULL;
63 s->write_callback = NULL;
64 s->write_userdata = NULL;
65 s->state_callback = NULL;
66 s->state_userdata = NULL;
67 s->overflow_callback = NULL;
68 s->overflow_userdata = NULL;
69 s->underflow_callback = NULL;
70 s->underflow_userdata = NULL;
71 s->latency_update_callback = NULL;
72 s->latency_update_userdata = NULL;
73 s->moved_callback = NULL;
74 s->moved_userdata = NULL;
75 s->suspended_callback = NULL;
76 s->suspended_userdata = NULL;
77 s->started_callback = NULL;
78 s->started_userdata = NULL;
79 s->event_callback = NULL;
80 s->event_userdata = NULL;
81 s->buffer_attr_callback = NULL;
82 s->buffer_attr_userdata = NULL;
83 }
84
85 static pa_stream *pa_stream_new_with_proplist_internal(
86 pa_context *c,
87 const char *name,
88 const pa_sample_spec *ss,
89 const pa_channel_map *map,
90 pa_format_info * const *formats,
91 unsigned int n_formats,
92 pa_proplist *p) {
93
94 pa_stream *s;
95 unsigned int i;
96
97 pa_assert(c);
98 pa_assert(PA_REFCNT_VALUE(c) >= 1);
99 pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
100 pa_assert(n_formats < PA_MAX_FORMATS);
101
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105 s = pa_xnew(pa_stream, 1);
106 PA_REFCNT_INIT(s);
107 s->context = c;
108 s->mainloop = c->mainloop;
109
110 s->direction = PA_STREAM_NODIRECTION;
111 s->state = PA_STREAM_UNCONNECTED;
112 s->flags = 0;
113
114 if (ss)
115 s->sample_spec = *ss;
116 else
117 pa_sample_spec_init(&s->sample_spec);
118
119 if (map)
120 s->channel_map = *map;
121 else
122 pa_channel_map_init(&s->channel_map);
123
124 s->n_formats = 0;
125 if (formats) {
126 s->n_formats = n_formats;
127 for (i = 0; i < n_formats; i++)
128 s->req_formats[i] = pa_format_info_copy(formats[i]);
129 }
130
131 /* We'll get the final negotiated format after connecting */
132 s->format = NULL;
133
134 s->direct_on_input = PA_INVALID_INDEX;
135
136 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
137 if (name)
138 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139
140 s->channel = 0;
141 s->channel_valid = FALSE;
142 s->syncid = c->csyncid++;
143 s->stream_index = PA_INVALID_INDEX;
144
145 s->requested_bytes = 0;
146 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
147
148 /* We initialize the target length here, so that if the user
149 * passes no explicit buffering metrics the default is similar to
150 * what older PA versions provided. */
151
152 s->buffer_attr.maxlength = (uint32_t) -1;
153 if (ss)
154 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
155 else {
156 /* FIXME: We assume a worst-case compressed format corresponding to
157 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
158 pa_sample_spec tmp_ss = {
159 .format = PA_SAMPLE_S16NE,
160 .rate = 48000,
161 .channels = 2,
162 };
163 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
164 }
165 s->buffer_attr.minreq = (uint32_t) -1;
166 s->buffer_attr.prebuf = (uint32_t) -1;
167 s->buffer_attr.fragsize = (uint32_t) -1;
168
169 s->device_index = PA_INVALID_INDEX;
170 s->device_name = NULL;
171 s->suspended = FALSE;
172 s->corked = FALSE;
173
174 s->write_memblock = NULL;
175 s->write_data = NULL;
176
177 pa_memchunk_reset(&s->peek_memchunk);
178 s->peek_data = NULL;
179 s->record_memblockq = NULL;
180
181 memset(&s->timing_info, 0, sizeof(s->timing_info));
182 s->timing_info_valid = FALSE;
183
184 s->previous_time = 0;
185 s->latest_underrun_at_index = -1;
186
187 s->read_index_not_before = 0;
188 s->write_index_not_before = 0;
189 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
190 s->write_index_corrections[i].valid = 0;
191 s->current_write_index_correction = 0;
192
193 s->auto_timing_update_event = NULL;
194 s->auto_timing_update_requested = FALSE;
195 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
196
197 reset_callbacks(s);
198
199 s->smoother = NULL;
200
201 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
202 PA_LLIST_PREPEND(pa_stream, c->streams, s);
203 pa_stream_ref(s);
204
205 return s;
206 }
207
208 pa_stream *pa_stream_new_with_proplist(
209 pa_context *c,
210 const char *name,
211 const pa_sample_spec *ss,
212 const pa_channel_map *map,
213 pa_proplist *p) {
214
215 pa_channel_map tmap;
216
217 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
220 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
221 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
222
223 if (!map)
224 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
225
226 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
227 }
228
229 pa_stream *pa_stream_new_extended(
230 pa_context *c,
231 const char *name,
232 pa_format_info * const *formats,
233 unsigned int n_formats,
234 pa_proplist *p) {
235
236 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
237
238 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
239 }
240
241 static void stream_unlink(pa_stream *s) {
242 pa_operation *o, *n;
243 pa_assert(s);
244
245 if (!s->context)
246 return;
247
248 /* Detach from context */
249
250 /* Unref all operation objects that point to us */
251 for (o = s->context->operations; o; o = n) {
252 n = o->next;
253
254 if (o->stream == s)
255 pa_operation_cancel(o);
256 }
257
258 /* Drop all outstanding replies for this stream */
259 if (s->context->pdispatch)
260 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
261
262 if (s->channel_valid) {
263 pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
264 s->channel = 0;
265 s->channel_valid = FALSE;
266 }
267
268 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
269 pa_stream_unref(s);
270
271 s->context = NULL;
272
273 if (s->auto_timing_update_event) {
274 pa_assert(s->mainloop);
275 s->mainloop->time_free(s->auto_timing_update_event);
276 }
277
278 reset_callbacks(s);
279 }
280
281 static void stream_free(pa_stream *s) {
282 unsigned int i;
283
284 pa_assert(s);
285
286 stream_unlink(s);
287
288 if (s->write_memblock) {
289 if (s->write_data)
290 pa_memblock_release(s->write_memblock);
291 pa_memblock_unref(s->write_memblock);
292 }
293
294 if (s->peek_memchunk.memblock) {
295 if (s->peek_data)
296 pa_memblock_release(s->peek_memchunk.memblock);
297 pa_memblock_unref(s->peek_memchunk.memblock);
298 }
299
300 if (s->record_memblockq)
301 pa_memblockq_free(s->record_memblockq);
302
303 if (s->proplist)
304 pa_proplist_free(s->proplist);
305
306 if (s->smoother)
307 pa_smoother_free(s->smoother);
308
309 for (i = 0; i < s->n_formats; i++)
310 pa_format_info_free(s->req_formats[i]);
311
312 if (s->format)
313 pa_format_info_free(s->format);
314
315 pa_xfree(s->device_name);
316 pa_xfree(s);
317 }
318
319 void pa_stream_unref(pa_stream *s) {
320 pa_assert(s);
321 pa_assert(PA_REFCNT_VALUE(s) >= 1);
322
323 if (PA_REFCNT_DEC(s) <= 0)
324 stream_free(s);
325 }
326
327 pa_stream* pa_stream_ref(pa_stream *s) {
328 pa_assert(s);
329 pa_assert(PA_REFCNT_VALUE(s) >= 1);
330
331 PA_REFCNT_INC(s);
332 return s;
333 }
334
335 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
336 pa_assert(s);
337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339 return s->state;
340 }
341
342 pa_context* pa_stream_get_context(pa_stream *s) {
343 pa_assert(s);
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346 return s->context;
347 }
348
349 uint32_t pa_stream_get_index(pa_stream *s) {
350 pa_assert(s);
351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
352
353 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
354 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
355
356 return s->stream_index;
357 }
358
359 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
360 pa_assert(s);
361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
362
363 if (s->state == st)
364 return;
365
366 pa_stream_ref(s);
367
368 s->state = st;
369
370 if (s->state_callback)
371 s->state_callback(s, s->state_userdata);
372
373 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
374 stream_unlink(s);
375
376 pa_stream_unref(s);
377 }
378
379 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
380 pa_assert(s);
381 pa_assert(PA_REFCNT_VALUE(s) >= 1);
382
383 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
384 return;
385
386 if (s->state == PA_STREAM_READY &&
387 (force || !s->auto_timing_update_requested)) {
388 pa_operation *o;
389
390 #ifdef STREAM_DEBUG
391 pa_log_debug("Automatically requesting new timing data");
392 #endif
393
394 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
395 pa_operation_unref(o);
396 s->auto_timing_update_requested = TRUE;
397 }
398 }
399
400 if (s->auto_timing_update_event) {
401 if (s->suspended && !force) {
402 pa_assert(s->mainloop);
403 s->mainloop->time_free(s->auto_timing_update_event);
404 s->auto_timing_update_event = NULL;
405 } else {
406 if (force)
407 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
408
409 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
410
411 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
412 }
413 }
414 }
415
416 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
417 pa_context *c = userdata;
418 pa_stream *s;
419 uint32_t channel;
420
421 pa_assert(pd);
422 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
423 pa_assert(t);
424 pa_assert(c);
425 pa_assert(PA_REFCNT_VALUE(c) >= 1);
426
427 pa_context_ref(c);
428
429 if (pa_tagstruct_getu32(t, &channel) < 0 ||
430 !pa_tagstruct_eof(t)) {
431 pa_context_fail(c, PA_ERR_PROTOCOL);
432 goto finish;
433 }
434
435 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
436 goto finish;
437
438 if (s->state != PA_STREAM_READY)
439 goto finish;
440
441 pa_context_set_error(c, PA_ERR_KILLED);
442 pa_stream_set_state(s, PA_STREAM_FAILED);
443
444 finish:
445 pa_context_unref(c);
446 }
447
448 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
449 pa_usec_t x;
450
451 pa_assert(s);
452 pa_assert(!force_start || !force_stop);
453
454 if (!s->smoother)
455 return;
456
457 x = pa_rtclock_now();
458
459 if (s->timing_info_valid) {
460 if (aposteriori)
461 x -= s->timing_info.transport_usec;
462 else
463 x += s->timing_info.transport_usec;
464 }
465
466 if (s->suspended || s->corked || force_stop)
467 pa_smoother_pause(s->smoother, x);
468 else if (force_start || s->buffer_attr.prebuf == 0) {
469
470 if (!s->timing_info_valid &&
471 !aposteriori &&
472 !force_start &&
473 !force_stop &&
474 s->context->version >= 13) {
475
476 /* If the server supports STARTED events we take them as
477 * indications when audio really starts/stops playing, if
478 * we don't have any timing info yet -- instead of trying
479 * to be smart and guessing the server time. Otherwise the
480 * unknown transport delay adds too much noise to our time
481 * calculations. */
482
483 return;
484 }
485
486 pa_smoother_resume(s->smoother, x, TRUE);
487 }
488
489 /* Please note that we have no idea if playback actually started
490 * if prebuf is non-zero! */
491 }
492
493 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
494
495 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
496 pa_context *c = userdata;
497 pa_stream *s;
498 uint32_t channel;
499 const char *dn;
500 pa_bool_t suspended;
501 uint32_t di;
502 pa_usec_t usec = 0;
503 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
504
505 pa_assert(pd);
506 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
507 pa_assert(t);
508 pa_assert(c);
509 pa_assert(PA_REFCNT_VALUE(c) >= 1);
510
511 pa_context_ref(c);
512
513 if (c->version < 12) {
514 pa_context_fail(c, PA_ERR_PROTOCOL);
515 goto finish;
516 }
517
518 if (pa_tagstruct_getu32(t, &channel) < 0 ||
519 pa_tagstruct_getu32(t, &di) < 0 ||
520 pa_tagstruct_gets(t, &dn) < 0 ||
521 pa_tagstruct_get_boolean(t, &suspended) < 0) {
522 pa_context_fail(c, PA_ERR_PROTOCOL);
523 goto finish;
524 }
525
526 if (c->version >= 13) {
527
528 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
529 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
530 pa_tagstruct_getu32(t, &fragsize) < 0 ||
531 pa_tagstruct_get_usec(t, &usec) < 0) {
532 pa_context_fail(c, PA_ERR_PROTOCOL);
533 goto finish;
534 }
535 } else {
536 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
537 pa_tagstruct_getu32(t, &tlength) < 0 ||
538 pa_tagstruct_getu32(t, &prebuf) < 0 ||
539 pa_tagstruct_getu32(t, &minreq) < 0 ||
540 pa_tagstruct_get_usec(t, &usec) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
542 goto finish;
543 }
544 }
545 }
546
547 if (!pa_tagstruct_eof(t)) {
548 pa_context_fail(c, PA_ERR_PROTOCOL);
549 goto finish;
550 }
551
552 if (!dn || di == PA_INVALID_INDEX) {
553 pa_context_fail(c, PA_ERR_PROTOCOL);
554 goto finish;
555 }
556
557 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
558 goto finish;
559
560 if (s->state != PA_STREAM_READY)
561 goto finish;
562
563 if (c->version >= 13) {
564 if (s->direction == PA_STREAM_RECORD)
565 s->timing_info.configured_source_usec = usec;
566 else
567 s->timing_info.configured_sink_usec = usec;
568
569 s->buffer_attr.maxlength = maxlength;
570 s->buffer_attr.fragsize = fragsize;
571 s->buffer_attr.tlength = tlength;
572 s->buffer_attr.prebuf = prebuf;
573 s->buffer_attr.minreq = minreq;
574 }
575
576 pa_xfree(s->device_name);
577 s->device_name = pa_xstrdup(dn);
578 s->device_index = di;
579
580 s->suspended = suspended;
581
582 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
583 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
584 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
585 request_auto_timing_update(s, TRUE);
586 }
587
588 check_smoother_status(s, TRUE, FALSE, FALSE);
589 request_auto_timing_update(s, TRUE);
590
591 if (s->moved_callback)
592 s->moved_callback(s, s->moved_userdata);
593
594 finish:
595 pa_context_unref(c);
596 }
597
598 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
599 pa_context *c = userdata;
600 pa_stream *s;
601 uint32_t channel;
602 pa_usec_t usec = 0;
603 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
604
605 pa_assert(pd);
606 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
607 pa_assert(t);
608 pa_assert(c);
609 pa_assert(PA_REFCNT_VALUE(c) >= 1);
610
611 pa_context_ref(c);
612
613 if (c->version < 15) {
614 pa_context_fail(c, PA_ERR_PROTOCOL);
615 goto finish;
616 }
617
618 if (pa_tagstruct_getu32(t, &channel) < 0) {
619 pa_context_fail(c, PA_ERR_PROTOCOL);
620 goto finish;
621 }
622
623 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
624 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
625 pa_tagstruct_getu32(t, &fragsize) < 0 ||
626 pa_tagstruct_get_usec(t, &usec) < 0) {
627 pa_context_fail(c, PA_ERR_PROTOCOL);
628 goto finish;
629 }
630 } else {
631 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
632 pa_tagstruct_getu32(t, &tlength) < 0 ||
633 pa_tagstruct_getu32(t, &prebuf) < 0 ||
634 pa_tagstruct_getu32(t, &minreq) < 0 ||
635 pa_tagstruct_get_usec(t, &usec) < 0) {
636 pa_context_fail(c, PA_ERR_PROTOCOL);
637 goto finish;
638 }
639 }
640
641 if (!pa_tagstruct_eof(t)) {
642 pa_context_fail(c, PA_ERR_PROTOCOL);
643 goto finish;
644 }
645
646 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
647 goto finish;
648
649 if (s->state != PA_STREAM_READY)
650 goto finish;
651
652 if (s->direction == PA_STREAM_RECORD)
653 s->timing_info.configured_source_usec = usec;
654 else
655 s->timing_info.configured_sink_usec = usec;
656
657 s->buffer_attr.maxlength = maxlength;
658 s->buffer_attr.fragsize = fragsize;
659 s->buffer_attr.tlength = tlength;
660 s->buffer_attr.prebuf = prebuf;
661 s->buffer_attr.minreq = minreq;
662
663 request_auto_timing_update(s, TRUE);
664
665 if (s->buffer_attr_callback)
666 s->buffer_attr_callback(s, s->buffer_attr_userdata);
667
668 finish:
669 pa_context_unref(c);
670 }
671
672 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
674 pa_stream *s;
675 uint32_t channel;
676 pa_bool_t suspended;
677
678 pa_assert(pd);
679 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
680 pa_assert(t);
681 pa_assert(c);
682 pa_assert(PA_REFCNT_VALUE(c) >= 1);
683
684 pa_context_ref(c);
685
686 if (c->version < 12) {
687 pa_context_fail(c, PA_ERR_PROTOCOL);
688 goto finish;
689 }
690
691 if (pa_tagstruct_getu32(t, &channel) < 0 ||
692 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
693 !pa_tagstruct_eof(t)) {
694 pa_context_fail(c, PA_ERR_PROTOCOL);
695 goto finish;
696 }
697
698 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
699 goto finish;
700
701 if (s->state != PA_STREAM_READY)
702 goto finish;
703
704 s->suspended = suspended;
705
706 if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
707 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
708 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
709 request_auto_timing_update(s, TRUE);
710 }
711
712 check_smoother_status(s, TRUE, FALSE, FALSE);
713 request_auto_timing_update(s, TRUE);
714
715 if (s->suspended_callback)
716 s->suspended_callback(s, s->suspended_userdata);
717
718 finish:
719 pa_context_unref(c);
720 }
721
722 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
723 pa_context *c = userdata;
724 pa_stream *s;
725 uint32_t channel;
726
727 pa_assert(pd);
728 pa_assert(command == PA_COMMAND_STARTED);
729 pa_assert(t);
730 pa_assert(c);
731 pa_assert(PA_REFCNT_VALUE(c) >= 1);
732
733 pa_context_ref(c);
734
735 if (c->version < 13) {
736 pa_context_fail(c, PA_ERR_PROTOCOL);
737 goto finish;
738 }
739
740 if (pa_tagstruct_getu32(t, &channel) < 0 ||
741 !pa_tagstruct_eof(t)) {
742 pa_context_fail(c, PA_ERR_PROTOCOL);
743 goto finish;
744 }
745
746 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
747 goto finish;
748
749 if (s->state != PA_STREAM_READY)
750 goto finish;
751
752 check_smoother_status(s, TRUE, TRUE, FALSE);
753 request_auto_timing_update(s, TRUE);
754
755 if (s->started_callback)
756 s->started_callback(s, s->started_userdata);
757
758 finish:
759 pa_context_unref(c);
760 }
761
762 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
763 pa_context *c = userdata;
764 pa_stream *s;
765 uint32_t channel;
766 pa_proplist *pl = NULL;
767 const char *event;
768
769 pa_assert(pd);
770 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
771 pa_assert(t);
772 pa_assert(c);
773 pa_assert(PA_REFCNT_VALUE(c) >= 1);
774
775 pa_context_ref(c);
776
777 if (c->version < 15) {
778 pa_context_fail(c, PA_ERR_PROTOCOL);
779 goto finish;
780 }
781
782 pl = pa_proplist_new();
783
784 if (pa_tagstruct_getu32(t, &channel) < 0 ||
785 pa_tagstruct_gets(t, &event) < 0 ||
786 pa_tagstruct_get_proplist(t, pl) < 0 ||
787 !pa_tagstruct_eof(t) || !event) {
788 pa_context_fail(c, PA_ERR_PROTOCOL);
789 goto finish;
790 }
791
792 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
793 goto finish;
794
795 if (s->state != PA_STREAM_READY)
796 goto finish;
797
798 if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
799 /* Let client know what the running time was when the stream had to be killed */
800 pa_usec_t stream_time;
801 if (pa_stream_get_time(s, &stream_time) == 0)
802 pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
803 }
804
805 if (s->event_callback)
806 s->event_callback(s, event, pl, s->event_userdata);
807
808 finish:
809 pa_context_unref(c);
810
811 if (pl)
812 pa_proplist_free(pl);
813 }
814
815 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
816 pa_stream *s;
817 pa_context *c = userdata;
818 uint32_t bytes, channel;
819
820 pa_assert(pd);
821 pa_assert(command == PA_COMMAND_REQUEST);
822 pa_assert(t);
823 pa_assert(c);
824 pa_assert(PA_REFCNT_VALUE(c) >= 1);
825
826 pa_context_ref(c);
827
828 if (pa_tagstruct_getu32(t, &channel) < 0 ||
829 pa_tagstruct_getu32(t, &bytes) < 0 ||
830 !pa_tagstruct_eof(t)) {
831 pa_context_fail(c, PA_ERR_PROTOCOL);
832 goto finish;
833 }
834
835 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
836 goto finish;
837
838 if (s->state != PA_STREAM_READY)
839 goto finish;
840
841 s->requested_bytes += bytes;
842
843 #ifdef STREAM_DEBUG
844 pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
845 #endif
846
847 if (s->requested_bytes > 0 && s->write_callback)
848 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
849
850 finish:
851 pa_context_unref(c);
852 }
853
854 int64_t pa_stream_get_underflow_index(pa_stream *p) {
855 pa_assert(p);
856 return p->latest_underrun_at_index;
857 }
858
859 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
860 pa_stream *s;
861 pa_context *c = userdata;
862 uint32_t channel;
863 int64_t offset = -1;
864
865 pa_assert(pd);
866 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
867 pa_assert(t);
868 pa_assert(c);
869 pa_assert(PA_REFCNT_VALUE(c) >= 1);
870
871 pa_context_ref(c);
872
873 if (pa_tagstruct_getu32(t, &channel) < 0) {
874 pa_context_fail(c, PA_ERR_PROTOCOL);
875 goto finish;
876 }
877
878 if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
879 if (pa_tagstruct_gets64(t, &offset) < 0) {
880 pa_context_fail(c, PA_ERR_PROTOCOL);
881 goto finish;
882 }
883 }
884
885 if (!pa_tagstruct_eof(t)) {
886 pa_context_fail(c, PA_ERR_PROTOCOL);
887 goto finish;
888 }
889
890 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
891 goto finish;
892
893 if (s->state != PA_STREAM_READY)
894 goto finish;
895
896 if (offset != -1)
897 s->latest_underrun_at_index = offset;
898
899 if (s->buffer_attr.prebuf > 0)
900 check_smoother_status(s, TRUE, FALSE, TRUE);
901
902 request_auto_timing_update(s, TRUE);
903
904 if (command == PA_COMMAND_OVERFLOW) {
905 if (s->overflow_callback)
906 s->overflow_callback(s, s->overflow_userdata);
907 } else if (command == PA_COMMAND_UNDERFLOW) {
908 if (s->underflow_callback)
909 s->underflow_callback(s, s->underflow_userdata);
910 }
911
912 finish:
913 pa_context_unref(c);
914 }
915
916 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
917 pa_assert(s);
918 pa_assert(PA_REFCNT_VALUE(s) >= 1);
919
920 #ifdef STREAM_DEBUG
921 pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
922 #endif
923
924 if (s->state != PA_STREAM_READY)
925 return;
926
927 if (w) {
928 s->write_index_not_before = s->context->ctag;
929
930 if (s->timing_info_valid)
931 s->timing_info.write_index_corrupt = TRUE;
932
933 #ifdef STREAM_DEBUG
934 pa_log_debug("write_index invalidated");
935 #endif
936 }
937
938 if (r) {
939 s->read_index_not_before = s->context->ctag;
940
941 if (s->timing_info_valid)
942 s->timing_info.read_index_corrupt = TRUE;
943
944 #ifdef STREAM_DEBUG
945 pa_log_debug("read_index invalidated");
946 #endif
947 }
948
949 request_auto_timing_update(s, TRUE);
950 }
951
952 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
953 pa_stream *s = userdata;
954
955 pa_assert(s);
956 pa_assert(PA_REFCNT_VALUE(s) >= 1);
957
958 pa_stream_ref(s);
959 request_auto_timing_update(s, FALSE);
960 pa_stream_unref(s);
961 }
962
963 static void create_stream_complete(pa_stream *s) {
964 pa_assert(s);
965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
966 pa_assert(s->state == PA_STREAM_CREATING);
967
968 pa_stream_set_state(s, PA_STREAM_READY);
969
970 if (s->requested_bytes > 0 && s->write_callback)
971 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
972
973 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
974 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
975 pa_assert(!s->auto_timing_update_event);
976 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
977
978 request_auto_timing_update(s, TRUE);
979 }
980
981 check_smoother_status(s, TRUE, FALSE, FALSE);
982 }
983
984 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
985 const char *e;
986
987 pa_assert(s);
988 pa_assert(attr);
989
990 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
991 uint32_t ms;
992
993 if (pa_atou(e, &ms) < 0 || ms <= 0)
994 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
995 else {
996 attr->maxlength = (uint32_t) -1;
997 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
998 attr->minreq = (uint32_t) -1;
999 attr->prebuf = (uint32_t) -1;
1000 attr->fragsize = attr->tlength;
1001 }
1002
1003 if (flags)
1004 *flags |= PA_STREAM_ADJUST_LATENCY;
1005 }
1006
1007 if (s->context->version >= 13)
1008 return;
1009
1010 /* Version older than 0.9.10 didn't do server side buffer_attr
1011 * selection, hence we have to fake it on the client side. */
1012
1013 /* We choose fairly conservative values here, to not confuse
1014 * old clients with extremely large playback buffers */
1015
1016 if (attr->maxlength == (uint32_t) -1)
1017 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1018
1019 if (attr->tlength == (uint32_t) -1)
1020 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1021
1022 if (attr->minreq == (uint32_t) -1)
1023 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1024
1025 if (attr->prebuf == (uint32_t) -1)
1026 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1027
1028 if (attr->fragsize == (uint32_t) -1)
1029 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1030 }
1031
1032 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1033 pa_stream *s = userdata;
1034 uint32_t requested_bytes = 0;
1035
1036 pa_assert(pd);
1037 pa_assert(s);
1038 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1039 pa_assert(s->state == PA_STREAM_CREATING);
1040
1041 pa_stream_ref(s);
1042
1043 if (command != PA_COMMAND_REPLY) {
1044 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1045 goto finish;
1046
1047 pa_stream_set_state(s, PA_STREAM_FAILED);
1048 goto finish;
1049 }
1050
1051 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1052 s->channel == PA_INVALID_INDEX ||
1053 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1054 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1055 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1056 goto finish;
1057 }
1058
1059 s->requested_bytes = (int64_t) requested_bytes;
1060
1061 if (s->context->version >= 9) {
1062 if (s->direction == PA_STREAM_PLAYBACK) {
1063 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1064 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1065 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1066 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1067 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1068 goto finish;
1069 }
1070 } else if (s->direction == PA_STREAM_RECORD) {
1071 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1072 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1073 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1074 goto finish;
1075 }
1076 }
1077 }
1078
1079 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1080 pa_sample_spec ss;
1081 pa_channel_map cm;
1082 const char *dn = NULL;
1083 pa_bool_t suspended;
1084
1085 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1086 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1087 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1088 pa_tagstruct_gets(t, &dn) < 0 ||
1089 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1090 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1091 goto finish;
1092 }
1093
1094 if (!dn || s->device_index == PA_INVALID_INDEX ||
1095 ss.channels != cm.channels ||
1096 !pa_channel_map_valid(&cm) ||
1097 !pa_sample_spec_valid(&ss) ||
1098 (s->n_formats == 0 && (
1099 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1100 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1101 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1102 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1103 goto finish;
1104 }
1105
1106 pa_xfree(s->device_name);
1107 s->device_name = pa_xstrdup(dn);
1108 s->suspended = suspended;
1109
1110 s->channel_map = cm;
1111 s->sample_spec = ss;
1112 }
1113
1114 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1115 pa_usec_t usec;
1116
1117 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1118 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1119 goto finish;
1120 }
1121
1122 if (s->direction == PA_STREAM_RECORD)
1123 s->timing_info.configured_source_usec = usec;
1124 else
1125 s->timing_info.configured_sink_usec = usec;
1126 }
1127
1128 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1129 || s->context->version >= 22) {
1130
1131 pa_format_info *f = pa_format_info_new();
1132 pa_tagstruct_get_format_info(t, f);
1133
1134 if (pa_format_info_valid(f))
1135 s->format = f;
1136 else {
1137 pa_format_info_free(f);
1138 if (s->n_formats > 0) {
1139 /* We used the extended API, so we should have got back a proper format */
1140 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1141 goto finish;
1142 }
1143 }
1144 }
1145
1146 if (!pa_tagstruct_eof(t)) {
1147 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1148 goto finish;
1149 }
1150
1151 if (s->direction == PA_STREAM_RECORD) {
1152 pa_assert(!s->record_memblockq);
1153
1154 s->record_memblockq = pa_memblockq_new(
1155 "client side record memblockq",
1156 0,
1157 s->buffer_attr.maxlength,
1158 0,
1159 &s->sample_spec,
1160 1,
1161 0,
1162 0,
1163 NULL);
1164 }
1165
1166 s->channel_valid = TRUE;
1167 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1168
1169 create_stream_complete(s);
1170
1171 finish:
1172 pa_stream_unref(s);
1173 }
1174
1175 static int create_stream(
1176 pa_stream_direction_t direction,
1177 pa_stream *s,
1178 const char *dev,
1179 const pa_buffer_attr *attr,
1180 pa_stream_flags_t flags,
1181 const pa_cvolume *volume,
1182 pa_stream *sync_stream) {
1183
1184 pa_tagstruct *t;
1185 uint32_t tag;
1186 pa_bool_t volume_set = !!volume;
1187 pa_cvolume cv;
1188 uint32_t i;
1189
1190 pa_assert(s);
1191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1192 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1193
1194 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1195 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1196 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1197 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1198 PA_STREAM_INTERPOLATE_TIMING|
1199 PA_STREAM_NOT_MONOTONIC|
1200 PA_STREAM_AUTO_TIMING_UPDATE|
1201 PA_STREAM_NO_REMAP_CHANNELS|
1202 PA_STREAM_NO_REMIX_CHANNELS|
1203 PA_STREAM_FIX_FORMAT|
1204 PA_STREAM_FIX_RATE|
1205 PA_STREAM_FIX_CHANNELS|
1206 PA_STREAM_DONT_MOVE|
1207 PA_STREAM_VARIABLE_RATE|
1208 PA_STREAM_PEAK_DETECT|
1209 PA_STREAM_START_MUTED|
1210 PA_STREAM_ADJUST_LATENCY|
1211 PA_STREAM_EARLY_REQUESTS|
1212 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1213 PA_STREAM_START_UNMUTED|
1214 PA_STREAM_FAIL_ON_SUSPEND|
1215 PA_STREAM_RELATIVE_VOLUME|
1216 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1217
1218
1219 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1220 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1221 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1222 /* Although some of the other flags are not supported on older
1223 * version, we don't check for them here, because it doesn't hurt
1224 * when they are passed but actually not supported. This makes
1225 * client development easier */
1226
1227 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1228 PA_CHECK_VALIDITY(s->context, !volume || s->n_formats || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1229 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1230 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1231
1232 pa_stream_ref(s);
1233
1234 s->direction = direction;
1235
1236 if (sync_stream)
1237 s->syncid = sync_stream->syncid;
1238
1239 if (attr)
1240 s->buffer_attr = *attr;
1241 patch_buffer_attr(s, &s->buffer_attr, &flags);
1242
1243 s->flags = flags;
1244 s->corked = !!(flags & PA_STREAM_START_CORKED);
1245
1246 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1247 pa_usec_t x;
1248
1249 x = pa_rtclock_now();
1250
1251 pa_assert(!s->smoother);
1252 s->smoother = pa_smoother_new(
1253 SMOOTHER_ADJUST_TIME,
1254 SMOOTHER_HISTORY_TIME,
1255 !(flags & PA_STREAM_NOT_MONOTONIC),
1256 TRUE,
1257 SMOOTHER_MIN_HISTORY,
1258 x,
1259 TRUE);
1260 }
1261
1262 if (!dev)
1263 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1264
1265 t = pa_tagstruct_command(
1266 s->context,
1267 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1268 &tag);
1269
1270 if (s->context->version < 13)
1271 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1272
1273 pa_tagstruct_put(
1274 t,
1275 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1276 PA_TAG_CHANNEL_MAP, &s->channel_map,
1277 PA_TAG_U32, PA_INVALID_INDEX,
1278 PA_TAG_STRING, dev,
1279 PA_TAG_U32, s->buffer_attr.maxlength,
1280 PA_TAG_BOOLEAN, s->corked,
1281 PA_TAG_INVALID);
1282
1283 if (!volume) {
1284 if (pa_sample_spec_valid(&s->sample_spec))
1285 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1286 else {
1287 /* This is not really relevant, since no volume was set, and
1288 * the real number of channels is embedded in the format_info
1289 * structure */
1290 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1291 }
1292 }
1293
1294 if (s->direction == PA_STREAM_PLAYBACK) {
1295 pa_tagstruct_put(
1296 t,
1297 PA_TAG_U32, s->buffer_attr.tlength,
1298 PA_TAG_U32, s->buffer_attr.prebuf,
1299 PA_TAG_U32, s->buffer_attr.minreq,
1300 PA_TAG_U32, s->syncid,
1301 PA_TAG_INVALID);
1302
1303 pa_tagstruct_put_cvolume(t, volume);
1304 } else
1305 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1306
1307 if (s->context->version >= 12) {
1308 pa_tagstruct_put(
1309 t,
1310 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1311 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1312 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1313 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1314 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1315 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1316 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1317 PA_TAG_INVALID);
1318 }
1319
1320 if (s->context->version >= 13) {
1321
1322 if (s->direction == PA_STREAM_PLAYBACK)
1323 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1324 else
1325 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1326
1327 pa_tagstruct_put(
1328 t,
1329 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1330 PA_TAG_PROPLIST, s->proplist,
1331 PA_TAG_INVALID);
1332
1333 if (s->direction == PA_STREAM_RECORD)
1334 pa_tagstruct_putu32(t, s->direct_on_input);
1335 }
1336
1337 if (s->context->version >= 14) {
1338
1339 if (s->direction == PA_STREAM_PLAYBACK)
1340 pa_tagstruct_put_boolean(t, volume_set);
1341
1342 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1343 }
1344
1345 if (s->context->version >= 15) {
1346
1347 if (s->direction == PA_STREAM_PLAYBACK)
1348 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1349
1350 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1351 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1352 }
1353
1354 if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1355 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1356
1357 if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1358 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1359
1360 if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1361 || s->context->version >= 22) {
1362
1363 pa_tagstruct_putu8(t, s->n_formats);
1364 for (i = 0; i < s->n_formats; i++)
1365 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1366 }
1367
1368 if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1369 pa_tagstruct_put_cvolume(t, volume);
1370 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1371 pa_tagstruct_put_boolean(t, volume_set);
1372 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1373 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1374 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1375 }
1376
1377 pa_pstream_send_tagstruct(s->context->pstream, t);
1378 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1379
1380 pa_stream_set_state(s, PA_STREAM_CREATING);
1381
1382 pa_stream_unref(s);
1383 return 0;
1384 }
1385
1386 int pa_stream_connect_playback(
1387 pa_stream *s,
1388 const char *dev,
1389 const pa_buffer_attr *attr,
1390 pa_stream_flags_t flags,
1391 const pa_cvolume *volume,
1392 pa_stream *sync_stream) {
1393
1394 pa_assert(s);
1395 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1396
1397 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1398 }
1399
1400 int pa_stream_connect_record(
1401 pa_stream *s,
1402 const char *dev,
1403 const pa_buffer_attr *attr,
1404 pa_stream_flags_t flags) {
1405
1406 pa_assert(s);
1407 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1408
1409 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1410 }
1411
1412 int pa_stream_begin_write(
1413 pa_stream *s,
1414 void **data,
1415 size_t *nbytes) {
1416
1417 pa_assert(s);
1418 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1419
1420 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1421 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1422 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1423 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1424 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1425
1426 if (*nbytes != (size_t) -1) {
1427 size_t m, fs;
1428
1429 m = pa_mempool_block_size_max(s->context->mempool);
1430 fs = pa_frame_size(&s->sample_spec);
1431
1432 m = (m / fs) * fs;
1433 if (*nbytes > m)
1434 *nbytes = m;
1435 }
1436
1437 if (!s->write_memblock) {
1438 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1439 s->write_data = pa_memblock_acquire(s->write_memblock);
1440 }
1441
1442 *data = s->write_data;
1443 *nbytes = pa_memblock_get_length(s->write_memblock);
1444
1445 return 0;
1446 }
1447
1448 int pa_stream_cancel_write(
1449 pa_stream *s) {
1450
1451 pa_assert(s);
1452 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1453
1454 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1455 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1456 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1457 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1458
1459 pa_assert(s->write_data);
1460
1461 pa_memblock_release(s->write_memblock);
1462 pa_memblock_unref(s->write_memblock);
1463 s->write_memblock = NULL;
1464 s->write_data = NULL;
1465
1466 return 0;
1467 }
1468
1469 int pa_stream_write(
1470 pa_stream *s,
1471 const void *data,
1472 size_t length,
1473 pa_free_cb_t free_cb,
1474 int64_t offset,
1475 pa_seek_mode_t seek) {
1476
1477 pa_assert(s);
1478 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1479 pa_assert(data);
1480
1481 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1482 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1483 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1484 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1485 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1486 PA_CHECK_VALIDITY(s->context,
1487 !s->write_memblock ||
1488 ((data >= s->write_data) &&
1489 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1490 PA_ERR_INVALID);
1491 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1492
1493 if (s->write_memblock) {
1494 pa_memchunk chunk;
1495
1496 /* pa_stream_write_begin() was called before */
1497
1498 pa_memblock_release(s->write_memblock);
1499
1500 chunk.memblock = s->write_memblock;
1501 chunk.index = (const char *) data - (const char *) s->write_data;
1502 chunk.length = length;
1503
1504 s->write_memblock = NULL;
1505 s->write_data = NULL;
1506
1507 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1508 pa_memblock_unref(chunk.memblock);
1509
1510 } else {
1511 pa_seek_mode_t t_seek = seek;
1512 int64_t t_offset = offset;
1513 size_t t_length = length;
1514 const void *t_data = data;
1515
1516 /* pa_stream_write_begin() was not called before */
1517
1518 while (t_length > 0) {
1519 pa_memchunk chunk;
1520
1521 chunk.index = 0;
1522
1523 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1524 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1525 chunk.length = t_length;
1526 } else {
1527 void *d;
1528
1529 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1530 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1531
1532 d = pa_memblock_acquire(chunk.memblock);
1533 memcpy(d, t_data, chunk.length);
1534 pa_memblock_release(chunk.memblock);
1535 }
1536
1537 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1538
1539 t_offset = 0;
1540 t_seek = PA_SEEK_RELATIVE;
1541
1542 t_data = (const uint8_t*) t_data + chunk.length;
1543 t_length -= chunk.length;
1544
1545 pa_memblock_unref(chunk.memblock);
1546 }
1547
1548 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1549 free_cb((void*) data);
1550 }
1551
1552 /* This is obviously wrong since we ignore the seeking index . But
1553 * that's OK, the server side applies the same error */
1554 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1555
1556 #ifdef STREAM_DEBUG
1557 pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1558 #endif
1559
1560 if (s->direction == PA_STREAM_PLAYBACK) {
1561
1562 /* Update latency request correction */
1563 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1564
1565 if (seek == PA_SEEK_ABSOLUTE) {
1566 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1567 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1568 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1569 } else if (seek == PA_SEEK_RELATIVE) {
1570 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1571 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1572 } else
1573 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1574 }
1575
1576 /* Update the write index in the already available latency data */
1577 if (s->timing_info_valid) {
1578
1579 if (seek == PA_SEEK_ABSOLUTE) {
1580 s->timing_info.write_index_corrupt = FALSE;
1581 s->timing_info.write_index = offset + (int64_t) length;
1582 } else if (seek == PA_SEEK_RELATIVE) {
1583 if (!s->timing_info.write_index_corrupt)
1584 s->timing_info.write_index += offset + (int64_t) length;
1585 } else
1586 s->timing_info.write_index_corrupt = TRUE;
1587 }
1588
1589 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1590 request_auto_timing_update(s, TRUE);
1591 }
1592
1593 return 0;
1594 }
1595
1596 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1597 pa_assert(s);
1598 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1599 pa_assert(data);
1600 pa_assert(length);
1601
1602 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1603 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1604 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1605
1606 if (!s->peek_memchunk.memblock) {
1607
1608 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1609 /* record_memblockq is empty. */
1610 *data = NULL;
1611 *length = 0;
1612 return 0;
1613
1614 } else if (!s->peek_memchunk.memblock) {
1615 /* record_memblockq isn't empty, but it doesn't have any data at
1616 * the current read index. */
1617 *data = NULL;
1618 *length = s->peek_memchunk.length;
1619 return 0;
1620 }
1621
1622 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1623 }
1624
1625 pa_assert(s->peek_data);
1626 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1627 *length = s->peek_memchunk.length;
1628 return 0;
1629 }
1630
1631 int pa_stream_drop(pa_stream *s) {
1632 pa_assert(s);
1633 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1634
1635 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1636 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1637 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1638 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1639
1640 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1641
1642 /* Fix the simulated local read index */
1643 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1644 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1645
1646 if (s->peek_memchunk.memblock) {
1647 pa_assert(s->peek_data);
1648 s->peek_data = NULL;
1649 pa_memblock_release(s->peek_memchunk.memblock);
1650 pa_memblock_unref(s->peek_memchunk.memblock);
1651 }
1652
1653 pa_memchunk_reset(&s->peek_memchunk);
1654
1655 return 0;
1656 }
1657
1658 size_t pa_stream_writable_size(pa_stream *s) {
1659 pa_assert(s);
1660 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1661
1662 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1663 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1664 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1665
1666 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1667 }
1668
1669 size_t pa_stream_readable_size(pa_stream *s) {
1670 pa_assert(s);
1671 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1672
1673 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1674 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1675 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1676
1677 return pa_memblockq_get_length(s->record_memblockq);
1678 }
1679
1680 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1681 pa_operation *o;
1682 pa_tagstruct *t;
1683 uint32_t tag;
1684
1685 pa_assert(s);
1686 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1687
1688 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1689 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1690 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1691
1692 /* Ask for a timing update before we cork/uncork to get the best
1693 * accuracy for the transport latency suitable for the
1694 * check_smoother_status() call in the started callback */
1695 request_auto_timing_update(s, TRUE);
1696
1697 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1698
1699 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1700 pa_tagstruct_putu32(t, s->channel);
1701 pa_pstream_send_tagstruct(s->context->pstream, t);
1702 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1703
1704 /* This might cause the read index to continue again, hence
1705 * let's request a timing update */
1706 request_auto_timing_update(s, TRUE);
1707
1708 return o;
1709 }
1710
1711 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1712 pa_usec_t usec;
1713
1714 pa_assert(s);
1715 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1716 pa_assert(s->state == PA_STREAM_READY);
1717 pa_assert(s->direction != PA_STREAM_UPLOAD);
1718 pa_assert(s->timing_info_valid);
1719 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1720 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1721
1722 if (s->direction == PA_STREAM_PLAYBACK) {
1723 /* The last byte that was written into the output device
1724 * had this time value associated */
1725 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1726
1727 if (!s->corked && !s->suspended) {
1728
1729 if (!ignore_transport)
1730 /* Because the latency info took a little time to come
1731 * to us, we assume that the real output time is actually
1732 * a little ahead */
1733 usec += s->timing_info.transport_usec;
1734
1735 /* However, the output device usually maintains a buffer
1736 too, hence the real sample currently played is a little
1737 back */
1738 if (s->timing_info.sink_usec >= usec)
1739 usec = 0;
1740 else
1741 usec -= s->timing_info.sink_usec;
1742 }
1743
1744 } else {
1745 pa_assert(s->direction == PA_STREAM_RECORD);
1746
1747 /* The last byte written into the server side queue had
1748 * this time value associated */
1749 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1750
1751 if (!s->corked && !s->suspended) {
1752
1753 if (!ignore_transport)
1754 /* Add transport latency */
1755 usec += s->timing_info.transport_usec;
1756
1757 /* Add latency of data in device buffer */
1758 usec += s->timing_info.source_usec;
1759
1760 /* If this is a monitor source, we need to correct the
1761 * time by the playback device buffer */
1762 if (s->timing_info.sink_usec >= usec)
1763 usec = 0;
1764 else
1765 usec -= s->timing_info.sink_usec;
1766 }
1767 }
1768
1769 return usec;
1770 }
1771
1772 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1773 pa_operation *o = userdata;
1774 struct timeval local, remote, now;
1775 pa_timing_info *i;
1776 pa_bool_t playing = FALSE;
1777 uint64_t underrun_for = 0, playing_for = 0;
1778
1779 pa_assert(pd);
1780 pa_assert(o);
1781 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1782
1783 if (!o->context || !o->stream)
1784 goto finish;
1785
1786 i = &o->stream->timing_info;
1787
1788 o->stream->timing_info_valid = FALSE;
1789 i->write_index_corrupt = TRUE;
1790 i->read_index_corrupt = TRUE;
1791
1792 if (command != PA_COMMAND_REPLY) {
1793 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1794 goto finish;
1795
1796 } else {
1797
1798 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1799 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1800 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1801 pa_tagstruct_get_timeval(t, &local) < 0 ||
1802 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1803 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1804 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1805
1806 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1807 goto finish;
1808 }
1809
1810 if (o->context->version >= 13 &&
1811 o->stream->direction == PA_STREAM_PLAYBACK)
1812 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1813 pa_tagstruct_getu64(t, &playing_for) < 0) {
1814
1815 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1816 goto finish;
1817 }
1818
1819
1820 if (!pa_tagstruct_eof(t)) {
1821 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1822 goto finish;
1823 }
1824 o->stream->timing_info_valid = TRUE;
1825 i->write_index_corrupt = FALSE;
1826 i->read_index_corrupt = FALSE;
1827
1828 i->playing = (int) playing;
1829 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1830
1831 pa_gettimeofday(&now);
1832
1833 /* Calculate timestamps */
1834 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1835 /* local and remote seem to have synchronized clocks */
1836
1837 if (o->stream->direction == PA_STREAM_PLAYBACK)
1838 i->transport_usec = pa_timeval_diff(&remote, &local);
1839 else
1840 i->transport_usec = pa_timeval_diff(&now, &remote);
1841
1842 i->synchronized_clocks = TRUE;
1843 i->timestamp = remote;
1844 } else {
1845 /* clocks are not synchronized, let's estimate latency then */
1846 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1847 i->synchronized_clocks = FALSE;
1848 i->timestamp = local;
1849 pa_timeval_add(&i->timestamp, i->transport_usec);
1850 }
1851
1852 /* Invalidate read and write indexes if necessary */
1853 if (tag < o->stream->read_index_not_before)
1854 i->read_index_corrupt = TRUE;
1855
1856 if (tag < o->stream->write_index_not_before)
1857 i->write_index_corrupt = TRUE;
1858
1859 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1860 /* Write index correction */
1861
1862 int n, j;
1863 uint32_t ctag = tag;
1864
1865 /* Go through the saved correction values and add up the
1866 * total correction.*/
1867 for (n = 0, j = o->stream->current_write_index_correction+1;
1868 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1869 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1870
1871 /* Step over invalid data or out-of-date data */
1872 if (!o->stream->write_index_corrections[j].valid ||
1873 o->stream->write_index_corrections[j].tag < ctag)
1874 continue;
1875
1876 /* Make sure that everything is in order */
1877 ctag = o->stream->write_index_corrections[j].tag+1;
1878
1879 /* Now fix the write index */
1880 if (o->stream->write_index_corrections[j].corrupt) {
1881 /* A corrupting seek was made */
1882 i->write_index_corrupt = TRUE;
1883 } else if (o->stream->write_index_corrections[j].absolute) {
1884 /* An absolute seek was made */
1885 i->write_index = o->stream->write_index_corrections[j].value;
1886 i->write_index_corrupt = FALSE;
1887 } else if (!i->write_index_corrupt) {
1888 /* A relative seek was made */
1889 i->write_index += o->stream->write_index_corrections[j].value;
1890 }
1891 }
1892
1893 /* Clear old correction entries */
1894 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1895 if (!o->stream->write_index_corrections[n].valid)
1896 continue;
1897
1898 if (o->stream->write_index_corrections[n].tag <= tag)
1899 o->stream->write_index_corrections[n].valid = FALSE;
1900 }
1901 }
1902
1903 if (o->stream->direction == PA_STREAM_RECORD) {
1904 /* Read index correction */
1905
1906 if (!i->read_index_corrupt)
1907 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1908 }
1909
1910 /* Update smoother if we're not corked */
1911 if (o->stream->smoother && !o->stream->corked) {
1912 pa_usec_t u, x;
1913
1914 u = x = pa_rtclock_now() - i->transport_usec;
1915
1916 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1917 pa_usec_t su;
1918
1919 /* If we weren't playing then it will take some time
1920 * until the audio will actually come out through the
1921 * speakers. Since we follow that timing here, we need
1922 * to try to fix this up */
1923
1924 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1925
1926 if (su < i->sink_usec)
1927 x += i->sink_usec - su;
1928 }
1929
1930 if (!i->playing)
1931 pa_smoother_pause(o->stream->smoother, x);
1932
1933 /* Update the smoother */
1934 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1935 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1936 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1937
1938 if (i->playing)
1939 pa_smoother_resume(o->stream->smoother, x, TRUE);
1940 }
1941 }
1942
1943 o->stream->auto_timing_update_requested = FALSE;
1944
1945 if (o->stream->latency_update_callback)
1946 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1947
1948 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1949 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1950 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1951 }
1952
1953 finish:
1954
1955 pa_operation_done(o);
1956 pa_operation_unref(o);
1957 }
1958
1959 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1960 uint32_t tag;
1961 pa_operation *o;
1962 pa_tagstruct *t;
1963 struct timeval now;
1964 int cidx = 0;
1965
1966 pa_assert(s);
1967 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1968
1969 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1970 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1971 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1972
1973 if (s->direction == PA_STREAM_PLAYBACK) {
1974 /* Find a place to store the write_index correction data for this entry */
1975 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1976
1977 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1978 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1979 }
1980 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1981
1982 t = pa_tagstruct_command(
1983 s->context,
1984 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1985 &tag);
1986 pa_tagstruct_putu32(t, s->channel);
1987 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1988
1989 pa_pstream_send_tagstruct(s->context->pstream, t);
1990 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1991
1992 if (s->direction == PA_STREAM_PLAYBACK) {
1993 /* Fill in initial correction data */
1994
1995 s->current_write_index_correction = cidx;
1996
1997 s->write_index_corrections[cidx].valid = TRUE;
1998 s->write_index_corrections[cidx].absolute = FALSE;
1999 s->write_index_corrections[cidx].corrupt = FALSE;
2000 s->write_index_corrections[cidx].tag = tag;
2001 s->write_index_corrections[cidx].value = 0;
2002 }
2003
2004 return o;
2005 }
2006
2007 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2008 pa_stream *s = userdata;
2009
2010 pa_assert(pd);
2011 pa_assert(s);
2012 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2013
2014 pa_stream_ref(s);
2015
2016 if (command != PA_COMMAND_REPLY) {
2017 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
2018 goto finish;
2019
2020 pa_stream_set_state(s, PA_STREAM_FAILED);
2021 goto finish;
2022 } else if (!pa_tagstruct_eof(t)) {
2023 pa_context_fail(s->context, PA_ERR_PROTOCOL);
2024 goto finish;
2025 }
2026
2027 pa_stream_set_state(s, PA_STREAM_TERMINATED);
2028
2029 finish:
2030 pa_stream_unref(s);
2031 }
2032
2033 int pa_stream_disconnect(pa_stream *s) {
2034 pa_tagstruct *t;
2035 uint32_t tag;
2036
2037 pa_assert(s);
2038 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2039
2040 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2041 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2042 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2043
2044 pa_stream_ref(s);
2045
2046 t = pa_tagstruct_command(
2047 s->context,
2048 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2049 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2050 &tag);
2051 pa_tagstruct_putu32(t, s->channel);
2052 pa_pstream_send_tagstruct(s->context->pstream, t);
2053 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2054
2055 pa_stream_unref(s);
2056 return 0;
2057 }
2058
2059 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2060 pa_assert(s);
2061 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2062
2063 if (pa_detect_fork())
2064 return;
2065
2066 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2067 return;
2068
2069 s->read_callback = cb;
2070 s->read_userdata = userdata;
2071 }
2072
2073 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2074 pa_assert(s);
2075 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2076
2077 if (pa_detect_fork())
2078 return;
2079
2080 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2081 return;
2082
2083 s->write_callback = cb;
2084 s->write_userdata = userdata;
2085 }
2086
2087 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2088 pa_assert(s);
2089 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2090
2091 if (pa_detect_fork())
2092 return;
2093
2094 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2095 return;
2096
2097 s->state_callback = cb;
2098 s->state_userdata = userdata;
2099 }
2100
2101 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2102 pa_assert(s);
2103 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2104
2105 if (pa_detect_fork())
2106 return;
2107
2108 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2109 return;
2110
2111 s->overflow_callback = cb;
2112 s->overflow_userdata = userdata;
2113 }
2114
2115 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2116 pa_assert(s);
2117 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2118
2119 if (pa_detect_fork())
2120 return;
2121
2122 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2123 return;
2124
2125 s->underflow_callback = cb;
2126 s->underflow_userdata = userdata;
2127 }
2128
2129 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2130 pa_assert(s);
2131 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2132
2133 if (pa_detect_fork())
2134 return;
2135
2136 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2137 return;
2138
2139 s->latency_update_callback = cb;
2140 s->latency_update_userdata = userdata;
2141 }
2142
2143 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2144 pa_assert(s);
2145 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2146
2147 if (pa_detect_fork())
2148 return;
2149
2150 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2151 return;
2152
2153 s->moved_callback = cb;
2154 s->moved_userdata = userdata;
2155 }
2156
2157 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2158 pa_assert(s);
2159 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2160
2161 if (pa_detect_fork())
2162 return;
2163
2164 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2165 return;
2166
2167 s->suspended_callback = cb;
2168 s->suspended_userdata = userdata;
2169 }
2170
2171 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2172 pa_assert(s);
2173 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2174
2175 if (pa_detect_fork())
2176 return;
2177
2178 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2179 return;
2180
2181 s->started_callback = cb;
2182 s->started_userdata = userdata;
2183 }
2184
2185 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2186 pa_assert(s);
2187 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2188
2189 if (pa_detect_fork())
2190 return;
2191
2192 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2193 return;
2194
2195 s->event_callback = cb;
2196 s->event_userdata = userdata;
2197 }
2198
2199 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2200 pa_assert(s);
2201 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2202
2203 if (pa_detect_fork())
2204 return;
2205
2206 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2207 return;
2208
2209 s->buffer_attr_callback = cb;
2210 s->buffer_attr_userdata = userdata;
2211 }
2212
2213 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2214 pa_operation *o = userdata;
2215 int success = 1;
2216
2217 pa_assert(pd);
2218 pa_assert(o);
2219 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2220
2221 if (!o->context)
2222 goto finish;
2223
2224 if (command != PA_COMMAND_REPLY) {
2225 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2226 goto finish;
2227
2228 success = 0;
2229 } else if (!pa_tagstruct_eof(t)) {
2230 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2231 goto finish;
2232 }
2233
2234 if (o->callback) {
2235 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2236 cb(o->stream, success, o->userdata);
2237 }
2238
2239 finish:
2240 pa_operation_done(o);
2241 pa_operation_unref(o);
2242 }
2243
2244 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2245 pa_operation *o;
2246 pa_tagstruct *t;
2247 uint32_t tag;
2248
2249 pa_assert(s);
2250 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2251
2252 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2253 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2254 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2255
2256 /* Ask for a timing update before we cork/uncork to get the best
2257 * accuracy for the transport latency suitable for the
2258 * check_smoother_status() call in the started callback */
2259 request_auto_timing_update(s, TRUE);
2260
2261 s->corked = b;
2262
2263 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2264
2265 t = pa_tagstruct_command(
2266 s->context,
2267 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2268 &tag);
2269 pa_tagstruct_putu32(t, s->channel);
2270 pa_tagstruct_put_boolean(t, !!b);
2271 pa_pstream_send_tagstruct(s->context->pstream, t);
2272 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2273
2274 check_smoother_status(s, FALSE, FALSE, FALSE);
2275
2276 /* This might cause the indexes to hang/start again, hence let's
2277 * request a timing update, after the cork/uncork, too */
2278 request_auto_timing_update(s, TRUE);
2279
2280 return o;
2281 }
2282
2283 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2284 pa_tagstruct *t;
2285 pa_operation *o;
2286 uint32_t tag;
2287
2288 pa_assert(s);
2289 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2290
2291 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2292 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2293
2294 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2295
2296 t = pa_tagstruct_command(s->context, command, &tag);
2297 pa_tagstruct_putu32(t, s->channel);
2298 pa_pstream_send_tagstruct(s->context->pstream, t);
2299 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2300
2301 return o;
2302 }
2303
2304 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2305 pa_operation *o;
2306
2307 pa_assert(s);
2308 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2309
2310 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2311 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2312 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2313
2314 /* Ask for a timing update *before* the flush, so that the
2315 * transport usec is as up to date as possible when we get the
2316 * underflow message and update the smoother status*/
2317 request_auto_timing_update(s, TRUE);
2318
2319 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2320 return NULL;
2321
2322 if (s->direction == PA_STREAM_PLAYBACK) {
2323
2324 if (s->write_index_corrections[s->current_write_index_correction].valid)
2325 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2326
2327 if (s->buffer_attr.prebuf > 0)
2328 check_smoother_status(s, FALSE, FALSE, TRUE);
2329
2330 /* This will change the write index, but leave the
2331 * read index untouched. */
2332 invalidate_indexes(s, FALSE, TRUE);
2333
2334 } else
2335 /* For record streams this has no influence on the write
2336 * index, but the read index might jump. */
2337 invalidate_indexes(s, TRUE, FALSE);
2338
2339 /* Note that we do not update requested_bytes here. This is
2340 * because we cannot really know how data actually was dropped
2341 * from the write index due to this. This 'error' will be applied
2342 * by both client and server and hence we should be fine. */
2343
2344 return o;
2345 }
2346
2347 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2348 pa_operation *o;
2349
2350 pa_assert(s);
2351 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2352
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2355 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2356 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2357
2358 /* Ask for a timing update before we cork/uncork to get the best
2359 * accuracy for the transport latency suitable for the
2360 * check_smoother_status() call in the started callback */
2361 request_auto_timing_update(s, TRUE);
2362
2363 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2364 return NULL;
2365
2366 /* This might cause the read index to hang again, hence
2367 * let's request a timing update */
2368 request_auto_timing_update(s, TRUE);
2369
2370 return o;
2371 }
2372
2373 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2374 pa_operation *o;
2375
2376 pa_assert(s);
2377 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2378
2379 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2380 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2383
2384 /* Ask for a timing update before we cork/uncork to get the best
2385 * accuracy for the transport latency suitable for the
2386 * check_smoother_status() call in the started callback */
2387 request_auto_timing_update(s, TRUE);
2388
2389 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2390 return NULL;
2391
2392 /* This might cause the read index to start moving again, hence
2393 * let's request a timing update */
2394 request_auto_timing_update(s, TRUE);
2395
2396 return o;
2397 }
2398
2399 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2400 pa_operation *o;
2401
2402 pa_assert(s);
2403 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2404 pa_assert(name);
2405
2406 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2407 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2408 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2409
2410 if (s->context->version >= 13) {
2411 pa_proplist *p = pa_proplist_new();
2412
2413 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2414 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2415 pa_proplist_free(p);
2416 } else {
2417 pa_tagstruct *t;
2418 uint32_t tag;
2419
2420 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2421 t = pa_tagstruct_command(
2422 s->context,
2423 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2424 &tag);
2425 pa_tagstruct_putu32(t, s->channel);
2426 pa_tagstruct_puts(t, name);
2427 pa_pstream_send_tagstruct(s->context->pstream, t);
2428 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2429 }
2430
2431 return o;
2432 }
2433
2434 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2435 pa_usec_t usec;
2436
2437 pa_assert(s);
2438 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2439
2440 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2441 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2442 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2443 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2444 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2445 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2446
2447 if (s->smoother)
2448 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2449 else
2450 usec = calc_time(s, FALSE);
2451
2452 /* Make sure the time runs monotonically */
2453 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2454 if (usec < s->previous_time)
2455 usec = s->previous_time;
2456 else
2457 s->previous_time = usec;
2458 }
2459
2460 if (r_usec)
2461 *r_usec = usec;
2462
2463 return 0;
2464 }
2465
2466 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2467 pa_assert(s);
2468 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2469
2470 if (negative)
2471 *negative = 0;
2472
2473 if (a >= b)
2474 return a-b;
2475 else {
2476 if (negative && s->direction == PA_STREAM_RECORD) {
2477 *negative = 1;
2478 return b-a;
2479 } else
2480 return 0;
2481 }
2482 }
2483
2484 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2485 pa_usec_t t, c;
2486 int r;
2487 int64_t cindex;
2488
2489 pa_assert(s);
2490 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2491 pa_assert(r_usec);
2492
2493 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2494 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2495 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2496 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2497 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2498 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2499
2500 if ((r = pa_stream_get_time(s, &t)) < 0)
2501 return r;
2502
2503 if (s->direction == PA_STREAM_PLAYBACK)
2504 cindex = s->timing_info.write_index;
2505 else
2506 cindex = s->timing_info.read_index;
2507
2508 if (cindex < 0)
2509 cindex = 0;
2510
2511 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2512
2513 if (s->direction == PA_STREAM_PLAYBACK)
2514 *r_usec = time_counter_diff(s, c, t, negative);
2515 else
2516 *r_usec = time_counter_diff(s, t, c, negative);
2517
2518 return 0;
2519 }
2520
2521 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2522 pa_assert(s);
2523 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2524
2525 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2526 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2527 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2528 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2529
2530 return &s->timing_info;
2531 }
2532
2533 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2534 pa_assert(s);
2535 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2536
2537 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2538
2539 return &s->sample_spec;
2540 }
2541
2542 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2543 pa_assert(s);
2544 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2545
2546 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2547
2548 return &s->channel_map;
2549 }
2550
2551 const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2552 pa_assert(s);
2553 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2554
2555 /* We don't have the format till routing is done */
2556 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2557 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2558
2559 return s->format;
2560 }
2561 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2562 pa_assert(s);
2563 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2564
2565 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2566 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2567 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2568
2569 return &s->buffer_attr;
2570 }
2571
2572 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2573 pa_operation *o = userdata;
2574 int success = 1;
2575
2576 pa_assert(pd);
2577 pa_assert(o);
2578 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2579
2580 if (!o->context)
2581 goto finish;
2582
2583 if (command != PA_COMMAND_REPLY) {
2584 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2585 goto finish;
2586
2587 success = 0;
2588 } else {
2589 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2590 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2591 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2592 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2593 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2594 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2595 goto finish;
2596 }
2597 } else if (o->stream->direction == PA_STREAM_RECORD) {
2598 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2599 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2600 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2601 goto finish;
2602 }
2603 }
2604
2605 if (o->stream->context->version >= 13) {
2606 pa_usec_t usec;
2607
2608 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2609 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2610 goto finish;
2611 }
2612
2613 if (o->stream->direction == PA_STREAM_RECORD)
2614 o->stream->timing_info.configured_source_usec = usec;
2615 else
2616 o->stream->timing_info.configured_sink_usec = usec;
2617 }
2618
2619 if (!pa_tagstruct_eof(t)) {
2620 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2621 goto finish;
2622 }
2623 }
2624
2625 if (o->callback) {
2626 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2627 cb(o->stream, success, o->userdata);
2628 }
2629
2630 finish:
2631 pa_operation_done(o);
2632 pa_operation_unref(o);
2633 }
2634
2635
2636 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2637 pa_operation *o;
2638 pa_tagstruct *t;
2639 uint32_t tag;
2640 pa_buffer_attr copy;
2641
2642 pa_assert(s);
2643 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2644 pa_assert(attr);
2645
2646 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2647 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2648 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2649 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2650
2651 /* Ask for a timing update before we cork/uncork to get the best
2652 * accuracy for the transport latency suitable for the
2653 * check_smoother_status() call in the started callback */
2654 request_auto_timing_update(s, TRUE);
2655
2656 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2657
2658 t = pa_tagstruct_command(
2659 s->context,
2660 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2661 &tag);
2662 pa_tagstruct_putu32(t, s->channel);
2663
2664 copy = *attr;
2665 patch_buffer_attr(s, &copy, NULL);
2666 attr = &copy;
2667
2668 pa_tagstruct_putu32(t, attr->maxlength);
2669
2670 if (s->direction == PA_STREAM_PLAYBACK)
2671 pa_tagstruct_put(
2672 t,
2673 PA_TAG_U32, attr->tlength,
2674 PA_TAG_U32, attr->prebuf,
2675 PA_TAG_U32, attr->minreq,
2676 PA_TAG_INVALID);
2677 else
2678 pa_tagstruct_putu32(t, attr->fragsize);
2679
2680 if (s->context->version >= 13)
2681 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2682
2683 if (s->context->version >= 14)
2684 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2685
2686 pa_pstream_send_tagstruct(s->context->pstream, t);
2687 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2688
2689 /* This might cause changes in the read/write index, hence let's
2690 * request a timing update */
2691 request_auto_timing_update(s, TRUE);
2692
2693 return o;
2694 }
2695
2696 uint32_t pa_stream_get_device_index(pa_stream *s) {
2697 pa_assert(s);
2698 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2699
2700 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2701 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2702 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2703 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2704 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2705
2706 return s->device_index;
2707 }
2708
2709 const char *pa_stream_get_device_name(pa_stream *s) {
2710 pa_assert(s);
2711 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2712
2713 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2714 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2715 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2716 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2717 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2718
2719 return s->device_name;
2720 }
2721
2722 int pa_stream_is_suspended(pa_stream *s) {
2723 pa_assert(s);
2724 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2725
2726 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2727 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2728 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2729 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2730
2731 return s->suspended;
2732 }
2733
2734 int pa_stream_is_corked(pa_stream *s) {
2735 pa_assert(s);
2736 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2737
2738 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2739 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2740 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2741
2742 return s->corked;
2743 }
2744
2745 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2746 pa_operation *o = userdata;
2747 int success = 1;
2748
2749 pa_assert(pd);
2750 pa_assert(o);
2751 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2752
2753 if (!o->context)
2754 goto finish;
2755
2756 if (command != PA_COMMAND_REPLY) {
2757 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2758 goto finish;
2759
2760 success = 0;
2761 } else {
2762
2763 if (!pa_tagstruct_eof(t)) {
2764 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2765 goto finish;
2766 }
2767 }
2768
2769 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2770 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2771
2772 if (o->callback) {
2773 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2774 cb(o->stream, success, o->userdata);
2775 }
2776
2777 finish:
2778 pa_operation_done(o);
2779 pa_operation_unref(o);
2780 }
2781
2782
2783 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2784 pa_operation *o;
2785 pa_tagstruct *t;
2786 uint32_t tag;
2787
2788 pa_assert(s);
2789 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2790
2791 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2792 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2793 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2794 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2795 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2796 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2797
2798 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2799 o->private = PA_UINT_TO_PTR(rate);
2800
2801 t = pa_tagstruct_command(
2802 s->context,
2803 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2804 &tag);
2805 pa_tagstruct_putu32(t, s->channel);
2806 pa_tagstruct_putu32(t, rate);
2807
2808 pa_pstream_send_tagstruct(s->context->pstream, t);
2809 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2810
2811 return o;
2812 }
2813
2814 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2815 pa_operation *o;
2816 pa_tagstruct *t;
2817 uint32_t tag;
2818
2819 pa_assert(s);
2820 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2821
2822 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2823 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2824 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2825 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2826 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2827
2828 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2829
2830 t = pa_tagstruct_command(
2831 s->context,
2832 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2833 &tag);
2834 pa_tagstruct_putu32(t, s->channel);
2835 pa_tagstruct_putu32(t, (uint32_t) mode);
2836 pa_tagstruct_put_proplist(t, p);
2837
2838 pa_pstream_send_tagstruct(s->context->pstream, t);
2839 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2840
2841 /* Please note that we don't update s->proplist here, because we
2842 * don't export that field */
2843
2844 return o;
2845 }
2846
2847 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2848 pa_operation *o;
2849 pa_tagstruct *t;
2850 uint32_t tag;
2851 const char * const*k;
2852
2853 pa_assert(s);
2854 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2855
2856 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2857 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2858 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2859 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2860 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2861
2862 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2863
2864 t = pa_tagstruct_command(
2865 s->context,
2866 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2867 &tag);
2868 pa_tagstruct_putu32(t, s->channel);
2869
2870 for (k = keys; *k; k++)
2871 pa_tagstruct_puts(t, *k);
2872
2873 pa_tagstruct_puts(t, NULL);
2874
2875 pa_pstream_send_tagstruct(s->context->pstream, t);
2876 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2877
2878 /* Please note that we don't update s->proplist here, because we
2879 * don't export that field */
2880
2881 return o;
2882 }
2883
2884 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2885 pa_assert(s);
2886 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2887
2888 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2889 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2890 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2891 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2892
2893 s->direct_on_input = sink_input_idx;
2894
2895 return 0;
2896 }
2897
2898 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2899 pa_assert(s);
2900 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2901
2902 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2903 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2904 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2905
2906 return s->direct_on_input;
2907 }