]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
pulse: ask for timing updates both *before* and *after* triggering a stream state...
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35
36 #include <pulsecore/pstream-util.h>
37 #include <pulsecore/log.h>
38 #include <pulsecore/hashmap.h>
39 #include <pulsecore/macro.h>
40 #include <pulsecore/core-rtclock.h>
41
42 #include "fork-detect.h"
43 #include "internal.h"
44
45 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
46 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
47
48 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
49 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_MIN_HISTORY (4)
51
52 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
53 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
54 }
55
56 static void reset_callbacks(pa_stream *s) {
57 s->read_callback = NULL;
58 s->read_userdata = NULL;
59 s->write_callback = NULL;
60 s->write_userdata = NULL;
61 s->state_callback = NULL;
62 s->state_userdata = NULL;
63 s->overflow_callback = NULL;
64 s->overflow_userdata = NULL;
65 s->underflow_callback = NULL;
66 s->underflow_userdata = NULL;
67 s->latency_update_callback = NULL;
68 s->latency_update_userdata = NULL;
69 s->moved_callback = NULL;
70 s->moved_userdata = NULL;
71 s->suspended_callback = NULL;
72 s->suspended_userdata = NULL;
73 s->started_callback = NULL;
74 s->started_userdata = NULL;
75 s->event_callback = NULL;
76 s->event_userdata = NULL;
77 s->buffer_attr_callback = NULL;
78 s->buffer_attr_userdata = NULL;
79 }
80
81 pa_stream *pa_stream_new_with_proplist(
82 pa_context *c,
83 const char *name,
84 const pa_sample_spec *ss,
85 const pa_channel_map *map,
86 pa_proplist *p) {
87
88 pa_stream *s;
89 int i;
90 pa_channel_map tmap;
91
92 pa_assert(c);
93 pa_assert(PA_REFCNT_VALUE(c) >= 1);
94
95 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
102
103 if (!map)
104 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
105
106 s = pa_xnew(pa_stream, 1);
107 PA_REFCNT_INIT(s);
108 s->context = c;
109 s->mainloop = c->mainloop;
110
111 s->direction = PA_STREAM_NODIRECTION;
112 s->state = PA_STREAM_UNCONNECTED;
113 s->flags = 0;
114
115 s->sample_spec = *ss;
116 s->channel_map = *map;
117
118 s->direct_on_input = PA_INVALID_INDEX;
119
120 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
121 if (name)
122 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
123
124 s->channel = 0;
125 s->channel_valid = FALSE;
126 s->syncid = c->csyncid++;
127 s->stream_index = PA_INVALID_INDEX;
128
129 s->requested_bytes = 0;
130 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
131
132 /* We initialize der target length here, so that if the user
133 * passes no explicit buffering metrics the default is similar to
134 * what older PA versions provided. */
135
136 s->buffer_attr.maxlength = (uint32_t) -1;
137 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
138 s->buffer_attr.minreq = (uint32_t) -1;
139 s->buffer_attr.prebuf = (uint32_t) -1;
140 s->buffer_attr.fragsize = (uint32_t) -1;
141
142 s->device_index = PA_INVALID_INDEX;
143 s->device_name = NULL;
144 s->suspended = FALSE;
145 s->corked = FALSE;
146
147 s->write_memblock = NULL;
148 s->write_data = NULL;
149
150 pa_memchunk_reset(&s->peek_memchunk);
151 s->peek_data = NULL;
152 s->record_memblockq = NULL;
153
154 memset(&s->timing_info, 0, sizeof(s->timing_info));
155 s->timing_info_valid = FALSE;
156
157 s->previous_time = 0;
158
159 s->read_index_not_before = 0;
160 s->write_index_not_before = 0;
161 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
162 s->write_index_corrections[i].valid = 0;
163 s->current_write_index_correction = 0;
164
165 s->auto_timing_update_event = NULL;
166 s->auto_timing_update_requested = FALSE;
167 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
168
169 reset_callbacks(s);
170
171 s->smoother = NULL;
172
173 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
174 PA_LLIST_PREPEND(pa_stream, c->streams, s);
175 pa_stream_ref(s);
176
177 return s;
178 }
179
180 static void stream_unlink(pa_stream *s) {
181 pa_operation *o, *n;
182 pa_assert(s);
183
184 if (!s->context)
185 return;
186
187 /* Detach from context */
188
189 /* Unref all operatio object that point to us */
190 for (o = s->context->operations; o; o = n) {
191 n = o->next;
192
193 if (o->stream == s)
194 pa_operation_cancel(o);
195 }
196
197 /* Drop all outstanding replies for this stream */
198 if (s->context->pdispatch)
199 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
200
201 if (s->channel_valid) {
202 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
203 s->channel = 0;
204 s->channel_valid = FALSE;
205 }
206
207 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
208 pa_stream_unref(s);
209
210 s->context = NULL;
211
212 if (s->auto_timing_update_event) {
213 pa_assert(s->mainloop);
214 s->mainloop->time_free(s->auto_timing_update_event);
215 }
216
217 reset_callbacks(s);
218 }
219
220 static void stream_free(pa_stream *s) {
221 pa_assert(s);
222
223 stream_unlink(s);
224
225 if (s->write_memblock) {
226 pa_memblock_release(s->write_memblock);
227 pa_memblock_unref(s->write_data);
228 }
229
230 if (s->peek_memchunk.memblock) {
231 if (s->peek_data)
232 pa_memblock_release(s->peek_memchunk.memblock);
233 pa_memblock_unref(s->peek_memchunk.memblock);
234 }
235
236 if (s->record_memblockq)
237 pa_memblockq_free(s->record_memblockq);
238
239 if (s->proplist)
240 pa_proplist_free(s->proplist);
241
242 if (s->smoother)
243 pa_smoother_free(s->smoother);
244
245 pa_xfree(s->device_name);
246 pa_xfree(s);
247 }
248
249 void pa_stream_unref(pa_stream *s) {
250 pa_assert(s);
251 pa_assert(PA_REFCNT_VALUE(s) >= 1);
252
253 if (PA_REFCNT_DEC(s) <= 0)
254 stream_free(s);
255 }
256
257 pa_stream* pa_stream_ref(pa_stream *s) {
258 pa_assert(s);
259 pa_assert(PA_REFCNT_VALUE(s) >= 1);
260
261 PA_REFCNT_INC(s);
262 return s;
263 }
264
265 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
266 pa_assert(s);
267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
268
269 return s->state;
270 }
271
272 pa_context* pa_stream_get_context(pa_stream *s) {
273 pa_assert(s);
274 pa_assert(PA_REFCNT_VALUE(s) >= 1);
275
276 return s->context;
277 }
278
279 uint32_t pa_stream_get_index(pa_stream *s) {
280 pa_assert(s);
281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
282
283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
285
286 return s->stream_index;
287 }
288
289 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
290 pa_assert(s);
291 pa_assert(PA_REFCNT_VALUE(s) >= 1);
292
293 if (s->state == st)
294 return;
295
296 pa_stream_ref(s);
297
298 s->state = st;
299
300 if (s->state_callback)
301 s->state_callback(s, s->state_userdata);
302
303 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
304 stream_unlink(s);
305
306 pa_stream_unref(s);
307 }
308
309 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
310 pa_assert(s);
311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
312
313 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
314 return;
315
316 if (s->state == PA_STREAM_READY &&
317 (force || !s->auto_timing_update_requested)) {
318 pa_operation *o;
319
320 /* pa_log("Automatically requesting new timing data"); */
321
322 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
323 pa_operation_unref(o);
324 s->auto_timing_update_requested = TRUE;
325 }
326 }
327
328 if (s->auto_timing_update_event) {
329 if (force)
330 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
331
332 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
333
334 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
335 }
336 }
337
338 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
339 pa_context *c = userdata;
340 pa_stream *s;
341 uint32_t channel;
342
343 pa_assert(pd);
344 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
345 pa_assert(t);
346 pa_assert(c);
347 pa_assert(PA_REFCNT_VALUE(c) >= 1);
348
349 pa_context_ref(c);
350
351 if (pa_tagstruct_getu32(t, &channel) < 0 ||
352 !pa_tagstruct_eof(t)) {
353 pa_context_fail(c, PA_ERR_PROTOCOL);
354 goto finish;
355 }
356
357 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
358 goto finish;
359
360 if (s->state != PA_STREAM_READY)
361 goto finish;
362
363 pa_context_set_error(c, PA_ERR_KILLED);
364 pa_stream_set_state(s, PA_STREAM_FAILED);
365
366 finish:
367 pa_context_unref(c);
368 }
369
370 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
371 pa_usec_t x;
372
373 pa_assert(s);
374 pa_assert(!force_start || !force_stop);
375
376 if (!s->smoother)
377 return;
378
379 x = pa_rtclock_now();
380
381 if (s->timing_info_valid) {
382 if (aposteriori)
383 x -= s->timing_info.transport_usec;
384 else
385 x += s->timing_info.transport_usec;
386 }
387
388 if (s->suspended || s->corked || force_stop)
389 pa_smoother_pause(s->smoother, x);
390 else if (force_start || s->buffer_attr.prebuf == 0) {
391
392 if (!s->timing_info_valid &&
393 !aposteriori &&
394 !force_start &&
395 !force_stop &&
396 s->context->version >= 13) {
397
398 /* If the server supports STARTED events we take them as
399 * indications when audio really starts/stops playing, if
400 * we don't have any timing info yet -- instead of trying
401 * to be smart and guessing the server time. Otherwise the
402 * unknown transport delay add too much noise to our time
403 * calculations. */
404
405 return;
406 }
407
408 pa_smoother_resume(s->smoother, x, TRUE);
409 }
410
411 /* Please note that we have no idea if playback actually started
412 * if prebuf is non-zero! */
413 }
414
415 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
416 pa_context *c = userdata;
417 pa_stream *s;
418 uint32_t channel;
419 const char *dn;
420 pa_bool_t suspended;
421 uint32_t di;
422 pa_usec_t usec = 0;
423 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
424
425 pa_assert(pd);
426 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
427 pa_assert(t);
428 pa_assert(c);
429 pa_assert(PA_REFCNT_VALUE(c) >= 1);
430
431 pa_context_ref(c);
432
433 if (c->version < 12) {
434 pa_context_fail(c, PA_ERR_PROTOCOL);
435 goto finish;
436 }
437
438 if (pa_tagstruct_getu32(t, &channel) < 0 ||
439 pa_tagstruct_getu32(t, &di) < 0 ||
440 pa_tagstruct_gets(t, &dn) < 0 ||
441 pa_tagstruct_get_boolean(t, &suspended) < 0) {
442 pa_context_fail(c, PA_ERR_PROTOCOL);
443 goto finish;
444 }
445
446 if (c->version >= 13) {
447
448 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
449 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
450 pa_tagstruct_getu32(t, &fragsize) < 0 ||
451 pa_tagstruct_get_usec(t, &usec) < 0) {
452 pa_context_fail(c, PA_ERR_PROTOCOL);
453 goto finish;
454 }
455 } else {
456 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
457 pa_tagstruct_getu32(t, &tlength) < 0 ||
458 pa_tagstruct_getu32(t, &prebuf) < 0 ||
459 pa_tagstruct_getu32(t, &minreq) < 0 ||
460 pa_tagstruct_get_usec(t, &usec) < 0) {
461 pa_context_fail(c, PA_ERR_PROTOCOL);
462 goto finish;
463 }
464 }
465 }
466
467 if (!pa_tagstruct_eof(t)) {
468 pa_context_fail(c, PA_ERR_PROTOCOL);
469 goto finish;
470 }
471
472 if (!dn || di == PA_INVALID_INDEX) {
473 pa_context_fail(c, PA_ERR_PROTOCOL);
474 goto finish;
475 }
476
477 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
478 goto finish;
479
480 if (s->state != PA_STREAM_READY)
481 goto finish;
482
483 if (c->version >= 13) {
484 if (s->direction == PA_STREAM_RECORD)
485 s->timing_info.configured_source_usec = usec;
486 else
487 s->timing_info.configured_sink_usec = usec;
488
489 s->buffer_attr.maxlength = maxlength;
490 s->buffer_attr.fragsize = fragsize;
491 s->buffer_attr.tlength = tlength;
492 s->buffer_attr.prebuf = prebuf;
493 s->buffer_attr.minreq = minreq;
494 }
495
496 pa_xfree(s->device_name);
497 s->device_name = pa_xstrdup(dn);
498 s->device_index = di;
499
500 s->suspended = suspended;
501
502 check_smoother_status(s, TRUE, FALSE, FALSE);
503 request_auto_timing_update(s, TRUE);
504
505 if (s->moved_callback)
506 s->moved_callback(s, s->moved_userdata);
507
508 finish:
509 pa_context_unref(c);
510 }
511
512 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
513 pa_context *c = userdata;
514 pa_stream *s;
515 uint32_t channel;
516 pa_usec_t usec = 0;
517 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
518
519 pa_assert(pd);
520 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
521 pa_assert(t);
522 pa_assert(c);
523 pa_assert(PA_REFCNT_VALUE(c) >= 1);
524
525 pa_context_ref(c);
526
527 if (c->version < 15) {
528 pa_context_fail(c, PA_ERR_PROTOCOL);
529 goto finish;
530 }
531
532 if (pa_tagstruct_getu32(t, &channel) < 0) {
533 pa_context_fail(c, PA_ERR_PROTOCOL);
534 goto finish;
535 }
536
537 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
538 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
539 pa_tagstruct_getu32(t, &fragsize) < 0 ||
540 pa_tagstruct_get_usec(t, &usec) < 0) {
541 pa_context_fail(c, PA_ERR_PROTOCOL);
542 goto finish;
543 }
544 } else {
545 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
546 pa_tagstruct_getu32(t, &tlength) < 0 ||
547 pa_tagstruct_getu32(t, &prebuf) < 0 ||
548 pa_tagstruct_getu32(t, &minreq) < 0 ||
549 pa_tagstruct_get_usec(t, &usec) < 0) {
550 pa_context_fail(c, PA_ERR_PROTOCOL);
551 goto finish;
552 }
553 }
554
555 if (!pa_tagstruct_eof(t)) {
556 pa_context_fail(c, PA_ERR_PROTOCOL);
557 goto finish;
558 }
559
560 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, channel)))
561 goto finish;
562
563 if (s->state != PA_STREAM_READY)
564 goto finish;
565
566 if (s->direction == PA_STREAM_RECORD)
567 s->timing_info.configured_source_usec = usec;
568 else
569 s->timing_info.configured_sink_usec = usec;
570
571 s->buffer_attr.maxlength = maxlength;
572 s->buffer_attr.fragsize = fragsize;
573 s->buffer_attr.tlength = tlength;
574 s->buffer_attr.prebuf = prebuf;
575 s->buffer_attr.minreq = minreq;
576
577 request_auto_timing_update(s, TRUE);
578
579 if (s->buffer_attr_callback)
580 s->buffer_attr_callback(s, s->buffer_attr_userdata);
581
582 finish:
583 pa_context_unref(c);
584 }
585
586 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
587 pa_context *c = userdata;
588 pa_stream *s;
589 uint32_t channel;
590 pa_bool_t suspended;
591
592 pa_assert(pd);
593 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
594 pa_assert(t);
595 pa_assert(c);
596 pa_assert(PA_REFCNT_VALUE(c) >= 1);
597
598 pa_context_ref(c);
599
600 if (c->version < 12) {
601 pa_context_fail(c, PA_ERR_PROTOCOL);
602 goto finish;
603 }
604
605 if (pa_tagstruct_getu32(t, &channel) < 0 ||
606 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
607 !pa_tagstruct_eof(t)) {
608 pa_context_fail(c, PA_ERR_PROTOCOL);
609 goto finish;
610 }
611
612 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
613 goto finish;
614
615 if (s->state != PA_STREAM_READY)
616 goto finish;
617
618 s->suspended = suspended;
619
620 check_smoother_status(s, TRUE, FALSE, FALSE);
621 request_auto_timing_update(s, TRUE);
622
623 if (s->suspended_callback)
624 s->suspended_callback(s, s->suspended_userdata);
625
626 finish:
627 pa_context_unref(c);
628 }
629
630 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
631 pa_context *c = userdata;
632 pa_stream *s;
633 uint32_t channel;
634
635 pa_assert(pd);
636 pa_assert(command == PA_COMMAND_STARTED);
637 pa_assert(t);
638 pa_assert(c);
639 pa_assert(PA_REFCNT_VALUE(c) >= 1);
640
641 pa_context_ref(c);
642
643 if (c->version < 13) {
644 pa_context_fail(c, PA_ERR_PROTOCOL);
645 goto finish;
646 }
647
648 if (pa_tagstruct_getu32(t, &channel) < 0 ||
649 !pa_tagstruct_eof(t)) {
650 pa_context_fail(c, PA_ERR_PROTOCOL);
651 goto finish;
652 }
653
654 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
655 goto finish;
656
657 if (s->state != PA_STREAM_READY)
658 goto finish;
659
660 check_smoother_status(s, TRUE, TRUE, FALSE);
661 request_auto_timing_update(s, TRUE);
662
663 if (s->started_callback)
664 s->started_callback(s, s->started_userdata);
665
666 finish:
667 pa_context_unref(c);
668 }
669
670 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671 pa_context *c = userdata;
672 pa_stream *s;
673 uint32_t channel;
674 pa_proplist *pl = NULL;
675 const char *event;
676
677 pa_assert(pd);
678 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
679 pa_assert(t);
680 pa_assert(c);
681 pa_assert(PA_REFCNT_VALUE(c) >= 1);
682
683 pa_context_ref(c);
684
685 if (c->version < 15) {
686 pa_context_fail(c, PA_ERR_PROTOCOL);
687 goto finish;
688 }
689
690 pl = pa_proplist_new();
691
692 if (pa_tagstruct_getu32(t, &channel) < 0 ||
693 pa_tagstruct_gets(t, &event) < 0 ||
694 pa_tagstruct_get_proplist(t, pl) < 0 ||
695 !pa_tagstruct_eof(t) || !event) {
696 pa_context_fail(c, PA_ERR_PROTOCOL);
697 goto finish;
698 }
699
700 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
701 goto finish;
702
703 if (s->state != PA_STREAM_READY)
704 goto finish;
705
706 if (s->event_callback)
707 s->event_callback(s, event, pl, s->event_userdata);
708
709 finish:
710 pa_context_unref(c);
711
712 if (pl)
713 pa_proplist_free(pl);
714 }
715
716 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
717 pa_stream *s;
718 pa_context *c = userdata;
719 uint32_t bytes, channel;
720
721 pa_assert(pd);
722 pa_assert(command == PA_COMMAND_REQUEST);
723 pa_assert(t);
724 pa_assert(c);
725 pa_assert(PA_REFCNT_VALUE(c) >= 1);
726
727 pa_context_ref(c);
728
729 if (pa_tagstruct_getu32(t, &channel) < 0 ||
730 pa_tagstruct_getu32(t, &bytes) < 0 ||
731 !pa_tagstruct_eof(t)) {
732 pa_context_fail(c, PA_ERR_PROTOCOL);
733 goto finish;
734 }
735
736 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
737 goto finish;
738
739 if (s->state != PA_STREAM_READY)
740 goto finish;
741
742 s->requested_bytes += bytes;
743
744 if (s->requested_bytes > 0 && s->write_callback)
745 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
746
747 finish:
748 pa_context_unref(c);
749 }
750
751 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
752 pa_stream *s;
753 pa_context *c = userdata;
754 uint32_t channel;
755
756 pa_assert(pd);
757 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
758 pa_assert(t);
759 pa_assert(c);
760 pa_assert(PA_REFCNT_VALUE(c) >= 1);
761
762 pa_context_ref(c);
763
764 if (pa_tagstruct_getu32(t, &channel) < 0 ||
765 !pa_tagstruct_eof(t)) {
766 pa_context_fail(c, PA_ERR_PROTOCOL);
767 goto finish;
768 }
769
770 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
771 goto finish;
772
773 if (s->state != PA_STREAM_READY)
774 goto finish;
775
776 if (s->buffer_attr.prebuf > 0)
777 check_smoother_status(s, TRUE, FALSE, TRUE);
778
779 request_auto_timing_update(s, TRUE);
780
781 if (command == PA_COMMAND_OVERFLOW) {
782 if (s->overflow_callback)
783 s->overflow_callback(s, s->overflow_userdata);
784 } else if (command == PA_COMMAND_UNDERFLOW) {
785 if (s->underflow_callback)
786 s->underflow_callback(s, s->underflow_userdata);
787 }
788
789 finish:
790 pa_context_unref(c);
791 }
792
793 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
794 pa_assert(s);
795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
796
797 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
798
799 if (s->state != PA_STREAM_READY)
800 return;
801
802 if (w) {
803 s->write_index_not_before = s->context->ctag;
804
805 if (s->timing_info_valid)
806 s->timing_info.write_index_corrupt = TRUE;
807
808 /* pa_log("write_index invalidated"); */
809 }
810
811 if (r) {
812 s->read_index_not_before = s->context->ctag;
813
814 if (s->timing_info_valid)
815 s->timing_info.read_index_corrupt = TRUE;
816
817 /* pa_log("read_index invalidated"); */
818 }
819
820 request_auto_timing_update(s, TRUE);
821 }
822
823 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
824 pa_stream *s = userdata;
825
826 pa_assert(s);
827 pa_assert(PA_REFCNT_VALUE(s) >= 1);
828
829 pa_stream_ref(s);
830 request_auto_timing_update(s, FALSE);
831 pa_stream_unref(s);
832 }
833
834 static void create_stream_complete(pa_stream *s) {
835 pa_assert(s);
836 pa_assert(PA_REFCNT_VALUE(s) >= 1);
837 pa_assert(s->state == PA_STREAM_CREATING);
838
839 pa_stream_set_state(s, PA_STREAM_READY);
840
841 if (s->requested_bytes > 0 && s->write_callback)
842 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
843
844 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
845 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
846 pa_assert(!s->auto_timing_update_event);
847 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
848
849 request_auto_timing_update(s, TRUE);
850 }
851
852 check_smoother_status(s, TRUE, FALSE, FALSE);
853 }
854
855 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
856 pa_assert(s);
857 pa_assert(attr);
858 pa_assert(ss);
859
860 if (s->context->version >= 13)
861 return;
862
863 /* Version older than 0.9.10 didn't do server side buffer_attr
864 * selection, hence we have to fake it on the client side. */
865
866 /* We choose fairly conservative values here, to not confuse
867 * old clients with extremely large playback buffers */
868
869 if (attr->maxlength == (uint32_t) -1)
870 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
871
872 if (attr->tlength == (uint32_t) -1)
873 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
874
875 if (attr->minreq == (uint32_t) -1)
876 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
877
878 if (attr->prebuf == (uint32_t) -1)
879 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
880
881 if (attr->fragsize == (uint32_t) -1)
882 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
883 }
884
885 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
886 pa_stream *s = userdata;
887 uint32_t requested_bytes = 0;
888
889 pa_assert(pd);
890 pa_assert(s);
891 pa_assert(PA_REFCNT_VALUE(s) >= 1);
892 pa_assert(s->state == PA_STREAM_CREATING);
893
894 pa_stream_ref(s);
895
896 if (command != PA_COMMAND_REPLY) {
897 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
898 goto finish;
899
900 pa_stream_set_state(s, PA_STREAM_FAILED);
901 goto finish;
902 }
903
904 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
905 s->channel == PA_INVALID_INDEX ||
906 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
907 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
908 pa_context_fail(s->context, PA_ERR_PROTOCOL);
909 goto finish;
910 }
911
912 s->requested_bytes = (int64_t) requested_bytes;
913
914 if (s->context->version >= 9) {
915 if (s->direction == PA_STREAM_PLAYBACK) {
916 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
917 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
918 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
919 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
920 pa_context_fail(s->context, PA_ERR_PROTOCOL);
921 goto finish;
922 }
923 } else if (s->direction == PA_STREAM_RECORD) {
924 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
925 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
926 pa_context_fail(s->context, PA_ERR_PROTOCOL);
927 goto finish;
928 }
929 }
930 }
931
932 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
933 pa_sample_spec ss;
934 pa_channel_map cm;
935 const char *dn = NULL;
936 pa_bool_t suspended;
937
938 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
939 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
940 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
941 pa_tagstruct_gets(t, &dn) < 0 ||
942 pa_tagstruct_get_boolean(t, &suspended) < 0) {
943 pa_context_fail(s->context, PA_ERR_PROTOCOL);
944 goto finish;
945 }
946
947 if (!dn || s->device_index == PA_INVALID_INDEX ||
948 ss.channels != cm.channels ||
949 !pa_channel_map_valid(&cm) ||
950 !pa_sample_spec_valid(&ss) ||
951 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
952 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
953 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
954 pa_context_fail(s->context, PA_ERR_PROTOCOL);
955 goto finish;
956 }
957
958 pa_xfree(s->device_name);
959 s->device_name = pa_xstrdup(dn);
960 s->suspended = suspended;
961
962 s->channel_map = cm;
963 s->sample_spec = ss;
964 }
965
966 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
967 pa_usec_t usec;
968
969 if (pa_tagstruct_get_usec(t, &usec) < 0) {
970 pa_context_fail(s->context, PA_ERR_PROTOCOL);
971 goto finish;
972 }
973
974 if (s->direction == PA_STREAM_RECORD)
975 s->timing_info.configured_source_usec = usec;
976 else
977 s->timing_info.configured_sink_usec = usec;
978 }
979
980 if (!pa_tagstruct_eof(t)) {
981 pa_context_fail(s->context, PA_ERR_PROTOCOL);
982 goto finish;
983 }
984
985 if (s->direction == PA_STREAM_RECORD) {
986 pa_assert(!s->record_memblockq);
987
988 s->record_memblockq = pa_memblockq_new(
989 0,
990 s->buffer_attr.maxlength,
991 0,
992 pa_frame_size(&s->sample_spec),
993 1,
994 0,
995 0,
996 NULL);
997 }
998
999 s->channel_valid = TRUE;
1000 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
1001
1002 create_stream_complete(s);
1003
1004 finish:
1005 pa_stream_unref(s);
1006 }
1007
1008 static int create_stream(
1009 pa_stream_direction_t direction,
1010 pa_stream *s,
1011 const char *dev,
1012 const pa_buffer_attr *attr,
1013 pa_stream_flags_t flags,
1014 const pa_cvolume *volume,
1015 pa_stream *sync_stream) {
1016
1017 pa_tagstruct *t;
1018 uint32_t tag;
1019 pa_bool_t volume_set = FALSE;
1020
1021 pa_assert(s);
1022 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1023 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1024
1025 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1026 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1027 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1028 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1029 PA_STREAM_INTERPOLATE_TIMING|
1030 PA_STREAM_NOT_MONOTONIC|
1031 PA_STREAM_AUTO_TIMING_UPDATE|
1032 PA_STREAM_NO_REMAP_CHANNELS|
1033 PA_STREAM_NO_REMIX_CHANNELS|
1034 PA_STREAM_FIX_FORMAT|
1035 PA_STREAM_FIX_RATE|
1036 PA_STREAM_FIX_CHANNELS|
1037 PA_STREAM_DONT_MOVE|
1038 PA_STREAM_VARIABLE_RATE|
1039 PA_STREAM_PEAK_DETECT|
1040 PA_STREAM_START_MUTED|
1041 PA_STREAM_ADJUST_LATENCY|
1042 PA_STREAM_EARLY_REQUESTS|
1043 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1044 PA_STREAM_START_UNMUTED|
1045 PA_STREAM_FAIL_ON_SUSPEND|
1046 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1047
1048 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1049 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1050 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1051 /* Althought some of the other flags are not supported on older
1052 * version, we don't check for them here, because it doesn't hurt
1053 * when they are passed but actually not supported. This makes
1054 * client development easier */
1055
1056 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1057 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1058 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1059 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1060 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1061
1062 pa_stream_ref(s);
1063
1064 s->direction = direction;
1065 s->flags = flags;
1066 s->corked = !!(flags & PA_STREAM_START_CORKED);
1067
1068 if (sync_stream)
1069 s->syncid = sync_stream->syncid;
1070
1071 if (attr)
1072 s->buffer_attr = *attr;
1073 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1074
1075 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1076 pa_usec_t x;
1077
1078 x = pa_rtclock_now();
1079
1080 pa_assert(!s->smoother);
1081 s->smoother = pa_smoother_new(
1082 SMOOTHER_ADJUST_TIME,
1083 SMOOTHER_HISTORY_TIME,
1084 !(flags & PA_STREAM_NOT_MONOTONIC),
1085 TRUE,
1086 SMOOTHER_MIN_HISTORY,
1087 x,
1088 TRUE);
1089 }
1090
1091 if (!dev)
1092 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1093
1094 t = pa_tagstruct_command(
1095 s->context,
1096 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1097 &tag);
1098
1099 if (s->context->version < 13)
1100 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1101
1102 pa_tagstruct_put(
1103 t,
1104 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1105 PA_TAG_CHANNEL_MAP, &s->channel_map,
1106 PA_TAG_U32, PA_INVALID_INDEX,
1107 PA_TAG_STRING, dev,
1108 PA_TAG_U32, s->buffer_attr.maxlength,
1109 PA_TAG_BOOLEAN, s->corked,
1110 PA_TAG_INVALID);
1111
1112 if (s->direction == PA_STREAM_PLAYBACK) {
1113 pa_cvolume cv;
1114
1115 pa_tagstruct_put(
1116 t,
1117 PA_TAG_U32, s->buffer_attr.tlength,
1118 PA_TAG_U32, s->buffer_attr.prebuf,
1119 PA_TAG_U32, s->buffer_attr.minreq,
1120 PA_TAG_U32, s->syncid,
1121 PA_TAG_INVALID);
1122
1123 volume_set = !!volume;
1124
1125 if (!volume)
1126 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1127
1128 pa_tagstruct_put_cvolume(t, volume);
1129 } else
1130 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1131
1132 if (s->context->version >= 12) {
1133 pa_tagstruct_put(
1134 t,
1135 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1136 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1137 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1138 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1139 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1140 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1141 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1142 PA_TAG_INVALID);
1143 }
1144
1145 if (s->context->version >= 13) {
1146
1147 if (s->direction == PA_STREAM_PLAYBACK)
1148 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1149 else
1150 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1151
1152 pa_tagstruct_put(
1153 t,
1154 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1155 PA_TAG_PROPLIST, s->proplist,
1156 PA_TAG_INVALID);
1157
1158 if (s->direction == PA_STREAM_RECORD)
1159 pa_tagstruct_putu32(t, s->direct_on_input);
1160 }
1161
1162 if (s->context->version >= 14) {
1163
1164 if (s->direction == PA_STREAM_PLAYBACK)
1165 pa_tagstruct_put_boolean(t, volume_set);
1166
1167 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1168 }
1169
1170 if (s->context->version >= 15) {
1171
1172 if (s->direction == PA_STREAM_PLAYBACK)
1173 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1174
1175 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1176 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1177 }
1178
1179 if (s->context->version >= 17) {
1180
1181 if (s->direction == PA_STREAM_PLAYBACK)
1182 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1183
1184 }
1185
1186 pa_pstream_send_tagstruct(s->context->pstream, t);
1187 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1188
1189 pa_stream_set_state(s, PA_STREAM_CREATING);
1190
1191 pa_stream_unref(s);
1192 return 0;
1193 }
1194
1195 int pa_stream_connect_playback(
1196 pa_stream *s,
1197 const char *dev,
1198 const pa_buffer_attr *attr,
1199 pa_stream_flags_t flags,
1200 const pa_cvolume *volume,
1201 pa_stream *sync_stream) {
1202
1203 pa_assert(s);
1204 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1205
1206 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1207 }
1208
1209 int pa_stream_connect_record(
1210 pa_stream *s,
1211 const char *dev,
1212 const pa_buffer_attr *attr,
1213 pa_stream_flags_t flags) {
1214
1215 pa_assert(s);
1216 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1217
1218 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1219 }
1220
1221 int pa_stream_begin_write(
1222 pa_stream *s,
1223 void **data,
1224 size_t *nbytes) {
1225
1226 pa_assert(s);
1227 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1228
1229 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1230 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1231 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1232 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1233 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1234
1235 if (*nbytes != (size_t) -1) {
1236 size_t m, fs;
1237
1238 m = pa_mempool_block_size_max(s->context->mempool);
1239 fs = pa_frame_size(&s->sample_spec);
1240
1241 m = (m / fs) * fs;
1242 if (*nbytes > m)
1243 *nbytes = m;
1244 }
1245
1246 if (!s->write_memblock) {
1247 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1248 s->write_data = pa_memblock_acquire(s->write_memblock);
1249 }
1250
1251 *data = s->write_data;
1252 *nbytes = pa_memblock_get_length(s->write_memblock);
1253
1254 return 0;
1255 }
1256
1257 int pa_stream_cancel_write(
1258 pa_stream *s) {
1259
1260 pa_assert(s);
1261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1262
1263 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1264 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1265 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1266 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1267
1268 pa_assert(s->write_data);
1269
1270 pa_memblock_release(s->write_memblock);
1271 pa_memblock_unref(s->write_memblock);
1272 s->write_memblock = NULL;
1273 s->write_data = NULL;
1274
1275 return 0;
1276 }
1277
1278 int pa_stream_write(
1279 pa_stream *s,
1280 const void *data,
1281 size_t length,
1282 pa_free_cb_t free_cb,
1283 int64_t offset,
1284 pa_seek_mode_t seek) {
1285
1286 pa_assert(s);
1287 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1288 pa_assert(data);
1289
1290 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1291 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1292 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1293 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1294 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1295 PA_CHECK_VALIDITY(s->context,
1296 !s->write_memblock ||
1297 ((data >= s->write_data) &&
1298 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1299 PA_ERR_INVALID);
1300 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1301
1302 if (s->write_memblock) {
1303 pa_memchunk chunk;
1304
1305 /* pa_stream_write_begin() was called before */
1306
1307 pa_memblock_release(s->write_memblock);
1308
1309 chunk.memblock = s->write_memblock;
1310 chunk.index = (const char *) data - (const char *) s->write_data;
1311 chunk.length = length;
1312
1313 s->write_memblock = NULL;
1314 s->write_data = NULL;
1315
1316 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1317 pa_memblock_unref(chunk.memblock);
1318
1319 } else {
1320 pa_seek_mode_t t_seek = seek;
1321 int64_t t_offset = offset;
1322 size_t t_length = length;
1323 const void *t_data = data;
1324
1325 /* pa_stream_write_begin() was not called before */
1326
1327 while (t_length > 0) {
1328 pa_memchunk chunk;
1329
1330 chunk.index = 0;
1331
1332 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1333 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1334 chunk.length = t_length;
1335 } else {
1336 void *d;
1337
1338 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1339 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1340
1341 d = pa_memblock_acquire(chunk.memblock);
1342 memcpy(d, t_data, chunk.length);
1343 pa_memblock_release(chunk.memblock);
1344 }
1345
1346 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1347
1348 t_offset = 0;
1349 t_seek = PA_SEEK_RELATIVE;
1350
1351 t_data = (const uint8_t*) t_data + chunk.length;
1352 t_length -= chunk.length;
1353
1354 pa_memblock_unref(chunk.memblock);
1355 }
1356
1357 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1358 free_cb((void*) data);
1359 }
1360
1361 /* This is obviously wrong since we ignore the seeking index . But
1362 * that's OK, the server side applies the same error */
1363 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1364
1365 if (s->direction == PA_STREAM_PLAYBACK) {
1366
1367 /* Update latency request correction */
1368 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1369
1370 if (seek == PA_SEEK_ABSOLUTE) {
1371 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1372 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1373 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1374 } else if (seek == PA_SEEK_RELATIVE) {
1375 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1376 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1377 } else
1378 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1379 }
1380
1381 /* Update the write index in the already available latency data */
1382 if (s->timing_info_valid) {
1383
1384 if (seek == PA_SEEK_ABSOLUTE) {
1385 s->timing_info.write_index_corrupt = FALSE;
1386 s->timing_info.write_index = offset + (int64_t) length;
1387 } else if (seek == PA_SEEK_RELATIVE) {
1388 if (!s->timing_info.write_index_corrupt)
1389 s->timing_info.write_index += offset + (int64_t) length;
1390 } else
1391 s->timing_info.write_index_corrupt = TRUE;
1392 }
1393
1394 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1395 request_auto_timing_update(s, TRUE);
1396 }
1397
1398 return 0;
1399 }
1400
1401 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1402 pa_assert(s);
1403 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1404 pa_assert(data);
1405 pa_assert(length);
1406
1407 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1408 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1409 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1410
1411 if (!s->peek_memchunk.memblock) {
1412
1413 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1414 *data = NULL;
1415 *length = 0;
1416 return 0;
1417 }
1418
1419 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1420 }
1421
1422 pa_assert(s->peek_data);
1423 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1424 *length = s->peek_memchunk.length;
1425 return 0;
1426 }
1427
1428 int pa_stream_drop(pa_stream *s) {
1429 pa_assert(s);
1430 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1431
1432 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1433 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1434 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1435 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1436
1437 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1438
1439 /* Fix the simulated local read index */
1440 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1441 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1442
1443 pa_assert(s->peek_data);
1444 pa_memblock_release(s->peek_memchunk.memblock);
1445 pa_memblock_unref(s->peek_memchunk.memblock);
1446 pa_memchunk_reset(&s->peek_memchunk);
1447
1448 return 0;
1449 }
1450
1451 size_t pa_stream_writable_size(pa_stream *s) {
1452 pa_assert(s);
1453 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1454
1455 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1456 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1457 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1458
1459 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1460 }
1461
1462 size_t pa_stream_readable_size(pa_stream *s) {
1463 pa_assert(s);
1464 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1465
1466 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1467 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1468 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1469
1470 return pa_memblockq_get_length(s->record_memblockq);
1471 }
1472
1473 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1474 pa_operation *o;
1475 pa_tagstruct *t;
1476 uint32_t tag;
1477
1478 pa_assert(s);
1479 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1480
1481 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1482 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1483 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1484
1485 /* Ask for a timing update before we cork/uncork to get the best
1486 * accuracy for the transport latency suitable for the
1487 * check_smoother_status() call in the started callback */
1488 request_auto_timing_update(s, TRUE);
1489
1490 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1491
1492 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1493 pa_tagstruct_putu32(t, s->channel);
1494 pa_pstream_send_tagstruct(s->context->pstream, t);
1495 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1496
1497 /* This might cause the read index to conitnue again, hence
1498 * let's request a timing update */
1499 request_auto_timing_update(s, TRUE);
1500
1501 return o;
1502 }
1503
1504 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1505 pa_usec_t usec;
1506
1507 pa_assert(s);
1508 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1509 pa_assert(s->state == PA_STREAM_READY);
1510 pa_assert(s->direction != PA_STREAM_UPLOAD);
1511 pa_assert(s->timing_info_valid);
1512 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1513 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1514
1515 if (s->direction == PA_STREAM_PLAYBACK) {
1516 /* The last byte that was written into the output device
1517 * had this time value associated */
1518 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1519
1520 if (!s->corked && !s->suspended) {
1521
1522 if (!ignore_transport)
1523 /* Because the latency info took a little time to come
1524 * to us, we assume that the real output time is actually
1525 * a little ahead */
1526 usec += s->timing_info.transport_usec;
1527
1528 /* However, the output device usually maintains a buffer
1529 too, hence the real sample currently played is a little
1530 back */
1531 if (s->timing_info.sink_usec >= usec)
1532 usec = 0;
1533 else
1534 usec -= s->timing_info.sink_usec;
1535 }
1536
1537 } else {
1538 pa_assert(s->direction == PA_STREAM_RECORD);
1539
1540 /* The last byte written into the server side queue had
1541 * this time value associated */
1542 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1543
1544 if (!s->corked && !s->suspended) {
1545
1546 if (!ignore_transport)
1547 /* Add transport latency */
1548 usec += s->timing_info.transport_usec;
1549
1550 /* Add latency of data in device buffer */
1551 usec += s->timing_info.source_usec;
1552
1553 /* If this is a monitor source, we need to correct the
1554 * time by the playback device buffer */
1555 if (s->timing_info.sink_usec >= usec)
1556 usec = 0;
1557 else
1558 usec -= s->timing_info.sink_usec;
1559 }
1560 }
1561
1562 return usec;
1563 }
1564
1565 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1566 pa_operation *o = userdata;
1567 struct timeval local, remote, now;
1568 pa_timing_info *i;
1569 pa_bool_t playing = FALSE;
1570 uint64_t underrun_for = 0, playing_for = 0;
1571
1572 pa_assert(pd);
1573 pa_assert(o);
1574 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1575
1576 if (!o->context || !o->stream)
1577 goto finish;
1578
1579 i = &o->stream->timing_info;
1580
1581 o->stream->timing_info_valid = FALSE;
1582 i->write_index_corrupt = TRUE;
1583 i->read_index_corrupt = TRUE;
1584
1585 if (command != PA_COMMAND_REPLY) {
1586 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1587 goto finish;
1588
1589 } else {
1590
1591 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1592 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1593 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1594 pa_tagstruct_get_timeval(t, &local) < 0 ||
1595 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1596 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1597 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1598
1599 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1600 goto finish;
1601 }
1602
1603 if (o->context->version >= 13 &&
1604 o->stream->direction == PA_STREAM_PLAYBACK)
1605 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1606 pa_tagstruct_getu64(t, &playing_for) < 0) {
1607
1608 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1609 goto finish;
1610 }
1611
1612
1613 if (!pa_tagstruct_eof(t)) {
1614 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1615 goto finish;
1616 }
1617 o->stream->timing_info_valid = TRUE;
1618 i->write_index_corrupt = FALSE;
1619 i->read_index_corrupt = FALSE;
1620
1621 i->playing = (int) playing;
1622 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1623
1624 pa_gettimeofday(&now);
1625
1626 /* Calculcate timestamps */
1627 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1628 /* local and remote seem to have synchronized clocks */
1629
1630 if (o->stream->direction == PA_STREAM_PLAYBACK)
1631 i->transport_usec = pa_timeval_diff(&remote, &local);
1632 else
1633 i->transport_usec = pa_timeval_diff(&now, &remote);
1634
1635 i->synchronized_clocks = TRUE;
1636 i->timestamp = remote;
1637 } else {
1638 /* clocks are not synchronized, let's estimate latency then */
1639 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1640 i->synchronized_clocks = FALSE;
1641 i->timestamp = local;
1642 pa_timeval_add(&i->timestamp, i->transport_usec);
1643 }
1644
1645 /* Invalidate read and write indexes if necessary */
1646 if (tag < o->stream->read_index_not_before)
1647 i->read_index_corrupt = TRUE;
1648
1649 if (tag < o->stream->write_index_not_before)
1650 i->write_index_corrupt = TRUE;
1651
1652 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1653 /* Write index correction */
1654
1655 int n, j;
1656 uint32_t ctag = tag;
1657
1658 /* Go through the saved correction values and add up the
1659 * total correction.*/
1660 for (n = 0, j = o->stream->current_write_index_correction+1;
1661 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1662 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1663
1664 /* Step over invalid data or out-of-date data */
1665 if (!o->stream->write_index_corrections[j].valid ||
1666 o->stream->write_index_corrections[j].tag < ctag)
1667 continue;
1668
1669 /* Make sure that everything is in order */
1670 ctag = o->stream->write_index_corrections[j].tag+1;
1671
1672 /* Now fix the write index */
1673 if (o->stream->write_index_corrections[j].corrupt) {
1674 /* A corrupting seek was made */
1675 i->write_index_corrupt = TRUE;
1676 } else if (o->stream->write_index_corrections[j].absolute) {
1677 /* An absolute seek was made */
1678 i->write_index = o->stream->write_index_corrections[j].value;
1679 i->write_index_corrupt = FALSE;
1680 } else if (!i->write_index_corrupt) {
1681 /* A relative seek was made */
1682 i->write_index += o->stream->write_index_corrections[j].value;
1683 }
1684 }
1685
1686 /* Clear old correction entries */
1687 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1688 if (!o->stream->write_index_corrections[n].valid)
1689 continue;
1690
1691 if (o->stream->write_index_corrections[n].tag <= tag)
1692 o->stream->write_index_corrections[n].valid = FALSE;
1693 }
1694 }
1695
1696 if (o->stream->direction == PA_STREAM_RECORD) {
1697 /* Read index correction */
1698
1699 if (!i->read_index_corrupt)
1700 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1701 }
1702
1703 /* Update smoother */
1704 if (o->stream->smoother) {
1705 pa_usec_t u, x;
1706
1707 u = x = pa_rtclock_now() - i->transport_usec;
1708
1709 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1710 pa_usec_t su;
1711
1712 /* If we weren't playing then it will take some time
1713 * until the audio will actually come out through the
1714 * speakers. Since we follow that timing here, we need
1715 * to try to fix this up */
1716
1717 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1718
1719 if (su < i->sink_usec)
1720 x += i->sink_usec - su;
1721 }
1722
1723 if (!i->playing)
1724 pa_smoother_pause(o->stream->smoother, x);
1725
1726 /* Update the smoother */
1727 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1728 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1729 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1730
1731 if (i->playing)
1732 pa_smoother_resume(o->stream->smoother, x, TRUE);
1733 }
1734 }
1735
1736 o->stream->auto_timing_update_requested = FALSE;
1737
1738 if (o->stream->latency_update_callback)
1739 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1740
1741 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1742 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1743 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1744 }
1745
1746 finish:
1747
1748 pa_operation_done(o);
1749 pa_operation_unref(o);
1750 }
1751
1752 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1753 uint32_t tag;
1754 pa_operation *o;
1755 pa_tagstruct *t;
1756 struct timeval now;
1757 int cidx = 0;
1758
1759 pa_assert(s);
1760 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1761
1762 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1763 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1764 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1765
1766 if (s->direction == PA_STREAM_PLAYBACK) {
1767 /* Find a place to store the write_index correction data for this entry */
1768 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1769
1770 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1771 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1772 }
1773 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1774
1775 t = pa_tagstruct_command(
1776 s->context,
1777 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1778 &tag);
1779 pa_tagstruct_putu32(t, s->channel);
1780 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1781
1782 pa_pstream_send_tagstruct(s->context->pstream, t);
1783 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1784
1785 if (s->direction == PA_STREAM_PLAYBACK) {
1786 /* Fill in initial correction data */
1787
1788 s->current_write_index_correction = cidx;
1789
1790 s->write_index_corrections[cidx].valid = TRUE;
1791 s->write_index_corrections[cidx].absolute = FALSE;
1792 s->write_index_corrections[cidx].corrupt = FALSE;
1793 s->write_index_corrections[cidx].tag = tag;
1794 s->write_index_corrections[cidx].value = 0;
1795 }
1796
1797 return o;
1798 }
1799
1800 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1801 pa_stream *s = userdata;
1802
1803 pa_assert(pd);
1804 pa_assert(s);
1805 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1806
1807 pa_stream_ref(s);
1808
1809 if (command != PA_COMMAND_REPLY) {
1810 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1811 goto finish;
1812
1813 pa_stream_set_state(s, PA_STREAM_FAILED);
1814 goto finish;
1815 } else if (!pa_tagstruct_eof(t)) {
1816 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1817 goto finish;
1818 }
1819
1820 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1821
1822 finish:
1823 pa_stream_unref(s);
1824 }
1825
1826 int pa_stream_disconnect(pa_stream *s) {
1827 pa_tagstruct *t;
1828 uint32_t tag;
1829
1830 pa_assert(s);
1831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1832
1833 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1834 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1835 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1836
1837 pa_stream_ref(s);
1838
1839 t = pa_tagstruct_command(
1840 s->context,
1841 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1842 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1843 &tag);
1844 pa_tagstruct_putu32(t, s->channel);
1845 pa_pstream_send_tagstruct(s->context->pstream, t);
1846 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1847
1848 pa_stream_unref(s);
1849 return 0;
1850 }
1851
1852 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1853 pa_assert(s);
1854 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1855
1856 if (pa_detect_fork())
1857 return;
1858
1859 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1860 return;
1861
1862 s->read_callback = cb;
1863 s->read_userdata = userdata;
1864 }
1865
1866 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1867 pa_assert(s);
1868 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1869
1870 if (pa_detect_fork())
1871 return;
1872
1873 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1874 return;
1875
1876 s->write_callback = cb;
1877 s->write_userdata = userdata;
1878 }
1879
1880 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1881 pa_assert(s);
1882 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1883
1884 if (pa_detect_fork())
1885 return;
1886
1887 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1888 return;
1889
1890 s->state_callback = cb;
1891 s->state_userdata = userdata;
1892 }
1893
1894 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1895 pa_assert(s);
1896 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1897
1898 if (pa_detect_fork())
1899 return;
1900
1901 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1902 return;
1903
1904 s->overflow_callback = cb;
1905 s->overflow_userdata = userdata;
1906 }
1907
1908 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1909 pa_assert(s);
1910 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1911
1912 if (pa_detect_fork())
1913 return;
1914
1915 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1916 return;
1917
1918 s->underflow_callback = cb;
1919 s->underflow_userdata = userdata;
1920 }
1921
1922 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1923 pa_assert(s);
1924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1925
1926 if (pa_detect_fork())
1927 return;
1928
1929 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1930 return;
1931
1932 s->latency_update_callback = cb;
1933 s->latency_update_userdata = userdata;
1934 }
1935
1936 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1937 pa_assert(s);
1938 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1939
1940 if (pa_detect_fork())
1941 return;
1942
1943 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1944 return;
1945
1946 s->moved_callback = cb;
1947 s->moved_userdata = userdata;
1948 }
1949
1950 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1951 pa_assert(s);
1952 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1953
1954 if (pa_detect_fork())
1955 return;
1956
1957 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1958 return;
1959
1960 s->suspended_callback = cb;
1961 s->suspended_userdata = userdata;
1962 }
1963
1964 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1965 pa_assert(s);
1966 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1967
1968 if (pa_detect_fork())
1969 return;
1970
1971 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1972 return;
1973
1974 s->started_callback = cb;
1975 s->started_userdata = userdata;
1976 }
1977
1978 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1979 pa_assert(s);
1980 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1981
1982 if (pa_detect_fork())
1983 return;
1984
1985 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1986 return;
1987
1988 s->event_callback = cb;
1989 s->event_userdata = userdata;
1990 }
1991
1992 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1993 pa_assert(s);
1994 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1995
1996 if (pa_detect_fork())
1997 return;
1998
1999 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2000 return;
2001
2002 s->buffer_attr_callback = cb;
2003 s->buffer_attr_userdata = userdata;
2004 }
2005
2006 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2007 pa_operation *o = userdata;
2008 int success = 1;
2009
2010 pa_assert(pd);
2011 pa_assert(o);
2012 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2013
2014 if (!o->context)
2015 goto finish;
2016
2017 if (command != PA_COMMAND_REPLY) {
2018 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2019 goto finish;
2020
2021 success = 0;
2022 } else if (!pa_tagstruct_eof(t)) {
2023 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2024 goto finish;
2025 }
2026
2027 if (o->callback) {
2028 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2029 cb(o->stream, success, o->userdata);
2030 }
2031
2032 finish:
2033 pa_operation_done(o);
2034 pa_operation_unref(o);
2035 }
2036
2037 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2038 pa_operation *o;
2039 pa_tagstruct *t;
2040 uint32_t tag;
2041
2042 pa_assert(s);
2043 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2044
2045 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2046 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2047 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2048
2049 /* Ask for a timing update before we cork/uncork to get the best
2050 * accuracy for the transport latency suitable for the
2051 * check_smoother_status() call in the started callback */
2052 request_auto_timing_update(s, TRUE);
2053
2054 s->corked = b;
2055
2056 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2057
2058 t = pa_tagstruct_command(
2059 s->context,
2060 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2061 &tag);
2062 pa_tagstruct_putu32(t, s->channel);
2063 pa_tagstruct_put_boolean(t, !!b);
2064 pa_pstream_send_tagstruct(s->context->pstream, t);
2065 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2066
2067 check_smoother_status(s, FALSE, FALSE, FALSE);
2068
2069 /* This might cause the indexes to hang/start again, hence let's
2070 * request a timing update, after the cork/uncork, too */
2071 request_auto_timing_update(s, TRUE);
2072
2073 return o;
2074 }
2075
2076 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2077 pa_tagstruct *t;
2078 pa_operation *o;
2079 uint32_t tag;
2080
2081 pa_assert(s);
2082 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2083
2084 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2085 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2086
2087 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2088
2089 t = pa_tagstruct_command(s->context, command, &tag);
2090 pa_tagstruct_putu32(t, s->channel);
2091 pa_pstream_send_tagstruct(s->context->pstream, t);
2092 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2093
2094 return o;
2095 }
2096
2097 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2098 pa_operation *o;
2099
2100 pa_assert(s);
2101 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2102
2103 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2104 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2105 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2106
2107 /* Ask for a timing update *before* the flush, so that the
2108 * transport usec is as up to date as possible when we get the
2109 * underflow message and update the smoother status*/
2110 request_auto_timing_update(s, TRUE);
2111
2112 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2113 return NULL;
2114
2115 if (s->direction == PA_STREAM_PLAYBACK) {
2116
2117 if (s->write_index_corrections[s->current_write_index_correction].valid)
2118 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2119
2120 if (s->buffer_attr.prebuf > 0)
2121 check_smoother_status(s, FALSE, FALSE, TRUE);
2122
2123 /* This will change the write index, but leave the
2124 * read index untouched. */
2125 invalidate_indexes(s, FALSE, TRUE);
2126
2127 } else
2128 /* For record streams this has no influence on the write
2129 * index, but the read index might jump. */
2130 invalidate_indexes(s, TRUE, FALSE);
2131
2132 return o;
2133 }
2134
2135 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2136 pa_operation *o;
2137
2138 pa_assert(s);
2139 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2140
2141 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2142 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2143 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2144 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2145
2146 /* Ask for a timing update before we cork/uncork to get the best
2147 * accuracy for the transport latency suitable for the
2148 * check_smoother_status() call in the started callback */
2149 request_auto_timing_update(s, TRUE);
2150
2151 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2152 return NULL;
2153
2154 /* This might cause the read index to hang again, hence
2155 * let's request a timing update */
2156 request_auto_timing_update(s, TRUE);
2157
2158 return o;
2159 }
2160
2161 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2162 pa_operation *o;
2163
2164 pa_assert(s);
2165 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2166
2167 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2168 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2169 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2170 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2171
2172 /* Ask for a timing update before we cork/uncork to get the best
2173 * accuracy for the transport latency suitable for the
2174 * check_smoother_status() call in the started callback */
2175 request_auto_timing_update(s, TRUE);
2176
2177 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2178 return NULL;
2179
2180 /* This might cause the read index to start moving again, hence
2181 * let's request a timing update */
2182 request_auto_timing_update(s, TRUE);
2183
2184 return o;
2185 }
2186
2187 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2188 pa_operation *o;
2189
2190 pa_assert(s);
2191 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2192 pa_assert(name);
2193
2194 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2195 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2196 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2197
2198 if (s->context->version >= 13) {
2199 pa_proplist *p = pa_proplist_new();
2200
2201 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2202 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2203 pa_proplist_free(p);
2204 } else {
2205 pa_tagstruct *t;
2206 uint32_t tag;
2207
2208 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2209 t = pa_tagstruct_command(
2210 s->context,
2211 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2212 &tag);
2213 pa_tagstruct_putu32(t, s->channel);
2214 pa_tagstruct_puts(t, name);
2215 pa_pstream_send_tagstruct(s->context->pstream, t);
2216 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2217 }
2218
2219 return o;
2220 }
2221
2222 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2223 pa_usec_t usec;
2224
2225 pa_assert(s);
2226 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2227
2228 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2229 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2230 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2231 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2232 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2233 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2234
2235 if (s->smoother)
2236 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2237 else
2238 usec = calc_time(s, FALSE);
2239
2240 /* Make sure the time runs monotonically */
2241 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2242 if (usec < s->previous_time)
2243 usec = s->previous_time;
2244 else
2245 s->previous_time = usec;
2246 }
2247
2248 if (r_usec)
2249 *r_usec = usec;
2250
2251 return 0;
2252 }
2253
2254 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2255 pa_assert(s);
2256 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2257
2258 if (negative)
2259 *negative = 0;
2260
2261 if (a >= b)
2262 return a-b;
2263 else {
2264 if (negative && s->direction == PA_STREAM_RECORD) {
2265 *negative = 1;
2266 return b-a;
2267 } else
2268 return 0;
2269 }
2270 }
2271
2272 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2273 pa_usec_t t, c;
2274 int r;
2275 int64_t cindex;
2276
2277 pa_assert(s);
2278 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279 pa_assert(r_usec);
2280
2281 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2282 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2283 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2284 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2285 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2286 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2287
2288 if ((r = pa_stream_get_time(s, &t)) < 0)
2289 return r;
2290
2291 if (s->direction == PA_STREAM_PLAYBACK)
2292 cindex = s->timing_info.write_index;
2293 else
2294 cindex = s->timing_info.read_index;
2295
2296 if (cindex < 0)
2297 cindex = 0;
2298
2299 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2300
2301 if (s->direction == PA_STREAM_PLAYBACK)
2302 *r_usec = time_counter_diff(s, c, t, negative);
2303 else
2304 *r_usec = time_counter_diff(s, t, c, negative);
2305
2306 return 0;
2307 }
2308
2309 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2310 pa_assert(s);
2311 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2312
2313 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2314 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2315 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2316 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2317
2318 return &s->timing_info;
2319 }
2320
2321 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2322 pa_assert(s);
2323 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2324
2325 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2326
2327 return &s->sample_spec;
2328 }
2329
2330 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2331 pa_assert(s);
2332 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2333
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2335
2336 return &s->channel_map;
2337 }
2338
2339 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2340 pa_assert(s);
2341 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2342
2343 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2344 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2345 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2346
2347 return &s->buffer_attr;
2348 }
2349
2350 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2351 pa_operation *o = userdata;
2352 int success = 1;
2353
2354 pa_assert(pd);
2355 pa_assert(o);
2356 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2357
2358 if (!o->context)
2359 goto finish;
2360
2361 if (command != PA_COMMAND_REPLY) {
2362 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2363 goto finish;
2364
2365 success = 0;
2366 } else {
2367 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2368 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2369 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2370 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2371 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2372 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2373 goto finish;
2374 }
2375 } else if (o->stream->direction == PA_STREAM_RECORD) {
2376 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2377 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2378 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2379 goto finish;
2380 }
2381 }
2382
2383 if (o->stream->context->version >= 13) {
2384 pa_usec_t usec;
2385
2386 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2387 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2388 goto finish;
2389 }
2390
2391 if (o->stream->direction == PA_STREAM_RECORD)
2392 o->stream->timing_info.configured_source_usec = usec;
2393 else
2394 o->stream->timing_info.configured_sink_usec = usec;
2395 }
2396
2397 if (!pa_tagstruct_eof(t)) {
2398 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2399 goto finish;
2400 }
2401 }
2402
2403 if (o->callback) {
2404 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2405 cb(o->stream, success, o->userdata);
2406 }
2407
2408 finish:
2409 pa_operation_done(o);
2410 pa_operation_unref(o);
2411 }
2412
2413
2414 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2415 pa_operation *o;
2416 pa_tagstruct *t;
2417 uint32_t tag;
2418
2419 pa_assert(s);
2420 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2421 pa_assert(attr);
2422
2423 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2424 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2425 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2426 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2427
2428 /* Ask for a timing update before we cork/uncork to get the best
2429 * accuracy for the transport latency suitable for the
2430 * check_smoother_status() call in the started callback */
2431 request_auto_timing_update(s, TRUE);
2432
2433 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2434
2435 t = pa_tagstruct_command(
2436 s->context,
2437 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2438 &tag);
2439 pa_tagstruct_putu32(t, s->channel);
2440
2441 pa_tagstruct_putu32(t, attr->maxlength);
2442
2443 if (s->direction == PA_STREAM_PLAYBACK)
2444 pa_tagstruct_put(
2445 t,
2446 PA_TAG_U32, attr->tlength,
2447 PA_TAG_U32, attr->prebuf,
2448 PA_TAG_U32, attr->minreq,
2449 PA_TAG_INVALID);
2450 else
2451 pa_tagstruct_putu32(t, attr->fragsize);
2452
2453 if (s->context->version >= 13)
2454 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2455
2456 if (s->context->version >= 14)
2457 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2458
2459 pa_pstream_send_tagstruct(s->context->pstream, t);
2460 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2461
2462 /* This might cause changes in the read/write indexex, hence let's
2463 * request a timing update */
2464 request_auto_timing_update(s, TRUE);
2465
2466 return o;
2467 }
2468
2469 uint32_t pa_stream_get_device_index(pa_stream *s) {
2470 pa_assert(s);
2471 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2472
2473 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2474 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2475 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2476 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2477 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2478
2479 return s->device_index;
2480 }
2481
2482 const char *pa_stream_get_device_name(pa_stream *s) {
2483 pa_assert(s);
2484 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2485
2486 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2487 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2488 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2489 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2490 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2491
2492 return s->device_name;
2493 }
2494
2495 int pa_stream_is_suspended(pa_stream *s) {
2496 pa_assert(s);
2497 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2498
2499 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2500 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2501 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2502 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2503
2504 return s->suspended;
2505 }
2506
2507 int pa_stream_is_corked(pa_stream *s) {
2508 pa_assert(s);
2509 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2510
2511 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2512 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2513 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2514
2515 return s->corked;
2516 }
2517
2518 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2519 pa_operation *o = userdata;
2520 int success = 1;
2521
2522 pa_assert(pd);
2523 pa_assert(o);
2524 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2525
2526 if (!o->context)
2527 goto finish;
2528
2529 if (command != PA_COMMAND_REPLY) {
2530 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2531 goto finish;
2532
2533 success = 0;
2534 } else {
2535
2536 if (!pa_tagstruct_eof(t)) {
2537 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2538 goto finish;
2539 }
2540 }
2541
2542 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2543 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2544
2545 if (o->callback) {
2546 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2547 cb(o->stream, success, o->userdata);
2548 }
2549
2550 finish:
2551 pa_operation_done(o);
2552 pa_operation_unref(o);
2553 }
2554
2555
2556 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2557 pa_operation *o;
2558 pa_tagstruct *t;
2559 uint32_t tag;
2560
2561 pa_assert(s);
2562 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2563
2564 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2565 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2566 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2567 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2568 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2569 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2570
2571 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2572 o->private = PA_UINT_TO_PTR(rate);
2573
2574 t = pa_tagstruct_command(
2575 s->context,
2576 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2577 &tag);
2578 pa_tagstruct_putu32(t, s->channel);
2579 pa_tagstruct_putu32(t, rate);
2580
2581 pa_pstream_send_tagstruct(s->context->pstream, t);
2582 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2583
2584 return o;
2585 }
2586
2587 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2588 pa_operation *o;
2589 pa_tagstruct *t;
2590 uint32_t tag;
2591
2592 pa_assert(s);
2593 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2594
2595 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2596 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2597 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2598 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2599 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2600
2601 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2602
2603 t = pa_tagstruct_command(
2604 s->context,
2605 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2606 &tag);
2607 pa_tagstruct_putu32(t, s->channel);
2608 pa_tagstruct_putu32(t, (uint32_t) mode);
2609 pa_tagstruct_put_proplist(t, p);
2610
2611 pa_pstream_send_tagstruct(s->context->pstream, t);
2612 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2613
2614 /* Please note that we don't update s->proplist here, because we
2615 * don't export that field */
2616
2617 return o;
2618 }
2619
2620 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2621 pa_operation *o;
2622 pa_tagstruct *t;
2623 uint32_t tag;
2624 const char * const*k;
2625
2626 pa_assert(s);
2627 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2628
2629 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2630 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2631 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2632 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2633 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2634
2635 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2636
2637 t = pa_tagstruct_command(
2638 s->context,
2639 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2640 &tag);
2641 pa_tagstruct_putu32(t, s->channel);
2642
2643 for (k = keys; *k; k++)
2644 pa_tagstruct_puts(t, *k);
2645
2646 pa_tagstruct_puts(t, NULL);
2647
2648 pa_pstream_send_tagstruct(s->context->pstream, t);
2649 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2650
2651 /* Please note that we don't update s->proplist here, because we
2652 * don't export that field */
2653
2654 return o;
2655 }
2656
2657 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2658 pa_assert(s);
2659 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2660
2661 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2662 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2663 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2664 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2665
2666 s->direct_on_input = sink_input_idx;
2667
2668 return 0;
2669 }
2670
2671 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2672 pa_assert(s);
2673 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2674
2675 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2676 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2677 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2678
2679 return s->direct_on_input;
2680 }