]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
addc36aedfd2d240a9cb9cb817ecacfd17aff5fd
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "fork-detect.h"
45 #include "internal.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 pa_stream *pa_stream_new_with_proplist(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_proplist *p) {
89
90 pa_stream *s;
91 int i;
92 pa_channel_map tmap;
93
94 pa_assert(c);
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
96
97 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
103 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
104
105 if (!map)
106 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
107
108 s = pa_xnew(pa_stream, 1);
109 PA_REFCNT_INIT(s);
110 s->context = c;
111 s->mainloop = c->mainloop;
112
113 s->direction = PA_STREAM_NODIRECTION;
114 s->state = PA_STREAM_UNCONNECTED;
115 s->flags = 0;
116
117 s->sample_spec = *ss;
118 s->channel_map = *map;
119
120 s->direct_on_input = PA_INVALID_INDEX;
121
122 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
123 if (name)
124 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
125
126 s->channel = 0;
127 s->channel_valid = FALSE;
128 s->syncid = c->csyncid++;
129 s->stream_index = PA_INVALID_INDEX;
130
131 s->requested_bytes = 0;
132 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
133
134 /* We initialize der target length here, so that if the user
135 * passes no explicit buffering metrics the default is similar to
136 * what older PA versions provided. */
137
138 s->buffer_attr.maxlength = (uint32_t) -1;
139 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
140 s->buffer_attr.minreq = (uint32_t) -1;
141 s->buffer_attr.prebuf = (uint32_t) -1;
142 s->buffer_attr.fragsize = (uint32_t) -1;
143
144 s->device_index = PA_INVALID_INDEX;
145 s->device_name = NULL;
146 s->suspended = FALSE;
147 s->corked = FALSE;
148
149 s->write_memblock = NULL;
150 s->write_data = NULL;
151
152 pa_memchunk_reset(&s->peek_memchunk);
153 s->peek_data = NULL;
154 s->record_memblockq = NULL;
155
156 memset(&s->timing_info, 0, sizeof(s->timing_info));
157 s->timing_info_valid = FALSE;
158
159 s->previous_time = 0;
160
161 s->read_index_not_before = 0;
162 s->write_index_not_before = 0;
163 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
164 s->write_index_corrections[i].valid = 0;
165 s->current_write_index_correction = 0;
166
167 s->auto_timing_update_event = NULL;
168 s->auto_timing_update_requested = FALSE;
169 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
170
171 reset_callbacks(s);
172
173 s->smoother = NULL;
174
175 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
176 PA_LLIST_PREPEND(pa_stream, c->streams, s);
177 pa_stream_ref(s);
178
179 return s;
180 }
181
182 static void stream_unlink(pa_stream *s) {
183 pa_operation *o, *n;
184 pa_assert(s);
185
186 if (!s->context)
187 return;
188
189 /* Detach from context */
190
191 /* Unref all operatio object that point to us */
192 for (o = s->context->operations; o; o = n) {
193 n = o->next;
194
195 if (o->stream == s)
196 pa_operation_cancel(o);
197 }
198
199 /* Drop all outstanding replies for this stream */
200 if (s->context->pdispatch)
201 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
202
203 if (s->channel_valid) {
204 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
205 s->channel = 0;
206 s->channel_valid = FALSE;
207 }
208
209 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
210 pa_stream_unref(s);
211
212 s->context = NULL;
213
214 if (s->auto_timing_update_event) {
215 pa_assert(s->mainloop);
216 s->mainloop->time_free(s->auto_timing_update_event);
217 }
218
219 reset_callbacks(s);
220 }
221
222 static void stream_free(pa_stream *s) {
223 pa_assert(s);
224
225 stream_unlink(s);
226
227 if (s->write_memblock) {
228 pa_memblock_release(s->write_memblock);
229 pa_memblock_unref(s->write_data);
230 }
231
232 if (s->peek_memchunk.memblock) {
233 if (s->peek_data)
234 pa_memblock_release(s->peek_memchunk.memblock);
235 pa_memblock_unref(s->peek_memchunk.memblock);
236 }
237
238 if (s->record_memblockq)
239 pa_memblockq_free(s->record_memblockq);
240
241 if (s->proplist)
242 pa_proplist_free(s->proplist);
243
244 if (s->smoother)
245 pa_smoother_free(s->smoother);
246
247 pa_xfree(s->device_name);
248 pa_xfree(s);
249 }
250
251 void pa_stream_unref(pa_stream *s) {
252 pa_assert(s);
253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
254
255 if (PA_REFCNT_DEC(s) <= 0)
256 stream_free(s);
257 }
258
259 pa_stream* pa_stream_ref(pa_stream *s) {
260 pa_assert(s);
261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
262
263 PA_REFCNT_INC(s);
264 return s;
265 }
266
267 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
268 pa_assert(s);
269 pa_assert(PA_REFCNT_VALUE(s) >= 1);
270
271 return s->state;
272 }
273
274 pa_context* pa_stream_get_context(pa_stream *s) {
275 pa_assert(s);
276 pa_assert(PA_REFCNT_VALUE(s) >= 1);
277
278 return s->context;
279 }
280
281 uint32_t pa_stream_get_index(pa_stream *s) {
282 pa_assert(s);
283 pa_assert(PA_REFCNT_VALUE(s) >= 1);
284
285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
286 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
287
288 return s->stream_index;
289 }
290
291 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
292 pa_assert(s);
293 pa_assert(PA_REFCNT_VALUE(s) >= 1);
294
295 if (s->state == st)
296 return;
297
298 pa_stream_ref(s);
299
300 s->state = st;
301
302 if (s->state_callback)
303 s->state_callback(s, s->state_userdata);
304
305 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
306 stream_unlink(s);
307
308 pa_stream_unref(s);
309 }
310
311 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
312 pa_assert(s);
313 pa_assert(PA_REFCNT_VALUE(s) >= 1);
314
315 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
316 return;
317
318 if (s->state == PA_STREAM_READY &&
319 (force || !s->auto_timing_update_requested)) {
320 pa_operation *o;
321
322 /* pa_log("Automatically requesting new timing data"); */
323
324 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
325 pa_operation_unref(o);
326 s->auto_timing_update_requested = TRUE;
327 }
328 }
329
330 if (s->auto_timing_update_event) {
331 if (force)
332 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
333
334 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
335
336 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
337 }
338 }
339
340 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
341 pa_context *c = userdata;
342 pa_stream *s;
343 uint32_t channel;
344
345 pa_assert(pd);
346 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
347 pa_assert(t);
348 pa_assert(c);
349 pa_assert(PA_REFCNT_VALUE(c) >= 1);
350
351 pa_context_ref(c);
352
353 if (pa_tagstruct_getu32(t, &channel) < 0 ||
354 !pa_tagstruct_eof(t)) {
355 pa_context_fail(c, PA_ERR_PROTOCOL);
356 goto finish;
357 }
358
359 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
360 goto finish;
361
362 if (s->state != PA_STREAM_READY)
363 goto finish;
364
365 pa_context_set_error(c, PA_ERR_KILLED);
366 pa_stream_set_state(s, PA_STREAM_FAILED);
367
368 finish:
369 pa_context_unref(c);
370 }
371
372 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
373 pa_usec_t x;
374
375 pa_assert(s);
376 pa_assert(!force_start || !force_stop);
377
378 if (!s->smoother)
379 return;
380
381 x = pa_rtclock_now();
382
383 if (s->timing_info_valid) {
384 if (aposteriori)
385 x -= s->timing_info.transport_usec;
386 else
387 x += s->timing_info.transport_usec;
388 }
389
390 if (s->suspended || s->corked || force_stop)
391 pa_smoother_pause(s->smoother, x);
392 else if (force_start || s->buffer_attr.prebuf == 0) {
393
394 if (!s->timing_info_valid &&
395 !aposteriori &&
396 !force_start &&
397 !force_stop &&
398 s->context->version >= 13) {
399
400 /* If the server supports STARTED events we take them as
401 * indications when audio really starts/stops playing, if
402 * we don't have any timing info yet -- instead of trying
403 * to be smart and guessing the server time. Otherwise the
404 * unknown transport delay add too much noise to our time
405 * calculations. */
406
407 return;
408 }
409
410 pa_smoother_resume(s->smoother, x, TRUE);
411 }
412
413 /* Please note that we have no idea if playback actually started
414 * if prebuf is non-zero! */
415 }
416
417 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
418 pa_context *c = userdata;
419 pa_stream *s;
420 uint32_t channel;
421 const char *dn;
422 pa_bool_t suspended;
423 uint32_t di;
424 pa_usec_t usec = 0;
425 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
426
427 pa_assert(pd);
428 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
429 pa_assert(t);
430 pa_assert(c);
431 pa_assert(PA_REFCNT_VALUE(c) >= 1);
432
433 pa_context_ref(c);
434
435 if (c->version < 12) {
436 pa_context_fail(c, PA_ERR_PROTOCOL);
437 goto finish;
438 }
439
440 if (pa_tagstruct_getu32(t, &channel) < 0 ||
441 pa_tagstruct_getu32(t, &di) < 0 ||
442 pa_tagstruct_gets(t, &dn) < 0 ||
443 pa_tagstruct_get_boolean(t, &suspended) < 0) {
444 pa_context_fail(c, PA_ERR_PROTOCOL);
445 goto finish;
446 }
447
448 if (c->version >= 13) {
449
450 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
451 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
452 pa_tagstruct_getu32(t, &fragsize) < 0 ||
453 pa_tagstruct_get_usec(t, &usec) < 0) {
454 pa_context_fail(c, PA_ERR_PROTOCOL);
455 goto finish;
456 }
457 } else {
458 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
459 pa_tagstruct_getu32(t, &tlength) < 0 ||
460 pa_tagstruct_getu32(t, &prebuf) < 0 ||
461 pa_tagstruct_getu32(t, &minreq) < 0 ||
462 pa_tagstruct_get_usec(t, &usec) < 0) {
463 pa_context_fail(c, PA_ERR_PROTOCOL);
464 goto finish;
465 }
466 }
467 }
468
469 if (!pa_tagstruct_eof(t)) {
470 pa_context_fail(c, PA_ERR_PROTOCOL);
471 goto finish;
472 }
473
474 if (!dn || di == PA_INVALID_INDEX) {
475 pa_context_fail(c, PA_ERR_PROTOCOL);
476 goto finish;
477 }
478
479 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
480 goto finish;
481
482 if (s->state != PA_STREAM_READY)
483 goto finish;
484
485 if (c->version >= 13) {
486 if (s->direction == PA_STREAM_RECORD)
487 s->timing_info.configured_source_usec = usec;
488 else
489 s->timing_info.configured_sink_usec = usec;
490
491 s->buffer_attr.maxlength = maxlength;
492 s->buffer_attr.fragsize = fragsize;
493 s->buffer_attr.tlength = tlength;
494 s->buffer_attr.prebuf = prebuf;
495 s->buffer_attr.minreq = minreq;
496 }
497
498 pa_xfree(s->device_name);
499 s->device_name = pa_xstrdup(dn);
500 s->device_index = di;
501
502 s->suspended = suspended;
503
504 check_smoother_status(s, TRUE, FALSE, FALSE);
505 request_auto_timing_update(s, TRUE);
506
507 if (s->moved_callback)
508 s->moved_callback(s, s->moved_userdata);
509
510 finish:
511 pa_context_unref(c);
512 }
513
514 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
515 pa_context *c = userdata;
516 pa_stream *s;
517 uint32_t channel;
518 pa_usec_t usec = 0;
519 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
520
521 pa_assert(pd);
522 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
523 pa_assert(t);
524 pa_assert(c);
525 pa_assert(PA_REFCNT_VALUE(c) >= 1);
526
527 pa_context_ref(c);
528
529 if (c->version < 15) {
530 pa_context_fail(c, PA_ERR_PROTOCOL);
531 goto finish;
532 }
533
534 if (pa_tagstruct_getu32(t, &channel) < 0) {
535 pa_context_fail(c, PA_ERR_PROTOCOL);
536 goto finish;
537 }
538
539 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
540 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
541 pa_tagstruct_getu32(t, &fragsize) < 0 ||
542 pa_tagstruct_get_usec(t, &usec) < 0) {
543 pa_context_fail(c, PA_ERR_PROTOCOL);
544 goto finish;
545 }
546 } else {
547 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
548 pa_tagstruct_getu32(t, &tlength) < 0 ||
549 pa_tagstruct_getu32(t, &prebuf) < 0 ||
550 pa_tagstruct_getu32(t, &minreq) < 0 ||
551 pa_tagstruct_get_usec(t, &usec) < 0) {
552 pa_context_fail(c, PA_ERR_PROTOCOL);
553 goto finish;
554 }
555 }
556
557 if (!pa_tagstruct_eof(t)) {
558 pa_context_fail(c, PA_ERR_PROTOCOL);
559 goto finish;
560 }
561
562 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
563 goto finish;
564
565 if (s->state != PA_STREAM_READY)
566 goto finish;
567
568 if (s->direction == PA_STREAM_RECORD)
569 s->timing_info.configured_source_usec = usec;
570 else
571 s->timing_info.configured_sink_usec = usec;
572
573 s->buffer_attr.maxlength = maxlength;
574 s->buffer_attr.fragsize = fragsize;
575 s->buffer_attr.tlength = tlength;
576 s->buffer_attr.prebuf = prebuf;
577 s->buffer_attr.minreq = minreq;
578
579 request_auto_timing_update(s, TRUE);
580
581 if (s->buffer_attr_callback)
582 s->buffer_attr_callback(s, s->buffer_attr_userdata);
583
584 finish:
585 pa_context_unref(c);
586 }
587
588 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
589 pa_context *c = userdata;
590 pa_stream *s;
591 uint32_t channel;
592 pa_bool_t suspended;
593
594 pa_assert(pd);
595 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
596 pa_assert(t);
597 pa_assert(c);
598 pa_assert(PA_REFCNT_VALUE(c) >= 1);
599
600 pa_context_ref(c);
601
602 if (c->version < 12) {
603 pa_context_fail(c, PA_ERR_PROTOCOL);
604 goto finish;
605 }
606
607 if (pa_tagstruct_getu32(t, &channel) < 0 ||
608 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
609 !pa_tagstruct_eof(t)) {
610 pa_context_fail(c, PA_ERR_PROTOCOL);
611 goto finish;
612 }
613
614 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
615 goto finish;
616
617 if (s->state != PA_STREAM_READY)
618 goto finish;
619
620 s->suspended = suspended;
621
622 check_smoother_status(s, TRUE, FALSE, FALSE);
623 request_auto_timing_update(s, TRUE);
624
625 if (s->suspended_callback)
626 s->suspended_callback(s, s->suspended_userdata);
627
628 finish:
629 pa_context_unref(c);
630 }
631
632 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
633 pa_context *c = userdata;
634 pa_stream *s;
635 uint32_t channel;
636
637 pa_assert(pd);
638 pa_assert(command == PA_COMMAND_STARTED);
639 pa_assert(t);
640 pa_assert(c);
641 pa_assert(PA_REFCNT_VALUE(c) >= 1);
642
643 pa_context_ref(c);
644
645 if (c->version < 13) {
646 pa_context_fail(c, PA_ERR_PROTOCOL);
647 goto finish;
648 }
649
650 if (pa_tagstruct_getu32(t, &channel) < 0 ||
651 !pa_tagstruct_eof(t)) {
652 pa_context_fail(c, PA_ERR_PROTOCOL);
653 goto finish;
654 }
655
656 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
657 goto finish;
658
659 if (s->state != PA_STREAM_READY)
660 goto finish;
661
662 check_smoother_status(s, TRUE, TRUE, FALSE);
663 request_auto_timing_update(s, TRUE);
664
665 if (s->started_callback)
666 s->started_callback(s, s->started_userdata);
667
668 finish:
669 pa_context_unref(c);
670 }
671
672 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
673 pa_context *c = userdata;
674 pa_stream *s;
675 uint32_t channel;
676 pa_proplist *pl = NULL;
677 const char *event;
678
679 pa_assert(pd);
680 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
681 pa_assert(t);
682 pa_assert(c);
683 pa_assert(PA_REFCNT_VALUE(c) >= 1);
684
685 pa_context_ref(c);
686
687 if (c->version < 15) {
688 pa_context_fail(c, PA_ERR_PROTOCOL);
689 goto finish;
690 }
691
692 pl = pa_proplist_new();
693
694 if (pa_tagstruct_getu32(t, &channel) < 0 ||
695 pa_tagstruct_gets(t, &event) < 0 ||
696 pa_tagstruct_get_proplist(t, pl) < 0 ||
697 !pa_tagstruct_eof(t) || !event) {
698 pa_context_fail(c, PA_ERR_PROTOCOL);
699 goto finish;
700 }
701
702 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
703 goto finish;
704
705 if (s->state != PA_STREAM_READY)
706 goto finish;
707
708 if (s->event_callback)
709 s->event_callback(s, event, pl, s->event_userdata);
710
711 finish:
712 pa_context_unref(c);
713
714 if (pl)
715 pa_proplist_free(pl);
716 }
717
718 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
719 pa_stream *s;
720 pa_context *c = userdata;
721 uint32_t bytes, channel;
722
723 pa_assert(pd);
724 pa_assert(command == PA_COMMAND_REQUEST);
725 pa_assert(t);
726 pa_assert(c);
727 pa_assert(PA_REFCNT_VALUE(c) >= 1);
728
729 pa_context_ref(c);
730
731 if (pa_tagstruct_getu32(t, &channel) < 0 ||
732 pa_tagstruct_getu32(t, &bytes) < 0 ||
733 !pa_tagstruct_eof(t)) {
734 pa_context_fail(c, PA_ERR_PROTOCOL);
735 goto finish;
736 }
737
738 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
739 goto finish;
740
741 if (s->state != PA_STREAM_READY)
742 goto finish;
743
744 s->requested_bytes += bytes;
745
746 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
747
748 if (s->requested_bytes > 0 && s->write_callback)
749 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
750
751 finish:
752 pa_context_unref(c);
753 }
754
755 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
756 pa_stream *s;
757 pa_context *c = userdata;
758 uint32_t channel;
759
760 pa_assert(pd);
761 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
762 pa_assert(t);
763 pa_assert(c);
764 pa_assert(PA_REFCNT_VALUE(c) >= 1);
765
766 pa_context_ref(c);
767
768 if (pa_tagstruct_getu32(t, &channel) < 0 ||
769 !pa_tagstruct_eof(t)) {
770 pa_context_fail(c, PA_ERR_PROTOCOL);
771 goto finish;
772 }
773
774 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
775 goto finish;
776
777 if (s->state != PA_STREAM_READY)
778 goto finish;
779
780 if (s->buffer_attr.prebuf > 0)
781 check_smoother_status(s, TRUE, FALSE, TRUE);
782
783 request_auto_timing_update(s, TRUE);
784
785 if (command == PA_COMMAND_OVERFLOW) {
786 if (s->overflow_callback)
787 s->overflow_callback(s, s->overflow_userdata);
788 } else if (command == PA_COMMAND_UNDERFLOW) {
789 if (s->underflow_callback)
790 s->underflow_callback(s, s->underflow_userdata);
791 }
792
793 finish:
794 pa_context_unref(c);
795 }
796
797 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
798 pa_assert(s);
799 pa_assert(PA_REFCNT_VALUE(s) >= 1);
800
801 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
802
803 if (s->state != PA_STREAM_READY)
804 return;
805
806 if (w) {
807 s->write_index_not_before = s->context->ctag;
808
809 if (s->timing_info_valid)
810 s->timing_info.write_index_corrupt = TRUE;
811
812 /* pa_log("write_index invalidated"); */
813 }
814
815 if (r) {
816 s->read_index_not_before = s->context->ctag;
817
818 if (s->timing_info_valid)
819 s->timing_info.read_index_corrupt = TRUE;
820
821 /* pa_log("read_index invalidated"); */
822 }
823
824 request_auto_timing_update(s, TRUE);
825 }
826
827 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
828 pa_stream *s = userdata;
829
830 pa_assert(s);
831 pa_assert(PA_REFCNT_VALUE(s) >= 1);
832
833 pa_stream_ref(s);
834 request_auto_timing_update(s, FALSE);
835 pa_stream_unref(s);
836 }
837
838 static void create_stream_complete(pa_stream *s) {
839 pa_assert(s);
840 pa_assert(PA_REFCNT_VALUE(s) >= 1);
841 pa_assert(s->state == PA_STREAM_CREATING);
842
843 pa_stream_set_state(s, PA_STREAM_READY);
844
845 if (s->requested_bytes > 0 && s->write_callback)
846 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
847
848 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
849 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
850 pa_assert(!s->auto_timing_update_event);
851 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
852
853 request_auto_timing_update(s, TRUE);
854 }
855
856 check_smoother_status(s, TRUE, FALSE, FALSE);
857 }
858
859 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
860 const char *e;
861
862 pa_assert(s);
863 pa_assert(attr);
864
865 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
866 uint32_t ms;
867
868 if (pa_atou(e, &ms) < 0 || ms <= 0)
869 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
870 else {
871 attr->maxlength = (uint32_t) -1;
872 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
873 attr->minreq = (uint32_t) -1;
874 attr->prebuf = (uint32_t) -1;
875 attr->fragsize = attr->tlength;
876 }
877
878 if (flags)
879 *flags |= PA_STREAM_ADJUST_LATENCY;
880 }
881
882 if (s->context->version >= 13)
883 return;
884
885 /* Version older than 0.9.10 didn't do server side buffer_attr
886 * selection, hence we have to fake it on the client side. */
887
888 /* We choose fairly conservative values here, to not confuse
889 * old clients with extremely large playback buffers */
890
891 if (attr->maxlength == (uint32_t) -1)
892 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
893
894 if (attr->tlength == (uint32_t) -1)
895 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
896
897 if (attr->minreq == (uint32_t) -1)
898 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
899
900 if (attr->prebuf == (uint32_t) -1)
901 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
902
903 if (attr->fragsize == (uint32_t) -1)
904 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
905 }
906
907 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
908 pa_stream *s = userdata;
909 uint32_t requested_bytes = 0;
910
911 pa_assert(pd);
912 pa_assert(s);
913 pa_assert(PA_REFCNT_VALUE(s) >= 1);
914 pa_assert(s->state == PA_STREAM_CREATING);
915
916 pa_stream_ref(s);
917
918 if (command != PA_COMMAND_REPLY) {
919 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
920 goto finish;
921
922 pa_stream_set_state(s, PA_STREAM_FAILED);
923 goto finish;
924 }
925
926 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
927 s->channel == PA_INVALID_INDEX ||
928 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
929 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
930 pa_context_fail(s->context, PA_ERR_PROTOCOL);
931 goto finish;
932 }
933
934 s->requested_bytes = (int64_t) requested_bytes;
935
936 if (s->context->version >= 9) {
937 if (s->direction == PA_STREAM_PLAYBACK) {
938 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
939 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
940 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
941 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
942 pa_context_fail(s->context, PA_ERR_PROTOCOL);
943 goto finish;
944 }
945 } else if (s->direction == PA_STREAM_RECORD) {
946 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
947 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
948 pa_context_fail(s->context, PA_ERR_PROTOCOL);
949 goto finish;
950 }
951 }
952 }
953
954 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
955 pa_sample_spec ss;
956 pa_channel_map cm;
957 const char *dn = NULL;
958 pa_bool_t suspended;
959
960 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
962 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
963 pa_tagstruct_gets(t, &dn) < 0 ||
964 pa_tagstruct_get_boolean(t, &suspended) < 0) {
965 pa_context_fail(s->context, PA_ERR_PROTOCOL);
966 goto finish;
967 }
968
969 if (!dn || s->device_index == PA_INVALID_INDEX ||
970 ss.channels != cm.channels ||
971 !pa_channel_map_valid(&cm) ||
972 !pa_sample_spec_valid(&ss) ||
973 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
974 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
975 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
976 pa_context_fail(s->context, PA_ERR_PROTOCOL);
977 goto finish;
978 }
979
980 pa_xfree(s->device_name);
981 s->device_name = pa_xstrdup(dn);
982 s->suspended = suspended;
983
984 s->channel_map = cm;
985 s->sample_spec = ss;
986 }
987
988 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
989 pa_usec_t usec;
990
991 if (pa_tagstruct_get_usec(t, &usec) < 0) {
992 pa_context_fail(s->context, PA_ERR_PROTOCOL);
993 goto finish;
994 }
995
996 if (s->direction == PA_STREAM_RECORD)
997 s->timing_info.configured_source_usec = usec;
998 else
999 s->timing_info.configured_sink_usec = usec;
1000 }
1001
1002 if (!pa_tagstruct_eof(t)) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004 goto finish;
1005 }
1006
1007 if (s->direction == PA_STREAM_RECORD) {
1008 pa_assert(!s->record_memblockq);
1009
1010 s->record_memblockq = pa_memblockq_new(
1011 0,
1012 s->buffer_attr.maxlength,
1013 0,
1014 pa_frame_size(&s->sample_spec),
1015 1,
1016 0,
1017 0,
1018 NULL);
1019 }
1020
1021 s->channel_valid = TRUE;
1022 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1023
1024 create_stream_complete(s);
1025
1026 finish:
1027 pa_stream_unref(s);
1028 }
1029
1030 static int create_stream(
1031 pa_stream_direction_t direction,
1032 pa_stream *s,
1033 const char *dev,
1034 const pa_buffer_attr *attr,
1035 pa_stream_flags_t flags,
1036 const pa_cvolume *volume,
1037 pa_stream *sync_stream) {
1038
1039 pa_tagstruct *t;
1040 uint32_t tag;
1041 pa_bool_t volume_set = FALSE;
1042
1043 pa_assert(s);
1044 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1045 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1046
1047 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1048 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1049 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1050 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1051 PA_STREAM_INTERPOLATE_TIMING|
1052 PA_STREAM_NOT_MONOTONIC|
1053 PA_STREAM_AUTO_TIMING_UPDATE|
1054 PA_STREAM_NO_REMAP_CHANNELS|
1055 PA_STREAM_NO_REMIX_CHANNELS|
1056 PA_STREAM_FIX_FORMAT|
1057 PA_STREAM_FIX_RATE|
1058 PA_STREAM_FIX_CHANNELS|
1059 PA_STREAM_DONT_MOVE|
1060 PA_STREAM_VARIABLE_RATE|
1061 PA_STREAM_PEAK_DETECT|
1062 PA_STREAM_START_MUTED|
1063 PA_STREAM_ADJUST_LATENCY|
1064 PA_STREAM_EARLY_REQUESTS|
1065 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1066 PA_STREAM_START_UNMUTED|
1067 PA_STREAM_FAIL_ON_SUSPEND|
1068 PA_STREAM_RELATIVE_VOLUME|
1069 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1070
1071
1072 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1073 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1074 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1075 /* Althought some of the other flags are not supported on older
1076 * version, we don't check for them here, because it doesn't hurt
1077 * when they are passed but actually not supported. This makes
1078 * client development easier */
1079
1080 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1081 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1082 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1083 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1084 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1085
1086 pa_stream_ref(s);
1087
1088 s->direction = direction;
1089
1090 if (sync_stream)
1091 s->syncid = sync_stream->syncid;
1092
1093 if (attr)
1094 s->buffer_attr = *attr;
1095 patch_buffer_attr(s, &s->buffer_attr, &flags);
1096
1097 s->flags = flags;
1098 s->corked = !!(flags & PA_STREAM_START_CORKED);
1099
1100 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1101 pa_usec_t x;
1102
1103 x = pa_rtclock_now();
1104
1105 pa_assert(!s->smoother);
1106 s->smoother = pa_smoother_new(
1107 SMOOTHER_ADJUST_TIME,
1108 SMOOTHER_HISTORY_TIME,
1109 !(flags & PA_STREAM_NOT_MONOTONIC),
1110 TRUE,
1111 SMOOTHER_MIN_HISTORY,
1112 x,
1113 TRUE);
1114 }
1115
1116 if (!dev)
1117 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1118
1119 t = pa_tagstruct_command(
1120 s->context,
1121 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1122 &tag);
1123
1124 if (s->context->version < 13)
1125 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1126
1127 pa_tagstruct_put(
1128 t,
1129 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1130 PA_TAG_CHANNEL_MAP, &s->channel_map,
1131 PA_TAG_U32, PA_INVALID_INDEX,
1132 PA_TAG_STRING, dev,
1133 PA_TAG_U32, s->buffer_attr.maxlength,
1134 PA_TAG_BOOLEAN, s->corked,
1135 PA_TAG_INVALID);
1136
1137 if (s->direction == PA_STREAM_PLAYBACK) {
1138 pa_cvolume cv;
1139
1140 pa_tagstruct_put(
1141 t,
1142 PA_TAG_U32, s->buffer_attr.tlength,
1143 PA_TAG_U32, s->buffer_attr.prebuf,
1144 PA_TAG_U32, s->buffer_attr.minreq,
1145 PA_TAG_U32, s->syncid,
1146 PA_TAG_INVALID);
1147
1148 volume_set = !!volume;
1149
1150 if (!volume)
1151 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1152
1153 pa_tagstruct_put_cvolume(t, volume);
1154 } else
1155 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1156
1157 if (s->context->version >= 12) {
1158 pa_tagstruct_put(
1159 t,
1160 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1161 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1162 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1163 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1164 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1165 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1166 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1167 PA_TAG_INVALID);
1168 }
1169
1170 if (s->context->version >= 13) {
1171
1172 if (s->direction == PA_STREAM_PLAYBACK)
1173 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1174 else
1175 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1176
1177 pa_tagstruct_put(
1178 t,
1179 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1180 PA_TAG_PROPLIST, s->proplist,
1181 PA_TAG_INVALID);
1182
1183 if (s->direction == PA_STREAM_RECORD)
1184 pa_tagstruct_putu32(t, s->direct_on_input);
1185 }
1186
1187 if (s->context->version >= 14) {
1188
1189 if (s->direction == PA_STREAM_PLAYBACK)
1190 pa_tagstruct_put_boolean(t, volume_set);
1191
1192 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1193 }
1194
1195 if (s->context->version >= 15) {
1196
1197 if (s->direction == PA_STREAM_PLAYBACK)
1198 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1199
1200 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1201 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1202 }
1203
1204 if (s->context->version >= 17) {
1205
1206 if (s->direction == PA_STREAM_PLAYBACK)
1207 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1208
1209 }
1210
1211 if (s->context->version >= 18) {
1212
1213 if (s->direction == PA_STREAM_PLAYBACK)
1214 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1215 }
1216
1217 pa_pstream_send_tagstruct(s->context->pstream, t);
1218 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1219
1220 pa_stream_set_state(s, PA_STREAM_CREATING);
1221
1222 pa_stream_unref(s);
1223 return 0;
1224 }
1225
1226 int pa_stream_connect_playback(
1227 pa_stream *s,
1228 const char *dev,
1229 const pa_buffer_attr *attr,
1230 pa_stream_flags_t flags,
1231 const pa_cvolume *volume,
1232 pa_stream *sync_stream) {
1233
1234 pa_assert(s);
1235 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1236
1237 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1238 }
1239
1240 int pa_stream_connect_record(
1241 pa_stream *s,
1242 const char *dev,
1243 const pa_buffer_attr *attr,
1244 pa_stream_flags_t flags) {
1245
1246 pa_assert(s);
1247 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1248
1249 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1250 }
1251
1252 int pa_stream_begin_write(
1253 pa_stream *s,
1254 void **data,
1255 size_t *nbytes) {
1256
1257 pa_assert(s);
1258 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1259
1260 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1261 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1262 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1263 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1264 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1265
1266 if (*nbytes != (size_t) -1) {
1267 size_t m, fs;
1268
1269 m = pa_mempool_block_size_max(s->context->mempool);
1270 fs = pa_frame_size(&s->sample_spec);
1271
1272 m = (m / fs) * fs;
1273 if (*nbytes > m)
1274 *nbytes = m;
1275 }
1276
1277 if (!s->write_memblock) {
1278 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1279 s->write_data = pa_memblock_acquire(s->write_memblock);
1280 }
1281
1282 *data = s->write_data;
1283 *nbytes = pa_memblock_get_length(s->write_memblock);
1284
1285 return 0;
1286 }
1287
1288 int pa_stream_cancel_write(
1289 pa_stream *s) {
1290
1291 pa_assert(s);
1292 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1293
1294 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1295 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1296 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1297 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1298
1299 pa_assert(s->write_data);
1300
1301 pa_memblock_release(s->write_memblock);
1302 pa_memblock_unref(s->write_memblock);
1303 s->write_memblock = NULL;
1304 s->write_data = NULL;
1305
1306 return 0;
1307 }
1308
1309 int pa_stream_write(
1310 pa_stream *s,
1311 const void *data,
1312 size_t length,
1313 pa_free_cb_t free_cb,
1314 int64_t offset,
1315 pa_seek_mode_t seek) {
1316
1317 pa_assert(s);
1318 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1319 pa_assert(data);
1320
1321 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1322 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1323 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1324 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1325 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1326 PA_CHECK_VALIDITY(s->context,
1327 !s->write_memblock ||
1328 ((data >= s->write_data) &&
1329 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1330 PA_ERR_INVALID);
1331 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1332
1333 if (s->write_memblock) {
1334 pa_memchunk chunk;
1335
1336 /* pa_stream_write_begin() was called before */
1337
1338 pa_memblock_release(s->write_memblock);
1339
1340 chunk.memblock = s->write_memblock;
1341 chunk.index = (const char *) data - (const char *) s->write_data;
1342 chunk.length = length;
1343
1344 s->write_memblock = NULL;
1345 s->write_data = NULL;
1346
1347 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1348 pa_memblock_unref(chunk.memblock);
1349
1350 } else {
1351 pa_seek_mode_t t_seek = seek;
1352 int64_t t_offset = offset;
1353 size_t t_length = length;
1354 const void *t_data = data;
1355
1356 /* pa_stream_write_begin() was not called before */
1357
1358 while (t_length > 0) {
1359 pa_memchunk chunk;
1360
1361 chunk.index = 0;
1362
1363 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1364 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1365 chunk.length = t_length;
1366 } else {
1367 void *d;
1368
1369 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1370 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1371
1372 d = pa_memblock_acquire(chunk.memblock);
1373 memcpy(d, t_data, chunk.length);
1374 pa_memblock_release(chunk.memblock);
1375 }
1376
1377 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1378
1379 t_offset = 0;
1380 t_seek = PA_SEEK_RELATIVE;
1381
1382 t_data = (const uint8_t*) t_data + chunk.length;
1383 t_length -= chunk.length;
1384
1385 pa_memblock_unref(chunk.memblock);
1386 }
1387
1388 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1389 free_cb((void*) data);
1390 }
1391
1392 /* This is obviously wrong since we ignore the seeking index . But
1393 * that's OK, the server side applies the same error */
1394 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1395
1396 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1397
1398 if (s->direction == PA_STREAM_PLAYBACK) {
1399
1400 /* Update latency request correction */
1401 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1402
1403 if (seek == PA_SEEK_ABSOLUTE) {
1404 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1405 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1406 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1407 } else if (seek == PA_SEEK_RELATIVE) {
1408 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1409 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1410 } else
1411 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1412 }
1413
1414 /* Update the write index in the already available latency data */
1415 if (s->timing_info_valid) {
1416
1417 if (seek == PA_SEEK_ABSOLUTE) {
1418 s->timing_info.write_index_corrupt = FALSE;
1419 s->timing_info.write_index = offset + (int64_t) length;
1420 } else if (seek == PA_SEEK_RELATIVE) {
1421 if (!s->timing_info.write_index_corrupt)
1422 s->timing_info.write_index += offset + (int64_t) length;
1423 } else
1424 s->timing_info.write_index_corrupt = TRUE;
1425 }
1426
1427 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1428 request_auto_timing_update(s, TRUE);
1429 }
1430
1431 return 0;
1432 }
1433
1434 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1435 pa_assert(s);
1436 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1437 pa_assert(data);
1438 pa_assert(length);
1439
1440 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1441 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1442 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1443
1444 if (!s->peek_memchunk.memblock) {
1445
1446 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1447 *data = NULL;
1448 *length = 0;
1449 return 0;
1450 }
1451
1452 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1453 }
1454
1455 pa_assert(s->peek_data);
1456 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1457 *length = s->peek_memchunk.length;
1458 return 0;
1459 }
1460
1461 int pa_stream_drop(pa_stream *s) {
1462 pa_assert(s);
1463 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1464
1465 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1466 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1467 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1468 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1469
1470 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1471
1472 /* Fix the simulated local read index */
1473 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1474 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1475
1476 pa_assert(s->peek_data);
1477 pa_memblock_release(s->peek_memchunk.memblock);
1478 pa_memblock_unref(s->peek_memchunk.memblock);
1479 pa_memchunk_reset(&s->peek_memchunk);
1480
1481 return 0;
1482 }
1483
1484 size_t pa_stream_writable_size(pa_stream *s) {
1485 pa_assert(s);
1486 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1487
1488 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1489 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1490 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1491
1492 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1493 }
1494
1495 size_t pa_stream_readable_size(pa_stream *s) {
1496 pa_assert(s);
1497 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1498
1499 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1500 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1501 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1502
1503 return pa_memblockq_get_length(s->record_memblockq);
1504 }
1505
1506 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1507 pa_operation *o;
1508 pa_tagstruct *t;
1509 uint32_t tag;
1510
1511 pa_assert(s);
1512 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1513
1514 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1515 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1516 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1517
1518 /* Ask for a timing update before we cork/uncork to get the best
1519 * accuracy for the transport latency suitable for the
1520 * check_smoother_status() call in the started callback */
1521 request_auto_timing_update(s, TRUE);
1522
1523 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1524
1525 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1526 pa_tagstruct_putu32(t, s->channel);
1527 pa_pstream_send_tagstruct(s->context->pstream, t);
1528 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1529
1530 /* This might cause the read index to conitnue again, hence
1531 * let's request a timing update */
1532 request_auto_timing_update(s, TRUE);
1533
1534 return o;
1535 }
1536
1537 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1538 pa_usec_t usec;
1539
1540 pa_assert(s);
1541 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1542 pa_assert(s->state == PA_STREAM_READY);
1543 pa_assert(s->direction != PA_STREAM_UPLOAD);
1544 pa_assert(s->timing_info_valid);
1545 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1546 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1547
1548 if (s->direction == PA_STREAM_PLAYBACK) {
1549 /* The last byte that was written into the output device
1550 * had this time value associated */
1551 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1552
1553 if (!s->corked && !s->suspended) {
1554
1555 if (!ignore_transport)
1556 /* Because the latency info took a little time to come
1557 * to us, we assume that the real output time is actually
1558 * a little ahead */
1559 usec += s->timing_info.transport_usec;
1560
1561 /* However, the output device usually maintains a buffer
1562 too, hence the real sample currently played is a little
1563 back */
1564 if (s->timing_info.sink_usec >= usec)
1565 usec = 0;
1566 else
1567 usec -= s->timing_info.sink_usec;
1568 }
1569
1570 } else {
1571 pa_assert(s->direction == PA_STREAM_RECORD);
1572
1573 /* The last byte written into the server side queue had
1574 * this time value associated */
1575 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1576
1577 if (!s->corked && !s->suspended) {
1578
1579 if (!ignore_transport)
1580 /* Add transport latency */
1581 usec += s->timing_info.transport_usec;
1582
1583 /* Add latency of data in device buffer */
1584 usec += s->timing_info.source_usec;
1585
1586 /* If this is a monitor source, we need to correct the
1587 * time by the playback device buffer */
1588 if (s->timing_info.sink_usec >= usec)
1589 usec = 0;
1590 else
1591 usec -= s->timing_info.sink_usec;
1592 }
1593 }
1594
1595 return usec;
1596 }
1597
1598 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1599 pa_operation *o = userdata;
1600 struct timeval local, remote, now;
1601 pa_timing_info *i;
1602 pa_bool_t playing = FALSE;
1603 uint64_t underrun_for = 0, playing_for = 0;
1604
1605 pa_assert(pd);
1606 pa_assert(o);
1607 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1608
1609 if (!o->context || !o->stream)
1610 goto finish;
1611
1612 i = &o->stream->timing_info;
1613
1614 o->stream->timing_info_valid = FALSE;
1615 i->write_index_corrupt = TRUE;
1616 i->read_index_corrupt = TRUE;
1617
1618 if (command != PA_COMMAND_REPLY) {
1619 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1620 goto finish;
1621
1622 } else {
1623
1624 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1625 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1626 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1627 pa_tagstruct_get_timeval(t, &local) < 0 ||
1628 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1629 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1630 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1631
1632 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1633 goto finish;
1634 }
1635
1636 if (o->context->version >= 13 &&
1637 o->stream->direction == PA_STREAM_PLAYBACK)
1638 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1639 pa_tagstruct_getu64(t, &playing_for) < 0) {
1640
1641 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1642 goto finish;
1643 }
1644
1645
1646 if (!pa_tagstruct_eof(t)) {
1647 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1648 goto finish;
1649 }
1650 o->stream->timing_info_valid = TRUE;
1651 i->write_index_corrupt = FALSE;
1652 i->read_index_corrupt = FALSE;
1653
1654 i->playing = (int) playing;
1655 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1656
1657 pa_gettimeofday(&now);
1658
1659 /* Calculcate timestamps */
1660 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1661 /* local and remote seem to have synchronized clocks */
1662
1663 if (o->stream->direction == PA_STREAM_PLAYBACK)
1664 i->transport_usec = pa_timeval_diff(&remote, &local);
1665 else
1666 i->transport_usec = pa_timeval_diff(&now, &remote);
1667
1668 i->synchronized_clocks = TRUE;
1669 i->timestamp = remote;
1670 } else {
1671 /* clocks are not synchronized, let's estimate latency then */
1672 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1673 i->synchronized_clocks = FALSE;
1674 i->timestamp = local;
1675 pa_timeval_add(&i->timestamp, i->transport_usec);
1676 }
1677
1678 /* Invalidate read and write indexes if necessary */
1679 if (tag < o->stream->read_index_not_before)
1680 i->read_index_corrupt = TRUE;
1681
1682 if (tag < o->stream->write_index_not_before)
1683 i->write_index_corrupt = TRUE;
1684
1685 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1686 /* Write index correction */
1687
1688 int n, j;
1689 uint32_t ctag = tag;
1690
1691 /* Go through the saved correction values and add up the
1692 * total correction.*/
1693 for (n = 0, j = o->stream->current_write_index_correction+1;
1694 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1695 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1696
1697 /* Step over invalid data or out-of-date data */
1698 if (!o->stream->write_index_corrections[j].valid ||
1699 o->stream->write_index_corrections[j].tag < ctag)
1700 continue;
1701
1702 /* Make sure that everything is in order */
1703 ctag = o->stream->write_index_corrections[j].tag+1;
1704
1705 /* Now fix the write index */
1706 if (o->stream->write_index_corrections[j].corrupt) {
1707 /* A corrupting seek was made */
1708 i->write_index_corrupt = TRUE;
1709 } else if (o->stream->write_index_corrections[j].absolute) {
1710 /* An absolute seek was made */
1711 i->write_index = o->stream->write_index_corrections[j].value;
1712 i->write_index_corrupt = FALSE;
1713 } else if (!i->write_index_corrupt) {
1714 /* A relative seek was made */
1715 i->write_index += o->stream->write_index_corrections[j].value;
1716 }
1717 }
1718
1719 /* Clear old correction entries */
1720 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1721 if (!o->stream->write_index_corrections[n].valid)
1722 continue;
1723
1724 if (o->stream->write_index_corrections[n].tag <= tag)
1725 o->stream->write_index_corrections[n].valid = FALSE;
1726 }
1727 }
1728
1729 if (o->stream->direction == PA_STREAM_RECORD) {
1730 /* Read index correction */
1731
1732 if (!i->read_index_corrupt)
1733 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1734 }
1735
1736 /* Update smoother if we're not corked */
1737 if (o->stream->smoother && !o->stream->corked) {
1738 pa_usec_t u, x;
1739
1740 u = x = pa_rtclock_now() - i->transport_usec;
1741
1742 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1743 pa_usec_t su;
1744
1745 /* If we weren't playing then it will take some time
1746 * until the audio will actually come out through the
1747 * speakers. Since we follow that timing here, we need
1748 * to try to fix this up */
1749
1750 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1751
1752 if (su < i->sink_usec)
1753 x += i->sink_usec - su;
1754 }
1755
1756 if (!i->playing)
1757 pa_smoother_pause(o->stream->smoother, x);
1758
1759 /* Update the smoother */
1760 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1761 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1762 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1763
1764 if (i->playing)
1765 pa_smoother_resume(o->stream->smoother, x, TRUE);
1766 }
1767 }
1768
1769 o->stream->auto_timing_update_requested = FALSE;
1770
1771 if (o->stream->latency_update_callback)
1772 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1773
1774 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1775 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1776 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1777 }
1778
1779 finish:
1780
1781 pa_operation_done(o);
1782 pa_operation_unref(o);
1783 }
1784
1785 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1786 uint32_t tag;
1787 pa_operation *o;
1788 pa_tagstruct *t;
1789 struct timeval now;
1790 int cidx = 0;
1791
1792 pa_assert(s);
1793 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1794
1795 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1796 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1797 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1798
1799 if (s->direction == PA_STREAM_PLAYBACK) {
1800 /* Find a place to store the write_index correction data for this entry */
1801 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1802
1803 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1804 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1805 }
1806 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1807
1808 t = pa_tagstruct_command(
1809 s->context,
1810 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1811 &tag);
1812 pa_tagstruct_putu32(t, s->channel);
1813 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1814
1815 pa_pstream_send_tagstruct(s->context->pstream, t);
1816 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1817
1818 if (s->direction == PA_STREAM_PLAYBACK) {
1819 /* Fill in initial correction data */
1820
1821 s->current_write_index_correction = cidx;
1822
1823 s->write_index_corrections[cidx].valid = TRUE;
1824 s->write_index_corrections[cidx].absolute = FALSE;
1825 s->write_index_corrections[cidx].corrupt = FALSE;
1826 s->write_index_corrections[cidx].tag = tag;
1827 s->write_index_corrections[cidx].value = 0;
1828 }
1829
1830 return o;
1831 }
1832
1833 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1834 pa_stream *s = userdata;
1835
1836 pa_assert(pd);
1837 pa_assert(s);
1838 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839
1840 pa_stream_ref(s);
1841
1842 if (command != PA_COMMAND_REPLY) {
1843 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1844 goto finish;
1845
1846 pa_stream_set_state(s, PA_STREAM_FAILED);
1847 goto finish;
1848 } else if (!pa_tagstruct_eof(t)) {
1849 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1850 goto finish;
1851 }
1852
1853 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1854
1855 finish:
1856 pa_stream_unref(s);
1857 }
1858
1859 int pa_stream_disconnect(pa_stream *s) {
1860 pa_tagstruct *t;
1861 uint32_t tag;
1862
1863 pa_assert(s);
1864 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1865
1866 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1867 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1868 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1869
1870 pa_stream_ref(s);
1871
1872 t = pa_tagstruct_command(
1873 s->context,
1874 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1875 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1876 &tag);
1877 pa_tagstruct_putu32(t, s->channel);
1878 pa_pstream_send_tagstruct(s->context->pstream, t);
1879 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1880
1881 pa_stream_unref(s);
1882 return 0;
1883 }
1884
1885 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1886 pa_assert(s);
1887 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1888
1889 if (pa_detect_fork())
1890 return;
1891
1892 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1893 return;
1894
1895 s->read_callback = cb;
1896 s->read_userdata = userdata;
1897 }
1898
1899 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1900 pa_assert(s);
1901 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1902
1903 if (pa_detect_fork())
1904 return;
1905
1906 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1907 return;
1908
1909 s->write_callback = cb;
1910 s->write_userdata = userdata;
1911 }
1912
1913 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1914 pa_assert(s);
1915 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916
1917 if (pa_detect_fork())
1918 return;
1919
1920 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1921 return;
1922
1923 s->state_callback = cb;
1924 s->state_userdata = userdata;
1925 }
1926
1927 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1928 pa_assert(s);
1929 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930
1931 if (pa_detect_fork())
1932 return;
1933
1934 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1935 return;
1936
1937 s->overflow_callback = cb;
1938 s->overflow_userdata = userdata;
1939 }
1940
1941 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1942 pa_assert(s);
1943 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945 if (pa_detect_fork())
1946 return;
1947
1948 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1949 return;
1950
1951 s->underflow_callback = cb;
1952 s->underflow_userdata = userdata;
1953 }
1954
1955 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1956 pa_assert(s);
1957 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1958
1959 if (pa_detect_fork())
1960 return;
1961
1962 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1963 return;
1964
1965 s->latency_update_callback = cb;
1966 s->latency_update_userdata = userdata;
1967 }
1968
1969 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1970 pa_assert(s);
1971 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972
1973 if (pa_detect_fork())
1974 return;
1975
1976 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1977 return;
1978
1979 s->moved_callback = cb;
1980 s->moved_userdata = userdata;
1981 }
1982
1983 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1984 pa_assert(s);
1985 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986
1987 if (pa_detect_fork())
1988 return;
1989
1990 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1991 return;
1992
1993 s->suspended_callback = cb;
1994 s->suspended_userdata = userdata;
1995 }
1996
1997 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1998 pa_assert(s);
1999 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000
2001 if (pa_detect_fork())
2002 return;
2003
2004 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2005 return;
2006
2007 s->started_callback = cb;
2008 s->started_userdata = userdata;
2009 }
2010
2011 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2012 pa_assert(s);
2013 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2014
2015 if (pa_detect_fork())
2016 return;
2017
2018 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2019 return;
2020
2021 s->event_callback = cb;
2022 s->event_userdata = userdata;
2023 }
2024
2025 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2026 pa_assert(s);
2027 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2028
2029 if (pa_detect_fork())
2030 return;
2031
2032 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2033 return;
2034
2035 s->buffer_attr_callback = cb;
2036 s->buffer_attr_userdata = userdata;
2037 }
2038
2039 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2040 pa_operation *o = userdata;
2041 int success = 1;
2042
2043 pa_assert(pd);
2044 pa_assert(o);
2045 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2046
2047 if (!o->context)
2048 goto finish;
2049
2050 if (command != PA_COMMAND_REPLY) {
2051 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2052 goto finish;
2053
2054 success = 0;
2055 } else if (!pa_tagstruct_eof(t)) {
2056 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2057 goto finish;
2058 }
2059
2060 if (o->callback) {
2061 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2062 cb(o->stream, success, o->userdata);
2063 }
2064
2065 finish:
2066 pa_operation_done(o);
2067 pa_operation_unref(o);
2068 }
2069
2070 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2071 pa_operation *o;
2072 pa_tagstruct *t;
2073 uint32_t tag;
2074
2075 pa_assert(s);
2076 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2077
2078 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2079 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2080 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2081
2082 /* Ask for a timing update before we cork/uncork to get the best
2083 * accuracy for the transport latency suitable for the
2084 * check_smoother_status() call in the started callback */
2085 request_auto_timing_update(s, TRUE);
2086
2087 s->corked = b;
2088
2089 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2090
2091 t = pa_tagstruct_command(
2092 s->context,
2093 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2094 &tag);
2095 pa_tagstruct_putu32(t, s->channel);
2096 pa_tagstruct_put_boolean(t, !!b);
2097 pa_pstream_send_tagstruct(s->context->pstream, t);
2098 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2099
2100 check_smoother_status(s, FALSE, FALSE, FALSE);
2101
2102 /* This might cause the indexes to hang/start again, hence let's
2103 * request a timing update, after the cork/uncork, too */
2104 request_auto_timing_update(s, TRUE);
2105
2106 return o;
2107 }
2108
2109 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2110 pa_tagstruct *t;
2111 pa_operation *o;
2112 uint32_t tag;
2113
2114 pa_assert(s);
2115 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2118 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2119
2120 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2121
2122 t = pa_tagstruct_command(s->context, command, &tag);
2123 pa_tagstruct_putu32(t, s->channel);
2124 pa_pstream_send_tagstruct(s->context->pstream, t);
2125 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2126
2127 return o;
2128 }
2129
2130 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2131 pa_operation *o;
2132
2133 pa_assert(s);
2134 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2135
2136 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2137 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2138 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2139
2140 /* Ask for a timing update *before* the flush, so that the
2141 * transport usec is as up to date as possible when we get the
2142 * underflow message and update the smoother status*/
2143 request_auto_timing_update(s, TRUE);
2144
2145 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2146 return NULL;
2147
2148 if (s->direction == PA_STREAM_PLAYBACK) {
2149
2150 if (s->write_index_corrections[s->current_write_index_correction].valid)
2151 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2152
2153 if (s->buffer_attr.prebuf > 0)
2154 check_smoother_status(s, FALSE, FALSE, TRUE);
2155
2156 /* This will change the write index, but leave the
2157 * read index untouched. */
2158 invalidate_indexes(s, FALSE, TRUE);
2159
2160 } else
2161 /* For record streams this has no influence on the write
2162 * index, but the read index might jump. */
2163 invalidate_indexes(s, TRUE, FALSE);
2164
2165 /* Note that we do not update requested_bytes here. This is
2166 * because we cannot really know how data actually was dropped
2167 * from the write index due to this. This 'error' will be applied
2168 * by both client and server and hence we should be fine. */
2169
2170 return o;
2171 }
2172
2173 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2174 pa_operation *o;
2175
2176 pa_assert(s);
2177 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2178
2179 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2180 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2181 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2182 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2183
2184 /* Ask for a timing update before we cork/uncork to get the best
2185 * accuracy for the transport latency suitable for the
2186 * check_smoother_status() call in the started callback */
2187 request_auto_timing_update(s, TRUE);
2188
2189 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2190 return NULL;
2191
2192 /* This might cause the read index to hang again, hence
2193 * let's request a timing update */
2194 request_auto_timing_update(s, TRUE);
2195
2196 return o;
2197 }
2198
2199 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2200 pa_operation *o;
2201
2202 pa_assert(s);
2203 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2204
2205 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2206 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2207 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2208 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2209
2210 /* Ask for a timing update before we cork/uncork to get the best
2211 * accuracy for the transport latency suitable for the
2212 * check_smoother_status() call in the started callback */
2213 request_auto_timing_update(s, TRUE);
2214
2215 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2216 return NULL;
2217
2218 /* This might cause the read index to start moving again, hence
2219 * let's request a timing update */
2220 request_auto_timing_update(s, TRUE);
2221
2222 return o;
2223 }
2224
2225 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2226 pa_operation *o;
2227
2228 pa_assert(s);
2229 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2230 pa_assert(name);
2231
2232 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2235
2236 if (s->context->version >= 13) {
2237 pa_proplist *p = pa_proplist_new();
2238
2239 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2240 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2241 pa_proplist_free(p);
2242 } else {
2243 pa_tagstruct *t;
2244 uint32_t tag;
2245
2246 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2247 t = pa_tagstruct_command(
2248 s->context,
2249 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2250 &tag);
2251 pa_tagstruct_putu32(t, s->channel);
2252 pa_tagstruct_puts(t, name);
2253 pa_pstream_send_tagstruct(s->context->pstream, t);
2254 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2255 }
2256
2257 return o;
2258 }
2259
2260 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2261 pa_usec_t usec;
2262
2263 pa_assert(s);
2264 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2265
2266 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2267 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2268 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2269 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2270 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2271 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2272
2273 if (s->smoother)
2274 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2275 else
2276 usec = calc_time(s, FALSE);
2277
2278 /* Make sure the time runs monotonically */
2279 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2280 if (usec < s->previous_time)
2281 usec = s->previous_time;
2282 else
2283 s->previous_time = usec;
2284 }
2285
2286 if (r_usec)
2287 *r_usec = usec;
2288
2289 return 0;
2290 }
2291
2292 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2293 pa_assert(s);
2294 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2295
2296 if (negative)
2297 *negative = 0;
2298
2299 if (a >= b)
2300 return a-b;
2301 else {
2302 if (negative && s->direction == PA_STREAM_RECORD) {
2303 *negative = 1;
2304 return b-a;
2305 } else
2306 return 0;
2307 }
2308 }
2309
2310 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2311 pa_usec_t t, c;
2312 int r;
2313 int64_t cindex;
2314
2315 pa_assert(s);
2316 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2317 pa_assert(r_usec);
2318
2319 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2320 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2321 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2322 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2323 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2324 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2325
2326 if ((r = pa_stream_get_time(s, &t)) < 0)
2327 return r;
2328
2329 if (s->direction == PA_STREAM_PLAYBACK)
2330 cindex = s->timing_info.write_index;
2331 else
2332 cindex = s->timing_info.read_index;
2333
2334 if (cindex < 0)
2335 cindex = 0;
2336
2337 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2338
2339 if (s->direction == PA_STREAM_PLAYBACK)
2340 *r_usec = time_counter_diff(s, c, t, negative);
2341 else
2342 *r_usec = time_counter_diff(s, t, c, negative);
2343
2344 return 0;
2345 }
2346
2347 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2348 pa_assert(s);
2349 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2350
2351 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2352 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2353 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2354 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2355
2356 return &s->timing_info;
2357 }
2358
2359 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2360 pa_assert(s);
2361 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2362
2363 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2364
2365 return &s->sample_spec;
2366 }
2367
2368 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2369 pa_assert(s);
2370 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2371
2372 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2373
2374 return &s->channel_map;
2375 }
2376
2377 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2378 pa_assert(s);
2379 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2380
2381 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2382 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2383 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2384
2385 return &s->buffer_attr;
2386 }
2387
2388 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2389 pa_operation *o = userdata;
2390 int success = 1;
2391
2392 pa_assert(pd);
2393 pa_assert(o);
2394 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2395
2396 if (!o->context)
2397 goto finish;
2398
2399 if (command != PA_COMMAND_REPLY) {
2400 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2401 goto finish;
2402
2403 success = 0;
2404 } else {
2405 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2406 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2407 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2408 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2409 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2410 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2411 goto finish;
2412 }
2413 } else if (o->stream->direction == PA_STREAM_RECORD) {
2414 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2415 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2416 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2417 goto finish;
2418 }
2419 }
2420
2421 if (o->stream->context->version >= 13) {
2422 pa_usec_t usec;
2423
2424 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2425 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2426 goto finish;
2427 }
2428
2429 if (o->stream->direction == PA_STREAM_RECORD)
2430 o->stream->timing_info.configured_source_usec = usec;
2431 else
2432 o->stream->timing_info.configured_sink_usec = usec;
2433 }
2434
2435 if (!pa_tagstruct_eof(t)) {
2436 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2437 goto finish;
2438 }
2439 }
2440
2441 if (o->callback) {
2442 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2443 cb(o->stream, success, o->userdata);
2444 }
2445
2446 finish:
2447 pa_operation_done(o);
2448 pa_operation_unref(o);
2449 }
2450
2451
2452 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2453 pa_operation *o;
2454 pa_tagstruct *t;
2455 uint32_t tag;
2456 pa_buffer_attr copy;
2457
2458 pa_assert(s);
2459 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2460 pa_assert(attr);
2461
2462 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2463 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2464 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2465 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2466
2467 /* Ask for a timing update before we cork/uncork to get the best
2468 * accuracy for the transport latency suitable for the
2469 * check_smoother_status() call in the started callback */
2470 request_auto_timing_update(s, TRUE);
2471
2472 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2473
2474 t = pa_tagstruct_command(
2475 s->context,
2476 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2477 &tag);
2478 pa_tagstruct_putu32(t, s->channel);
2479
2480 copy = *attr;
2481 patch_buffer_attr(s, &copy, NULL);
2482 attr = &copy;
2483
2484 pa_tagstruct_putu32(t, attr->maxlength);
2485
2486 if (s->direction == PA_STREAM_PLAYBACK)
2487 pa_tagstruct_put(
2488 t,
2489 PA_TAG_U32, attr->tlength,
2490 PA_TAG_U32, attr->prebuf,
2491 PA_TAG_U32, attr->minreq,
2492 PA_TAG_INVALID);
2493 else
2494 pa_tagstruct_putu32(t, attr->fragsize);
2495
2496 if (s->context->version >= 13)
2497 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2498
2499 if (s->context->version >= 14)
2500 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2501
2502 pa_pstream_send_tagstruct(s->context->pstream, t);
2503 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2504
2505 /* This might cause changes in the read/write indexex, hence let's
2506 * request a timing update */
2507 request_auto_timing_update(s, TRUE);
2508
2509 return o;
2510 }
2511
2512 uint32_t pa_stream_get_device_index(pa_stream *s) {
2513 pa_assert(s);
2514 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2515
2516 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2517 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2518 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2519 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2520 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2521
2522 return s->device_index;
2523 }
2524
2525 const char *pa_stream_get_device_name(pa_stream *s) {
2526 pa_assert(s);
2527 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2528
2529 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2530 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2531 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2532 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2533 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2534
2535 return s->device_name;
2536 }
2537
2538 int pa_stream_is_suspended(pa_stream *s) {
2539 pa_assert(s);
2540 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2541
2542 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2543 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2544 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2545 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2546
2547 return s->suspended;
2548 }
2549
2550 int pa_stream_is_corked(pa_stream *s) {
2551 pa_assert(s);
2552 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2553
2554 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2555 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2556 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2557
2558 return s->corked;
2559 }
2560
2561 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2562 pa_operation *o = userdata;
2563 int success = 1;
2564
2565 pa_assert(pd);
2566 pa_assert(o);
2567 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2568
2569 if (!o->context)
2570 goto finish;
2571
2572 if (command != PA_COMMAND_REPLY) {
2573 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2574 goto finish;
2575
2576 success = 0;
2577 } else {
2578
2579 if (!pa_tagstruct_eof(t)) {
2580 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2581 goto finish;
2582 }
2583 }
2584
2585 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2586 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2587
2588 if (o->callback) {
2589 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2590 cb(o->stream, success, o->userdata);
2591 }
2592
2593 finish:
2594 pa_operation_done(o);
2595 pa_operation_unref(o);
2596 }
2597
2598
2599 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2600 pa_operation *o;
2601 pa_tagstruct *t;
2602 uint32_t tag;
2603
2604 pa_assert(s);
2605 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2606
2607 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2608 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2609 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2610 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2611 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2612 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2613
2614 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2615 o->private = PA_UINT_TO_PTR(rate);
2616
2617 t = pa_tagstruct_command(
2618 s->context,
2619 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2620 &tag);
2621 pa_tagstruct_putu32(t, s->channel);
2622 pa_tagstruct_putu32(t, rate);
2623
2624 pa_pstream_send_tagstruct(s->context->pstream, t);
2625 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2626
2627 return o;
2628 }
2629
2630 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2631 pa_operation *o;
2632 pa_tagstruct *t;
2633 uint32_t tag;
2634
2635 pa_assert(s);
2636 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2637
2638 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2640 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2641 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2642 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2643
2644 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2645
2646 t = pa_tagstruct_command(
2647 s->context,
2648 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2649 &tag);
2650 pa_tagstruct_putu32(t, s->channel);
2651 pa_tagstruct_putu32(t, (uint32_t) mode);
2652 pa_tagstruct_put_proplist(t, p);
2653
2654 pa_pstream_send_tagstruct(s->context->pstream, t);
2655 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2656
2657 /* Please note that we don't update s->proplist here, because we
2658 * don't export that field */
2659
2660 return o;
2661 }
2662
2663 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2664 pa_operation *o;
2665 pa_tagstruct *t;
2666 uint32_t tag;
2667 const char * const*k;
2668
2669 pa_assert(s);
2670 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2671
2672 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2673 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2674 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2675 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2676 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2677
2678 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2679
2680 t = pa_tagstruct_command(
2681 s->context,
2682 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2683 &tag);
2684 pa_tagstruct_putu32(t, s->channel);
2685
2686 for (k = keys; *k; k++)
2687 pa_tagstruct_puts(t, *k);
2688
2689 pa_tagstruct_puts(t, NULL);
2690
2691 pa_pstream_send_tagstruct(s->context->pstream, t);
2692 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2693
2694 /* Please note that we don't update s->proplist here, because we
2695 * don't export that field */
2696
2697 return o;
2698 }
2699
2700 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2701 pa_assert(s);
2702 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2703
2704 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2705 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2706 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2707 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2708
2709 s->direct_on_input = sink_input_idx;
2710
2711 return 0;
2712 }
2713
2714 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2715 pa_assert(s);
2716 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2717
2718 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2719 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2720 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2721
2722 return s->direct_on_input;
2723 }