]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
Merge commit '2d903bae9a2e57f997a3d3f335379c3880f95c77'
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
49
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
52 }
53
54 static void reset_callbacks(pa_stream *s) {
55 s->read_callback = NULL;
56 s->read_userdata = NULL;
57 s->write_callback = NULL;
58 s->write_userdata = NULL;
59 s->state_callback = NULL;
60 s->state_userdata = NULL;
61 s->overflow_callback = NULL;
62 s->overflow_userdata = NULL;
63 s->underflow_callback = NULL;
64 s->underflow_userdata = NULL;
65 s->latency_update_callback = NULL;
66 s->latency_update_userdata = NULL;
67 s->moved_callback = NULL;
68 s->moved_userdata = NULL;
69 s->suspended_callback = NULL;
70 s->suspended_userdata = NULL;
71 s->started_callback = NULL;
72 s->started_userdata = NULL;
73 s->event_callback = NULL;
74 s->event_userdata = NULL;
75 }
76
77 pa_stream *pa_stream_new_with_proplist(
78 pa_context *c,
79 const char *name,
80 const pa_sample_spec *ss,
81 const pa_channel_map *map,
82 pa_proplist *p) {
83
84 pa_stream *s;
85 int i;
86 pa_channel_map tmap;
87
88 pa_assert(c);
89 pa_assert(PA_REFCNT_VALUE(c) >= 1);
90
91 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
92 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
93 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
98
99 if (!map)
100 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
101
102 s = pa_xnew(pa_stream, 1);
103 PA_REFCNT_INIT(s);
104 s->context = c;
105 s->mainloop = c->mainloop;
106
107 s->direction = PA_STREAM_NODIRECTION;
108 s->state = PA_STREAM_UNCONNECTED;
109 s->flags = 0;
110
111 s->sample_spec = *ss;
112 s->channel_map = *map;
113
114 s->direct_on_input = PA_INVALID_INDEX;
115
116 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
117 if (name)
118 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
119
120 s->channel = 0;
121 s->channel_valid = FALSE;
122 s->syncid = c->csyncid++;
123 s->stream_index = PA_INVALID_INDEX;
124
125 s->requested_bytes = 0;
126 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
127
128 /* We initialize der target length here, so that if the user
129 * passes no explicit buffering metrics the default is similar to
130 * what older PA versions provided. */
131
132 s->buffer_attr.maxlength = (uint32_t) -1;
133 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
134 s->buffer_attr.minreq = (uint32_t) -1;
135 s->buffer_attr.prebuf = (uint32_t) -1;
136 s->buffer_attr.fragsize = (uint32_t) -1;
137
138 s->device_index = PA_INVALID_INDEX;
139 s->device_name = NULL;
140 s->suspended = FALSE;
141
142 pa_memchunk_reset(&s->peek_memchunk);
143 s->peek_data = NULL;
144
145 s->record_memblockq = NULL;
146
147 s->corked = FALSE;
148
149 memset(&s->timing_info, 0, sizeof(s->timing_info));
150 s->timing_info_valid = FALSE;
151
152 s->previous_time = 0;
153
154 s->read_index_not_before = 0;
155 s->write_index_not_before = 0;
156 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
157 s->write_index_corrections[i].valid = 0;
158 s->current_write_index_correction = 0;
159
160 s->auto_timing_update_event = NULL;
161 s->auto_timing_update_requested = FALSE;
162
163 reset_callbacks(s);
164
165 s->smoother = NULL;
166
167 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
168 PA_LLIST_PREPEND(pa_stream, c->streams, s);
169 pa_stream_ref(s);
170
171 return s;
172 }
173
174 static void stream_unlink(pa_stream *s) {
175 pa_operation *o, *n;
176 pa_assert(s);
177
178 if (!s->context)
179 return;
180
181 /* Detach from context */
182
183 /* Unref all operatio object that point to us */
184 for (o = s->context->operations; o; o = n) {
185 n = o->next;
186
187 if (o->stream == s)
188 pa_operation_cancel(o);
189 }
190
191 /* Drop all outstanding replies for this stream */
192 if (s->context->pdispatch)
193 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
194
195 if (s->channel_valid) {
196 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
197 s->channel = 0;
198 s->channel_valid = FALSE;
199 }
200
201 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
202 pa_stream_unref(s);
203
204 s->context = NULL;
205
206 if (s->auto_timing_update_event) {
207 pa_assert(s->mainloop);
208 s->mainloop->time_free(s->auto_timing_update_event);
209 }
210
211 reset_callbacks(s);
212 }
213
214 static void stream_free(pa_stream *s) {
215 pa_assert(s);
216
217 stream_unlink(s);
218
219 if (s->peek_memchunk.memblock) {
220 if (s->peek_data)
221 pa_memblock_release(s->peek_memchunk.memblock);
222 pa_memblock_unref(s->peek_memchunk.memblock);
223 }
224
225 if (s->record_memblockq)
226 pa_memblockq_free(s->record_memblockq);
227
228 if (s->proplist)
229 pa_proplist_free(s->proplist);
230
231 if (s->smoother)
232 pa_smoother_free(s->smoother);
233
234 pa_xfree(s->device_name);
235 pa_xfree(s);
236 }
237
238 void pa_stream_unref(pa_stream *s) {
239 pa_assert(s);
240 pa_assert(PA_REFCNT_VALUE(s) >= 1);
241
242 if (PA_REFCNT_DEC(s) <= 0)
243 stream_free(s);
244 }
245
246 pa_stream* pa_stream_ref(pa_stream *s) {
247 pa_assert(s);
248 pa_assert(PA_REFCNT_VALUE(s) >= 1);
249
250 PA_REFCNT_INC(s);
251 return s;
252 }
253
254 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
255 pa_assert(s);
256 pa_assert(PA_REFCNT_VALUE(s) >= 1);
257
258 return s->state;
259 }
260
261 pa_context* pa_stream_get_context(pa_stream *s) {
262 pa_assert(s);
263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
264
265 return s->context;
266 }
267
268 uint32_t pa_stream_get_index(pa_stream *s) {
269 pa_assert(s);
270 pa_assert(PA_REFCNT_VALUE(s) >= 1);
271
272 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
273 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
274
275 return s->stream_index;
276 }
277
278 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
279 pa_assert(s);
280 pa_assert(PA_REFCNT_VALUE(s) >= 1);
281
282 if (s->state == st)
283 return;
284
285 pa_stream_ref(s);
286
287 s->state = st;
288
289 if (s->state_callback)
290 s->state_callback(s, s->state_userdata);
291
292 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
293 stream_unlink(s);
294
295 pa_stream_unref(s);
296 }
297
298 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
299 pa_assert(s);
300 pa_assert(PA_REFCNT_VALUE(s) >= 1);
301
302 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
303 return;
304
305 if (s->state == PA_STREAM_READY &&
306 (force || !s->auto_timing_update_requested)) {
307 pa_operation *o;
308
309 /* pa_log("automatically requesting new timing data"); */
310
311 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
312 pa_operation_unref(o);
313 s->auto_timing_update_requested = TRUE;
314 }
315 }
316
317 if (s->auto_timing_update_event) {
318 struct timeval next;
319 pa_gettimeofday(&next);
320 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
321 s->mainloop->time_restart(s->auto_timing_update_event, &next);
322 }
323 }
324
325 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
326 pa_context *c = userdata;
327 pa_stream *s;
328 uint32_t channel;
329
330 pa_assert(pd);
331 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
332 pa_assert(t);
333 pa_assert(c);
334 pa_assert(PA_REFCNT_VALUE(c) >= 1);
335
336 pa_context_ref(c);
337
338 if (pa_tagstruct_getu32(t, &channel) < 0 ||
339 !pa_tagstruct_eof(t)) {
340 pa_context_fail(c, PA_ERR_PROTOCOL);
341 goto finish;
342 }
343
344 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
345 goto finish;
346
347 if (s->state != PA_STREAM_READY)
348 goto finish;
349
350 pa_context_set_error(c, PA_ERR_KILLED);
351 pa_stream_set_state(s, PA_STREAM_FAILED);
352
353 finish:
354 pa_context_unref(c);
355 }
356
357 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
358 pa_usec_t x;
359
360 pa_assert(s);
361 pa_assert(!force_start || !force_stop);
362
363 if (!s->smoother)
364 return;
365
366 x = pa_rtclock_usec();
367
368 if (s->timing_info_valid) {
369 if (aposteriori)
370 x -= s->timing_info.transport_usec;
371 else
372 x += s->timing_info.transport_usec;
373
374 if (s->direction == PA_STREAM_PLAYBACK)
375 /* it takes a while until the pause/resume is actually
376 * audible */
377 x += s->timing_info.sink_usec;
378 else
379 /* Data froma while back will be dropped */
380 x -= s->timing_info.source_usec;
381 }
382
383 if (s->suspended || s->corked || force_stop)
384 pa_smoother_pause(s->smoother, x);
385 else if (force_start || s->buffer_attr.prebuf == 0)
386 pa_smoother_resume(s->smoother, x);
387
388 /* Please note that we have no idea if playback actually started
389 * if prebuf is non-zero! */
390 }
391
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393 pa_context *c = userdata;
394 pa_stream *s;
395 uint32_t channel;
396 const char *dn;
397 pa_bool_t suspended;
398 uint32_t di;
399 pa_usec_t usec;
400 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
401
402 pa_assert(pd);
403 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
404 pa_assert(t);
405 pa_assert(c);
406 pa_assert(PA_REFCNT_VALUE(c) >= 1);
407
408 pa_context_ref(c);
409
410 if (c->version < 12) {
411 pa_context_fail(c, PA_ERR_PROTOCOL);
412 goto finish;
413 }
414
415 if (pa_tagstruct_getu32(t, &channel) < 0 ||
416 pa_tagstruct_getu32(t, &di) < 0 ||
417 pa_tagstruct_gets(t, &dn) < 0 ||
418 pa_tagstruct_get_boolean(t, &suspended) < 0) {
419 pa_context_fail(c, PA_ERR_PROTOCOL);
420 goto finish;
421 }
422
423 if (c->version >= 13) {
424
425 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428 pa_tagstruct_get_usec(t, &usec) < 0) {
429 pa_context_fail(c, PA_ERR_PROTOCOL);
430 goto finish;
431 }
432 } else {
433 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434 pa_tagstruct_getu32(t, &tlength) < 0 ||
435 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436 pa_tagstruct_getu32(t, &minreq) < 0 ||
437 pa_tagstruct_get_usec(t, &usec) < 0) {
438 pa_context_fail(c, PA_ERR_PROTOCOL);
439 goto finish;
440 }
441 }
442 }
443
444 if (!pa_tagstruct_eof(t)) {
445 pa_context_fail(c, PA_ERR_PROTOCOL);
446 goto finish;
447 }
448
449 if (!dn || di == PA_INVALID_INDEX) {
450 pa_context_fail(c, PA_ERR_PROTOCOL);
451 goto finish;
452 }
453
454 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
455 goto finish;
456
457 if (s->state != PA_STREAM_READY)
458 goto finish;
459
460 if (c->version >= 13) {
461 if (s->direction == PA_STREAM_RECORD)
462 s->timing_info.configured_source_usec = usec;
463 else
464 s->timing_info.configured_sink_usec = usec;
465
466 s->buffer_attr.maxlength = maxlength;
467 s->buffer_attr.fragsize = fragsize;
468 s->buffer_attr.tlength = tlength;
469 s->buffer_attr.prebuf = prebuf;
470 s->buffer_attr.minreq = minreq;
471 }
472
473 pa_xfree(s->device_name);
474 s->device_name = pa_xstrdup(dn);
475 s->device_index = di;
476
477 s->suspended = suspended;
478
479 check_smoother_status(s, TRUE, FALSE, FALSE);
480 request_auto_timing_update(s, TRUE);
481
482 if (s->moved_callback)
483 s->moved_callback(s, s->moved_userdata);
484
485 finish:
486 pa_context_unref(c);
487 }
488
489 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
491 pa_stream *s;
492 uint32_t channel;
493 pa_bool_t suspended;
494
495 pa_assert(pd);
496 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
497 pa_assert(t);
498 pa_assert(c);
499 pa_assert(PA_REFCNT_VALUE(c) >= 1);
500
501 pa_context_ref(c);
502
503 if (c->version < 12) {
504 pa_context_fail(c, PA_ERR_PROTOCOL);
505 goto finish;
506 }
507
508 if (pa_tagstruct_getu32(t, &channel) < 0 ||
509 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
510 !pa_tagstruct_eof(t)) {
511 pa_context_fail(c, PA_ERR_PROTOCOL);
512 goto finish;
513 }
514
515 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
516 goto finish;
517
518 if (s->state != PA_STREAM_READY)
519 goto finish;
520
521 s->suspended = suspended;
522
523 check_smoother_status(s, TRUE, FALSE, FALSE);
524 request_auto_timing_update(s, TRUE);
525
526 if (s->suspended_callback)
527 s->suspended_callback(s, s->suspended_userdata);
528
529 finish:
530 pa_context_unref(c);
531 }
532
533 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
534 pa_context *c = userdata;
535 pa_stream *s;
536 uint32_t channel;
537
538 pa_assert(pd);
539 pa_assert(command == PA_COMMAND_STARTED);
540 pa_assert(t);
541 pa_assert(c);
542 pa_assert(PA_REFCNT_VALUE(c) >= 1);
543
544 pa_context_ref(c);
545
546 if (c->version < 13) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
548 goto finish;
549 }
550
551 if (pa_tagstruct_getu32(t, &channel) < 0 ||
552 !pa_tagstruct_eof(t)) {
553 pa_context_fail(c, PA_ERR_PROTOCOL);
554 goto finish;
555 }
556
557 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
558 goto finish;
559
560 if (s->state != PA_STREAM_READY)
561 goto finish;
562
563 check_smoother_status(s, TRUE, TRUE, FALSE);
564 request_auto_timing_update(s, TRUE);
565
566 if (s->started_callback)
567 s->started_callback(s, s->started_userdata);
568
569 finish:
570 pa_context_unref(c);
571 }
572
573 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574 pa_context *c = userdata;
575 pa_stream *s;
576 uint32_t channel;
577 pa_proplist *pl = NULL;
578 const char *event;
579
580 pa_assert(pd);
581 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
582 pa_assert(t);
583 pa_assert(c);
584 pa_assert(PA_REFCNT_VALUE(c) >= 1);
585
586 pa_context_ref(c);
587
588 if (c->version < 15) {
589 pa_context_fail(c, PA_ERR_PROTOCOL);
590 goto finish;
591 }
592
593 pl = pa_proplist_new();
594
595 if (pa_tagstruct_getu32(t, &channel) < 0 ||
596 pa_tagstruct_gets(t, &event) < 0 ||
597 pa_tagstruct_get_proplist(t, pl) < 0 ||
598 !pa_tagstruct_eof(t) || !event) {
599 pa_context_fail(c, PA_ERR_PROTOCOL);
600 goto finish;
601 }
602
603 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
604 goto finish;
605
606 if (s->state != PA_STREAM_READY)
607 goto finish;
608
609 if (s->event_callback)
610 s->event_callback(s, event, pl, s->event_userdata);
611
612 finish:
613 pa_context_unref(c);
614
615 if (pl)
616 pa_proplist_free(pl);
617 }
618
619 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
620 pa_stream *s;
621 pa_context *c = userdata;
622 uint32_t bytes, channel;
623
624 pa_assert(pd);
625 pa_assert(command == PA_COMMAND_REQUEST);
626 pa_assert(t);
627 pa_assert(c);
628 pa_assert(PA_REFCNT_VALUE(c) >= 1);
629
630 pa_context_ref(c);
631
632 if (pa_tagstruct_getu32(t, &channel) < 0 ||
633 pa_tagstruct_getu32(t, &bytes) < 0 ||
634 !pa_tagstruct_eof(t)) {
635 pa_context_fail(c, PA_ERR_PROTOCOL);
636 goto finish;
637 }
638
639 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
640 goto finish;
641
642 if (s->state != PA_STREAM_READY)
643 goto finish;
644
645 s->requested_bytes += bytes;
646
647 if (s->requested_bytes > 0 && s->write_callback)
648 s->write_callback(s, s->requested_bytes, s->write_userdata);
649
650 finish:
651 pa_context_unref(c);
652 }
653
654 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
655 pa_stream *s;
656 pa_context *c = userdata;
657 uint32_t channel;
658
659 pa_assert(pd);
660 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
661 pa_assert(t);
662 pa_assert(c);
663 pa_assert(PA_REFCNT_VALUE(c) >= 1);
664
665 pa_context_ref(c);
666
667 if (pa_tagstruct_getu32(t, &channel) < 0 ||
668 !pa_tagstruct_eof(t)) {
669 pa_context_fail(c, PA_ERR_PROTOCOL);
670 goto finish;
671 }
672
673 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
674 goto finish;
675
676 if (s->state != PA_STREAM_READY)
677 goto finish;
678
679 if (s->buffer_attr.prebuf > 0)
680 check_smoother_status(s, TRUE, FALSE, TRUE);
681
682 request_auto_timing_update(s, TRUE);
683
684 if (command == PA_COMMAND_OVERFLOW) {
685 if (s->overflow_callback)
686 s->overflow_callback(s, s->overflow_userdata);
687 } else if (command == PA_COMMAND_UNDERFLOW) {
688 if (s->underflow_callback)
689 s->underflow_callback(s, s->underflow_userdata);
690 }
691
692 finish:
693 pa_context_unref(c);
694 }
695
696 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
697 pa_assert(s);
698 pa_assert(PA_REFCNT_VALUE(s) >= 1);
699
700 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
701
702 if (s->state != PA_STREAM_READY)
703 return;
704
705 if (w) {
706 s->write_index_not_before = s->context->ctag;
707
708 if (s->timing_info_valid)
709 s->timing_info.write_index_corrupt = TRUE;
710
711 /* pa_log("write_index invalidated"); */
712 }
713
714 if (r) {
715 s->read_index_not_before = s->context->ctag;
716
717 if (s->timing_info_valid)
718 s->timing_info.read_index_corrupt = TRUE;
719
720 /* pa_log("read_index invalidated"); */
721 }
722
723 request_auto_timing_update(s, TRUE);
724 }
725
726 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
727 pa_stream *s = userdata;
728
729 pa_assert(s);
730 pa_assert(PA_REFCNT_VALUE(s) >= 1);
731
732 pa_stream_ref(s);
733 request_auto_timing_update(s, FALSE);
734 pa_stream_unref(s);
735 }
736
737 static void create_stream_complete(pa_stream *s) {
738 pa_assert(s);
739 pa_assert(PA_REFCNT_VALUE(s) >= 1);
740 pa_assert(s->state == PA_STREAM_CREATING);
741
742 pa_stream_set_state(s, PA_STREAM_READY);
743
744 if (s->requested_bytes > 0 && s->write_callback)
745 s->write_callback(s, s->requested_bytes, s->write_userdata);
746
747 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
748 struct timeval tv;
749 pa_gettimeofday(&tv);
750 tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
751 pa_assert(!s->auto_timing_update_event);
752 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
753
754 request_auto_timing_update(s, TRUE);
755 }
756
757 check_smoother_status(s, TRUE, FALSE, FALSE);
758 }
759
760 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
761 pa_assert(s);
762 pa_assert(attr);
763 pa_assert(ss);
764
765 if (s->context->version >= 13)
766 return;
767
768 /* Version older than 0.9.10 didn't do server side buffer_attr
769 * selection, hence we have to fake it on the client side. */
770
771 /* We choose fairly conservative values here, to not confuse
772 * old clients with extremely large playback buffers */
773
774 if (attr->maxlength == (uint32_t) -1)
775 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
776
777 if (attr->tlength == (uint32_t) -1)
778 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
779
780 if (attr->minreq == (uint32_t) -1)
781 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
782
783 if (attr->prebuf == (uint32_t) -1)
784 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
785
786 if (attr->fragsize == (uint32_t) -1)
787 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
788 }
789
790 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
791 pa_stream *s = userdata;
792
793 pa_assert(pd);
794 pa_assert(s);
795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
796 pa_assert(s->state == PA_STREAM_CREATING);
797
798 pa_stream_ref(s);
799
800 if (command != PA_COMMAND_REPLY) {
801 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
802 goto finish;
803
804 pa_stream_set_state(s, PA_STREAM_FAILED);
805 goto finish;
806 }
807
808 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
809 s->channel == PA_INVALID_INDEX ||
810 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
811 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
812 pa_context_fail(s->context, PA_ERR_PROTOCOL);
813 goto finish;
814 }
815
816 if (s->context->version >= 9) {
817 if (s->direction == PA_STREAM_PLAYBACK) {
818 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
819 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
820 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
821 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
822 pa_context_fail(s->context, PA_ERR_PROTOCOL);
823 goto finish;
824 }
825 } else if (s->direction == PA_STREAM_RECORD) {
826 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
827 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
828 pa_context_fail(s->context, PA_ERR_PROTOCOL);
829 goto finish;
830 }
831 }
832 }
833
834 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
835 pa_sample_spec ss;
836 pa_channel_map cm;
837 const char *dn = NULL;
838 pa_bool_t suspended;
839
840 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
841 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
842 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
843 pa_tagstruct_gets(t, &dn) < 0 ||
844 pa_tagstruct_get_boolean(t, &suspended) < 0) {
845 pa_context_fail(s->context, PA_ERR_PROTOCOL);
846 goto finish;
847 }
848
849 if (!dn || s->device_index == PA_INVALID_INDEX ||
850 ss.channels != cm.channels ||
851 !pa_channel_map_valid(&cm) ||
852 !pa_sample_spec_valid(&ss) ||
853 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
854 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
855 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
856 pa_context_fail(s->context, PA_ERR_PROTOCOL);
857 goto finish;
858 }
859
860 pa_xfree(s->device_name);
861 s->device_name = pa_xstrdup(dn);
862 s->suspended = suspended;
863
864 s->channel_map = cm;
865 s->sample_spec = ss;
866 }
867
868 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
869 pa_usec_t usec;
870
871 if (pa_tagstruct_get_usec(t, &usec) < 0) {
872 pa_context_fail(s->context, PA_ERR_PROTOCOL);
873 goto finish;
874 }
875
876 if (s->direction == PA_STREAM_RECORD)
877 s->timing_info.configured_source_usec = usec;
878 else
879 s->timing_info.configured_sink_usec = usec;
880 }
881
882 if (!pa_tagstruct_eof(t)) {
883 pa_context_fail(s->context, PA_ERR_PROTOCOL);
884 goto finish;
885 }
886
887 if (s->direction == PA_STREAM_RECORD) {
888 pa_assert(!s->record_memblockq);
889
890 s->record_memblockq = pa_memblockq_new(
891 0,
892 s->buffer_attr.maxlength,
893 0,
894 pa_frame_size(&s->sample_spec),
895 1,
896 0,
897 0,
898 NULL);
899 }
900
901 s->channel_valid = TRUE;
902 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
903
904 create_stream_complete(s);
905
906 finish:
907 pa_stream_unref(s);
908 }
909
910 static int create_stream(
911 pa_stream_direction_t direction,
912 pa_stream *s,
913 const char *dev,
914 const pa_buffer_attr *attr,
915 pa_stream_flags_t flags,
916 const pa_cvolume *volume,
917 pa_stream *sync_stream) {
918
919 pa_tagstruct *t;
920 uint32_t tag;
921 pa_bool_t volume_set = FALSE;
922
923 pa_assert(s);
924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
925 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
926
927 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
928 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
929 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
930 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
931 PA_STREAM_INTERPOLATE_TIMING|
932 PA_STREAM_NOT_MONOTONIC|
933 PA_STREAM_AUTO_TIMING_UPDATE|
934 PA_STREAM_NO_REMAP_CHANNELS|
935 PA_STREAM_NO_REMIX_CHANNELS|
936 PA_STREAM_FIX_FORMAT|
937 PA_STREAM_FIX_RATE|
938 PA_STREAM_FIX_CHANNELS|
939 PA_STREAM_DONT_MOVE|
940 PA_STREAM_VARIABLE_RATE|
941 PA_STREAM_PEAK_DETECT|
942 PA_STREAM_START_MUTED|
943 PA_STREAM_ADJUST_LATENCY|
944 PA_STREAM_EARLY_REQUESTS|
945 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
946 PA_STREAM_START_UNMUTED|
947 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
948
949 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
950 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
951 /* Althought some of the other flags are not supported on older
952 * version, we don't check for them here, because it doesn't hurt
953 * when they are passed but actually not supported. This makes
954 * client development easier */
955
956 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
957 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
958 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
959 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
960 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
961
962 pa_stream_ref(s);
963
964 s->direction = direction;
965 s->flags = flags;
966 s->corked = !!(flags & PA_STREAM_START_CORKED);
967
968 if (sync_stream)
969 s->syncid = sync_stream->syncid;
970
971 if (attr)
972 s->buffer_attr = *attr;
973 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
974
975 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
976 pa_usec_t x;
977
978 if (s->smoother)
979 pa_smoother_free(s->smoother);
980
981 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
982
983 x = pa_rtclock_usec();
984 pa_smoother_set_time_offset(s->smoother, x);
985 pa_smoother_pause(s->smoother, x);
986 }
987
988 if (!dev)
989 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
990
991 t = pa_tagstruct_command(
992 s->context,
993 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
994 &tag);
995
996 if (s->context->version < 13)
997 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
998
999 pa_tagstruct_put(
1000 t,
1001 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1002 PA_TAG_CHANNEL_MAP, &s->channel_map,
1003 PA_TAG_U32, PA_INVALID_INDEX,
1004 PA_TAG_STRING, dev,
1005 PA_TAG_U32, s->buffer_attr.maxlength,
1006 PA_TAG_BOOLEAN, s->corked,
1007 PA_TAG_INVALID);
1008
1009 if (s->direction == PA_STREAM_PLAYBACK) {
1010 pa_cvolume cv;
1011
1012 pa_tagstruct_put(
1013 t,
1014 PA_TAG_U32, s->buffer_attr.tlength,
1015 PA_TAG_U32, s->buffer_attr.prebuf,
1016 PA_TAG_U32, s->buffer_attr.minreq,
1017 PA_TAG_U32, s->syncid,
1018 PA_TAG_INVALID);
1019
1020 volume_set = !!volume;
1021
1022 if (!volume)
1023 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1024
1025 pa_tagstruct_put_cvolume(t, volume);
1026 } else
1027 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1028
1029 if (s->context->version >= 12) {
1030 pa_tagstruct_put(
1031 t,
1032 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1033 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1034 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1035 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1036 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1037 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1038 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1039 PA_TAG_INVALID);
1040 }
1041
1042 if (s->context->version >= 13) {
1043
1044 if (s->direction == PA_STREAM_PLAYBACK)
1045 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1046 else
1047 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1048
1049 pa_tagstruct_put(
1050 t,
1051 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1052 PA_TAG_PROPLIST, s->proplist,
1053 PA_TAG_INVALID);
1054
1055 if (s->direction == PA_STREAM_RECORD)
1056 pa_tagstruct_putu32(t, s->direct_on_input);
1057 }
1058
1059 if (s->context->version >= 14) {
1060
1061 if (s->direction == PA_STREAM_PLAYBACK)
1062 pa_tagstruct_put_boolean(t, volume_set);
1063
1064 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1065 }
1066
1067 if (s->context->version >= 15) {
1068
1069 if (s->direction == PA_STREAM_PLAYBACK)
1070 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1071
1072 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1073 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1074 }
1075
1076 pa_pstream_send_tagstruct(s->context->pstream, t);
1077 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1078
1079 pa_stream_set_state(s, PA_STREAM_CREATING);
1080
1081 pa_stream_unref(s);
1082 return 0;
1083 }
1084
1085 int pa_stream_connect_playback(
1086 pa_stream *s,
1087 const char *dev,
1088 const pa_buffer_attr *attr,
1089 pa_stream_flags_t flags,
1090 pa_cvolume *volume,
1091 pa_stream *sync_stream) {
1092
1093 pa_assert(s);
1094 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1095
1096 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1097 }
1098
1099 int pa_stream_connect_record(
1100 pa_stream *s,
1101 const char *dev,
1102 const pa_buffer_attr *attr,
1103 pa_stream_flags_t flags) {
1104
1105 pa_assert(s);
1106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1107
1108 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1109 }
1110
1111 int pa_stream_write(
1112 pa_stream *s,
1113 const void *data,
1114 size_t length,
1115 void (*free_cb)(void *p),
1116 int64_t offset,
1117 pa_seek_mode_t seek) {
1118
1119 pa_memchunk chunk;
1120 pa_seek_mode_t t_seek;
1121 int64_t t_offset;
1122 size_t t_length;
1123 const void *t_data;
1124
1125 pa_assert(s);
1126 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1127 pa_assert(data);
1128
1129 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1130 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1131 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1132 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1133 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1134
1135 if (length <= 0)
1136 return 0;
1137
1138 t_seek = seek;
1139 t_offset = offset;
1140 t_length = length;
1141 t_data = data;
1142
1143 while (t_length > 0) {
1144
1145 chunk.index = 0;
1146
1147 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1148 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1149 chunk.length = t_length;
1150 } else {
1151 void *d;
1152
1153 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1154 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1155
1156 d = pa_memblock_acquire(chunk.memblock);
1157 memcpy(d, t_data, chunk.length);
1158 pa_memblock_release(chunk.memblock);
1159 }
1160
1161 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1162
1163 t_offset = 0;
1164 t_seek = PA_SEEK_RELATIVE;
1165
1166 t_data = (const uint8_t*) t_data + chunk.length;
1167 t_length -= chunk.length;
1168
1169 pa_memblock_unref(chunk.memblock);
1170 }
1171
1172 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1173 free_cb((void*) data);
1174
1175 if (length < s->requested_bytes)
1176 s->requested_bytes -= (uint32_t) length;
1177 else
1178 s->requested_bytes = 0;
1179
1180 /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1181
1182 if (s->direction == PA_STREAM_PLAYBACK) {
1183
1184 /* Update latency request correction */
1185 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1186
1187 if (seek == PA_SEEK_ABSOLUTE) {
1188 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1189 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1190 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1191 } else if (seek == PA_SEEK_RELATIVE) {
1192 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1193 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1194 } else
1195 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1196 }
1197
1198 /* Update the write index in the already available latency data */
1199 if (s->timing_info_valid) {
1200
1201 if (seek == PA_SEEK_ABSOLUTE) {
1202 s->timing_info.write_index_corrupt = FALSE;
1203 s->timing_info.write_index = offset + (int64_t) length;
1204 } else if (seek == PA_SEEK_RELATIVE) {
1205 if (!s->timing_info.write_index_corrupt)
1206 s->timing_info.write_index += offset + (int64_t) length;
1207 } else
1208 s->timing_info.write_index_corrupt = TRUE;
1209 }
1210
1211 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1212 request_auto_timing_update(s, TRUE);
1213 }
1214
1215 return 0;
1216 }
1217
1218 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1219 pa_assert(s);
1220 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1221 pa_assert(data);
1222 pa_assert(length);
1223
1224 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1225 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1226 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1227
1228 if (!s->peek_memchunk.memblock) {
1229
1230 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1231 *data = NULL;
1232 *length = 0;
1233 return 0;
1234 }
1235
1236 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1237 }
1238
1239 pa_assert(s->peek_data);
1240 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1241 *length = s->peek_memchunk.length;
1242 return 0;
1243 }
1244
1245 int pa_stream_drop(pa_stream *s) {
1246 pa_assert(s);
1247 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1248
1249 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1250 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1251 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1252 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1253
1254 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1255
1256 /* Fix the simulated local read index */
1257 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1258 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1259
1260 pa_assert(s->peek_data);
1261 pa_memblock_release(s->peek_memchunk.memblock);
1262 pa_memblock_unref(s->peek_memchunk.memblock);
1263 pa_memchunk_reset(&s->peek_memchunk);
1264
1265 return 0;
1266 }
1267
1268 size_t pa_stream_writable_size(pa_stream *s) {
1269 pa_assert(s);
1270 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1271
1272 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1273 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1274 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1275
1276 return s->requested_bytes;
1277 }
1278
1279 size_t pa_stream_readable_size(pa_stream *s) {
1280 pa_assert(s);
1281 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1282
1283 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1286
1287 return pa_memblockq_get_length(s->record_memblockq);
1288 }
1289
1290 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1291 pa_operation *o;
1292 pa_tagstruct *t;
1293 uint32_t tag;
1294
1295 pa_assert(s);
1296 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1297
1298 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1299 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1300 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1301
1302 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1303
1304 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1305 pa_tagstruct_putu32(t, s->channel);
1306 pa_pstream_send_tagstruct(s->context->pstream, t);
1307 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1308
1309 return o;
1310 }
1311
1312 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1313 pa_usec_t usec;
1314
1315 pa_assert(s);
1316 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1317 pa_assert(s->state == PA_STREAM_READY);
1318 pa_assert(s->direction != PA_STREAM_UPLOAD);
1319 pa_assert(s->timing_info_valid);
1320 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1321 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1322
1323 if (s->direction == PA_STREAM_PLAYBACK) {
1324 /* The last byte that was written into the output device
1325 * had this time value associated */
1326 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1327
1328 if (!s->corked && !s->suspended) {
1329
1330 if (!ignore_transport)
1331 /* Because the latency info took a little time to come
1332 * to us, we assume that the real output time is actually
1333 * a little ahead */
1334 usec += s->timing_info.transport_usec;
1335
1336 /* However, the output device usually maintains a buffer
1337 too, hence the real sample currently played is a little
1338 back */
1339 if (s->timing_info.sink_usec >= usec)
1340 usec = 0;
1341 else
1342 usec -= s->timing_info.sink_usec;
1343 }
1344
1345 } else {
1346 pa_assert(s->direction == PA_STREAM_RECORD);
1347
1348 /* The last byte written into the server side queue had
1349 * this time value associated */
1350 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1351
1352 if (!s->corked && !s->suspended) {
1353
1354 if (!ignore_transport)
1355 /* Add transport latency */
1356 usec += s->timing_info.transport_usec;
1357
1358 /* Add latency of data in device buffer */
1359 usec += s->timing_info.source_usec;
1360
1361 /* If this is a monitor source, we need to correct the
1362 * time by the playback device buffer */
1363 if (s->timing_info.sink_usec >= usec)
1364 usec = 0;
1365 else
1366 usec -= s->timing_info.sink_usec;
1367 }
1368 }
1369
1370 return usec;
1371 }
1372
1373 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1374 pa_operation *o = userdata;
1375 struct timeval local, remote, now;
1376 pa_timing_info *i;
1377 pa_bool_t playing = FALSE;
1378 uint64_t underrun_for = 0, playing_for = 0;
1379
1380 pa_assert(pd);
1381 pa_assert(o);
1382 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1383
1384 if (!o->context || !o->stream)
1385 goto finish;
1386
1387 i = &o->stream->timing_info;
1388
1389 o->stream->timing_info_valid = FALSE;
1390 i->write_index_corrupt = TRUE;
1391 i->read_index_corrupt = TRUE;
1392
1393 if (command != PA_COMMAND_REPLY) {
1394 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1395 goto finish;
1396
1397 } else {
1398
1399 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1400 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1401 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1402 pa_tagstruct_get_timeval(t, &local) < 0 ||
1403 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1404 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1405 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1406
1407 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1408 goto finish;
1409 }
1410
1411 if (o->context->version >= 13 &&
1412 o->stream->direction == PA_STREAM_PLAYBACK)
1413 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1414 pa_tagstruct_getu64(t, &playing_for) < 0) {
1415
1416 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1417 goto finish;
1418 }
1419
1420
1421 if (!pa_tagstruct_eof(t)) {
1422 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1423 goto finish;
1424 }
1425 o->stream->timing_info_valid = TRUE;
1426 i->write_index_corrupt = FALSE;
1427 i->read_index_corrupt = FALSE;
1428
1429 i->playing = (int) playing;
1430 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1431
1432 pa_gettimeofday(&now);
1433
1434 /* Calculcate timestamps */
1435 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1436 /* local and remote seem to have synchronized clocks */
1437
1438 if (o->stream->direction == PA_STREAM_PLAYBACK)
1439 i->transport_usec = pa_timeval_diff(&remote, &local);
1440 else
1441 i->transport_usec = pa_timeval_diff(&now, &remote);
1442
1443 i->synchronized_clocks = TRUE;
1444 i->timestamp = remote;
1445 } else {
1446 /* clocks are not synchronized, let's estimate latency then */
1447 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1448 i->synchronized_clocks = FALSE;
1449 i->timestamp = local;
1450 pa_timeval_add(&i->timestamp, i->transport_usec);
1451 }
1452
1453 /* Invalidate read and write indexes if necessary */
1454 if (tag < o->stream->read_index_not_before)
1455 i->read_index_corrupt = TRUE;
1456
1457 if (tag < o->stream->write_index_not_before)
1458 i->write_index_corrupt = TRUE;
1459
1460 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1461 /* Write index correction */
1462
1463 int n, j;
1464 uint32_t ctag = tag;
1465
1466 /* Go through the saved correction values and add up the
1467 * total correction.*/
1468 for (n = 0, j = o->stream->current_write_index_correction+1;
1469 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1470 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1471
1472 /* Step over invalid data or out-of-date data */
1473 if (!o->stream->write_index_corrections[j].valid ||
1474 o->stream->write_index_corrections[j].tag < ctag)
1475 continue;
1476
1477 /* Make sure that everything is in order */
1478 ctag = o->stream->write_index_corrections[j].tag+1;
1479
1480 /* Now fix the write index */
1481 if (o->stream->write_index_corrections[j].corrupt) {
1482 /* A corrupting seek was made */
1483 i->write_index_corrupt = TRUE;
1484 } else if (o->stream->write_index_corrections[j].absolute) {
1485 /* An absolute seek was made */
1486 i->write_index = o->stream->write_index_corrections[j].value;
1487 i->write_index_corrupt = FALSE;
1488 } else if (!i->write_index_corrupt) {
1489 /* A relative seek was made */
1490 i->write_index += o->stream->write_index_corrections[j].value;
1491 }
1492 }
1493
1494 /* Clear old correction entries */
1495 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1496 if (!o->stream->write_index_corrections[n].valid)
1497 continue;
1498
1499 if (o->stream->write_index_corrections[n].tag <= tag)
1500 o->stream->write_index_corrections[n].valid = FALSE;
1501 }
1502 }
1503
1504 if (o->stream->direction == PA_STREAM_RECORD) {
1505 /* Read index correction */
1506
1507 if (!i->read_index_corrupt)
1508 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1509 }
1510
1511 /* Update smoother */
1512 if (o->stream->smoother) {
1513 pa_usec_t u, x;
1514
1515 u = x = pa_rtclock_usec() - i->transport_usec;
1516
1517 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1518 pa_usec_t su;
1519
1520 /* If we weren't playing then it will take some time
1521 * until the audio will actually come out through the
1522 * speakers. Since we follow that timing here, we need
1523 * to try to fix this up */
1524
1525 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1526
1527 if (su < i->sink_usec)
1528 x += i->sink_usec - su;
1529 }
1530
1531 if (!i->playing)
1532 pa_smoother_pause(o->stream->smoother, x);
1533
1534 /* Update the smoother */
1535 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1536 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1537 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1538
1539 if (i->playing)
1540 pa_smoother_resume(o->stream->smoother, x);
1541 }
1542 }
1543
1544 o->stream->auto_timing_update_requested = FALSE;
1545
1546 if (o->stream->latency_update_callback)
1547 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1548
1549 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1550 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1551 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1552 }
1553
1554 finish:
1555
1556 pa_operation_done(o);
1557 pa_operation_unref(o);
1558 }
1559
1560 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1561 uint32_t tag;
1562 pa_operation *o;
1563 pa_tagstruct *t;
1564 struct timeval now;
1565 int cidx = 0;
1566
1567 pa_assert(s);
1568 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1569
1570 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1572 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1573
1574 if (s->direction == PA_STREAM_PLAYBACK) {
1575 /* Find a place to store the write_index correction data for this entry */
1576 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1577
1578 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1579 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1580 }
1581 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1582
1583 t = pa_tagstruct_command(
1584 s->context,
1585 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1586 &tag);
1587 pa_tagstruct_putu32(t, s->channel);
1588 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1589
1590 pa_pstream_send_tagstruct(s->context->pstream, t);
1591 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1592
1593 if (s->direction == PA_STREAM_PLAYBACK) {
1594 /* Fill in initial correction data */
1595
1596 s->current_write_index_correction = cidx;
1597
1598 s->write_index_corrections[cidx].valid = TRUE;
1599 s->write_index_corrections[cidx].absolute = FALSE;
1600 s->write_index_corrections[cidx].corrupt = FALSE;
1601 s->write_index_corrections[cidx].tag = tag;
1602 s->write_index_corrections[cidx].value = 0;
1603 }
1604
1605 return o;
1606 }
1607
1608 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1609 pa_stream *s = userdata;
1610
1611 pa_assert(pd);
1612 pa_assert(s);
1613 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1614
1615 pa_stream_ref(s);
1616
1617 if (command != PA_COMMAND_REPLY) {
1618 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1619 goto finish;
1620
1621 pa_stream_set_state(s, PA_STREAM_FAILED);
1622 goto finish;
1623 } else if (!pa_tagstruct_eof(t)) {
1624 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1625 goto finish;
1626 }
1627
1628 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1629
1630 finish:
1631 pa_stream_unref(s);
1632 }
1633
1634 int pa_stream_disconnect(pa_stream *s) {
1635 pa_tagstruct *t;
1636 uint32_t tag;
1637
1638 pa_assert(s);
1639 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1640
1641 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1642 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1643 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1644
1645 pa_stream_ref(s);
1646
1647 t = pa_tagstruct_command(
1648 s->context,
1649 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1650 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1651 &tag);
1652 pa_tagstruct_putu32(t, s->channel);
1653 pa_pstream_send_tagstruct(s->context->pstream, t);
1654 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1655
1656 pa_stream_unref(s);
1657 return 0;
1658 }
1659
1660 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1661 pa_assert(s);
1662 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1663
1664 if (pa_detect_fork())
1665 return;
1666
1667 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1668 return;
1669
1670 s->read_callback = cb;
1671 s->read_userdata = userdata;
1672 }
1673
1674 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1675 pa_assert(s);
1676 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1677
1678 if (pa_detect_fork())
1679 return;
1680
1681 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1682 return;
1683
1684 s->write_callback = cb;
1685 s->write_userdata = userdata;
1686 }
1687
1688 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1689 pa_assert(s);
1690 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1691
1692 if (pa_detect_fork())
1693 return;
1694
1695 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1696 return;
1697
1698 s->state_callback = cb;
1699 s->state_userdata = userdata;
1700 }
1701
1702 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1703 pa_assert(s);
1704 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1705
1706 if (pa_detect_fork())
1707 return;
1708
1709 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1710 return;
1711
1712 s->overflow_callback = cb;
1713 s->overflow_userdata = userdata;
1714 }
1715
1716 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1717 pa_assert(s);
1718 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1719
1720 if (pa_detect_fork())
1721 return;
1722
1723 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1724 return;
1725
1726 s->underflow_callback = cb;
1727 s->underflow_userdata = userdata;
1728 }
1729
1730 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1731 pa_assert(s);
1732 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1733
1734 if (pa_detect_fork())
1735 return;
1736
1737 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1738 return;
1739
1740 s->latency_update_callback = cb;
1741 s->latency_update_userdata = userdata;
1742 }
1743
1744 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1745 pa_assert(s);
1746 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1747
1748 if (pa_detect_fork())
1749 return;
1750
1751 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1752 return;
1753
1754 s->moved_callback = cb;
1755 s->moved_userdata = userdata;
1756 }
1757
1758 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1759 pa_assert(s);
1760 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1761
1762 if (pa_detect_fork())
1763 return;
1764
1765 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1766 return;
1767
1768 s->suspended_callback = cb;
1769 s->suspended_userdata = userdata;
1770 }
1771
1772 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1773 pa_assert(s);
1774 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1775
1776 if (pa_detect_fork())
1777 return;
1778
1779 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1780 return;
1781
1782 s->started_callback = cb;
1783 s->started_userdata = userdata;
1784 }
1785
1786 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1787 pa_assert(s);
1788 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1789
1790 if (pa_detect_fork())
1791 return;
1792
1793 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1794 return;
1795
1796 s->event_callback = cb;
1797 s->event_userdata = userdata;
1798 }
1799
1800 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1801 pa_operation *o = userdata;
1802 int success = 1;
1803
1804 pa_assert(pd);
1805 pa_assert(o);
1806 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1807
1808 if (!o->context)
1809 goto finish;
1810
1811 if (command != PA_COMMAND_REPLY) {
1812 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1813 goto finish;
1814
1815 success = 0;
1816 } else if (!pa_tagstruct_eof(t)) {
1817 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1818 goto finish;
1819 }
1820
1821 if (o->callback) {
1822 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1823 cb(o->stream, success, o->userdata);
1824 }
1825
1826 finish:
1827 pa_operation_done(o);
1828 pa_operation_unref(o);
1829 }
1830
1831 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1832 pa_operation *o;
1833 pa_tagstruct *t;
1834 uint32_t tag;
1835
1836 pa_assert(s);
1837 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1838
1839 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1840 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1841 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1842
1843 s->corked = b;
1844
1845 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1846
1847 t = pa_tagstruct_command(
1848 s->context,
1849 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1850 &tag);
1851 pa_tagstruct_putu32(t, s->channel);
1852 pa_tagstruct_put_boolean(t, !!b);
1853 pa_pstream_send_tagstruct(s->context->pstream, t);
1854 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1855
1856 check_smoother_status(s, FALSE, FALSE, FALSE);
1857
1858 /* This might cause the indexes to hang/start again, hence
1859 * let's request a timing update */
1860 request_auto_timing_update(s, TRUE);
1861
1862 return o;
1863 }
1864
1865 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1866 pa_tagstruct *t;
1867 pa_operation *o;
1868 uint32_t tag;
1869
1870 pa_assert(s);
1871 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1872
1873 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1874 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1875
1876 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1877
1878 t = pa_tagstruct_command(s->context, command, &tag);
1879 pa_tagstruct_putu32(t, s->channel);
1880 pa_pstream_send_tagstruct(s->context->pstream, t);
1881 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1882
1883 return o;
1884 }
1885
1886 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1887 pa_operation *o;
1888
1889 pa_assert(s);
1890 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1891
1892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1894 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1895
1896 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1897 return NULL;
1898
1899 if (s->direction == PA_STREAM_PLAYBACK) {
1900
1901 if (s->write_index_corrections[s->current_write_index_correction].valid)
1902 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1903
1904 if (s->buffer_attr.prebuf > 0)
1905 check_smoother_status(s, FALSE, FALSE, TRUE);
1906
1907 /* This will change the write index, but leave the
1908 * read index untouched. */
1909 invalidate_indexes(s, FALSE, TRUE);
1910
1911 } else
1912 /* For record streams this has no influence on the write
1913 * index, but the read index might jump. */
1914 invalidate_indexes(s, TRUE, FALSE);
1915
1916 return o;
1917 }
1918
1919 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1920 pa_operation *o;
1921
1922 pa_assert(s);
1923 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1924
1925 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1926 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1927 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1928 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1929
1930 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1931 return NULL;
1932
1933 /* This might cause the read index to hang again, hence
1934 * let's request a timing update */
1935 request_auto_timing_update(s, TRUE);
1936
1937 return o;
1938 }
1939
1940 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1941 pa_operation *o;
1942
1943 pa_assert(s);
1944 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1945
1946 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1947 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1948 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1949 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1950
1951 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1952 return NULL;
1953
1954 /* This might cause the read index to start moving again, hence
1955 * let's request a timing update */
1956 request_auto_timing_update(s, TRUE);
1957
1958 return o;
1959 }
1960
1961 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1962 pa_operation *o;
1963
1964 pa_assert(s);
1965 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1966 pa_assert(name);
1967
1968 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1969 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1970 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1971
1972 if (s->context->version >= 13) {
1973 pa_proplist *p = pa_proplist_new();
1974
1975 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1976 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1977 pa_proplist_free(p);
1978 } else {
1979 pa_tagstruct *t;
1980 uint32_t tag;
1981
1982 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1983 t = pa_tagstruct_command(
1984 s->context,
1985 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1986 &tag);
1987 pa_tagstruct_putu32(t, s->channel);
1988 pa_tagstruct_puts(t, name);
1989 pa_pstream_send_tagstruct(s->context->pstream, t);
1990 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1991 }
1992
1993 return o;
1994 }
1995
1996 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1997 pa_usec_t usec;
1998
1999 pa_assert(s);
2000 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2001
2002 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2003 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2004 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2005 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2006 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2007 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2008
2009 if (s->smoother)
2010 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2011 else
2012 usec = calc_time(s, FALSE);
2013
2014 /* Make sure the time runs monotonically */
2015 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2016 if (usec < s->previous_time)
2017 usec = s->previous_time;
2018 else
2019 s->previous_time = usec;
2020 }
2021
2022 if (r_usec)
2023 *r_usec = usec;
2024
2025 return 0;
2026 }
2027
2028 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2029 pa_assert(s);
2030 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2031
2032 if (negative)
2033 *negative = 0;
2034
2035 if (a >= b)
2036 return a-b;
2037 else {
2038 if (negative && s->direction == PA_STREAM_RECORD) {
2039 *negative = 1;
2040 return b-a;
2041 } else
2042 return 0;
2043 }
2044 }
2045
2046 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2047 pa_usec_t t, c;
2048 int r;
2049 int64_t cindex;
2050
2051 pa_assert(s);
2052 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2053 pa_assert(r_usec);
2054
2055 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2056 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2057 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2058 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2059 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2060 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2061
2062 if ((r = pa_stream_get_time(s, &t)) < 0)
2063 return r;
2064
2065 if (s->direction == PA_STREAM_PLAYBACK)
2066 cindex = s->timing_info.write_index;
2067 else
2068 cindex = s->timing_info.read_index;
2069
2070 if (cindex < 0)
2071 cindex = 0;
2072
2073 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2074
2075 if (s->direction == PA_STREAM_PLAYBACK)
2076 *r_usec = time_counter_diff(s, c, t, negative);
2077 else
2078 *r_usec = time_counter_diff(s, t, c, negative);
2079
2080 return 0;
2081 }
2082
2083 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2084 pa_assert(s);
2085 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086
2087 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2088 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2089 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2090 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2091
2092 return &s->timing_info;
2093 }
2094
2095 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2096 pa_assert(s);
2097 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2098
2099 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2100
2101 return &s->sample_spec;
2102 }
2103
2104 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2105 pa_assert(s);
2106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107
2108 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2109
2110 return &s->channel_map;
2111 }
2112
2113 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2114 pa_assert(s);
2115 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2116
2117 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2118 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2119 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2120
2121 return &s->buffer_attr;
2122 }
2123
2124 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2125 pa_operation *o = userdata;
2126 int success = 1;
2127
2128 pa_assert(pd);
2129 pa_assert(o);
2130 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2131
2132 if (!o->context)
2133 goto finish;
2134
2135 if (command != PA_COMMAND_REPLY) {
2136 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2137 goto finish;
2138
2139 success = 0;
2140 } else {
2141 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2142 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2143 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2144 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2145 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2146 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2147 goto finish;
2148 }
2149 } else if (o->stream->direction == PA_STREAM_RECORD) {
2150 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2151 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2152 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2153 goto finish;
2154 }
2155 }
2156
2157 if (o->stream->context->version >= 13) {
2158 pa_usec_t usec;
2159
2160 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2161 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2162 goto finish;
2163 }
2164
2165 if (o->stream->direction == PA_STREAM_RECORD)
2166 o->stream->timing_info.configured_source_usec = usec;
2167 else
2168 o->stream->timing_info.configured_sink_usec = usec;
2169 }
2170
2171 if (!pa_tagstruct_eof(t)) {
2172 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2173 goto finish;
2174 }
2175 }
2176
2177 if (o->callback) {
2178 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2179 cb(o->stream, success, o->userdata);
2180 }
2181
2182 finish:
2183 pa_operation_done(o);
2184 pa_operation_unref(o);
2185 }
2186
2187
2188 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2189 pa_operation *o;
2190 pa_tagstruct *t;
2191 uint32_t tag;
2192
2193 pa_assert(s);
2194 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2195 pa_assert(attr);
2196
2197 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2201
2202 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2203
2204 t = pa_tagstruct_command(
2205 s->context,
2206 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2207 &tag);
2208 pa_tagstruct_putu32(t, s->channel);
2209
2210 pa_tagstruct_putu32(t, attr->maxlength);
2211
2212 if (s->direction == PA_STREAM_PLAYBACK)
2213 pa_tagstruct_put(
2214 t,
2215 PA_TAG_U32, attr->tlength,
2216 PA_TAG_U32, attr->prebuf,
2217 PA_TAG_U32, attr->minreq,
2218 PA_TAG_INVALID);
2219 else
2220 pa_tagstruct_putu32(t, attr->fragsize);
2221
2222 if (s->context->version >= 13)
2223 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2224
2225 if (s->context->version >= 14)
2226 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2227
2228 pa_pstream_send_tagstruct(s->context->pstream, t);
2229 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2230
2231 /* This might cause changes in the read/write indexex, hence let's
2232 * request a timing update */
2233 request_auto_timing_update(s, TRUE);
2234
2235 return o;
2236 }
2237
2238 uint32_t pa_stream_get_device_index(pa_stream *s) {
2239 pa_assert(s);
2240 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2241
2242 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2243 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2244 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2245 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2246 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2247
2248 return s->device_index;
2249 }
2250
2251 const char *pa_stream_get_device_name(pa_stream *s) {
2252 pa_assert(s);
2253 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2254
2255 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2256 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2257 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2258 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2259 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2260
2261 return s->device_name;
2262 }
2263
2264 int pa_stream_is_suspended(pa_stream *s) {
2265 pa_assert(s);
2266 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2267
2268 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2269 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2270 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2271 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2272
2273 return s->suspended;
2274 }
2275
2276 int pa_stream_is_corked(pa_stream *s) {
2277 pa_assert(s);
2278 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2279
2280 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2281 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2282 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2283
2284 return s->corked;
2285 }
2286
2287 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2288 pa_operation *o = userdata;
2289 int success = 1;
2290
2291 pa_assert(pd);
2292 pa_assert(o);
2293 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2294
2295 if (!o->context)
2296 goto finish;
2297
2298 if (command != PA_COMMAND_REPLY) {
2299 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2300 goto finish;
2301
2302 success = 0;
2303 } else {
2304
2305 if (!pa_tagstruct_eof(t)) {
2306 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2307 goto finish;
2308 }
2309 }
2310
2311 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2312 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2313
2314 if (o->callback) {
2315 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2316 cb(o->stream, success, o->userdata);
2317 }
2318
2319 finish:
2320 pa_operation_done(o);
2321 pa_operation_unref(o);
2322 }
2323
2324
2325 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2326 pa_operation *o;
2327 pa_tagstruct *t;
2328 uint32_t tag;
2329
2330 pa_assert(s);
2331 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2332
2333 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2335 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2336 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2337 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2339
2340 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2341 o->private = PA_UINT_TO_PTR(rate);
2342
2343 t = pa_tagstruct_command(
2344 s->context,
2345 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2346 &tag);
2347 pa_tagstruct_putu32(t, s->channel);
2348 pa_tagstruct_putu32(t, rate);
2349
2350 pa_pstream_send_tagstruct(s->context->pstream, t);
2351 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2352
2353 return o;
2354 }
2355
2356 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2357 pa_operation *o;
2358 pa_tagstruct *t;
2359 uint32_t tag;
2360
2361 pa_assert(s);
2362 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2363
2364 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2365 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2366 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2367 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2368 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2369
2370 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2371
2372 t = pa_tagstruct_command(
2373 s->context,
2374 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2375 &tag);
2376 pa_tagstruct_putu32(t, s->channel);
2377 pa_tagstruct_putu32(t, (uint32_t) mode);
2378 pa_tagstruct_put_proplist(t, p);
2379
2380 pa_pstream_send_tagstruct(s->context->pstream, t);
2381 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2382
2383 /* Please note that we don't update s->proplist here, because we
2384 * don't export that field */
2385
2386 return o;
2387 }
2388
2389 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2390 pa_operation *o;
2391 pa_tagstruct *t;
2392 uint32_t tag;
2393 const char * const*k;
2394
2395 pa_assert(s);
2396 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2397
2398 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2399 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2400 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2403
2404 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2405
2406 t = pa_tagstruct_command(
2407 s->context,
2408 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2409 &tag);
2410 pa_tagstruct_putu32(t, s->channel);
2411
2412 for (k = keys; *k; k++)
2413 pa_tagstruct_puts(t, *k);
2414
2415 pa_tagstruct_puts(t, NULL);
2416
2417 pa_pstream_send_tagstruct(s->context->pstream, t);
2418 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2419
2420 /* Please note that we don't update s->proplist here, because we
2421 * don't export that field */
2422
2423 return o;
2424 }
2425
2426 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2427 pa_assert(s);
2428 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2429
2430 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2431 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2432 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2433 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2434
2435 s->direct_on_input = sink_input_idx;
2436
2437 return 0;
2438 }
2439
2440 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2441 pa_assert(s);
2442 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2443
2444 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2445 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2446 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2447
2448 return s->direct_on_input;
2449 }