]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
minimal reordering
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/pstream-util.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/hashmap.h>
38 #include <pulsecore/macro.h>
39 #include <pulsecore/rtclock.h>
40
41 #include "fork-detect.h"
42 #include "internal.h"
43
44 #define LATENCY_IPOL_INTERVAL_USEC (333*PA_USEC_PER_MSEC)
45
46 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
47 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
48 #define SMOOTHER_MIN_HISTORY (4)
49
50 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
51 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
52 }
53
54 static void reset_callbacks(pa_stream *s) {
55 s->read_callback = NULL;
56 s->read_userdata = NULL;
57 s->write_callback = NULL;
58 s->write_userdata = NULL;
59 s->state_callback = NULL;
60 s->state_userdata = NULL;
61 s->overflow_callback = NULL;
62 s->overflow_userdata = NULL;
63 s->underflow_callback = NULL;
64 s->underflow_userdata = NULL;
65 s->latency_update_callback = NULL;
66 s->latency_update_userdata = NULL;
67 s->moved_callback = NULL;
68 s->moved_userdata = NULL;
69 s->suspended_callback = NULL;
70 s->suspended_userdata = NULL;
71 s->started_callback = NULL;
72 s->started_userdata = NULL;
73 s->event_callback = NULL;
74 s->event_userdata = NULL;
75 }
76
77 pa_stream *pa_stream_new_with_proplist(
78 pa_context *c,
79 const char *name,
80 const pa_sample_spec *ss,
81 const pa_channel_map *map,
82 pa_proplist *p) {
83
84 pa_stream *s;
85 int i;
86 pa_channel_map tmap;
87
88 pa_assert(c);
89 pa_assert(PA_REFCNT_VALUE(c) >= 1);
90
91 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
92 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
93 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
94 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
95 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
96 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
98
99 if (!map)
100 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
101
102 s = pa_xnew(pa_stream, 1);
103 PA_REFCNT_INIT(s);
104 s->context = c;
105 s->mainloop = c->mainloop;
106
107 s->direction = PA_STREAM_NODIRECTION;
108 s->state = PA_STREAM_UNCONNECTED;
109 s->flags = 0;
110
111 s->sample_spec = *ss;
112 s->channel_map = *map;
113
114 s->direct_on_input = PA_INVALID_INDEX;
115
116 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
117 if (name)
118 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
119
120 s->channel = 0;
121 s->channel_valid = FALSE;
122 s->syncid = c->csyncid++;
123 s->stream_index = PA_INVALID_INDEX;
124
125 s->requested_bytes = 0;
126 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
127
128 /* We initialize der target length here, so that if the user
129 * passes no explicit buffering metrics the default is similar to
130 * what older PA versions provided. */
131
132 s->buffer_attr.maxlength = (uint32_t) -1;
133 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
134 s->buffer_attr.minreq = (uint32_t) -1;
135 s->buffer_attr.prebuf = (uint32_t) -1;
136 s->buffer_attr.fragsize = (uint32_t) -1;
137
138 s->device_index = PA_INVALID_INDEX;
139 s->device_name = NULL;
140 s->suspended = FALSE;
141 s->corked = FALSE;
142
143 pa_memchunk_reset(&s->peek_memchunk);
144 s->peek_data = NULL;
145
146 s->record_memblockq = NULL;
147
148
149 memset(&s->timing_info, 0, sizeof(s->timing_info));
150 s->timing_info_valid = FALSE;
151
152 s->previous_time = 0;
153
154 s->read_index_not_before = 0;
155 s->write_index_not_before = 0;
156 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
157 s->write_index_corrections[i].valid = 0;
158 s->current_write_index_correction = 0;
159
160 s->auto_timing_update_event = NULL;
161 s->auto_timing_update_requested = FALSE;
162
163 reset_callbacks(s);
164
165 s->smoother = NULL;
166
167 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
168 PA_LLIST_PREPEND(pa_stream, c->streams, s);
169 pa_stream_ref(s);
170
171 return s;
172 }
173
174 static void stream_unlink(pa_stream *s) {
175 pa_operation *o, *n;
176 pa_assert(s);
177
178 if (!s->context)
179 return;
180
181 /* Detach from context */
182
183 /* Unref all operatio object that point to us */
184 for (o = s->context->operations; o; o = n) {
185 n = o->next;
186
187 if (o->stream == s)
188 pa_operation_cancel(o);
189 }
190
191 /* Drop all outstanding replies for this stream */
192 if (s->context->pdispatch)
193 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
194
195 if (s->channel_valid) {
196 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
197 s->channel = 0;
198 s->channel_valid = FALSE;
199 }
200
201 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
202 pa_stream_unref(s);
203
204 s->context = NULL;
205
206 if (s->auto_timing_update_event) {
207 pa_assert(s->mainloop);
208 s->mainloop->time_free(s->auto_timing_update_event);
209 }
210
211 reset_callbacks(s);
212 }
213
214 static void stream_free(pa_stream *s) {
215 pa_assert(s);
216
217 stream_unlink(s);
218
219 if (s->peek_memchunk.memblock) {
220 if (s->peek_data)
221 pa_memblock_release(s->peek_memchunk.memblock);
222 pa_memblock_unref(s->peek_memchunk.memblock);
223 }
224
225 if (s->record_memblockq)
226 pa_memblockq_free(s->record_memblockq);
227
228 if (s->proplist)
229 pa_proplist_free(s->proplist);
230
231 if (s->smoother)
232 pa_smoother_free(s->smoother);
233
234 pa_xfree(s->device_name);
235 pa_xfree(s);
236 }
237
238 void pa_stream_unref(pa_stream *s) {
239 pa_assert(s);
240 pa_assert(PA_REFCNT_VALUE(s) >= 1);
241
242 if (PA_REFCNT_DEC(s) <= 0)
243 stream_free(s);
244 }
245
246 pa_stream* pa_stream_ref(pa_stream *s) {
247 pa_assert(s);
248 pa_assert(PA_REFCNT_VALUE(s) >= 1);
249
250 PA_REFCNT_INC(s);
251 return s;
252 }
253
254 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
255 pa_assert(s);
256 pa_assert(PA_REFCNT_VALUE(s) >= 1);
257
258 return s->state;
259 }
260
261 pa_context* pa_stream_get_context(pa_stream *s) {
262 pa_assert(s);
263 pa_assert(PA_REFCNT_VALUE(s) >= 1);
264
265 return s->context;
266 }
267
268 uint32_t pa_stream_get_index(pa_stream *s) {
269 pa_assert(s);
270 pa_assert(PA_REFCNT_VALUE(s) >= 1);
271
272 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
273 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
274
275 return s->stream_index;
276 }
277
278 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
279 pa_assert(s);
280 pa_assert(PA_REFCNT_VALUE(s) >= 1);
281
282 if (s->state == st)
283 return;
284
285 pa_stream_ref(s);
286
287 s->state = st;
288
289 if (s->state_callback)
290 s->state_callback(s, s->state_userdata);
291
292 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
293 stream_unlink(s);
294
295 pa_stream_unref(s);
296 }
297
298 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
299 pa_assert(s);
300 pa_assert(PA_REFCNT_VALUE(s) >= 1);
301
302 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
303 return;
304
305 if (s->state == PA_STREAM_READY &&
306 (force || !s->auto_timing_update_requested)) {
307 pa_operation *o;
308
309 /* pa_log("automatically requesting new timing data"); */
310
311 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
312 pa_operation_unref(o);
313 s->auto_timing_update_requested = TRUE;
314 }
315 }
316
317 if (s->auto_timing_update_event) {
318 struct timeval next;
319 pa_gettimeofday(&next);
320 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
321 s->mainloop->time_restart(s->auto_timing_update_event, &next);
322 }
323 }
324
325 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
326 pa_context *c = userdata;
327 pa_stream *s;
328 uint32_t channel;
329
330 pa_assert(pd);
331 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
332 pa_assert(t);
333 pa_assert(c);
334 pa_assert(PA_REFCNT_VALUE(c) >= 1);
335
336 pa_context_ref(c);
337
338 if (pa_tagstruct_getu32(t, &channel) < 0 ||
339 !pa_tagstruct_eof(t)) {
340 pa_context_fail(c, PA_ERR_PROTOCOL);
341 goto finish;
342 }
343
344 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
345 goto finish;
346
347 if (s->state != PA_STREAM_READY)
348 goto finish;
349
350 pa_context_set_error(c, PA_ERR_KILLED);
351 pa_stream_set_state(s, PA_STREAM_FAILED);
352
353 finish:
354 pa_context_unref(c);
355 }
356
357 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
358 pa_usec_t x;
359
360 pa_assert(s);
361 pa_assert(!force_start || !force_stop);
362
363 if (!s->smoother)
364 return;
365
366 x = pa_rtclock_usec();
367
368 if (s->timing_info_valid) {
369 if (aposteriori)
370 x -= s->timing_info.transport_usec;
371 else
372 x += s->timing_info.transport_usec;
373
374 if (s->direction == PA_STREAM_PLAYBACK)
375 /* it takes a while until the pause/resume is actually
376 * audible */
377 x += s->timing_info.sink_usec;
378 else
379 /* Data froma while back will be dropped */
380 x -= s->timing_info.source_usec;
381 }
382
383 if (s->suspended || s->corked || force_stop)
384 pa_smoother_pause(s->smoother, x);
385 else if (force_start || s->buffer_attr.prebuf == 0)
386 pa_smoother_resume(s->smoother, x);
387
388 /* Please note that we have no idea if playback actually started
389 * if prebuf is non-zero! */
390 }
391
392 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
393 pa_context *c = userdata;
394 pa_stream *s;
395 uint32_t channel;
396 const char *dn;
397 pa_bool_t suspended;
398 uint32_t di;
399 pa_usec_t usec;
400 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
401
402 pa_assert(pd);
403 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
404 pa_assert(t);
405 pa_assert(c);
406 pa_assert(PA_REFCNT_VALUE(c) >= 1);
407
408 pa_context_ref(c);
409
410 if (c->version < 12) {
411 pa_context_fail(c, PA_ERR_PROTOCOL);
412 goto finish;
413 }
414
415 if (pa_tagstruct_getu32(t, &channel) < 0 ||
416 pa_tagstruct_getu32(t, &di) < 0 ||
417 pa_tagstruct_gets(t, &dn) < 0 ||
418 pa_tagstruct_get_boolean(t, &suspended) < 0) {
419 pa_context_fail(c, PA_ERR_PROTOCOL);
420 goto finish;
421 }
422
423 if (c->version >= 13) {
424
425 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
426 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
427 pa_tagstruct_getu32(t, &fragsize) < 0 ||
428 pa_tagstruct_get_usec(t, &usec) < 0) {
429 pa_context_fail(c, PA_ERR_PROTOCOL);
430 goto finish;
431 }
432 } else {
433 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
434 pa_tagstruct_getu32(t, &tlength) < 0 ||
435 pa_tagstruct_getu32(t, &prebuf) < 0 ||
436 pa_tagstruct_getu32(t, &minreq) < 0 ||
437 pa_tagstruct_get_usec(t, &usec) < 0) {
438 pa_context_fail(c, PA_ERR_PROTOCOL);
439 goto finish;
440 }
441 }
442 }
443
444 if (!pa_tagstruct_eof(t)) {
445 pa_context_fail(c, PA_ERR_PROTOCOL);
446 goto finish;
447 }
448
449 if (!dn || di == PA_INVALID_INDEX) {
450 pa_context_fail(c, PA_ERR_PROTOCOL);
451 goto finish;
452 }
453
454 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, channel)))
455 goto finish;
456
457 if (s->state != PA_STREAM_READY)
458 goto finish;
459
460 if (c->version >= 13) {
461 if (s->direction == PA_STREAM_RECORD)
462 s->timing_info.configured_source_usec = usec;
463 else
464 s->timing_info.configured_sink_usec = usec;
465
466 s->buffer_attr.maxlength = maxlength;
467 s->buffer_attr.fragsize = fragsize;
468 s->buffer_attr.tlength = tlength;
469 s->buffer_attr.prebuf = prebuf;
470 s->buffer_attr.minreq = minreq;
471 }
472
473 pa_xfree(s->device_name);
474 s->device_name = pa_xstrdup(dn);
475 s->device_index = di;
476
477 s->suspended = suspended;
478
479 check_smoother_status(s, TRUE, FALSE, FALSE);
480 request_auto_timing_update(s, TRUE);
481
482 if (s->moved_callback)
483 s->moved_callback(s, s->moved_userdata);
484
485 finish:
486 pa_context_unref(c);
487 }
488
489 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
490 pa_context *c = userdata;
491 pa_stream *s;
492 uint32_t channel;
493 pa_bool_t suspended;
494
495 pa_assert(pd);
496 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
497 pa_assert(t);
498 pa_assert(c);
499 pa_assert(PA_REFCNT_VALUE(c) >= 1);
500
501 pa_context_ref(c);
502
503 if (c->version < 12) {
504 pa_context_fail(c, PA_ERR_PROTOCOL);
505 goto finish;
506 }
507
508 if (pa_tagstruct_getu32(t, &channel) < 0 ||
509 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
510 !pa_tagstruct_eof(t)) {
511 pa_context_fail(c, PA_ERR_PROTOCOL);
512 goto finish;
513 }
514
515 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, channel)))
516 goto finish;
517
518 if (s->state != PA_STREAM_READY)
519 goto finish;
520
521 s->suspended = suspended;
522
523 check_smoother_status(s, TRUE, FALSE, FALSE);
524 request_auto_timing_update(s, TRUE);
525
526 if (s->suspended_callback)
527 s->suspended_callback(s, s->suspended_userdata);
528
529 finish:
530 pa_context_unref(c);
531 }
532
533 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
534 pa_context *c = userdata;
535 pa_stream *s;
536 uint32_t channel;
537
538 pa_assert(pd);
539 pa_assert(command == PA_COMMAND_STARTED);
540 pa_assert(t);
541 pa_assert(c);
542 pa_assert(PA_REFCNT_VALUE(c) >= 1);
543
544 pa_context_ref(c);
545
546 if (c->version < 13) {
547 pa_context_fail(c, PA_ERR_PROTOCOL);
548 goto finish;
549 }
550
551 if (pa_tagstruct_getu32(t, &channel) < 0 ||
552 !pa_tagstruct_eof(t)) {
553 pa_context_fail(c, PA_ERR_PROTOCOL);
554 goto finish;
555 }
556
557 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
558 goto finish;
559
560 if (s->state != PA_STREAM_READY)
561 goto finish;
562
563 check_smoother_status(s, TRUE, TRUE, FALSE);
564 request_auto_timing_update(s, TRUE);
565
566 if (s->started_callback)
567 s->started_callback(s, s->started_userdata);
568
569 finish:
570 pa_context_unref(c);
571 }
572
573 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
574 pa_context *c = userdata;
575 pa_stream *s;
576 uint32_t channel;
577 pa_proplist *pl = NULL;
578 const char *event;
579
580 pa_assert(pd);
581 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
582 pa_assert(t);
583 pa_assert(c);
584 pa_assert(PA_REFCNT_VALUE(c) >= 1);
585
586 pa_context_ref(c);
587
588 if (c->version < 15) {
589 pa_context_fail(c, PA_ERR_PROTOCOL);
590 goto finish;
591 }
592
593 pl = pa_proplist_new();
594
595 if (pa_tagstruct_getu32(t, &channel) < 0 ||
596 pa_tagstruct_gets(t, &event) < 0 ||
597 pa_tagstruct_get_proplist(t, pl) < 0 ||
598 !pa_tagstruct_eof(t) || !event) {
599 pa_context_fail(c, PA_ERR_PROTOCOL);
600 goto finish;
601 }
602
603 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, channel)))
604 goto finish;
605
606 if (s->state != PA_STREAM_READY)
607 goto finish;
608
609 if (s->event_callback)
610 s->event_callback(s, event, pl, s->event_userdata);
611
612 finish:
613 pa_context_unref(c);
614
615 if (pl)
616 pa_proplist_free(pl);
617 }
618
619 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
620 pa_stream *s;
621 pa_context *c = userdata;
622 uint32_t bytes, channel;
623
624 pa_assert(pd);
625 pa_assert(command == PA_COMMAND_REQUEST);
626 pa_assert(t);
627 pa_assert(c);
628 pa_assert(PA_REFCNT_VALUE(c) >= 1);
629
630 pa_context_ref(c);
631
632 if (pa_tagstruct_getu32(t, &channel) < 0 ||
633 pa_tagstruct_getu32(t, &bytes) < 0 ||
634 !pa_tagstruct_eof(t)) {
635 pa_context_fail(c, PA_ERR_PROTOCOL);
636 goto finish;
637 }
638
639 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
640 goto finish;
641
642 if (s->state != PA_STREAM_READY)
643 goto finish;
644
645 s->requested_bytes += bytes;
646
647 if (s->requested_bytes > 0 && s->write_callback)
648 s->write_callback(s, s->requested_bytes, s->write_userdata);
649
650 finish:
651 pa_context_unref(c);
652 }
653
654 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
655 pa_stream *s;
656 pa_context *c = userdata;
657 uint32_t channel;
658
659 pa_assert(pd);
660 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
661 pa_assert(t);
662 pa_assert(c);
663 pa_assert(PA_REFCNT_VALUE(c) >= 1);
664
665 pa_context_ref(c);
666
667 if (pa_tagstruct_getu32(t, &channel) < 0 ||
668 !pa_tagstruct_eof(t)) {
669 pa_context_fail(c, PA_ERR_PROTOCOL);
670 goto finish;
671 }
672
673 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
674 goto finish;
675
676 if (s->state != PA_STREAM_READY)
677 goto finish;
678
679 if (s->buffer_attr.prebuf > 0)
680 check_smoother_status(s, TRUE, FALSE, TRUE);
681
682 request_auto_timing_update(s, TRUE);
683
684 if (command == PA_COMMAND_OVERFLOW) {
685 if (s->overflow_callback)
686 s->overflow_callback(s, s->overflow_userdata);
687 } else if (command == PA_COMMAND_UNDERFLOW) {
688 if (s->underflow_callback)
689 s->underflow_callback(s, s->underflow_userdata);
690 }
691
692 finish:
693 pa_context_unref(c);
694 }
695
696 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
697 pa_assert(s);
698 pa_assert(PA_REFCNT_VALUE(s) >= 1);
699
700 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
701
702 if (s->state != PA_STREAM_READY)
703 return;
704
705 if (w) {
706 s->write_index_not_before = s->context->ctag;
707
708 if (s->timing_info_valid)
709 s->timing_info.write_index_corrupt = TRUE;
710
711 /* pa_log("write_index invalidated"); */
712 }
713
714 if (r) {
715 s->read_index_not_before = s->context->ctag;
716
717 if (s->timing_info_valid)
718 s->timing_info.read_index_corrupt = TRUE;
719
720 /* pa_log("read_index invalidated"); */
721 }
722
723 request_auto_timing_update(s, TRUE);
724 }
725
726 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *tv, void *userdata) {
727 pa_stream *s = userdata;
728
729 pa_assert(s);
730 pa_assert(PA_REFCNT_VALUE(s) >= 1);
731
732 pa_stream_ref(s);
733 request_auto_timing_update(s, FALSE);
734 pa_stream_unref(s);
735 }
736
737 static void create_stream_complete(pa_stream *s) {
738 pa_assert(s);
739 pa_assert(PA_REFCNT_VALUE(s) >= 1);
740 pa_assert(s->state == PA_STREAM_CREATING);
741
742 pa_stream_set_state(s, PA_STREAM_READY);
743
744 if (s->requested_bytes > 0 && s->write_callback)
745 s->write_callback(s, s->requested_bytes, s->write_userdata);
746
747 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
748 struct timeval tv;
749 pa_gettimeofday(&tv);
750 tv.tv_usec += (suseconds_t) LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
751 pa_assert(!s->auto_timing_update_event);
752 s->auto_timing_update_event = s->mainloop->time_new(s->mainloop, &tv, &auto_timing_update_callback, s);
753
754 request_auto_timing_update(s, TRUE);
755 }
756
757 check_smoother_status(s, TRUE, FALSE, FALSE);
758 }
759
760 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
761 pa_assert(s);
762 pa_assert(attr);
763 pa_assert(ss);
764
765 if (s->context->version >= 13)
766 return;
767
768 /* Version older than 0.9.10 didn't do server side buffer_attr
769 * selection, hence we have to fake it on the client side. */
770
771 /* We choose fairly conservative values here, to not confuse
772 * old clients with extremely large playback buffers */
773
774 if (attr->maxlength == (uint32_t) -1)
775 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
776
777 if (attr->tlength == (uint32_t) -1)
778 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
779
780 if (attr->minreq == (uint32_t) -1)
781 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
782
783 if (attr->prebuf == (uint32_t) -1)
784 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
785
786 if (attr->fragsize == (uint32_t) -1)
787 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
788 }
789
790 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
791 pa_stream *s = userdata;
792
793 pa_assert(pd);
794 pa_assert(s);
795 pa_assert(PA_REFCNT_VALUE(s) >= 1);
796 pa_assert(s->state == PA_STREAM_CREATING);
797
798 pa_stream_ref(s);
799
800 if (command != PA_COMMAND_REPLY) {
801 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
802 goto finish;
803
804 pa_stream_set_state(s, PA_STREAM_FAILED);
805 goto finish;
806 }
807
808 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
809 s->channel == PA_INVALID_INDEX ||
810 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
811 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0)) {
812 pa_context_fail(s->context, PA_ERR_PROTOCOL);
813 goto finish;
814 }
815
816 if (s->context->version >= 9) {
817 if (s->direction == PA_STREAM_PLAYBACK) {
818 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
819 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
820 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
821 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
822 pa_context_fail(s->context, PA_ERR_PROTOCOL);
823 goto finish;
824 }
825 } else if (s->direction == PA_STREAM_RECORD) {
826 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
827 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
828 pa_context_fail(s->context, PA_ERR_PROTOCOL);
829 goto finish;
830 }
831 }
832 }
833
834 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
835 pa_sample_spec ss;
836 pa_channel_map cm;
837 const char *dn = NULL;
838 pa_bool_t suspended;
839
840 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
841 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
842 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
843 pa_tagstruct_gets(t, &dn) < 0 ||
844 pa_tagstruct_get_boolean(t, &suspended) < 0) {
845 pa_context_fail(s->context, PA_ERR_PROTOCOL);
846 goto finish;
847 }
848
849 if (!dn || s->device_index == PA_INVALID_INDEX ||
850 ss.channels != cm.channels ||
851 !pa_channel_map_valid(&cm) ||
852 !pa_sample_spec_valid(&ss) ||
853 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
854 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
855 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
856 pa_context_fail(s->context, PA_ERR_PROTOCOL);
857 goto finish;
858 }
859
860 pa_xfree(s->device_name);
861 s->device_name = pa_xstrdup(dn);
862 s->suspended = suspended;
863
864 s->channel_map = cm;
865 s->sample_spec = ss;
866 }
867
868 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
869 pa_usec_t usec;
870
871 if (pa_tagstruct_get_usec(t, &usec) < 0) {
872 pa_context_fail(s->context, PA_ERR_PROTOCOL);
873 goto finish;
874 }
875
876 if (s->direction == PA_STREAM_RECORD)
877 s->timing_info.configured_source_usec = usec;
878 else
879 s->timing_info.configured_sink_usec = usec;
880 }
881
882 if (!pa_tagstruct_eof(t)) {
883 pa_context_fail(s->context, PA_ERR_PROTOCOL);
884 goto finish;
885 }
886
887 if (s->direction == PA_STREAM_RECORD) {
888 pa_assert(!s->record_memblockq);
889
890 s->record_memblockq = pa_memblockq_new(
891 0,
892 s->buffer_attr.maxlength,
893 0,
894 pa_frame_size(&s->sample_spec),
895 1,
896 0,
897 0,
898 NULL);
899 }
900
901 s->channel_valid = TRUE;
902 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
903
904 create_stream_complete(s);
905
906 finish:
907 pa_stream_unref(s);
908 }
909
910 static int create_stream(
911 pa_stream_direction_t direction,
912 pa_stream *s,
913 const char *dev,
914 const pa_buffer_attr *attr,
915 pa_stream_flags_t flags,
916 const pa_cvolume *volume,
917 pa_stream *sync_stream) {
918
919 pa_tagstruct *t;
920 uint32_t tag;
921 pa_bool_t volume_set = FALSE;
922
923 pa_assert(s);
924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
925 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
926
927 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
928 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
929 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
930 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
931 PA_STREAM_INTERPOLATE_TIMING|
932 PA_STREAM_NOT_MONOTONIC|
933 PA_STREAM_AUTO_TIMING_UPDATE|
934 PA_STREAM_NO_REMAP_CHANNELS|
935 PA_STREAM_NO_REMIX_CHANNELS|
936 PA_STREAM_FIX_FORMAT|
937 PA_STREAM_FIX_RATE|
938 PA_STREAM_FIX_CHANNELS|
939 PA_STREAM_DONT_MOVE|
940 PA_STREAM_VARIABLE_RATE|
941 PA_STREAM_PEAK_DETECT|
942 PA_STREAM_START_MUTED|
943 PA_STREAM_ADJUST_LATENCY|
944 PA_STREAM_EARLY_REQUESTS|
945 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
946 PA_STREAM_START_UNMUTED|
947 PA_STREAM_FAIL_ON_SUSPEND)), PA_ERR_INVALID);
948
949 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
950 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
951 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
952 /* Althought some of the other flags are not supported on older
953 * version, we don't check for them here, because it doesn't hurt
954 * when they are passed but actually not supported. This makes
955 * client development easier */
956
957 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
958 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
959 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
960 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
961 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
962
963 pa_stream_ref(s);
964
965 s->direction = direction;
966 s->flags = flags;
967 s->corked = !!(flags & PA_STREAM_START_CORKED);
968
969 if (sync_stream)
970 s->syncid = sync_stream->syncid;
971
972 if (attr)
973 s->buffer_attr = *attr;
974 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
975
976 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
977 pa_usec_t x;
978
979 if (s->smoother)
980 pa_smoother_free(s->smoother);
981
982 s->smoother = pa_smoother_new(SMOOTHER_ADJUST_TIME, SMOOTHER_HISTORY_TIME, !(flags & PA_STREAM_NOT_MONOTONIC), SMOOTHER_MIN_HISTORY);
983
984 x = pa_rtclock_usec();
985 pa_smoother_set_time_offset(s->smoother, x);
986 pa_smoother_pause(s->smoother, x);
987 }
988
989 if (!dev)
990 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
991
992 t = pa_tagstruct_command(
993 s->context,
994 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
995 &tag);
996
997 if (s->context->version < 13)
998 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
999
1000 pa_tagstruct_put(
1001 t,
1002 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1003 PA_TAG_CHANNEL_MAP, &s->channel_map,
1004 PA_TAG_U32, PA_INVALID_INDEX,
1005 PA_TAG_STRING, dev,
1006 PA_TAG_U32, s->buffer_attr.maxlength,
1007 PA_TAG_BOOLEAN, s->corked,
1008 PA_TAG_INVALID);
1009
1010 if (s->direction == PA_STREAM_PLAYBACK) {
1011 pa_cvolume cv;
1012
1013 pa_tagstruct_put(
1014 t,
1015 PA_TAG_U32, s->buffer_attr.tlength,
1016 PA_TAG_U32, s->buffer_attr.prebuf,
1017 PA_TAG_U32, s->buffer_attr.minreq,
1018 PA_TAG_U32, s->syncid,
1019 PA_TAG_INVALID);
1020
1021 volume_set = !!volume;
1022
1023 if (!volume)
1024 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1025
1026 pa_tagstruct_put_cvolume(t, volume);
1027 } else
1028 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1029
1030 if (s->context->version >= 12) {
1031 pa_tagstruct_put(
1032 t,
1033 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1034 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1035 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1036 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1037 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1038 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1039 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1040 PA_TAG_INVALID);
1041 }
1042
1043 if (s->context->version >= 13) {
1044
1045 if (s->direction == PA_STREAM_PLAYBACK)
1046 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1047 else
1048 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1049
1050 pa_tagstruct_put(
1051 t,
1052 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1053 PA_TAG_PROPLIST, s->proplist,
1054 PA_TAG_INVALID);
1055
1056 if (s->direction == PA_STREAM_RECORD)
1057 pa_tagstruct_putu32(t, s->direct_on_input);
1058 }
1059
1060 if (s->context->version >= 14) {
1061
1062 if (s->direction == PA_STREAM_PLAYBACK)
1063 pa_tagstruct_put_boolean(t, volume_set);
1064
1065 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1066 }
1067
1068 if (s->context->version >= 15) {
1069
1070 if (s->direction == PA_STREAM_PLAYBACK)
1071 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1072
1073 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1074 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1075 }
1076
1077 pa_pstream_send_tagstruct(s->context->pstream, t);
1078 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1079
1080 pa_stream_set_state(s, PA_STREAM_CREATING);
1081
1082 pa_stream_unref(s);
1083 return 0;
1084 }
1085
1086 int pa_stream_connect_playback(
1087 pa_stream *s,
1088 const char *dev,
1089 const pa_buffer_attr *attr,
1090 pa_stream_flags_t flags,
1091 pa_cvolume *volume,
1092 pa_stream *sync_stream) {
1093
1094 pa_assert(s);
1095 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1096
1097 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1098 }
1099
1100 int pa_stream_connect_record(
1101 pa_stream *s,
1102 const char *dev,
1103 const pa_buffer_attr *attr,
1104 pa_stream_flags_t flags) {
1105
1106 pa_assert(s);
1107 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1108
1109 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1110 }
1111
1112 int pa_stream_write(
1113 pa_stream *s,
1114 const void *data,
1115 size_t length,
1116 void (*free_cb)(void *p),
1117 int64_t offset,
1118 pa_seek_mode_t seek) {
1119
1120 pa_memchunk chunk;
1121 pa_seek_mode_t t_seek;
1122 int64_t t_offset;
1123 size_t t_length;
1124 const void *t_data;
1125
1126 pa_assert(s);
1127 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1128 pa_assert(data);
1129
1130 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1131 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1132 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1133 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1134 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1135
1136 if (length <= 0)
1137 return 0;
1138
1139 t_seek = seek;
1140 t_offset = offset;
1141 t_length = length;
1142 t_data = data;
1143
1144 while (t_length > 0) {
1145
1146 chunk.index = 0;
1147
1148 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1149 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1150 chunk.length = t_length;
1151 } else {
1152 void *d;
1153
1154 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1155 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1156
1157 d = pa_memblock_acquire(chunk.memblock);
1158 memcpy(d, t_data, chunk.length);
1159 pa_memblock_release(chunk.memblock);
1160 }
1161
1162 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1163
1164 t_offset = 0;
1165 t_seek = PA_SEEK_RELATIVE;
1166
1167 t_data = (const uint8_t*) t_data + chunk.length;
1168 t_length -= chunk.length;
1169
1170 pa_memblock_unref(chunk.memblock);
1171 }
1172
1173 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1174 free_cb((void*) data);
1175
1176 if (length < s->requested_bytes)
1177 s->requested_bytes -= (uint32_t) length;
1178 else
1179 s->requested_bytes = 0;
1180
1181 /* FIXME!!! ^^^ will break when offset is != 0 and mode is not RELATIVE*/
1182
1183 if (s->direction == PA_STREAM_PLAYBACK) {
1184
1185 /* Update latency request correction */
1186 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1187
1188 if (seek == PA_SEEK_ABSOLUTE) {
1189 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1190 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1191 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1192 } else if (seek == PA_SEEK_RELATIVE) {
1193 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1194 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1195 } else
1196 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1197 }
1198
1199 /* Update the write index in the already available latency data */
1200 if (s->timing_info_valid) {
1201
1202 if (seek == PA_SEEK_ABSOLUTE) {
1203 s->timing_info.write_index_corrupt = FALSE;
1204 s->timing_info.write_index = offset + (int64_t) length;
1205 } else if (seek == PA_SEEK_RELATIVE) {
1206 if (!s->timing_info.write_index_corrupt)
1207 s->timing_info.write_index += offset + (int64_t) length;
1208 } else
1209 s->timing_info.write_index_corrupt = TRUE;
1210 }
1211
1212 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1213 request_auto_timing_update(s, TRUE);
1214 }
1215
1216 return 0;
1217 }
1218
1219 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1220 pa_assert(s);
1221 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1222 pa_assert(data);
1223 pa_assert(length);
1224
1225 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1226 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1227 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1228
1229 if (!s->peek_memchunk.memblock) {
1230
1231 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1232 *data = NULL;
1233 *length = 0;
1234 return 0;
1235 }
1236
1237 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1238 }
1239
1240 pa_assert(s->peek_data);
1241 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1242 *length = s->peek_memchunk.length;
1243 return 0;
1244 }
1245
1246 int pa_stream_drop(pa_stream *s) {
1247 pa_assert(s);
1248 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1249
1250 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1251 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1252 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1253 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1254
1255 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1256
1257 /* Fix the simulated local read index */
1258 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1259 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1260
1261 pa_assert(s->peek_data);
1262 pa_memblock_release(s->peek_memchunk.memblock);
1263 pa_memblock_unref(s->peek_memchunk.memblock);
1264 pa_memchunk_reset(&s->peek_memchunk);
1265
1266 return 0;
1267 }
1268
1269 size_t pa_stream_writable_size(pa_stream *s) {
1270 pa_assert(s);
1271 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1272
1273 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1274 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1275 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1276
1277 return s->requested_bytes;
1278 }
1279
1280 size_t pa_stream_readable_size(pa_stream *s) {
1281 pa_assert(s);
1282 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1283
1284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1286 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1287
1288 return pa_memblockq_get_length(s->record_memblockq);
1289 }
1290
1291 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1292 pa_operation *o;
1293 pa_tagstruct *t;
1294 uint32_t tag;
1295
1296 pa_assert(s);
1297 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1298
1299 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1300 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1301 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1302
1303 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1304
1305 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1306 pa_tagstruct_putu32(t, s->channel);
1307 pa_pstream_send_tagstruct(s->context->pstream, t);
1308 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1309
1310 return o;
1311 }
1312
1313 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1314 pa_usec_t usec;
1315
1316 pa_assert(s);
1317 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1318 pa_assert(s->state == PA_STREAM_READY);
1319 pa_assert(s->direction != PA_STREAM_UPLOAD);
1320 pa_assert(s->timing_info_valid);
1321 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1322 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1323
1324 if (s->direction == PA_STREAM_PLAYBACK) {
1325 /* The last byte that was written into the output device
1326 * had this time value associated */
1327 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1328
1329 if (!s->corked && !s->suspended) {
1330
1331 if (!ignore_transport)
1332 /* Because the latency info took a little time to come
1333 * to us, we assume that the real output time is actually
1334 * a little ahead */
1335 usec += s->timing_info.transport_usec;
1336
1337 /* However, the output device usually maintains a buffer
1338 too, hence the real sample currently played is a little
1339 back */
1340 if (s->timing_info.sink_usec >= usec)
1341 usec = 0;
1342 else
1343 usec -= s->timing_info.sink_usec;
1344 }
1345
1346 } else {
1347 pa_assert(s->direction == PA_STREAM_RECORD);
1348
1349 /* The last byte written into the server side queue had
1350 * this time value associated */
1351 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1352
1353 if (!s->corked && !s->suspended) {
1354
1355 if (!ignore_transport)
1356 /* Add transport latency */
1357 usec += s->timing_info.transport_usec;
1358
1359 /* Add latency of data in device buffer */
1360 usec += s->timing_info.source_usec;
1361
1362 /* If this is a monitor source, we need to correct the
1363 * time by the playback device buffer */
1364 if (s->timing_info.sink_usec >= usec)
1365 usec = 0;
1366 else
1367 usec -= s->timing_info.sink_usec;
1368 }
1369 }
1370
1371 return usec;
1372 }
1373
1374 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1375 pa_operation *o = userdata;
1376 struct timeval local, remote, now;
1377 pa_timing_info *i;
1378 pa_bool_t playing = FALSE;
1379 uint64_t underrun_for = 0, playing_for = 0;
1380
1381 pa_assert(pd);
1382 pa_assert(o);
1383 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1384
1385 if (!o->context || !o->stream)
1386 goto finish;
1387
1388 i = &o->stream->timing_info;
1389
1390 o->stream->timing_info_valid = FALSE;
1391 i->write_index_corrupt = TRUE;
1392 i->read_index_corrupt = TRUE;
1393
1394 if (command != PA_COMMAND_REPLY) {
1395 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1396 goto finish;
1397
1398 } else {
1399
1400 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1401 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1402 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1403 pa_tagstruct_get_timeval(t, &local) < 0 ||
1404 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1405 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1406 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1407
1408 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1409 goto finish;
1410 }
1411
1412 if (o->context->version >= 13 &&
1413 o->stream->direction == PA_STREAM_PLAYBACK)
1414 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1415 pa_tagstruct_getu64(t, &playing_for) < 0) {
1416
1417 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1418 goto finish;
1419 }
1420
1421
1422 if (!pa_tagstruct_eof(t)) {
1423 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1424 goto finish;
1425 }
1426 o->stream->timing_info_valid = TRUE;
1427 i->write_index_corrupt = FALSE;
1428 i->read_index_corrupt = FALSE;
1429
1430 i->playing = (int) playing;
1431 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1432
1433 pa_gettimeofday(&now);
1434
1435 /* Calculcate timestamps */
1436 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1437 /* local and remote seem to have synchronized clocks */
1438
1439 if (o->stream->direction == PA_STREAM_PLAYBACK)
1440 i->transport_usec = pa_timeval_diff(&remote, &local);
1441 else
1442 i->transport_usec = pa_timeval_diff(&now, &remote);
1443
1444 i->synchronized_clocks = TRUE;
1445 i->timestamp = remote;
1446 } else {
1447 /* clocks are not synchronized, let's estimate latency then */
1448 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1449 i->synchronized_clocks = FALSE;
1450 i->timestamp = local;
1451 pa_timeval_add(&i->timestamp, i->transport_usec);
1452 }
1453
1454 /* Invalidate read and write indexes if necessary */
1455 if (tag < o->stream->read_index_not_before)
1456 i->read_index_corrupt = TRUE;
1457
1458 if (tag < o->stream->write_index_not_before)
1459 i->write_index_corrupt = TRUE;
1460
1461 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1462 /* Write index correction */
1463
1464 int n, j;
1465 uint32_t ctag = tag;
1466
1467 /* Go through the saved correction values and add up the
1468 * total correction.*/
1469 for (n = 0, j = o->stream->current_write_index_correction+1;
1470 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1471 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1472
1473 /* Step over invalid data or out-of-date data */
1474 if (!o->stream->write_index_corrections[j].valid ||
1475 o->stream->write_index_corrections[j].tag < ctag)
1476 continue;
1477
1478 /* Make sure that everything is in order */
1479 ctag = o->stream->write_index_corrections[j].tag+1;
1480
1481 /* Now fix the write index */
1482 if (o->stream->write_index_corrections[j].corrupt) {
1483 /* A corrupting seek was made */
1484 i->write_index_corrupt = TRUE;
1485 } else if (o->stream->write_index_corrections[j].absolute) {
1486 /* An absolute seek was made */
1487 i->write_index = o->stream->write_index_corrections[j].value;
1488 i->write_index_corrupt = FALSE;
1489 } else if (!i->write_index_corrupt) {
1490 /* A relative seek was made */
1491 i->write_index += o->stream->write_index_corrections[j].value;
1492 }
1493 }
1494
1495 /* Clear old correction entries */
1496 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1497 if (!o->stream->write_index_corrections[n].valid)
1498 continue;
1499
1500 if (o->stream->write_index_corrections[n].tag <= tag)
1501 o->stream->write_index_corrections[n].valid = FALSE;
1502 }
1503 }
1504
1505 if (o->stream->direction == PA_STREAM_RECORD) {
1506 /* Read index correction */
1507
1508 if (!i->read_index_corrupt)
1509 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1510 }
1511
1512 /* Update smoother */
1513 if (o->stream->smoother) {
1514 pa_usec_t u, x;
1515
1516 u = x = pa_rtclock_usec() - i->transport_usec;
1517
1518 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1519 pa_usec_t su;
1520
1521 /* If we weren't playing then it will take some time
1522 * until the audio will actually come out through the
1523 * speakers. Since we follow that timing here, we need
1524 * to try to fix this up */
1525
1526 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1527
1528 if (su < i->sink_usec)
1529 x += i->sink_usec - su;
1530 }
1531
1532 if (!i->playing)
1533 pa_smoother_pause(o->stream->smoother, x);
1534
1535 /* Update the smoother */
1536 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1537 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1538 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1539
1540 if (i->playing)
1541 pa_smoother_resume(o->stream->smoother, x);
1542 }
1543 }
1544
1545 o->stream->auto_timing_update_requested = FALSE;
1546
1547 if (o->stream->latency_update_callback)
1548 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1549
1550 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1551 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1552 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1553 }
1554
1555 finish:
1556
1557 pa_operation_done(o);
1558 pa_operation_unref(o);
1559 }
1560
1561 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1562 uint32_t tag;
1563 pa_operation *o;
1564 pa_tagstruct *t;
1565 struct timeval now;
1566 int cidx = 0;
1567
1568 pa_assert(s);
1569 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1570
1571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1572 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1573 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1574
1575 if (s->direction == PA_STREAM_PLAYBACK) {
1576 /* Find a place to store the write_index correction data for this entry */
1577 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1578
1579 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1580 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1581 }
1582 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1583
1584 t = pa_tagstruct_command(
1585 s->context,
1586 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1587 &tag);
1588 pa_tagstruct_putu32(t, s->channel);
1589 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1590
1591 pa_pstream_send_tagstruct(s->context->pstream, t);
1592 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1593
1594 if (s->direction == PA_STREAM_PLAYBACK) {
1595 /* Fill in initial correction data */
1596
1597 s->current_write_index_correction = cidx;
1598
1599 s->write_index_corrections[cidx].valid = TRUE;
1600 s->write_index_corrections[cidx].absolute = FALSE;
1601 s->write_index_corrections[cidx].corrupt = FALSE;
1602 s->write_index_corrections[cidx].tag = tag;
1603 s->write_index_corrections[cidx].value = 0;
1604 }
1605
1606 return o;
1607 }
1608
1609 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1610 pa_stream *s = userdata;
1611
1612 pa_assert(pd);
1613 pa_assert(s);
1614 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1615
1616 pa_stream_ref(s);
1617
1618 if (command != PA_COMMAND_REPLY) {
1619 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1620 goto finish;
1621
1622 pa_stream_set_state(s, PA_STREAM_FAILED);
1623 goto finish;
1624 } else if (!pa_tagstruct_eof(t)) {
1625 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1626 goto finish;
1627 }
1628
1629 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1630
1631 finish:
1632 pa_stream_unref(s);
1633 }
1634
1635 int pa_stream_disconnect(pa_stream *s) {
1636 pa_tagstruct *t;
1637 uint32_t tag;
1638
1639 pa_assert(s);
1640 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1641
1642 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1643 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1644 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1645
1646 pa_stream_ref(s);
1647
1648 t = pa_tagstruct_command(
1649 s->context,
1650 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1651 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1652 &tag);
1653 pa_tagstruct_putu32(t, s->channel);
1654 pa_pstream_send_tagstruct(s->context->pstream, t);
1655 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1656
1657 pa_stream_unref(s);
1658 return 0;
1659 }
1660
1661 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1662 pa_assert(s);
1663 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1664
1665 if (pa_detect_fork())
1666 return;
1667
1668 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1669 return;
1670
1671 s->read_callback = cb;
1672 s->read_userdata = userdata;
1673 }
1674
1675 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1676 pa_assert(s);
1677 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1678
1679 if (pa_detect_fork())
1680 return;
1681
1682 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1683 return;
1684
1685 s->write_callback = cb;
1686 s->write_userdata = userdata;
1687 }
1688
1689 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1690 pa_assert(s);
1691 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1692
1693 if (pa_detect_fork())
1694 return;
1695
1696 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1697 return;
1698
1699 s->state_callback = cb;
1700 s->state_userdata = userdata;
1701 }
1702
1703 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1704 pa_assert(s);
1705 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1706
1707 if (pa_detect_fork())
1708 return;
1709
1710 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1711 return;
1712
1713 s->overflow_callback = cb;
1714 s->overflow_userdata = userdata;
1715 }
1716
1717 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1718 pa_assert(s);
1719 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1720
1721 if (pa_detect_fork())
1722 return;
1723
1724 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1725 return;
1726
1727 s->underflow_callback = cb;
1728 s->underflow_userdata = userdata;
1729 }
1730
1731 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1732 pa_assert(s);
1733 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1734
1735 if (pa_detect_fork())
1736 return;
1737
1738 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1739 return;
1740
1741 s->latency_update_callback = cb;
1742 s->latency_update_userdata = userdata;
1743 }
1744
1745 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1746 pa_assert(s);
1747 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1748
1749 if (pa_detect_fork())
1750 return;
1751
1752 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1753 return;
1754
1755 s->moved_callback = cb;
1756 s->moved_userdata = userdata;
1757 }
1758
1759 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1760 pa_assert(s);
1761 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1762
1763 if (pa_detect_fork())
1764 return;
1765
1766 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1767 return;
1768
1769 s->suspended_callback = cb;
1770 s->suspended_userdata = userdata;
1771 }
1772
1773 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1774 pa_assert(s);
1775 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1776
1777 if (pa_detect_fork())
1778 return;
1779
1780 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1781 return;
1782
1783 s->started_callback = cb;
1784 s->started_userdata = userdata;
1785 }
1786
1787 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1788 pa_assert(s);
1789 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1790
1791 if (pa_detect_fork())
1792 return;
1793
1794 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1795 return;
1796
1797 s->event_callback = cb;
1798 s->event_userdata = userdata;
1799 }
1800
1801 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1802 pa_operation *o = userdata;
1803 int success = 1;
1804
1805 pa_assert(pd);
1806 pa_assert(o);
1807 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1808
1809 if (!o->context)
1810 goto finish;
1811
1812 if (command != PA_COMMAND_REPLY) {
1813 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1814 goto finish;
1815
1816 success = 0;
1817 } else if (!pa_tagstruct_eof(t)) {
1818 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1819 goto finish;
1820 }
1821
1822 if (o->callback) {
1823 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1824 cb(o->stream, success, o->userdata);
1825 }
1826
1827 finish:
1828 pa_operation_done(o);
1829 pa_operation_unref(o);
1830 }
1831
1832 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
1833 pa_operation *o;
1834 pa_tagstruct *t;
1835 uint32_t tag;
1836
1837 pa_assert(s);
1838 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1839
1840 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1841 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1842 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1843
1844 s->corked = b;
1845
1846 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1847
1848 t = pa_tagstruct_command(
1849 s->context,
1850 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
1851 &tag);
1852 pa_tagstruct_putu32(t, s->channel);
1853 pa_tagstruct_put_boolean(t, !!b);
1854 pa_pstream_send_tagstruct(s->context->pstream, t);
1855 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1856
1857 check_smoother_status(s, FALSE, FALSE, FALSE);
1858
1859 /* This might cause the indexes to hang/start again, hence
1860 * let's request a timing update */
1861 request_auto_timing_update(s, TRUE);
1862
1863 return o;
1864 }
1865
1866 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
1867 pa_tagstruct *t;
1868 pa_operation *o;
1869 uint32_t tag;
1870
1871 pa_assert(s);
1872 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1873
1874 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1875 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1876
1877 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1878
1879 t = pa_tagstruct_command(s->context, command, &tag);
1880 pa_tagstruct_putu32(t, s->channel);
1881 pa_pstream_send_tagstruct(s->context->pstream, t);
1882 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1883
1884 return o;
1885 }
1886
1887 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1888 pa_operation *o;
1889
1890 pa_assert(s);
1891 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1892
1893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1894 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1895 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1896
1897 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
1898 return NULL;
1899
1900 if (s->direction == PA_STREAM_PLAYBACK) {
1901
1902 if (s->write_index_corrections[s->current_write_index_correction].valid)
1903 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1904
1905 if (s->buffer_attr.prebuf > 0)
1906 check_smoother_status(s, FALSE, FALSE, TRUE);
1907
1908 /* This will change the write index, but leave the
1909 * read index untouched. */
1910 invalidate_indexes(s, FALSE, TRUE);
1911
1912 } else
1913 /* For record streams this has no influence on the write
1914 * index, but the read index might jump. */
1915 invalidate_indexes(s, TRUE, FALSE);
1916
1917 return o;
1918 }
1919
1920 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1921 pa_operation *o;
1922
1923 pa_assert(s);
1924 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1925
1926 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1927 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1928 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1929 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1930
1931 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
1932 return NULL;
1933
1934 /* This might cause the read index to hang again, hence
1935 * let's request a timing update */
1936 request_auto_timing_update(s, TRUE);
1937
1938 return o;
1939 }
1940
1941 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1942 pa_operation *o;
1943
1944 pa_assert(s);
1945 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1946
1947 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1948 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1949 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1950 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
1951
1952 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
1953 return NULL;
1954
1955 /* This might cause the read index to start moving again, hence
1956 * let's request a timing update */
1957 request_auto_timing_update(s, TRUE);
1958
1959 return o;
1960 }
1961
1962 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
1963 pa_operation *o;
1964
1965 pa_assert(s);
1966 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1967 pa_assert(name);
1968
1969 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1970 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1971 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1972
1973 if (s->context->version >= 13) {
1974 pa_proplist *p = pa_proplist_new();
1975
1976 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1977 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
1978 pa_proplist_free(p);
1979 } else {
1980 pa_tagstruct *t;
1981 uint32_t tag;
1982
1983 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1984 t = pa_tagstruct_command(
1985 s->context,
1986 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
1987 &tag);
1988 pa_tagstruct_putu32(t, s->channel);
1989 pa_tagstruct_puts(t, name);
1990 pa_pstream_send_tagstruct(s->context->pstream, t);
1991 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1992 }
1993
1994 return o;
1995 }
1996
1997 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
1998 pa_usec_t usec;
1999
2000 pa_assert(s);
2001 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2002
2003 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2004 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2005 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2006 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2007 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2008 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2009
2010 if (s->smoother)
2011 usec = pa_smoother_get(s->smoother, pa_rtclock_usec());
2012 else
2013 usec = calc_time(s, FALSE);
2014
2015 /* Make sure the time runs monotonically */
2016 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2017 if (usec < s->previous_time)
2018 usec = s->previous_time;
2019 else
2020 s->previous_time = usec;
2021 }
2022
2023 if (r_usec)
2024 *r_usec = usec;
2025
2026 return 0;
2027 }
2028
2029 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2030 pa_assert(s);
2031 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2032
2033 if (negative)
2034 *negative = 0;
2035
2036 if (a >= b)
2037 return a-b;
2038 else {
2039 if (negative && s->direction == PA_STREAM_RECORD) {
2040 *negative = 1;
2041 return b-a;
2042 } else
2043 return 0;
2044 }
2045 }
2046
2047 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2048 pa_usec_t t, c;
2049 int r;
2050 int64_t cindex;
2051
2052 pa_assert(s);
2053 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2054 pa_assert(r_usec);
2055
2056 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2057 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2058 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2059 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2060 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2061 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2062
2063 if ((r = pa_stream_get_time(s, &t)) < 0)
2064 return r;
2065
2066 if (s->direction == PA_STREAM_PLAYBACK)
2067 cindex = s->timing_info.write_index;
2068 else
2069 cindex = s->timing_info.read_index;
2070
2071 if (cindex < 0)
2072 cindex = 0;
2073
2074 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2075
2076 if (s->direction == PA_STREAM_PLAYBACK)
2077 *r_usec = time_counter_diff(s, c, t, negative);
2078 else
2079 *r_usec = time_counter_diff(s, t, c, negative);
2080
2081 return 0;
2082 }
2083
2084 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2085 pa_assert(s);
2086 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2087
2088 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2089 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2090 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2091 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2092
2093 return &s->timing_info;
2094 }
2095
2096 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2097 pa_assert(s);
2098 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2099
2100 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2101
2102 return &s->sample_spec;
2103 }
2104
2105 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2106 pa_assert(s);
2107 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2108
2109 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2110
2111 return &s->channel_map;
2112 }
2113
2114 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2115 pa_assert(s);
2116 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2117
2118 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2119 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2120 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2121
2122 return &s->buffer_attr;
2123 }
2124
2125 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2126 pa_operation *o = userdata;
2127 int success = 1;
2128
2129 pa_assert(pd);
2130 pa_assert(o);
2131 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2132
2133 if (!o->context)
2134 goto finish;
2135
2136 if (command != PA_COMMAND_REPLY) {
2137 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2138 goto finish;
2139
2140 success = 0;
2141 } else {
2142 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2143 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2144 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2145 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2146 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2147 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2148 goto finish;
2149 }
2150 } else if (o->stream->direction == PA_STREAM_RECORD) {
2151 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2152 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2153 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2154 goto finish;
2155 }
2156 }
2157
2158 if (o->stream->context->version >= 13) {
2159 pa_usec_t usec;
2160
2161 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2162 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2163 goto finish;
2164 }
2165
2166 if (o->stream->direction == PA_STREAM_RECORD)
2167 o->stream->timing_info.configured_source_usec = usec;
2168 else
2169 o->stream->timing_info.configured_sink_usec = usec;
2170 }
2171
2172 if (!pa_tagstruct_eof(t)) {
2173 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2174 goto finish;
2175 }
2176 }
2177
2178 if (o->callback) {
2179 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2180 cb(o->stream, success, o->userdata);
2181 }
2182
2183 finish:
2184 pa_operation_done(o);
2185 pa_operation_unref(o);
2186 }
2187
2188
2189 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2190 pa_operation *o;
2191 pa_tagstruct *t;
2192 uint32_t tag;
2193
2194 pa_assert(s);
2195 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2196 pa_assert(attr);
2197
2198 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2201 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2202
2203 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2204
2205 t = pa_tagstruct_command(
2206 s->context,
2207 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2208 &tag);
2209 pa_tagstruct_putu32(t, s->channel);
2210
2211 pa_tagstruct_putu32(t, attr->maxlength);
2212
2213 if (s->direction == PA_STREAM_PLAYBACK)
2214 pa_tagstruct_put(
2215 t,
2216 PA_TAG_U32, attr->tlength,
2217 PA_TAG_U32, attr->prebuf,
2218 PA_TAG_U32, attr->minreq,
2219 PA_TAG_INVALID);
2220 else
2221 pa_tagstruct_putu32(t, attr->fragsize);
2222
2223 if (s->context->version >= 13)
2224 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2225
2226 if (s->context->version >= 14)
2227 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2228
2229 pa_pstream_send_tagstruct(s->context->pstream, t);
2230 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2231
2232 /* This might cause changes in the read/write indexex, hence let's
2233 * request a timing update */
2234 request_auto_timing_update(s, TRUE);
2235
2236 return o;
2237 }
2238
2239 uint32_t pa_stream_get_device_index(pa_stream *s) {
2240 pa_assert(s);
2241 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2242
2243 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2244 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2245 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2246 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2247 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2248
2249 return s->device_index;
2250 }
2251
2252 const char *pa_stream_get_device_name(pa_stream *s) {
2253 pa_assert(s);
2254 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2255
2256 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2257 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2258 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2259 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2260 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2261
2262 return s->device_name;
2263 }
2264
2265 int pa_stream_is_suspended(pa_stream *s) {
2266 pa_assert(s);
2267 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2268
2269 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2270 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2271 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2272 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2273
2274 return s->suspended;
2275 }
2276
2277 int pa_stream_is_corked(pa_stream *s) {
2278 pa_assert(s);
2279 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2280
2281 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2282 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2283 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2284
2285 return s->corked;
2286 }
2287
2288 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2289 pa_operation *o = userdata;
2290 int success = 1;
2291
2292 pa_assert(pd);
2293 pa_assert(o);
2294 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2295
2296 if (!o->context)
2297 goto finish;
2298
2299 if (command != PA_COMMAND_REPLY) {
2300 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2301 goto finish;
2302
2303 success = 0;
2304 } else {
2305
2306 if (!pa_tagstruct_eof(t)) {
2307 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2308 goto finish;
2309 }
2310 }
2311
2312 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2313 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2314
2315 if (o->callback) {
2316 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2317 cb(o->stream, success, o->userdata);
2318 }
2319
2320 finish:
2321 pa_operation_done(o);
2322 pa_operation_unref(o);
2323 }
2324
2325
2326 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2327 pa_operation *o;
2328 pa_tagstruct *t;
2329 uint32_t tag;
2330
2331 pa_assert(s);
2332 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2333
2334 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2335 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2336 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2337 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2338 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2339 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2340
2341 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2342 o->private = PA_UINT_TO_PTR(rate);
2343
2344 t = pa_tagstruct_command(
2345 s->context,
2346 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2347 &tag);
2348 pa_tagstruct_putu32(t, s->channel);
2349 pa_tagstruct_putu32(t, rate);
2350
2351 pa_pstream_send_tagstruct(s->context->pstream, t);
2352 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2353
2354 return o;
2355 }
2356
2357 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2358 pa_operation *o;
2359 pa_tagstruct *t;
2360 uint32_t tag;
2361
2362 pa_assert(s);
2363 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2364
2365 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2366 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2367 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2368 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2369 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2370
2371 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2372
2373 t = pa_tagstruct_command(
2374 s->context,
2375 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2376 &tag);
2377 pa_tagstruct_putu32(t, s->channel);
2378 pa_tagstruct_putu32(t, (uint32_t) mode);
2379 pa_tagstruct_put_proplist(t, p);
2380
2381 pa_pstream_send_tagstruct(s->context->pstream, t);
2382 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2383
2384 /* Please note that we don't update s->proplist here, because we
2385 * don't export that field */
2386
2387 return o;
2388 }
2389
2390 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2391 pa_operation *o;
2392 pa_tagstruct *t;
2393 uint32_t tag;
2394 const char * const*k;
2395
2396 pa_assert(s);
2397 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2398
2399 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2400 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2401 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2402 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2403 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2404
2405 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2406
2407 t = pa_tagstruct_command(
2408 s->context,
2409 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2410 &tag);
2411 pa_tagstruct_putu32(t, s->channel);
2412
2413 for (k = keys; *k; k++)
2414 pa_tagstruct_puts(t, *k);
2415
2416 pa_tagstruct_puts(t, NULL);
2417
2418 pa_pstream_send_tagstruct(s->context->pstream, t);
2419 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2420
2421 /* Please note that we don't update s->proplist here, because we
2422 * don't export that field */
2423
2424 return o;
2425 }
2426
2427 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2428 pa_assert(s);
2429 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2430
2431 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2432 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2433 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2434 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2435
2436 s->direct_on_input = sink_input_idx;
2437
2438 return 0;
2439 }
2440
2441 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2442 pa_assert(s);
2443 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2444
2445 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2446 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2447 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2448
2449 return s->direct_on_input;
2450 }